130 lines
4.2 KiB
Python
130 lines
4.2 KiB
Python
import yt_dlp
|
||
from app.core.config import settings
|
||
|
||
class MediaInfo:
|
||
def __init__(self, format: dict, id: int):
|
||
self.__format = format
|
||
self.id = id
|
||
|
||
@property
|
||
def filesize(self):
|
||
return self.__format.get("filesize")
|
||
|
||
@property
|
||
def codec(self):
|
||
if vcodec := self.__format.get("vcodec"):
|
||
return vcodec
|
||
return self.__format.get("acodec")
|
||
|
||
|
||
class YtDlpManager:
|
||
def __init__(self, url: str):
|
||
self.url = url
|
||
self._extract_info()
|
||
self._set_video_codecs()
|
||
|
||
@property
|
||
def resolutions(self):
|
||
return sorted(self._resolutions.keys())
|
||
|
||
def best_audio(self) -> MediaInfo | None:
|
||
"""
|
||
Получает аудио дорожку с наилучшим качеством звучания
|
||
"""
|
||
if self.info.get("acodec") in ("none", None):
|
||
return None
|
||
ids = str(self.info.get("format_id", "")).split("+")
|
||
if len(ids) == 2:
|
||
audio_id = ids[1]
|
||
else:
|
||
audio_id = ids[0]
|
||
|
||
for f in self.info["formats"]:
|
||
if f.get("format_id") == audio_id:
|
||
return MediaInfo(f, audio_id)
|
||
return None
|
||
|
||
def best_video(self, height: int | None = None, codec: str | None = None):
|
||
"""
|
||
Возвращает видео дорожку с наилучшим качеством с указанными параметрами
|
||
"""
|
||
if height is None:
|
||
if codec is None:
|
||
height = self.info.get("height")
|
||
else:
|
||
height = max(self._codecs.get(codec, [0]))
|
||
|
||
if not self._video_exist(codec, height):
|
||
return None
|
||
|
||
iterator = reversed(self.info["formats"])
|
||
for f in iterator:
|
||
if f.get("height", 0) == height:
|
||
if codec is None:
|
||
return MediaInfo(f, f.get("format_id"))
|
||
if f.get("codec") == codec:
|
||
return MediaInfo(f, f.get("format_id"))
|
||
return None
|
||
|
||
def download(self, video_id: str | None = None, audio_id: str | None = None):
|
||
if video_id is None and audio_id is None:
|
||
format_id = self.info.get(
|
||
"format_id",
|
||
)
|
||
else:
|
||
format_id = "" + str(video_id) if video_id is not None else ""
|
||
if audio_id is not None:
|
||
if len(format_id) > 0:
|
||
format_id += "+"
|
||
format_id += str(audio_id)
|
||
|
||
ydl_opts = {
|
||
"format": f"{video_id}+{audio_id}",
|
||
"merge_output_format": "mp4",
|
||
"outtmpl": f"{settings.MEDIA_DIR}/%(title)s.%(ext)s",
|
||
}
|
||
|
||
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
||
info = ydl.extract_info(self.url, download=True)
|
||
return ydl.prepare_filename(info)
|
||
|
||
def _extract_info(self):
|
||
ydl_opts = {
|
||
"quiet": True,
|
||
"no_warnings": True,
|
||
"extract_flat": False,
|
||
}
|
||
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
||
self.info = ydl.extract_info(self.url, download=False)
|
||
if self.info is None:
|
||
raise Exception("Не удалось получить информацию о видео")
|
||
|
||
def _set_video_codecs(self):
|
||
self._codecs: dict[str, list[int]] = {}
|
||
self._resolutions: dict[int, list[str]] = {}
|
||
for f in self.info["formats"]:
|
||
codec = f.get("vcodec")
|
||
if codec in ("none", None):
|
||
continue
|
||
|
||
height = f.get("height")
|
||
if height not in self._resolutions:
|
||
self._resolutions[height] = []
|
||
if codec not in self._codecs:
|
||
self._codecs[codec] = []
|
||
self._codecs[codec].append(height)
|
||
self._resolutions[height].append(codec)
|
||
|
||
def _video_exist(self, codec: str = "", resolution: int = 0):
|
||
if codec:
|
||
if resolution:
|
||
if resolutions := self._codecs.get(codec, []):
|
||
return resolution in resolutions
|
||
return True
|
||
if resolution:
|
||
if codec:
|
||
if codecs := self._resolutions.get(resolution, []):
|
||
return codec in codecs
|
||
return True
|
||
return False
|