from app.logger_config import logger as logging from fastrtc.utils import AdditionalOutputs from pydub import AudioSegment import asyncio import os import time import numpy as np # -------------------------------------------------------- # Utility functions # -------------------------------------------------------- def read_and_stream_audio(filepath_to_stream: str, session_id: str, stop_streaming_flags: dict): """ Read an audio file and stream it chunk by chunk (1s per chunk). Handles errors safely and reports structured messages to the client. """ if not session_id: yield from handle_stream_error("unknown", "No session_id provided.", stop_streaming_flags) return if not filepath_to_stream or not os.path.exists(filepath_to_stream): yield from handle_stream_error(session_id, f"Audio file not found: {filepath_to_stream}", stop_streaming_flags) return try: segment = AudioSegment.from_file(filepath_to_stream) chunk_duration_ms = 1000 total_chunks = len(segment) // chunk_duration_ms + 1 logging.info(f"[{session_id}] Starting audio streaming ({total_chunks} chunks).") for i, chunk in enumerate(segment[::chunk_duration_ms]): if _is_stop_requested(stop_streaming_flags): logging.info(f"[{session_id}] Stop signal received. Terminating stream.") break frame_rate = chunk.frame_rate samples = np.array(chunk.get_array_of_samples()).reshape(1, -1) progress = round(((i + 1) / total_chunks) * 100, 2) yield ((frame_rate, samples), AdditionalOutputs(progress)) logging.debug(f"[{session_id}] Sent chunk {i+1}/{total_chunks} ({progress}%).") time.sleep(1) # raise_function() # Optional injected test exception logging.info(f"[{session_id}] Audio streaming completed successfully.") except asyncio.CancelledError: yield from handle_stream_error(session_id, "Streaming cancelled by user.", stop_streaming_flags) except FileNotFoundError as e: yield from handle_stream_error(session_id, e, stop_streaming_flags) except Exception as e: yield from handle_stream_error(session_id, e, stop_streaming_flags) finally: if isinstance(stop_streaming_flags, dict): stop_streaming_flags["stop"] = False logging.info(f"[{session_id}] Stop flag reset.") yield (None, AdditionalOutputs("STREAM_DONE")) def handle_stream_error(session_id: str, error: Exception | str, stop_streaming_flags: dict | None = None): """ Handle streaming errors: - Log the error - Send structured info to client - Reset stop flag """ if isinstance(error, Exception): msg = f"{type(error).__name__}: {str(error)}" else: msg = str(error) logging.error(f"[{session_id}] Streaming error: {msg}", exc_info=isinstance(error, Exception)) if isinstance(stop_streaming_flags, dict): stop_streaming_flags["stop"] = False yield (None, AdditionalOutputs({"error": True, "message": msg})) yield (None, AdditionalOutputs("STREAM_DONE")) def _is_stop_requested(stop_streaming_flags: dict) -> bool: """Check if the stop signal was requested.""" if not isinstance(stop_streaming_flags, dict): return False return bool(stop_streaming_flags.get("stop", False)) def stop_streaming(session_id: str, stop_streaming_flags: dict): """Trigger the stop flag for active streaming.""" logging.info(f"[{session_id}] Stop button clicked — sending stop signal.") if not isinstance(stop_streaming_flags, dict): stop_streaming_flags = {"stop": True} else: stop_streaming_flags["stop"] = True return stop_streaming_flags