Сделал скелет того, как примерно будет реализован сервис

This commit is contained in:
Viner Abubakirov
2026-02-21 00:19:09 +05:00
parent 8ac132e503
commit 51ec17381f
9 changed files with 62 additions and 34 deletions

View File

@@ -1,4 +1,4 @@
# from app.main import app
from app.main import app
# __all__ = ["app"]
__all__ = ["app"]

View File

@@ -0,0 +1,12 @@
from fastapi import APIRouter
from app.services import YouTubeService
from app.schemas import DownloadResponse
from app.schemas import DownloadRequest
router = APIRouter()
@router.post("/", response_model=DownloadResponse)
async def download_video(data: DownloadRequest):
response_data = YouTubeService.download(data)
return response_data

View File

@@ -0,0 +1,8 @@
from fastapi import APIRouter
from app.api.v1.endpoints import youtube
router = APIRouter()
router.include_router(youtube.router, prefix="/youtube", tags=["YouTube"])

View File

@@ -1,25 +1,11 @@
from app.utils.downloader import HttpStreamingDownloader
from app.utils.uploader import DiskChunkUploadBackend
from app.utils.youtube import YtDlpInfo
from fastapi import FastAPI
from app.api.v1.router import router as v1_router
app = FastAPI()
app.include_router(v1_router, prefix="/api/v1")
def download(url: str):
upload_backend = DiskChunkUploadBackend("trash_holder")
downloader = HttpStreamingDownloader(upload_backend)
youtube = YtDlpInfo(url)
video_url = youtube.get_video_url("480p")
video_name = youtube.title + ".mp4"
audio_url = youtube.get_audio_url()
audio_name = youtube.title + ".mp4a"
# downloader.download(video_url, video_name)
downloader.download(audio_url, audio_name)
def main():
url = "https://youtu.be/OSAOh4L41Wg"
download(url)
if __name__ == "__main__":
main()
@app.get("/ping")
async def ping() -> str:
return "Pong"

View File

@@ -3,6 +3,9 @@ from pydantic import BaseModel
class DownloadRequest(BaseModel):
url: str
quality: str
codec: str
identifier: str
quality: int
class DownloadResponse(BaseModel):
video: str
audio: str

View File

@@ -0,0 +1,13 @@
from app.utils.youtube import YtDlpManager
from app.utils.uploader import S3ChunkUploadBackend
from app.schemas import DownloadRequest, DownloadResponse
class YouTubeService:
@staticmethod
def download(data: DownloadRequest):
key_prefix = f"{data.url}@{data.quality}@"
s3 = S3ChunkUploadBackend(key_prefix)
manager = YtDlpManager(data.url, s3)
video_url = manager.download_video(data.quality)
audio_url = manager.download_audio()
return DownloadResponse(video=video_url, audio=audio_url)

View File

@@ -15,7 +15,7 @@ class ChunkUploadBackend(ABC):
"""Загрузка очередного чанка"""
@abstractmethod
def finish(self) -> any:
def finish(self) -> str:
"""Завершение загрузки"""
@abstractmethod
@@ -109,7 +109,7 @@ class S3ChunkUploadBackend(ChunkUploadBackend):
)
# Сбрасываем части
self.parts = []
return response
return response["Location"]
def abort(self) -> None:
if self.upload_id:

View File

@@ -5,6 +5,9 @@ from dataclasses import dataclass
from app.utils.uploader import ChunkUploadBackend
RESOLUTIONS = [240, 360, 480, 720, 1080, 1440, 2160]
@dataclass
class MediaContent:
url: str
@@ -36,7 +39,9 @@ class YtDlpManager:
"""Возвращает title видео"""
return self.info.get("title", "unknown")
def download_video(self, size: Literal[240, 360, 480, 720, 1080, 1440, 2160]):
def download_video(self, size: int):
if size not in RESOLUTIONS:
raise RuntimeError("Unsupported quality")
command = [
"yt-dlp",
"-f",

View File

@@ -4,15 +4,16 @@ from app.utils.youtube import YtDlpManager
def download(url: str):
from pprint import pprint
# upload_backend = DiskChunkUploadBackend("trash_holder")
upload_backend = S3ChunkUploadBackend("2")
upload_backend = S3ChunkUploadBackend("3")
youtube = YtDlpManager(url, upload_backend)
print("Download Video")
res = youtube.download_video(360)
print(res)
pprint(res)
print("Download Audio")
res = youtube.download_audio()
print(res)
pprint(res)
print("Success")
def main():