import httpx import ffmpeg import os import sqlite3 import uvicorn import uuid from fastapi import FastAPI, BackgroundTasks from pydantic import BaseModel from starlette.responses import FileResponse, JSONResponse # --- Configuration --- app = FastAPI() DOWNLOAD_DIR = "downloads" ENCODED_DIR = "encoded" DB_FILE = "jobs.db" os.makedirs(DOWNLOAD_DIR, exist_ok=True) os.makedirs(ENCODED_DIR, exist_ok=True) # --------------------- # --- Database Setup --- def get_db(): conn = sqlite3.connect(DB_FILE) conn.row_factory = sqlite3.Row return conn def init_db(): with get_db() as conn: conn.execute(''' CREATE TABLE IF NOT EXISTS jobs ( task_id TEXT PRIMARY KEY, status TEXT NOT NULL, file_name TEXT, error_message TEXT ) ''') conn.commit() # --- Background Re-encoding Task --- def process_video_task(url: str, task_id: str): db = get_db() temp_in_path = os.path.join(DOWNLOAD_DIR, f"{task_id}_in.mp4") temp_out_path = os.path.join(ENCODED_DIR, f"{task_id}_out.mp4") try: # 1. Update DB: Downloading db.execute("UPDATE jobs SET status = 'downloading' WHERE task_id = ?", (task_id,)) db.commit() with httpx.stream("GET", url, follow_redirects=True, timeout=600.0) as r: r.raise_for_status() with open(temp_in_path, 'wb') as f: for chunk in r.iter_bytes(chunk_size=8192): f.write(chunk) # 2. Update DB: Encoding db.execute("UPDATE jobs SET status = 'encoding' WHERE task_id = ?", (task_id,)) db.commit() # ⚠️ THIS IS THE 100% CPU RE-ENCODE YOU WANTED ⚠️ ( ffmpeg .input(temp_in_path) .output(temp_out_path, vcodec='libx264', crf=23, acodec='aac', movflags='+faststart') .run(capture_stdout=True, capture_stderr=True) ) # 3. Update DB: Complete final_file_name = f"{task_id}_out.mp4" db.execute("UPDATE jobs SET status = 'complete', file_name = ? WHERE task_id = ?", (final_file_name, task_id)) db.commit() except Exception as e: error_msg = str(e) if hasattr(e, 'stderr'): # Get error from ffmpeg error_msg = e.stderr.decode() print(f"Error processing task {task_id}: {error_msg}") db.execute("UPDATE jobs SET status = 'error', error_message = ? WHERE task_id = ?", (error_msg, task_id)) db.commit() finally: if os.path.exists(temp_in_path): os.remove(temp_in_path) db.close() # --- API Endpoints --- class VideoRequest(BaseModel): url: str @app.post("/process") async def start_processing(request: VideoRequest, background_tasks: BackgroundTasks): """ START the encoding job. """ task_id = str(uuid.uuid4()) with get_db() as db: db.execute("INSERT INTO jobs (task_id, status) VALUES (?, 'queued')", (task_id,)) db.commit() background_tasks.add_task(process_video_task, request.url, task_id) return {"status": "queued", "task_id": task_id} @app.get("/status/{task_id}") async def get_status(task_id: str): """ CHECK the status of the encoding job. """ with get_db() as db: job = db.execute("SELECT * FROM jobs WHERE task_id = ?", (task_id,)).fetchone() if not job: return JSONResponse(status_code=404, content={"status": "not_found"}) return dict(job) # Convert DB row to a dict @app.get("/download/{file_name}") async def download_file(file_name: str): """ DOWNLOAD the final re-encoded file. """ file_path = os.path.join(ENCODED_DIR, file_name) if not os.path.exists(file_path): return JSONResponse(status_code=404, content={"status": "file_not_found"}) return FileResponse(file_path, media_type='video/mp4', filename=file_name) # --- Server Start --- @app.on_event("startup") async def startup_event(): init_db() # Create the database and table on startup print("Database initialized.") if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=7860)