import logging import subprocess from pathlib import Path import cv2 from tqdm import tqdm from time import perf_counter from decimal import Decimal from interpolator import get_device from interpolator import ImageInterpolator from interpolator import ModelRunner, Anchor logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) from pathlib import Path def move_images(src_dir: str, interpolated_dir: str, output_dir: str): src_dir = Path(src_dir) interpolated_dir = Path(interpolated_dir) output_dir = Path(output_dir) output_dir.mkdir(parents=True, exist_ok=True) index = 0 src_frames = sorted(src_dir.glob("img_*.png")) interp_frames = sorted(interpolated_dir.glob("img_*.png")) for i in range(len(src_frames)): output_frame = output_dir / f"img_{index:08d}.png" src_frames[i].rename(output_frame) index += 1 if i < len(interp_frames): output_interp = output_dir / f"img_{index:08d}.png" interp_frames[i].rename(output_interp) index += 1 def build_file_list(moved_dir: str, list_path: str): import os moved_dir = Path(moved_dir) frames = sorted(moved_dir.glob("img_*.png")) print(frames[0]) with open(list_path, "w") as f: for frame in frames: f.write(f"file '{os.path.abspath(frame)}'\n") def build_ffmpeg_file_list(frames_dir: str, interpolated_dir: str, list_path: str): frames = sorted(Path(frames_dir).glob("img_*.png")) interps = sorted(Path(interpolated_dir).glob("img_*.png")) if len(interps) != len(frames) - 1: raise ValueError("Interpolated frames must be N-1") with open(list_path, "w") as f: for i in range(len(frames)): f.write(f"file '{frames[i].resolve().as_posix()}'\n") if i < len(interps): f.write(f"file '{interps[i].resolve().as_posix()}'\n") def merge_with_ffmpeg( original_video: str, file_list: str, output_video: str, ): cap = cv2.VideoCapture(original_video) if not cap.isOpened(): raise ValueError("Cannot open original video") fps = cap.get(cv2.CAP_PROP_FPS) cap.release() new_fps = Decimal(fps * 2) cmd = [ "ffmpeg", "-y", "-r", str(new_fps.quantize(Decimal("1.0000000000"))), "-f", "concat", "-safe", "0", "-i", file_list, "-c:v", "libx264rgb", output_video, ] print("Running ffmpeg command:", " ".join(cmd)) subprocess.run(cmd, check=True) def video_frames_to_disk_generator( video_path: str | Path, output_dir: str | Path, chunk_seconds: int = 10 ): output_dir = Path(output_dir) output_dir.mkdir(parents=True, exist_ok=True) cap = cv2.VideoCapture(str(video_path)) if not cap.isOpened(): raise ValueError(f"Cannot open video: {video_path}") fps = cap.get(cv2.CAP_PROP_FPS) frames_per_chunk = int(fps * chunk_seconds) frame_index = 0 while True: paths = [] for _ in range(frames_per_chunk): ret, frame = cap.read() if not ret: cap.release() return frame_path = output_dir / f"img_{frame_index:08d}.png" cv2.imwrite(str(frame_path), frame) paths.append(frame_path) frame_index += 1 yield tuple(paths) def main(): start = perf_counter() logging.info("Starting video interpolation process") config_path = Path("src/config/AMT-G.yaml") ckpt_path = Path("src/pretrained/amt-g.pth") video_path = Path("source/video.mp4") output_dir = Path("output/frames") output_interpolated_dir = Path("output/interpolated") output_interpolated_dir.mkdir(parents=True, exist_ok=True) device = get_device() model_runner = ModelRunner(config_path, ckpt_path, device) if device.type in ("cpu", "mps"): if device.type == "mps": logging.warning( "Running on Apple Silicon GPU (MPS) may have limited performance. Consider using a CUDA-enabled GPU for better performance." ) else: logging.warning( "Running on CPU may be very slow. Consider using a GPU for better performance." ) anchor = Anchor(resolution=8192 * 8192, memory=1, memory_bias=0) elif device.type == "cuda": anchor = Anchor( resolution=1024 * 512, memory=1500 * 1024**2, memory_bias=2500 * 1024**2 ) else: raise Exception(f"Unsupported device type: {device.type}") interpolator = ImageInterpolator(device, anchor, model_runner) loaded_time = perf_counter() - start logging.info(f"Model loaded and initialized in {loaded_time:.2f} seconds") prev_frame_path = None frame_count = 0 for frame_paths in video_frames_to_disk_generator(video_path, output_dir): logging.info(f"Processing frames: {frame_paths}") if prev_frame_path is not None: img1 = prev_frame_path[-1] img2 = frame_paths[0] output_path = output_interpolated_dir / f"img_{frame_count:08d}.png" interpolator.interpolate(img1, img2, output_path) logging.debug(f"Interpolated image saved to: {output_path}") frame_count += 1 for i in tqdm(range(len(frame_paths) - 1), desc="Interpolating frames"): img1 = frame_paths[i] img2 = frame_paths[i + 1] output_path = output_interpolated_dir / f"img_{frame_count:08d}.png" interpolator.interpolate(img1, img2, output_path) logging.debug(f"Interpolated image saved to: {output_path}") frame_count += 1 prev_frame_path = frame_paths total_time = perf_counter() - start logging.info(f"Video interpolation completed in {total_time:.2f} seconds") def builder(): frames_dir = "output/frames" interpolated_dir = "output/interpolated" list_path = "file_list.txt" video_path = "source/video.mp4" output_video = "output/interpolated_video.mp4" build_file_list('output/moved_frames', list_path) merge_with_ffmpeg(video_path, list_path, output_video) if __name__ == "__main__": main()