iRecite-MVP-API / app.py
didodev
Add root healthcheck
a3951ff
import os
import re
import shutil
import subprocess
NULL_SINK = "NUL" if os.name == "nt" else "/dev/null"
from fastapi import FastAPI, UploadFile, File
from fastapi.responses import JSONResponse
app = FastAPI(title="iRecite MVP API")
@app.get("/")
def root():
return {"status": "ok", "message": "iRecite MVP API is running. Go to /docs"}
@app.get("/")
def root():
return {"status": "ok", "message": "iRecite MVP API is running. Go to /docs"}
WORKDIR = os.path.dirname(os.path.abspath(__file__))
PYTHON = os.path.join(WORKDIR, ".venv", "Scripts", "python.exe")
UPLOADS = os.path.join(WORKDIR, "uploads")
OUTPUT_DIR = os.path.join(WORKDIR, "output")
API_JSON = os.path.join(OUTPUT_DIR, "api_response.json")
import sys
def run(cmd):
# Always run child scripts with the same Python interpreter as the server
if cmd and cmd[0].lower() == "python":
cmd = [sys.executable] + cmd[1:]
subprocess.check_call(cmd, cwd=WORKDIR)
def detect_trim_times(wav_path: str):
"""
Use ffmpeg silencedetect to get start/end of main speech.
Returns (start_sec, end_sec). If detection fails, returns (0, full_duration).
"""
# Run silencedetect and capture output
p = subprocess.run(
["ffmpeg", "-i", wav_path, "-af", "silencedetect=noise=-35dB:d=0.35", "-f", "null", NULL_SINK],
cwd=WORKDIR,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
encoding="utf-8",
errors="ignore"
)
txt = p.stdout
# Find first "silence_end" near the beginning (speech start)
# and last "silence_start" near the end (speech end)
silence_end = None
silence_start_last = None
for line in txt.splitlines():
if "silence_end:" in line:
m = re.search(r"silence_end:\s*([0-9.]+)", line)
if m and silence_end is None:
silence_end = float(m.group(1))
if "silence_start:" in line:
m = re.search(r"silence_start:\s*([0-9.]+)", line)
if m:
silence_start_last = float(m.group(1))
# Get full duration using ffprobe
pr = subprocess.run(
["ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=nw=1:nk=1", wav_path],
cwd=WORKDIR,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
try:
full_dur = float(pr.stdout.strip())
except Exception:
full_dur = None
start = max(0.0, (silence_end if silence_end is not None else 0.0))
end = (silence_start_last if silence_start_last is not None else (full_dur if full_dur is not None else 0.0))
# Sanity checks
if full_dur is not None:
end = min(end, full_dur)
if end <= start + 1.0:
# fallback: don't trim
return 0.0, full_dur if full_dur is not None else 0.0
# small padding
start = max(0.0, start - 0.10)
end = end + 0.10
if full_dur is not None:
end = min(end, full_dur)
return start, end
@app.post("/analyze")
async def analyze(file: UploadFile = File(...)):
os.makedirs(UPLOADS, exist_ok=True)
os.makedirs(OUTPUT_DIR, exist_ok=True)
# Save upload
upload_path = os.path.join(UPLOADS, file.filename)
with open(upload_path, "wb") as f:
shutil.copyfileobj(file.file, f)
# Convert to 16k mono wav
sample_wav = os.path.join(WORKDIR, "sample.wav")
run(["ffmpeg", "-y", "-i", upload_path, "-ac", "1", "-ar", "16000", sample_wav])
# Auto trim -> sample_trim.wav
sample_trim = os.path.join(WORKDIR, "sample_trim.wav")
start, end = detect_trim_times(sample_wav)
if end and end > start:
run(["ffmpeg", "-y", "-i", sample_wav, "-ss", f"{start:.2f}", "-to", f"{end:.2f}", "-ac", "1", "-ar", "16000", sample_trim])
else:
shutil.copy(sample_wav, sample_trim)
# Run pipeline (ordered)
run(["python", "step7_fallback_phonemes_and_madd.py"]) # ensures fallback json exists
run(["python", "step8_madd_signal.py"])
run(["python", "step9_madd_feedback_json.py"])
run(["python", "step13_arabic_ctc_transcribe.py"]) # now writes output/asr_raw.txt automatically
run(["python", "step14_align_text_to_canonical.py"])
run(["python", "step15_global_word_alignment.py"])
run(["python", "step16b_token_interpolation_timestamps.py"])
run(["python", "step17_make_api_response.py"])
if not os.path.exists(API_JSON):
return JSONResponse({"error": "api_response.json not generated"}, status_code=500)
import json
with open(API_JSON, "r", encoding="utf-8") as f:
data = json.load(f)
# include trim info for debugging
data["debug"] = {"trim": {"start": round(start, 2), "end": round(end, 2)}}
return data