Tberox / app.py
understanding's picture
Create app.py
5e53604 verified
raw
history blame
4.19 kB
import httpx
import ffmpeg
import os
import sqlite3
import uvicorn
import uuid
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
from starlette.responses import FileResponse, JSONResponse
# --- Configuration ---
app = FastAPI()
DOWNLOAD_DIR = "downloads"
ENCODED_DIR = "encoded"
DB_FILE = "jobs.db"
os.makedirs(DOWNLOAD_DIR, exist_ok=True)
os.makedirs(ENCODED_DIR, exist_ok=True)
# ---------------------
# --- Database Setup ---
def get_db():
conn = sqlite3.connect(DB_FILE)
conn.row_factory = sqlite3.Row
return conn
def init_db():
with get_db() as conn:
conn.execute('''
CREATE TABLE IF NOT EXISTS jobs (
task_id TEXT PRIMARY KEY,
status TEXT NOT NULL,
file_name TEXT,
error_message TEXT
)
''')
conn.commit()
# --- Background Re-encoding Task ---
def process_video_task(url: str, task_id: str):
db = get_db()
temp_in_path = os.path.join(DOWNLOAD_DIR, f"{task_id}_in.mp4")
temp_out_path = os.path.join(ENCODED_DIR, f"{task_id}_out.mp4")
try:
# 1. Update DB: Downloading
db.execute("UPDATE jobs SET status = 'downloading' WHERE task_id = ?", (task_id,))
db.commit()
with httpx.stream("GET", url, follow_redirects=True, timeout=600.0) as r:
r.raise_for_status()
with open(temp_in_path, 'wb') as f:
for chunk in r.iter_bytes(chunk_size=8192):
f.write(chunk)
# 2. Update DB: Encoding
db.execute("UPDATE jobs SET status = 'encoding' WHERE task_id = ?", (task_id,))
db.commit()
# ⚠️ THIS IS THE 100% CPU RE-ENCODE YOU WANTED ⚠️
(
ffmpeg
.input(temp_in_path)
.output(temp_out_path, vcodec='libx264', crf=23, acodec='aac', movflags='+faststart')
.run(capture_stdout=True, capture_stderr=True)
)
# 3. Update DB: Complete
final_file_name = f"{task_id}_out.mp4"
db.execute("UPDATE jobs SET status = 'complete', file_name = ? WHERE task_id = ?", (final_file_name, task_id))
db.commit()
except Exception as e:
error_msg = str(e)
if hasattr(e, 'stderr'): # Get error from ffmpeg
error_msg = e.stderr.decode()
print(f"Error processing task {task_id}: {error_msg}")
db.execute("UPDATE jobs SET status = 'error', error_message = ? WHERE task_id = ?", (error_msg, task_id))
db.commit()
finally:
if os.path.exists(temp_in_path):
os.remove(temp_in_path)
db.close()
# --- API Endpoints ---
class VideoRequest(BaseModel):
url: str
@app.post("/process")
async def start_processing(request: VideoRequest, background_tasks: BackgroundTasks):
""" START the encoding job. """
task_id = str(uuid.uuid4())
with get_db() as db:
db.execute("INSERT INTO jobs (task_id, status) VALUES (?, 'queued')", (task_id,))
db.commit()
background_tasks.add_task(process_video_task, request.url, task_id)
return {"status": "queued", "task_id": task_id}
@app.get("/status/{task_id}")
async def get_status(task_id: str):
""" CHECK the status of the encoding job. """
with get_db() as db:
job = db.execute("SELECT * FROM jobs WHERE task_id = ?", (task_id,)).fetchone()
if not job:
return JSONResponse(status_code=404, content={"status": "not_found"})
return dict(job) # Convert DB row to a dict
@app.get("/download/{file_name}")
async def download_file(file_name: str):
""" DOWNLOAD the final re-encoded file. """
file_path = os.path.join(ENCODED_DIR, file_name)
if not os.path.exists(file_path):
return JSONResponse(status_code=404, content={"status": "file_not_found"})
return FileResponse(file_path, media_type='video/mp4', filename=file_name)
# --- Server Start ---
@app.on_event("startup")
async def startup_event():
init_db() # Create the database and table on startup
print("Database initialized.")
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=7860)