Files
youtube-microservice/app/api/v1/endpoints/tasks.py
2026-02-22 22:38:01 +05:00

26 lines
698 B
Python

from fastapi import APIRouter
from fastapi import HTTPException
from celery.result import AsyncResult
from app.core.celery import celery_app
from app.schemas import TaskStatusResponse
from app.schemas import DownloadResponse
router = APIRouter()
@router.get("/status/{task_id}", response_model=TaskStatusResponse)
def get_task_status(task_id: str):
result = AsyncResult(task_id, app=celery_app)
if result is None:
raise HTTPException(status_code=404, detail="Task not found")
response = TaskStatusResponse(
task_id=task_id,
status=result.status,
result=DownloadResponse(**result.result) if result.successful() else None,
)
return response