Spaces:
Running
on
Zero
Running
on
Zero
File size: 3,816 Bytes
6f523af |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
from app.logger_config import logger as logging
from fastrtc.utils import AdditionalOutputs
from pydub import AudioSegment
import asyncio
import os
import time
import numpy as np
# --------------------------------------------------------
# Utility functions
# --------------------------------------------------------
def read_and_stream_audio(filepath_to_stream: str, session_id: str, stop_streaming_flags: dict):
"""
Read an audio file and stream it chunk by chunk (1s per chunk).
Handles errors safely and reports structured messages to the client.
"""
if not session_id:
yield from handle_stream_error("unknown", "No session_id provided.", stop_streaming_flags)
return
if not filepath_to_stream or not os.path.exists(filepath_to_stream):
yield from handle_stream_error(session_id, f"Audio file not found: {filepath_to_stream}", stop_streaming_flags)
return
try:
segment = AudioSegment.from_file(filepath_to_stream)
chunk_duration_ms = 1000
total_chunks = len(segment) // chunk_duration_ms + 1
logging.info(f"[{session_id}] Starting audio streaming ({total_chunks} chunks).")
for i, chunk in enumerate(segment[::chunk_duration_ms]):
if _is_stop_requested(stop_streaming_flags):
logging.info(f"[{session_id}] Stop signal received. Terminating stream.")
break
frame_rate = chunk.frame_rate
samples = np.array(chunk.get_array_of_samples()).reshape(1, -1)
progress = round(((i + 1) / total_chunks) * 100, 2)
yield ((frame_rate, samples), AdditionalOutputs(progress))
logging.debug(f"[{session_id}] Sent chunk {i+1}/{total_chunks} ({progress}%).")
time.sleep(1)
# raise_function() # Optional injected test exception
logging.info(f"[{session_id}] Audio streaming completed successfully.")
except asyncio.CancelledError:
yield from handle_stream_error(session_id, "Streaming cancelled by user.", stop_streaming_flags)
except FileNotFoundError as e:
yield from handle_stream_error(session_id, e, stop_streaming_flags)
except Exception as e:
yield from handle_stream_error(session_id, e, stop_streaming_flags)
finally:
if isinstance(stop_streaming_flags, dict):
stop_streaming_flags["stop"] = False
logging.info(f"[{session_id}] Stop flag reset.")
yield (None, AdditionalOutputs("STREAM_DONE"))
def handle_stream_error(session_id: str, error: Exception | str, stop_streaming_flags: dict | None = None):
"""
Handle streaming errors:
- Log the error
- Send structured info to client
- Reset stop flag
"""
if isinstance(error, Exception):
msg = f"{type(error).__name__}: {str(error)}"
else:
msg = str(error)
logging.error(f"[{session_id}] Streaming error: {msg}", exc_info=isinstance(error, Exception))
if isinstance(stop_streaming_flags, dict):
stop_streaming_flags["stop"] = False
yield (None, AdditionalOutputs({"error": True, "message": msg}))
yield (None, AdditionalOutputs("STREAM_DONE"))
def _is_stop_requested(stop_streaming_flags: dict) -> bool:
"""Check if the stop signal was requested."""
if not isinstance(stop_streaming_flags, dict):
return False
return bool(stop_streaming_flags.get("stop", False))
def stop_streaming(session_id: str, stop_streaming_flags: dict):
"""Trigger the stop flag for active streaming."""
logging.info(f"[{session_id}] Stop button clicked — sending stop signal.")
if not isinstance(stop_streaming_flags, dict):
stop_streaming_flags = {"stop": True}
else:
stop_streaming_flags["stop"] = True
return stop_streaming_flags
|