Обновил main.py, добавил fs.py и video.py
This commit is contained in:
96
src/utils/video.py
Normal file
96
src/utils/video.py
Normal file
@@ -0,0 +1,96 @@
|
||||
import logging
|
||||
from pathlib import Path
|
||||
|
||||
import cv2
|
||||
from typing import Generator
|
||||
|
||||
|
||||
class VideoMaker:
|
||||
def images_to_video(
|
||||
self,
|
||||
images_path: Path,
|
||||
output_path: Path,
|
||||
fps: float,
|
||||
image_numerator: str = "img_%08d.png",
|
||||
):
|
||||
"""Converts a sequence of images to a video using ffmpeg."""
|
||||
cmd = f"ffmpeg -framerate {fps} -i {images_path / image_numerator} -c:v libx264 -pix_fmt yuv420p {output_path}"
|
||||
logging.info(f"Running command: {cmd}")
|
||||
result = self.run_command(cmd)
|
||||
if result != 0:
|
||||
logging.error(f"Failed to create video. Command returned {result}")
|
||||
|
||||
def concatenate_videos(
|
||||
self,
|
||||
videos_path: Path,
|
||||
output_path: Path,
|
||||
video_numerator: str = "video_%08d.mp4",
|
||||
):
|
||||
"""Concatenates a sequence of videos using ffmpeg."""
|
||||
cmd = f"ffmpeg -f concat -safe 0 -i <(for f in {videos_path / video_numerator}; do echo \"file '$f'\"; done) -c copy {output_path}"
|
||||
logging.info(f"Running command: {cmd}")
|
||||
result = self.run_command(cmd)
|
||||
if result != 0:
|
||||
logging.error(f"Failed to concatenate videos. Command returned {result}")
|
||||
|
||||
def get_fps(self, video_path: Path) -> float:
|
||||
"""Gets the frames per second (FPS) of a video."""
|
||||
cap = cv2.VideoCapture(str(video_path))
|
||||
if not cap.isOpened():
|
||||
raise ValueError(f"Cannot open video: {video_path}")
|
||||
fps = cap.get(cv2.CAP_PROP_FPS)
|
||||
cap.release()
|
||||
logging.debug(f"FPS of video {video_path}: {fps}")
|
||||
return fps
|
||||
|
||||
def get_video_duration(self, video_path: Path) -> float:
|
||||
"""Gets the duration of a video in seconds."""
|
||||
cap = cv2.VideoCapture(str(video_path))
|
||||
if not cap.isOpened():
|
||||
raise ValueError(f"Cannot open video: {video_path}")
|
||||
fps = cap.get(cv2.CAP_PROP_FPS)
|
||||
frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT)
|
||||
cap.release()
|
||||
duration = frame_count / fps
|
||||
logging.debug(f"Duration of video {video_path}: {duration:.2f} seconds")
|
||||
return duration
|
||||
|
||||
def run_command(self, cmd: str) -> int:
|
||||
import subprocess
|
||||
|
||||
try:
|
||||
subprocess.run(cmd, shell=True, check=True, stdout=subprocess.DEVNULL)
|
||||
return 0
|
||||
except subprocess.CalledProcessError as e:
|
||||
logging.error(f"Command failed with error: {e}")
|
||||
return e.returncode
|
||||
|
||||
def video_to_frames_generator(self, video_path: Path, output_dir: Path, chunk_seconds: int = 10) -> Generator[tuple[Path, ...], None, None]:
|
||||
"""Extracts frames from a video and saves them to disk, yielding paths to the saved frames."""
|
||||
|
||||
cap = cv2.VideoCapture(str(video_path))
|
||||
|
||||
if not cap.isOpened():
|
||||
raise ValueError(f"Cannot open video: {video_path}")
|
||||
|
||||
fps = cap.get(cv2.CAP_PROP_FPS)
|
||||
frames_per_chunk = int(fps * chunk_seconds)
|
||||
|
||||
frame_index = 0
|
||||
|
||||
while True:
|
||||
paths = []
|
||||
|
||||
for _ in range(frames_per_chunk):
|
||||
ret, frame = cap.read()
|
||||
if not ret:
|
||||
cap.release()
|
||||
return
|
||||
|
||||
frame_path = output_dir / f"img_{frame_index:08d}.png"
|
||||
cv2.imwrite(str(frame_path), frame)
|
||||
|
||||
paths.append(frame_path)
|
||||
frame_index += 1
|
||||
|
||||
yield tuple(paths)
|
||||
Reference in New Issue
Block a user