Files
AMT-Apple/main.py
2026-04-01 21:41:05 +05:00

183 lines
5.6 KiB
Python

import logging
import subprocess
from pathlib import Path
from typing import Generator
import cv2
from tqdm import tqdm
from time import perf_counter
from interpolator import get_device
from interpolator import ImageInterpolator
from interpolator import ModelRunner, Anchor
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
from pathlib import Path
def move_images(src_dir: str, interpolated_dir: str, output_dir: str):
src_dir = Path(src_dir)
interpolated_dir = Path(interpolated_dir)
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
index = 0
src_frames = sorted(src_dir.glob("img_*.png"))
interp_frames = sorted(interpolated_dir.glob("img_*.png"))
for i in range(len(src_frames)):
output_frame = output_dir / f"img_{index:08d}.png"
src_frames[i].rename(output_frame)
index += 1
if i < len(interp_frames):
output_interp = output_dir / f"img_{index:08d}.png"
interp_frames[i].rename(output_interp)
index += 1
def video_frames_to_disk_generator(
video_path: str | Path,
output_dir: str | Path,
chunk_seconds: int = 10
) -> Generator[tuple[Path, ...], None, None]:
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
cap = cv2.VideoCapture(str(video_path))
if not cap.isOpened():
raise ValueError(f"Cannot open video: {video_path}")
fps = cap.get(cv2.CAP_PROP_FPS)
frames_per_chunk = int(fps * chunk_seconds)
frame_index = 0
while True:
paths = []
for _ in range(frames_per_chunk):
ret, frame = cap.read()
if not ret:
cap.release()
return
frame_path = output_dir / f"img_{frame_index:08d}.png"
cv2.imwrite(str(frame_path), frame)
paths.append(frame_path)
frame_index += 1
yield tuple(paths)
def main():
start = perf_counter()
logging.info("Starting video interpolation process")
config_path = Path("src/config/AMT-G.yaml")
ckpt_path = Path("src/pretrained/amt-g.pth")
video_path = Path("example/video.mp4")
output_dir = Path("output/frames")
output_interpolated_dir = Path("output/interpolated")
output_interpolated_dir.mkdir(parents=True, exist_ok=True)
device = get_device()
model_runner = ModelRunner(config_path, ckpt_path, device)
if device.type in ("cpu", "mps"):
if device.type == "mps":
logging.warning(
"Running on Apple Silicon GPU (MPS) may have limited performance. Consider using a CUDA-enabled GPU for better performance."
)
else:
logging.warning(
"Running on CPU may be very slow. Consider using a GPU for better performance."
)
anchor = Anchor(resolution=8192 * 8192, memory=1, memory_bias=0)
elif device.type == "cuda":
anchor = Anchor(
resolution=1024 * 512, memory=1500 * 1024**2, memory_bias=2500 * 1024**2
)
else:
raise Exception(f"Unsupported device type: {device.type}")
interpolator = ImageInterpolator(device, anchor, model_runner)
loaded_time = perf_counter() - start
logging.info(f"Model loaded and initialized in {loaded_time:.2f} seconds")
prev_frame_path = None
frame_count = 0
for frame_paths in video_frames_to_disk_generator(video_path, output_dir):
logging.info(f"Processing frames: {len(frame_paths)}")
if prev_frame_path is not None:
img1 = prev_frame_path[-1]
img2 = frame_paths[0]
output_path = output_interpolated_dir / f"img_{frame_count:08d}.png"
interpolator.interpolate(img1, img2, output_path)
logging.debug(f"Interpolated image saved to: {output_path}")
frame_count += 1
for i in tqdm(range(len(frame_paths) - 1), desc="Interpolating frames"):
img1 = frame_paths[i]
img2 = frame_paths[i + 1]
output_path = output_interpolated_dir / f"img_{frame_count:08d}.png"
interpolator.interpolate(img1, img2, output_path)
logging.debug(f"Interpolated image saved to: {output_path}")
frame_count += 1
prev_frame_path = frame_paths
total_time = perf_counter() - start
logging.info(f"Video interpolation completed in {total_time:.2f} seconds")
def builder():
frames_dir = "output/frames"
interpolated_dir = "output/interpolated"
moved_dir = "output/moved"
video_path = "example/video.mp4"
output_video = "output/interpolated_video.mp4"
move_images(frames_dir, interpolated_dir, moved_dir)
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
raise ValueError("Cannot open original video")
fps = cap.get(cv2.CAP_PROP_FPS)
cmd = [
"ffmpeg",
"-y",
"-framerate", str(fps * 2),
"-i", f"{moved_dir}/img_%08d.png",
"-i", video_path,
"-c:v", "libx264",
"-c:a", "copy",
"-shortest",
output_video,
]
logging.info("Running ffmpeg command to build final video: " + " ".join(cmd))
subprocess.run(cmd, check=True)
def cleanup():
import os
import shutil
frames_dir = "output/frames"
interpolated_dir = "output/interpolated"
moved_dir = "output/moved"
os.makedirs(frames_dir, exist_ok=True)
os.makedirs(interpolated_dir, exist_ok=True)
os.makedirs(moved_dir, exist_ok=True)
shutil.rmtree(frames_dir)
shutil.rmtree(interpolated_dir)
shutil.rmtree(moved_dir)
if __name__ == "__main__":
cleanup()
main()
builder()
cleanup()