6 Commits

Author SHA1 Message Date
Viner Abubakirov
c7acd66974 переименовал runner на run 2026-04-04 22:06:27 +05:00
Viner Abubakirov
2d67b72128 Перевел импорты модулей в относительные пути 2026-04-04 11:57:41 +05:00
c91cf6b53a Merge pull request 'dev' (#2) from dev into main
Reviewed-on: #2
2026-04-03 18:28:31 +05:00
Viner Abubakirov
61f8e0abe1 Поменял, метод формирования видео 2026-04-03 17:03:10 +05:00
Viner Abubakirov
c72e34f9dc checkout presets.py from dev 2026-04-02 18:31:54 +05:00
Viner Abubakirov
faf7aa8e81 Начал менять логику pipeline 2026-04-02 18:26:36 +05:00
15 changed files with 292 additions and 239 deletions

192
main.py
View File

@@ -1,193 +1,7 @@
import logging import logging
from pathlib import Path from pathlib import Path
from typing import TYPE_CHECKING from src.runner import run
import tqdm
from src.config import presets from src.config import presets
from src.utils.fs import FileSystem
from src.utils.video import VideoMaker
from src.interpolator import (
ImageInterpolator,
Anchor,
get_device,
get_vram_available,
ModelRunner,
)
if TYPE_CHECKING:
import torch
def performing_warning_message(device: "torch.device"):
if device.type in ("cpu", "mps"):
if device.type == "mps":
logging.warning(
"Running on Apple Silicon GPU (MPS) may have limited performance. Consider using a CUDA-enabled GPU for better performance."
)
else:
logging.warning(
"Running on CPU may be very slow. Consider using a GPU for better performance."
)
elif device.type == "cuda":
pass
else:
raise Exception(f"Unsupported device type: {device.type}")
def init_fs(base_path: Path) -> FileSystem:
fs = FileSystem(base_path)
fs.clear_directory(fs.frames_path)
fs.clear_directory(fs.interpolated_path)
fs.clear_directory(fs.moved_path)
fs.clear_directory(fs.video_part_path)
return fs
def init_video_maker() -> VideoMaker:
return VideoMaker()
def init_device() -> "torch.device":
device = get_device()
performing_warning_message(device)
vram_available = get_vram_available(device)
logging.info(f"Available VRAM: {vram_available / (1024 ** 3):.2f} GB")
return device
def init_anchor(device: "torch.device") -> Anchor:
if device.type in ("cpu", "mps"):
return Anchor(resolution=8192 * 8192, memory=1, memory_bias=0)
elif device.type == "cuda":
return Anchor(
resolution=1024 * 512, memory=1500 * 1024**2, memory_bias=2500 * 1024**2
)
else:
raise Exception(f"Unsupported device type: {device.type}")
def init_model_runner(
config: Path, checkpoint_path: Path, device: "torch.device"
) -> ModelRunner:
return ModelRunner(config, checkpoint_path, device)
def init_interpolator(
model_runner: ModelRunner, device: "torch.device"
) -> ImageInterpolator:
anchor = init_anchor(device)
return ImageInterpolator(device, anchor, model_runner)
class InterpolationPipeline:
def __init__(
self,
config: Path,
checkpoint_path: Path,
base_path: Path,
):
self.fs = init_fs(base_path)
self.video_maker = init_video_maker()
self.device = init_device()
self.model_runner = init_model_runner(config, checkpoint_path, self.device)
self.interpolator = init_interpolator(self.model_runner, self.device)
def run(self, video_path: Path, output_video: str):
prev_frame_path = None
frame_count = 0
part = 0
source_frame_length = 0
chunk_seconds = 10
length = self.video_maker.get_video_duration(video_path)
last_part_seconds = 1 if length % chunk_seconds else 0
total_parts = int(length // chunk_seconds) + last_part_seconds
fps = self.video_maker.get_fps(video_path)
logging.info(f"Video FPS: {fps}")
fps *= 2 # Doubling FPS
for frame_paths in self.video_maker.video_to_frames_generator(
video_path, self.fs.frames_path, chunk_seconds
):
logging.info(f"Processing frames: {len(frame_paths)}")
if prev_frame_path is not None:
img1 = prev_frame_path[-1]
img2 = frame_paths[0]
output_path = self.fs.interpolated_path / f"img_{frame_count:08d}.png"
self.interpolator.interpolate(img1, img2, output_path)
logging.debug(f"Interpolated image saved to: {output_path}")
self._merge_frames_to_video(
self.fs.video_part_path / f"video_{part:08d}.mp4",
fps,
source_frame_length=source_frame_length,
)
logging.info(f"Finished processing part {part:08d}")
frame_count += 1
part += 1
for i in tqdm.tqdm(
range(len(frame_paths) - 1),
desc=f"Processing video frames {part + 1} / {total_parts}",
):
img1 = frame_paths[i]
img2 = frame_paths[i + 1]
output_path = self.fs.interpolated_path / f"img_{i:08d}.png"
self.interpolator.interpolate(img1, img2, output_path)
logging.debug(f"Interpolated image saved to: {output_path}")
frame_count += 1
source_frame_length = len(frame_paths)
prev_frame_path = frame_paths
self._merge_frames_to_video(
self.fs.video_part_path / f"video_{part:08d}.mp4",
fps,
source_frame_length=source_frame_length,
)
logging.info(f"Finished processing part {part:08d}")
self._merge_video_parts(self.fs.output_path / output_video)
logging.info(
f"Video interpolation completed. Output saved to: {self.fs.output_path / output_video}"
)
def _merge_frames_to_video(
self, output_video: Path, fps: float, source_frame_length: int = 0
):
self._move_frames(source_frame_length)
self.video_maker.images_to_video(self.fs.moved_path, output_video, fps)
def _merge_video_parts(self, output_video: Path):
self.video_maker.concatenate_videos(self.fs.video_part_path, output_video)
self.fs.clear_directory(self.fs.video_part_path)
def _move_frames(self, source_frame_length: int = 0):
self.fs.clear_directory(self.fs.moved_path)
src_frames = sorted(self.fs.frames_path.glob("*.png"))
interpolated_frames = sorted(self.fs.interpolated_path.glob("*.png"))
index = 0
for i in range(source_frame_length):
moved_frame_path = self.fs.moved_path / f"img_{index:08d}.png"
src_frames[i].rename(moved_frame_path)
index += 1
if i < len(interpolated_frames):
moved_interpolated_path = self.fs.moved_path / f"img_{index:08d}.png"
interpolated_frames[i].rename(moved_interpolated_path)
index += 1
logging.info(
f"Moved {len(src_frames)} source frames and {len(interpolated_frames)} interpolated frames to {self.fs.moved_path}"
)
def runner(
base_path: Path,
video_path: Path,
output_video: str,
preset: presets.Preset = presets.LARGE,
):
pipeline = InterpolationPipeline(
config=preset.config,
checkpoint_path=preset.checkpoint,
base_path=base_path,
)
pipeline.run(video_path, output_video)
def main(): def main():
@@ -216,11 +30,11 @@ def main():
default="global", default="global",
) )
args = parser.parse_args() args = parser.parse_args()
runner( run(
base_path=Path(args.base_path), base_path=Path(args.base_path),
video_path=Path(args.video_path), video_path=Path(args.video_path),
output_video=args.output, output_video=args.output,
preset=getattr(presets, args.preset.upper()) preset=getattr(presets, args.preset.upper()),
) )

View File

@@ -10,7 +10,7 @@ save_dir: work_dir
eval_interval: 1 eval_interval: 1
network: network:
name: src.networks.AMT-G.Model name: AMT-G.Model
params: params:
corr_radius: 3 corr_radius: 3
corr_lvls: 4 corr_lvls: 4

View File

@@ -10,7 +10,7 @@ save_dir: work_dir
eval_interval: 1 eval_interval: 1
network: network:
name: src.networks.AMT-L.Model name: AMT-L.Model
params: params:
corr_radius: 3 corr_radius: 3
corr_lvls: 4 corr_lvls: 4

View File

@@ -10,7 +10,7 @@ save_dir: work_dir
eval_interval: 1 eval_interval: 1
network: network:
name: src.networks.AMT-S.Model name: AMT-S.Model
params: params:
corr_radius: 3 corr_radius: 3
corr_lvls: 4 corr_lvls: 4

View File

@@ -19,6 +19,6 @@ LARGE = Preset(
) )
GLOBAL = Preset( GLOBAL = Preset(
config=Path("src/config/AMT-g.yaml"), config=Path("src/config/AMT-G.yaml"),
checkpoint=Path("src/pretrained/amt-g.pth"), checkpoint=Path("src/pretrained/amt-g.pth"),
) )

View File

@@ -4,11 +4,10 @@ from pathlib import Path
import torch import torch
import numpy as np import numpy as np
from omegaconf import OmegaConf, DictConfig from omegaconf import OmegaConf, DictConfig
from imageio import imread, imwrite
from src.utils.torch import img2tensor, check_dim_and_resize, tensor2img from .utils.torch import img2tensor, check_dim_and_resize, tensor2img
from src.utils.build import build_from_cfg from .utils.build import build_from_cfg
from src.utils.padder import InputPadder from .utils.padder import InputPadder
class Anchor: class Anchor:
@@ -83,7 +82,7 @@ class ImageInterpolator:
f"Initialized ImageInterpolator with device: {device}, anchor: {anchor}, available VRAM: {self.vram_available} bytes" f"Initialized ImageInterpolator with device: {device}, anchor: {anchor}, available VRAM: {self.vram_available} bytes"
) )
def interpolate(self, image1: Path, image2: Path, output_path: Path): def interpolate(self, image1: np.ndarray, image2: np.ndarray) -> np.ndarray:
""" """
Interpolates between two images and saves the result. Interpolates between two images and saves the result.
Args: Args:
@@ -92,8 +91,8 @@ class ImageInterpolator:
output_path (Path): Path to save the interpolated image (only png and jpg formats are supported) output_path (Path): Path to save the interpolated image (only png and jpg formats are supported)
""" """
logging.debug(f"Reading images: {image1} and {image2}") logging.debug(f"Reading images: {image1} and {image2}")
tensor1 = img2tensor(imread(image1)).to(self.device) tensor1 = img2tensor(image1).to(self.device)
tensor2 = img2tensor(imread(image2)).to(self.device) tensor2 = img2tensor(image2).to(self.device)
logging.debug( logging.debug(
f"Image shapes after conversion to tensors: {tensor1.shape}, {tensor2.shape}" f"Image shapes after conversion to tensors: {tensor1.shape}, {tensor2.shape}"
) )
@@ -122,8 +121,7 @@ class ImageInterpolator:
logging.debug(f"Interpolated image shape before unpadding: {interpolated.shape}") logging.debug(f"Interpolated image shape before unpadding: {interpolated.shape}")
(interpolated,) = padder.unpad(interpolated) (interpolated,) = padder.unpad(interpolated)
logging.debug(f"Interpolated image shape after unpadding: {interpolated.shape}") logging.debug(f"Interpolated image shape after unpadding: {interpolated.shape}")
imwrite(output_path, tensor2img(interpolated.cpu())) return tensor2img(interpolated.cpu())
logging.debug(f"Saved interpolated image to: {output_path}")
def scale(self, height: int, width: int) -> float: def scale(self, height: int, width: int) -> float:
scale = ( scale = (

View File

@@ -2,10 +2,10 @@ from typing import Optional
import torch import torch
import torch.nn as nn import torch.nn as nn
from src.networks.blocks.raft import coords_grid, BasicUpdateBlock, BidirCorrBlock from .blocks.raft import coords_grid, BasicUpdateBlock, BidirCorrBlock
from src.networks.blocks.feat_enc import LargeEncoder from .blocks.feat_enc import LargeEncoder
from src.networks.blocks.ifrnet import resize, Encoder, InitDecoder, IntermediateDecoder from .blocks.ifrnet import resize, Encoder, InitDecoder, IntermediateDecoder
from src.networks.blocks.multi_flow import multi_flow_combine, MultiFlowDecoder from .blocks.multi_flow import multi_flow_combine, MultiFlowDecoder
class Model(nn.Module): class Model(nn.Module):

View File

@@ -1,10 +1,10 @@
import torch import torch
import torch.nn as nn import torch.nn as nn
from src.networks.blocks.raft import coords_grid, BasicUpdateBlock, BidirCorrBlock from .blocks.raft import coords_grid, BasicUpdateBlock, BidirCorrBlock
from src.networks.blocks.feat_enc import BasicEncoder from .blocks.feat_enc import BasicEncoder
from src.networks.blocks.ifrnet import resize, Encoder, InitDecoder, IntermediateDecoder from .blocks.ifrnet import resize, Encoder, InitDecoder, IntermediateDecoder
from src.networks.blocks.multi_flow import multi_flow_combine, MultiFlowDecoder from .blocks.multi_flow import multi_flow_combine, MultiFlowDecoder
class Model(nn.Module): class Model(nn.Module):

View File

@@ -1,9 +1,9 @@
import torch import torch
import torch.nn as nn import torch.nn as nn
from src.networks.blocks.raft import coords_grid, SmallUpdateBlock, BidirCorrBlock from .blocks.raft import coords_grid, SmallUpdateBlock, BidirCorrBlock
from src.networks.blocks.feat_enc import SmallEncoder from .blocks.feat_enc import SmallEncoder
from src.networks.blocks.ifrnet import resize, Encoder, InitDecoder, IntermediateDecoder from .blocks.ifrnet import resize, Encoder, InitDecoder, IntermediateDecoder
from src.networks.blocks.multi_flow import multi_flow_combine, MultiFlowDecoder from .blocks.multi_flow import multi_flow_combine, MultiFlowDecoder
class Model(nn.Module): class Model(nn.Module):

View File

@@ -1,7 +1,7 @@
import torch import torch
import torch.nn as nn import torch.nn as nn
from src.utils.flow_utils import warp from ..utils.flow_utils import warp
from src.networks.blocks.ifrnet import convrelu, resize, ResBlock from .blocks.ifrnet import convrelu, resize, ResBlock
class Encoder(nn.Module): class Encoder(nn.Module):

View File

@@ -1,7 +1,7 @@
import torch import torch
import torch.nn as nn import torch.nn as nn
import torch.nn.functional as F import torch.nn.functional as F
from src.utils.flow_utils import warp from ...utils.flow_utils import warp
def resize(x, scale_factor): def resize(x, scale_factor):

View File

@@ -1,7 +1,7 @@
import torch import torch
import torch.nn as nn import torch.nn as nn
from src.utils.flow_utils import warp from ...utils.flow_utils import warp
from src.networks.blocks.ifrnet import convrelu, resize, ResBlock from .ifrnet import convrelu, resize, ResBlock
def multi_flow_combine( def multi_flow_combine(

196
src/runner.py Normal file
View File

@@ -0,0 +1,196 @@
import logging
from pathlib import Path
from typing import TYPE_CHECKING
from cv2 import imwrite
import tqdm
from .config import presets
from .utils.fs import FileSystem
from .utils.video import VideoMaker
from .interpolator import (
ImageInterpolator,
Anchor,
get_device,
get_vram_available,
ModelRunner,
)
if TYPE_CHECKING:
import torch
import numpy as np
def performing_warning_message(device: "torch.device"):
if device.type in ("cpu", "mps"):
if device.type == "mps":
logging.warning(
"Running on Apple Silicon GPU (MPS) may have limited performance. Consider using a CUDA-enabled GPU for better performance."
)
else:
logging.warning(
"Running on CPU may be very slow. Consider using a GPU for better performance."
)
elif device.type == "cuda":
pass
else:
raise Exception(f"Unsupported device type: {device.type}")
def init_fs(base_path: Path) -> FileSystem:
fs = FileSystem(base_path)
fs.clear_directory(fs.frames_path)
fs.clear_directory(fs.interpolated_path)
fs.clear_directory(fs.moved_path)
fs.clear_directory(fs.video_part_path)
return fs
def init_video_maker() -> VideoMaker:
return VideoMaker()
def init_device() -> "torch.device":
device = get_device()
performing_warning_message(device)
vram_available = get_vram_available(device)
logging.info(f"Available VRAM: {vram_available / (1024**3):.2f} GB")
return device
def init_anchor(device: "torch.device") -> Anchor:
if device.type in ("cpu", "mps"):
return Anchor(resolution=8192 * 8192, memory=1, memory_bias=0)
elif device.type == "cuda":
return Anchor(
resolution=1024 * 512, memory=1500 * 1024**2, memory_bias=2500 * 1024**2
)
else:
raise Exception(f"Unsupported device type: {device.type}")
def init_model_runner(
config: Path, checkpoint_path: Path, device: "torch.device"
) -> ModelRunner:
return ModelRunner(config, checkpoint_path, device)
def init_interpolator(
model_runner: ModelRunner, device: "torch.device"
) -> ImageInterpolator:
anchor = init_anchor(device)
return ImageInterpolator(device, anchor, model_runner)
class InterpolationPipeline:
def __init__(
self,
config: Path,
checkpoint_path: Path,
base_path: Path,
):
self.fs = init_fs(base_path)
self.video_maker = init_video_maker()
self.device = init_device()
self.model_runner = init_model_runner(config, checkpoint_path, self.device)
self.interpolator = init_interpolator(self.model_runner, self.device)
def run(self, video_path: Path, output_video: str):
prev_frames: tuple["np.ndarray", ...] = tuple()
interpolated_frames: list["np.ndarray"] = []
part = 0
chunk_seconds = 10
length = self.video_maker.get_video_duration(video_path)
last_part_seconds = 1 if length % chunk_seconds else 0
total_parts = int(length // chunk_seconds) + last_part_seconds
fps = self.video_maker.get_fps(video_path)
logging.info(f"Video FPS: {fps}")
fps *= 2 # Doubling FPS
width, height = self.video_maker.get_size(video_path)
for frames in self.video_maker.video_to_frames_generator(
video_path, self.fs.frames_path, chunk_seconds
):
logging.info(f"Processing frames: {len(frames)}")
if prev_frames:
img1 = prev_frames[-1]
img2 = frames[0]
img1_2 = self.interpolator.interpolate(img1, img2)
interpolated_frames.append(img1_2)
generator = self._frame_generator(prev_frames, interpolated_frames)
part_path = self.fs.video_part_path / f"video_{part:08d}.mp4"
self.video_maker.images_to_video_pipeline(
generator, part_path, width, height, fps
)
interpolated_frames = []
logging.info(f"Finished processing part {part:08d}")
part += 1
for i in tqdm.tqdm(
range(len(frames) - 1),
desc=f"Processing video frames {part + 1} / {total_parts}",
):
img1 = frames[i]
img2 = frames[i + 1]
img1_2 = self.interpolator.interpolate(img1, img2)
interpolated_frames.append(img1_2)
prev_frames = frames
generator = self._frame_generator(prev_frames, interpolated_frames)
part_path = self.fs.video_part_path / f"video_{part:08d}.mp4"
self.video_maker.images_to_video_pipeline(
generator, part_path, width, height, fps
)
logging.info(f"Finished processing part {part:08d}")
self._merge_video_parts(self.fs.output_path / output_video)
logging.info(
f"Video interpolation completed. Output saved to: {self.fs.output_path / output_video}"
)
def _save_images(
self,
source: tuple["np.ndarray", ...],
interpolated: list["np.ndarray"],
):
logging.info("Saving images...")
self.fs.clear_directory(self.fs.moved_path)
index = 0
for i, frame in enumerate(source):
name = self.fs.moved_path / f"img_{index:08d}.png"
index += 1
imwrite(name, frame)
if i < len(interpolated):
name = self.fs.moved_path / f"img_{index:08d}.png"
index += 1
imwrite(name, interpolated[i])
logging.info("Success...")
def _merge_frames_to_video(self, output_video: Path, fps: float):
self.video_maker.images_to_video(self.fs.moved_path, output_video, fps)
def _merge_video_parts(self, output_video: Path):
self.video_maker.concatenate_videos(self.fs.video_part_path, output_video)
self.fs.clear_directory(self.fs.video_part_path)
def _frame_generator(
self,
source: tuple["np.ndarray", ...],
interpolated: list["np.ndarray"],
):
for i, frame in enumerate(source):
yield frame
if i < len(interpolated):
yield interpolated[i]
def run(
base_path: Path,
video_path: Path,
output_video: str,
preset: presets.Preset = presets.LARGE,
):
pipeline = InterpolationPipeline(
config=preset.config,
checkpoint_path=preset.checkpoint,
base_path=base_path,
)
pipeline.run(video_path, output_video)

View File

@@ -1,16 +1,19 @@
import importlib
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from ..networks import AMT_G, AMT_L, AMT_S
if TYPE_CHECKING: if TYPE_CHECKING:
from omegaconf import DictConfig from omegaconf import DictConfig
def base_build_fn(module: str, cls: str, params: dict):
return getattr(importlib.import_module(module, package=None), cls)(**params)
def build_from_cfg(config: "DictConfig"): def build_from_cfg(config: "DictConfig"):
packages = {
"AMT-G": AMT_G,
"AMT-L": AMT_L,
"AMT-S": AMT_S
}
module, cls = config["name"].rsplit(".", 1) module, cls = config["name"].rsplit(".", 1)
params: dict = config.get("params", {}) params: dict = config.get("params", {})
return base_build_fn(module, cls, params) return getattr(packages[module], cls)(**params)

View File

@@ -2,9 +2,10 @@ import os
import logging import logging
import subprocess import subprocess
from pathlib import Path from pathlib import Path
from typing import Generator, Iterable
import cv2 import cv2
from typing import Generator import numpy as np
class VideoMaker: class VideoMaker:
@@ -35,7 +36,7 @@ class VideoMaker:
with open(file, "w") as f: with open(file, "w") as f:
for video in videos: for video in videos:
f.write(f"file '{video}'\n") f.write(f"file '{video}'\n")
cmd = f"ffmpeg -f concat -safe 0 -i {file} -c copy {output_path}" cmd = f"ffmpeg -y -f concat -safe 0 -i {file} -c copy {output_path}"
logging.info(f"Running command: {cmd}") logging.info(f"Running command: {cmd}")
result = self.run_command(cmd) result = self.run_command(cmd)
if result != 0: if result != 0:
@@ -66,7 +67,13 @@ class VideoMaker:
def run_command(self, cmd: str) -> int: def run_command(self, cmd: str) -> int:
try: try:
subprocess.run(cmd, shell=True, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) subprocess.run(
cmd,
shell=True,
check=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
return 0 return 0
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
logging.error(f"Command failed with error: {e}") logging.error(f"Command failed with error: {e}")
@@ -74,7 +81,7 @@ class VideoMaker:
def video_to_frames_generator( def video_to_frames_generator(
self, video_path: Path, output_dir: Path, chunk_seconds: int = 10 self, video_path: Path, output_dir: Path, chunk_seconds: int = 10
) -> Generator[tuple[Path, ...], None, None]: ) -> Generator[tuple[np.ndarray, ...], None, None]:
"""Extracts frames from a video and saves them to disk, yielding paths to the saved frames.""" """Extracts frames from a video and saves them to disk, yielding paths to the saved frames."""
cap = cv2.VideoCapture(str(video_path)) cap = cv2.VideoCapture(str(video_path))
@@ -85,21 +92,56 @@ class VideoMaker:
fps = cap.get(cv2.CAP_PROP_FPS) fps = cap.get(cv2.CAP_PROP_FPS)
frames_per_chunk = int(fps * chunk_seconds) frames_per_chunk = int(fps * chunk_seconds)
frame_index = 0
while True: while True:
paths = [] paths = []
for _ in range(frames_per_chunk): for _ in range(frames_per_chunk):
ret, frame = cap.read() ret, frame = cap.read()
if not ret: if not ret:
cap.release() cap.release()
return return
paths.append(frame)
frame_path = output_dir / f"img_{frame_index:08d}.png"
cv2.imwrite(str(frame_path), frame)
paths.append(frame_path)
frame_index += 1
yield tuple(paths) yield tuple(paths)
def images_to_video_pipeline(
self,
frames: Iterable[np.ndarray],
output_path: Path,
width: int,
height: int,
fps: float,
):
pipeline = subprocess.Popen(
[
"ffmpeg",
"-y",
"-f", "rawvideo",
"-vcodec", "rawvideo",
"-pix_fmt", "bgr24",
"-s", f"{width}x{height}",
"-r", str(fps),
"-i", "-",
"-an",
"-vcodec", "libx264",
"-pix_fmt", "yuv420p",
str(output_path),
],
stdin=subprocess.PIPE,
stderr=subprocess.DEVNULL
)
if pipeline.stdin is None:
raise Exception("STDIN closed")
for frame in frames:
pipeline.stdin.write(frame.tobytes())
pipeline.stdin.close()
pipeline.wait()
def get_size(self, video_path: Path) -> tuple[int, int]:
cap = cv2.VideoCapture(str(video_path))
if not cap.isOpened():
raise ValueError(f"Cannot open video: {video_path}")
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
cap.release()
return width, height