Spaces:
Running
Running
File size: 4,750 Bytes
4ca6263 9c5218d 4ca6263 34995f6 a3951ff 34995f6 4ca6263 9c5218d 4ca6263 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
import os
import re
import shutil
import subprocess
NULL_SINK = "NUL" if os.name == "nt" else "/dev/null"
from fastapi import FastAPI, UploadFile, File
from fastapi.responses import JSONResponse
app = FastAPI(title="iRecite MVP API")
@app.get("/")
def root():
return {"status": "ok", "message": "iRecite MVP API is running. Go to /docs"}
@app.get("/")
def root():
return {"status": "ok", "message": "iRecite MVP API is running. Go to /docs"}
WORKDIR = os.path.dirname(os.path.abspath(__file__))
PYTHON = os.path.join(WORKDIR, ".venv", "Scripts", "python.exe")
UPLOADS = os.path.join(WORKDIR, "uploads")
OUTPUT_DIR = os.path.join(WORKDIR, "output")
API_JSON = os.path.join(OUTPUT_DIR, "api_response.json")
import sys
def run(cmd):
# Always run child scripts with the same Python interpreter as the server
if cmd and cmd[0].lower() == "python":
cmd = [sys.executable] + cmd[1:]
subprocess.check_call(cmd, cwd=WORKDIR)
def detect_trim_times(wav_path: str):
"""
Use ffmpeg silencedetect to get start/end of main speech.
Returns (start_sec, end_sec). If detection fails, returns (0, full_duration).
"""
# Run silencedetect and capture output
p = subprocess.run(
["ffmpeg", "-i", wav_path, "-af", "silencedetect=noise=-35dB:d=0.35", "-f", "null", NULL_SINK],
cwd=WORKDIR,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
encoding="utf-8",
errors="ignore"
)
txt = p.stdout
# Find first "silence_end" near the beginning (speech start)
# and last "silence_start" near the end (speech end)
silence_end = None
silence_start_last = None
for line in txt.splitlines():
if "silence_end:" in line:
m = re.search(r"silence_end:\s*([0-9.]+)", line)
if m and silence_end is None:
silence_end = float(m.group(1))
if "silence_start:" in line:
m = re.search(r"silence_start:\s*([0-9.]+)", line)
if m:
silence_start_last = float(m.group(1))
# Get full duration using ffprobe
pr = subprocess.run(
["ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=nw=1:nk=1", wav_path],
cwd=WORKDIR,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
try:
full_dur = float(pr.stdout.strip())
except Exception:
full_dur = None
start = max(0.0, (silence_end if silence_end is not None else 0.0))
end = (silence_start_last if silence_start_last is not None else (full_dur if full_dur is not None else 0.0))
# Sanity checks
if full_dur is not None:
end = min(end, full_dur)
if end <= start + 1.0:
# fallback: don't trim
return 0.0, full_dur if full_dur is not None else 0.0
# small padding
start = max(0.0, start - 0.10)
end = end + 0.10
if full_dur is not None:
end = min(end, full_dur)
return start, end
@app.post("/analyze")
async def analyze(file: UploadFile = File(...)):
os.makedirs(UPLOADS, exist_ok=True)
os.makedirs(OUTPUT_DIR, exist_ok=True)
# Save upload
upload_path = os.path.join(UPLOADS, file.filename)
with open(upload_path, "wb") as f:
shutil.copyfileobj(file.file, f)
# Convert to 16k mono wav
sample_wav = os.path.join(WORKDIR, "sample.wav")
run(["ffmpeg", "-y", "-i", upload_path, "-ac", "1", "-ar", "16000", sample_wav])
# Auto trim -> sample_trim.wav
sample_trim = os.path.join(WORKDIR, "sample_trim.wav")
start, end = detect_trim_times(sample_wav)
if end and end > start:
run(["ffmpeg", "-y", "-i", sample_wav, "-ss", f"{start:.2f}", "-to", f"{end:.2f}", "-ac", "1", "-ar", "16000", sample_trim])
else:
shutil.copy(sample_wav, sample_trim)
# Run pipeline (ordered)
run(["python", "step7_fallback_phonemes_and_madd.py"]) # ensures fallback json exists
run(["python", "step8_madd_signal.py"])
run(["python", "step9_madd_feedback_json.py"])
run(["python", "step13_arabic_ctc_transcribe.py"]) # now writes output/asr_raw.txt automatically
run(["python", "step14_align_text_to_canonical.py"])
run(["python", "step15_global_word_alignment.py"])
run(["python", "step16b_token_interpolation_timestamps.py"])
run(["python", "step17_make_api_response.py"])
if not os.path.exists(API_JSON):
return JSONResponse({"error": "api_response.json not generated"}, status_code=500)
import json
with open(API_JSON, "r", encoding="utf-8") as f:
data = json.load(f)
# include trim info for debugging
data["debug"] = {"trim": {"start": round(start, 2), "end": round(end, 2)}}
return data |