File size: 3,816 Bytes
6f523af
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103

from app.logger_config import logger as logging
from fastrtc.utils import AdditionalOutputs
from pydub import AudioSegment
import asyncio
import os
import time
import numpy as np

# --------------------------------------------------------
# Utility functions
# --------------------------------------------------------

def read_and_stream_audio(filepath_to_stream: str, session_id: str, stop_streaming_flags: dict):
    """
    Read an audio file and stream it chunk by chunk (1s per chunk).
    Handles errors safely and reports structured messages to the client.
    """
    if not session_id:
        yield from handle_stream_error("unknown", "No session_id provided.", stop_streaming_flags)
        return

    if not filepath_to_stream or not os.path.exists(filepath_to_stream):
        yield from handle_stream_error(session_id, f"Audio file not found: {filepath_to_stream}", stop_streaming_flags)
        return

    try:
        segment = AudioSegment.from_file(filepath_to_stream)
        chunk_duration_ms = 1000
        total_chunks = len(segment) // chunk_duration_ms + 1
        logging.info(f"[{session_id}] Starting audio streaming ({total_chunks} chunks).")

        for i, chunk in enumerate(segment[::chunk_duration_ms]):
            if _is_stop_requested(stop_streaming_flags):
                logging.info(f"[{session_id}] Stop signal received. Terminating stream.")
                break

            frame_rate = chunk.frame_rate
            samples = np.array(chunk.get_array_of_samples()).reshape(1, -1)
            progress = round(((i + 1) / total_chunks) * 100, 2)

            yield ((frame_rate, samples), AdditionalOutputs(progress))
            logging.debug(f"[{session_id}] Sent chunk {i+1}/{total_chunks} ({progress}%).")

            time.sleep(1)
            # raise_function()  # Optional injected test exception

        logging.info(f"[{session_id}] Audio streaming completed successfully.")

    except asyncio.CancelledError:
        yield from handle_stream_error(session_id, "Streaming cancelled by user.", stop_streaming_flags)
    except FileNotFoundError as e:
        yield from handle_stream_error(session_id, e, stop_streaming_flags)
    except Exception as e:
        yield from handle_stream_error(session_id, e, stop_streaming_flags)
    finally:
        if isinstance(stop_streaming_flags, dict):
            stop_streaming_flags["stop"] = False
        logging.info(f"[{session_id}] Stop flag reset.")
        yield (None, AdditionalOutputs("STREAM_DONE"))



def handle_stream_error(session_id: str, error: Exception | str, stop_streaming_flags: dict | None = None):
    """
    Handle streaming errors:
    - Log the error
    - Send structured info to client
    - Reset stop flag
    """
    if isinstance(error, Exception):
        msg = f"{type(error).__name__}: {str(error)}"
    else:
        msg = str(error)

    logging.error(f"[{session_id}] Streaming error: {msg}", exc_info=isinstance(error, Exception))

    if isinstance(stop_streaming_flags, dict):
        stop_streaming_flags["stop"] = False

    yield (None, AdditionalOutputs({"error": True, "message": msg}))
    yield (None, AdditionalOutputs("STREAM_DONE"))


def _is_stop_requested(stop_streaming_flags: dict) -> bool:
    """Check if the stop signal was requested."""
    if not isinstance(stop_streaming_flags, dict):
        return False
    return bool(stop_streaming_flags.get("stop", False))



def stop_streaming(session_id: str, stop_streaming_flags: dict):
    """Trigger the stop flag for active streaming."""
    logging.info(f"[{session_id}] Stop button clicked — sending stop signal.")
    if not isinstance(stop_streaming_flags, dict):
        stop_streaming_flags = {"stop": True}
    else:
        stop_streaming_flags["stop"] = True
    return stop_streaming_flags