Spaces:
Running
Running
| import os | |
| import re | |
| import shutil | |
| import subprocess | |
| NULL_SINK = "NUL" if os.name == "nt" else "/dev/null" | |
| from fastapi import FastAPI, UploadFile, File | |
| from fastapi.responses import JSONResponse | |
| app = FastAPI(title="iRecite MVP API") | |
| def root(): | |
| return {"status": "ok", "message": "iRecite MVP API is running. Go to /docs"} | |
| def root(): | |
| return {"status": "ok", "message": "iRecite MVP API is running. Go to /docs"} | |
| WORKDIR = os.path.dirname(os.path.abspath(__file__)) | |
| PYTHON = os.path.join(WORKDIR, ".venv", "Scripts", "python.exe") | |
| UPLOADS = os.path.join(WORKDIR, "uploads") | |
| OUTPUT_DIR = os.path.join(WORKDIR, "output") | |
| API_JSON = os.path.join(OUTPUT_DIR, "api_response.json") | |
| import sys | |
| def run(cmd): | |
| # Always run child scripts with the same Python interpreter as the server | |
| if cmd and cmd[0].lower() == "python": | |
| cmd = [sys.executable] + cmd[1:] | |
| subprocess.check_call(cmd, cwd=WORKDIR) | |
| def detect_trim_times(wav_path: str): | |
| """ | |
| Use ffmpeg silencedetect to get start/end of main speech. | |
| Returns (start_sec, end_sec). If detection fails, returns (0, full_duration). | |
| """ | |
| # Run silencedetect and capture output | |
| p = subprocess.run( | |
| ["ffmpeg", "-i", wav_path, "-af", "silencedetect=noise=-35dB:d=0.35", "-f", "null", NULL_SINK], | |
| cwd=WORKDIR, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.STDOUT, | |
| text=True, | |
| encoding="utf-8", | |
| errors="ignore" | |
| ) | |
| txt = p.stdout | |
| # Find first "silence_end" near the beginning (speech start) | |
| # and last "silence_start" near the end (speech end) | |
| silence_end = None | |
| silence_start_last = None | |
| for line in txt.splitlines(): | |
| if "silence_end:" in line: | |
| m = re.search(r"silence_end:\s*([0-9.]+)", line) | |
| if m and silence_end is None: | |
| silence_end = float(m.group(1)) | |
| if "silence_start:" in line: | |
| m = re.search(r"silence_start:\s*([0-9.]+)", line) | |
| if m: | |
| silence_start_last = float(m.group(1)) | |
| # Get full duration using ffprobe | |
| pr = subprocess.run( | |
| ["ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=nw=1:nk=1", wav_path], | |
| cwd=WORKDIR, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| text=True | |
| ) | |
| try: | |
| full_dur = float(pr.stdout.strip()) | |
| except Exception: | |
| full_dur = None | |
| start = max(0.0, (silence_end if silence_end is not None else 0.0)) | |
| end = (silence_start_last if silence_start_last is not None else (full_dur if full_dur is not None else 0.0)) | |
| # Sanity checks | |
| if full_dur is not None: | |
| end = min(end, full_dur) | |
| if end <= start + 1.0: | |
| # fallback: don't trim | |
| return 0.0, full_dur if full_dur is not None else 0.0 | |
| # small padding | |
| start = max(0.0, start - 0.10) | |
| end = end + 0.10 | |
| if full_dur is not None: | |
| end = min(end, full_dur) | |
| return start, end | |
| async def analyze(file: UploadFile = File(...)): | |
| os.makedirs(UPLOADS, exist_ok=True) | |
| os.makedirs(OUTPUT_DIR, exist_ok=True) | |
| # Save upload | |
| upload_path = os.path.join(UPLOADS, file.filename) | |
| with open(upload_path, "wb") as f: | |
| shutil.copyfileobj(file.file, f) | |
| # Convert to 16k mono wav | |
| sample_wav = os.path.join(WORKDIR, "sample.wav") | |
| run(["ffmpeg", "-y", "-i", upload_path, "-ac", "1", "-ar", "16000", sample_wav]) | |
| # Auto trim -> sample_trim.wav | |
| sample_trim = os.path.join(WORKDIR, "sample_trim.wav") | |
| start, end = detect_trim_times(sample_wav) | |
| if end and end > start: | |
| run(["ffmpeg", "-y", "-i", sample_wav, "-ss", f"{start:.2f}", "-to", f"{end:.2f}", "-ac", "1", "-ar", "16000", sample_trim]) | |
| else: | |
| shutil.copy(sample_wav, sample_trim) | |
| # Run pipeline (ordered) | |
| run(["python", "step7_fallback_phonemes_and_madd.py"]) # ensures fallback json exists | |
| run(["python", "step8_madd_signal.py"]) | |
| run(["python", "step9_madd_feedback_json.py"]) | |
| run(["python", "step13_arabic_ctc_transcribe.py"]) # now writes output/asr_raw.txt automatically | |
| run(["python", "step14_align_text_to_canonical.py"]) | |
| run(["python", "step15_global_word_alignment.py"]) | |
| run(["python", "step16b_token_interpolation_timestamps.py"]) | |
| run(["python", "step17_make_api_response.py"]) | |
| if not os.path.exists(API_JSON): | |
| return JSONResponse({"error": "api_response.json not generated"}, status_code=500) | |
| import json | |
| with open(API_JSON, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| # include trim info for debugging | |
| data["debug"] = {"trim": {"start": round(start, 2), "end": round(end, 2)}} | |
| return data |