30 lines
840 B
Python
30 lines
840 B
Python
from app.core.celery import celery_app
|
|
from app.services import YouTubeService
|
|
from app.schemas import DownloadRequest
|
|
|
|
|
|
@celery_app.task(
|
|
bind=True,
|
|
name="download_youtube",
|
|
autoretry_for=(Exception,),
|
|
retry_kwargs={"max_retries": 3},
|
|
retry_backoff=True,
|
|
)
|
|
def download_youtube(self, url: str, quality: int) -> dict:
|
|
request = DownloadRequest(url=url, quality=quality)
|
|
response = YouTubeService.download(request)
|
|
return response.model_dump()
|
|
|
|
|
|
@celery_app.task(
|
|
bind=True,
|
|
name="download_youtube_v2",
|
|
autoretry_for=(Exception,),
|
|
retry_kwargs={"max_retries": 0},
|
|
retry_backoff=True,
|
|
)
|
|
def download_youtube_v2(self, url: str, quality: int) -> dict:
|
|
request = DownloadRequest(url=url, quality=quality)
|
|
response = YouTubeService.download_v2(request)
|
|
return response.model_dump()
|