98 lines
3.4 KiB
Python
98 lines
3.4 KiB
Python
import os
|
|
import logging
|
|
import subprocess
|
|
from pathlib import Path
|
|
from typing import Generator
|
|
|
|
import cv2
|
|
import numpy as np
|
|
|
|
|
|
class VideoMaker:
|
|
def images_to_video(
|
|
self,
|
|
images_path: Path,
|
|
output_path: Path,
|
|
fps: float,
|
|
image_numerator: str = "img_%08d.png",
|
|
):
|
|
"""Converts a sequence of images to a video using ffmpeg."""
|
|
cmd = f"ffmpeg -framerate {fps} -i {images_path / image_numerator} -c:v libx264 -pix_fmt yuv420p {output_path}"
|
|
logging.info(f"Running command: {cmd}")
|
|
result = self.run_command(cmd)
|
|
if result != 0:
|
|
logging.error(f"Failed to create video. Command returned {result}")
|
|
|
|
def concatenate_videos(
|
|
self,
|
|
videos_path: Path,
|
|
output_path: Path,
|
|
video_numerator: str = "video_%08d.mp4",
|
|
):
|
|
"""Concatenates a sequence of videos using ffmpeg."""
|
|
|
|
videos = sorted(videos_path.glob("*.mp4"))
|
|
file = "file.txt"
|
|
with open(file, "w") as f:
|
|
for video in videos:
|
|
f.write(f"file '{video}'\n")
|
|
cmd = f"ffmpeg -y -f concat -safe 0 -i {file} -c copy {output_path}"
|
|
logging.info(f"Running command: {cmd}")
|
|
result = self.run_command(cmd)
|
|
if result != 0:
|
|
logging.error(f"Failed to concatenate videos. Command returned {result}")
|
|
os.remove(file)
|
|
|
|
def get_fps(self, video_path: Path) -> float:
|
|
"""Gets the frames per second (FPS) of a video."""
|
|
cap = cv2.VideoCapture(str(video_path))
|
|
if not cap.isOpened():
|
|
raise ValueError(f"Cannot open video: {video_path}")
|
|
fps = cap.get(cv2.CAP_PROP_FPS)
|
|
cap.release()
|
|
logging.debug(f"FPS of video {video_path}: {fps}")
|
|
return fps
|
|
|
|
def get_video_duration(self, video_path: Path) -> float:
|
|
"""Gets the duration of a video in seconds."""
|
|
cap = cv2.VideoCapture(str(video_path))
|
|
if not cap.isOpened():
|
|
raise ValueError(f"Cannot open video: {video_path}")
|
|
fps = cap.get(cv2.CAP_PROP_FPS)
|
|
frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT)
|
|
cap.release()
|
|
duration = frame_count / fps
|
|
logging.debug(f"Duration of video {video_path}: {duration:.2f} seconds")
|
|
return duration
|
|
|
|
def run_command(self, cmd: str) -> int:
|
|
try:
|
|
subprocess.run(cmd, shell=True, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
|
return 0
|
|
except subprocess.CalledProcessError as e:
|
|
logging.error(f"Command failed with error: {e}")
|
|
return e.returncode
|
|
|
|
def video_to_frames_generator(
|
|
self, video_path: Path, output_dir: Path, chunk_seconds: int = 10
|
|
) -> Generator[tuple[np.ndarray, ...], None, None]:
|
|
"""Extracts frames from a video and saves them to disk, yielding paths to the saved frames."""
|
|
|
|
cap = cv2.VideoCapture(str(video_path))
|
|
|
|
if not cap.isOpened():
|
|
raise ValueError(f"Cannot open video: {video_path}")
|
|
|
|
fps = cap.get(cv2.CAP_PROP_FPS)
|
|
frames_per_chunk = int(fps * chunk_seconds)
|
|
|
|
while True:
|
|
paths = []
|
|
for _ in range(frames_per_chunk):
|
|
ret, frame = cap.read()
|
|
if not ret:
|
|
cap.release()
|
|
return
|
|
paths.append(frame)
|
|
yield tuple(paths)
|