import os import re import shutil import subprocess NULL_SINK = "NUL" if os.name == "nt" else "/dev/null" from fastapi import FastAPI, UploadFile, File from fastapi.responses import JSONResponse app = FastAPI(title="iRecite MVP API") @app.get("/") def root(): return {"status": "ok", "message": "iRecite MVP API is running. Go to /docs"} @app.get("/") def root(): return {"status": "ok", "message": "iRecite MVP API is running. Go to /docs"} WORKDIR = os.path.dirname(os.path.abspath(__file__)) PYTHON = os.path.join(WORKDIR, ".venv", "Scripts", "python.exe") UPLOADS = os.path.join(WORKDIR, "uploads") OUTPUT_DIR = os.path.join(WORKDIR, "output") API_JSON = os.path.join(OUTPUT_DIR, "api_response.json") import sys def run(cmd): # Always run child scripts with the same Python interpreter as the server if cmd and cmd[0].lower() == "python": cmd = [sys.executable] + cmd[1:] subprocess.check_call(cmd, cwd=WORKDIR) def detect_trim_times(wav_path: str): """ Use ffmpeg silencedetect to get start/end of main speech. Returns (start_sec, end_sec). If detection fails, returns (0, full_duration). """ # Run silencedetect and capture output p = subprocess.run( ["ffmpeg", "-i", wav_path, "-af", "silencedetect=noise=-35dB:d=0.35", "-f", "null", NULL_SINK], cwd=WORKDIR, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, encoding="utf-8", errors="ignore" ) txt = p.stdout # Find first "silence_end" near the beginning (speech start) # and last "silence_start" near the end (speech end) silence_end = None silence_start_last = None for line in txt.splitlines(): if "silence_end:" in line: m = re.search(r"silence_end:\s*([0-9.]+)", line) if m and silence_end is None: silence_end = float(m.group(1)) if "silence_start:" in line: m = re.search(r"silence_start:\s*([0-9.]+)", line) if m: silence_start_last = float(m.group(1)) # Get full duration using ffprobe pr = subprocess.run( ["ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=nw=1:nk=1", wav_path], cwd=WORKDIR, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) try: full_dur = float(pr.stdout.strip()) except Exception: full_dur = None start = max(0.0, (silence_end if silence_end is not None else 0.0)) end = (silence_start_last if silence_start_last is not None else (full_dur if full_dur is not None else 0.0)) # Sanity checks if full_dur is not None: end = min(end, full_dur) if end <= start + 1.0: # fallback: don't trim return 0.0, full_dur if full_dur is not None else 0.0 # small padding start = max(0.0, start - 0.10) end = end + 0.10 if full_dur is not None: end = min(end, full_dur) return start, end @app.post("/analyze") async def analyze(file: UploadFile = File(...)): os.makedirs(UPLOADS, exist_ok=True) os.makedirs(OUTPUT_DIR, exist_ok=True) # Save upload upload_path = os.path.join(UPLOADS, file.filename) with open(upload_path, "wb") as f: shutil.copyfileobj(file.file, f) # Convert to 16k mono wav sample_wav = os.path.join(WORKDIR, "sample.wav") run(["ffmpeg", "-y", "-i", upload_path, "-ac", "1", "-ar", "16000", sample_wav]) # Auto trim -> sample_trim.wav sample_trim = os.path.join(WORKDIR, "sample_trim.wav") start, end = detect_trim_times(sample_wav) if end and end > start: run(["ffmpeg", "-y", "-i", sample_wav, "-ss", f"{start:.2f}", "-to", f"{end:.2f}", "-ac", "1", "-ar", "16000", sample_trim]) else: shutil.copy(sample_wav, sample_trim) # Run pipeline (ordered) run(["python", "step7_fallback_phonemes_and_madd.py"]) # ensures fallback json exists run(["python", "step8_madd_signal.py"]) run(["python", "step9_madd_feedback_json.py"]) run(["python", "step13_arabic_ctc_transcribe.py"]) # now writes output/asr_raw.txt automatically run(["python", "step14_align_text_to_canonical.py"]) run(["python", "step15_global_word_alignment.py"]) run(["python", "step16b_token_interpolation_timestamps.py"]) run(["python", "step17_make_api_response.py"]) if not os.path.exists(API_JSON): return JSONResponse({"error": "api_response.json not generated"}, status_code=500) import json with open(API_JSON, "r", encoding="utf-8") as f: data = json.load(f) # include trim info for debugging data["debug"] = {"trim": {"start": round(start, 2), "end": round(end, 2)}} return data