File size: 10,393 Bytes
70d2ece
 
 
 
 
d1f7785
70d2ece
 
1dd92bc
70d2ece
 
 
1dd92bc
 
70d2ece
d1f7785
70d2ece
 
 
 
 
1dd92bc
 
 
 
70d2ece
 
1dd92bc
 
70d2ece
1dd92bc
 
 
70d2ece
1dd92bc
70d2ece
 
 
 
1dd92bc
 
70d2ece
1dd92bc
 
 
 
70d2ece
1dd92bc
 
 
 
70d2ece
1dd92bc
70d2ece
 
 
 
1dd92bc
 
70d2ece
1dd92bc
 
 
 
 
 
 
 
 
 
 
 
 
70d2ece
 
 
1dd92bc
 
 
70d2ece
1dd92bc
70d2ece
1dd92bc
70d2ece
 
1dd92bc
 
d1f7785
70d2ece
1dd92bc
 
 
 
 
 
 
70d2ece
 
1dd92bc
 
 
70d2ece
1dd92bc
70d2ece
 
 
1dd92bc
 
70d2ece
 
 
1dd92bc
 
70d2ece
 
 
 
 
 
1dd92bc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d1f7785
 
 
 
 
1dd92bc
 
 
 
 
 
 
 
 
 
 
d1f7785
1dd92bc
 
 
 
 
 
 
 
 
 
 
 
 
 
d1f7785
70d2ece
1dd92bc
 
 
70d2ece
 
1dd92bc
70d2ece
 
 
 
 
 
 
1dd92bc
 
70d2ece
 
 
 
d1f7785
 
 
 
1dd92bc
 
d1f7785
1dd92bc
 
 
 
d1f7785
 
70d2ece
 
1dd92bc
70d2ece
 
 
1dd92bc
 
70d2ece
1dd92bc
70d2ece
 
 
 
 
 
1dd92bc
70d2ece
1dd92bc
70d2ece
 
1dd92bc
70d2ece
 
 
1dd92bc
 
70d2ece
1dd92bc
70d2ece
 
 
1dd92bc
70d2ece
 
1dd92bc
 
d1f7785
1dd92bc
d1f7785
 
1dd92bc
 
d1f7785
1dd92bc
70d2ece
 
1dd92bc
 
70d2ece
1dd92bc
70d2ece
1dd92bc
70d2ece
1dd92bc
70d2ece
 
 
 
 
 
 
 
 
 
d1f7785
 
1dd92bc
d1f7785
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1dd92bc
 
d1f7785
1dd92bc
d1f7785
70d2ece
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
from app.logger_config import logger as logging
import numpy as np
import gradio as gr
import asyncio
from fastrtc.webrtc import WebRTC
from fastrtc.utils import AdditionalOutputs
from pydub import AudioSegment
import time
import os
from gradio.utils import get_space

from app.utils import (
    generate_coturn_config,
    raise_function
)
from app.new_session_utils import (
    on_load,
    on_unload,
    get_active_sessions,
    reset_all_active_sessions,
)

# --------------------------------------------------------
# Initialization
# --------------------------------------------------------
reset_all_active_sessions()
EXAMPLE_FILES = ["data/bonjour.wav", "data/bonjour2.wav"]
DEFAULT_FILE = EXAMPLE_FILES[0]


# --------------------------------------------------------
# Utility functions
# --------------------------------------------------------
def _is_stop_requested(stop_streaming_flags: dict) -> bool:
    """Check if the stop signal was requested."""
    if not isinstance(stop_streaming_flags, dict):
        return False
    return bool(stop_streaming_flags.get("stop", False))


def handle_stream_error(session_id: str, error: Exception | str, stop_streaming_flags: dict | None = None):
    """
    Handle streaming errors:
    - Log the error
    - Send structured info to client
    - Reset stop flag
    """
    if isinstance(error, Exception):
        msg = f"{type(error).__name__}: {str(error)}"
    else:
        msg = str(error)

    logging.error(f"[{session_id}] Streaming error: {msg}", exc_info=isinstance(error, Exception))

    if isinstance(stop_streaming_flags, dict):
        stop_streaming_flags["stop"] = False

    yield (None, AdditionalOutputs({"error": True, "message": msg}))
    yield (None, AdditionalOutputs("STREAM_DONE"))


def read_and_stream_audio(filepath_to_stream: str, session_id: str, stop_streaming_flags: dict):
    """
    Read an audio file and stream it chunk by chunk (1s per chunk).
    Handles errors safely and reports structured messages to the client.
    """
    if not session_id:
        yield from handle_stream_error("unknown", "No session_id provided.", stop_streaming_flags)
        return

    if not filepath_to_stream or not os.path.exists(filepath_to_stream):
        yield from handle_stream_error(session_id, f"Audio file not found: {filepath_to_stream}", stop_streaming_flags)
        return

    try:
        segment = AudioSegment.from_file(filepath_to_stream)
        chunk_duration_ms = 1000
        total_chunks = len(segment) // chunk_duration_ms + 1
        logging.info(f"[{session_id}] Starting audio streaming ({total_chunks} chunks).")

        for i, chunk in enumerate(segment[::chunk_duration_ms]):
            if _is_stop_requested(stop_streaming_flags):
                logging.info(f"[{session_id}] Stop signal received. Terminating stream.")
                break

            frame_rate = chunk.frame_rate
            samples = np.array(chunk.get_array_of_samples()).reshape(1, -1)
            progress = round(((i + 1) / total_chunks) * 100, 2)

            yield ((frame_rate, samples), AdditionalOutputs(progress))
            logging.debug(f"[{session_id}] Sent chunk {i+1}/{total_chunks} ({progress}%).")

            time.sleep(0.9)
            raise_function()  # Optional injected test exception

        logging.info(f"[{session_id}] Audio streaming completed successfully.")

    except asyncio.CancelledError:
        yield from handle_stream_error(session_id, "Streaming cancelled by user.", stop_streaming_flags)
    except FileNotFoundError as e:
        yield from handle_stream_error(session_id, e, stop_streaming_flags)
    except Exception as e:
        yield from handle_stream_error(session_id, e, stop_streaming_flags)
    finally:
        if isinstance(stop_streaming_flags, dict):
            stop_streaming_flags["stop"] = False
        logging.info(f"[{session_id}] Stop flag reset.")
        yield (None, AdditionalOutputs("STREAM_DONE"))


def stop_streaming(session_id: str, stop_streaming_flags: dict):
    """Trigger the stop flag for active streaming."""
    logging.info(f"[{session_id}] Stop button clicked β€” sending stop signal.")
    if not isinstance(stop_streaming_flags, dict):
        stop_streaming_flags = {"stop": True}
    else:
        stop_streaming_flags["stop"] = True
    return stop_streaming_flags


def handle_additional_outputs(start_button, stop_button, main_audio, status_slider, progress_value):
    """
    Update UI elements based on streaming progress or errors.
    Controls button states, audio visibility, and progress slider.
    """
    logging.debug(f"Additional output received: {progress_value}")

    # Handle structured error message
    if isinstance(progress_value, dict) and progress_value.get("error"):
        msg = progress_value.get("message", "Unknown error.")
        logging.error(f"[stream_ui] Client-side error: {msg}")
        return (
            gr.update(interactive=True),   # start_button enabled
            gr.update(interactive=False),  # stop_button disabled
            gr.update(visible=True),       # audio re-shown
            gr.update(visible=False, value=0),  # slider hidden
        )

    try:
        progress = float(progress_value)
    except (ValueError, TypeError):
        progress = 0

    # --- Stream not started ---
    if progress <= 0:
        return (
            gr.update(interactive=True),   # start_button enabled
            gr.update(interactive=False),  # stop_button disabled
            gr.update(visible=True),       # audio visible
            gr.update(visible=False, value=0),  # slider hidden
        )

    # --- Stream finished ---
    if progress >= 100:
        return (
            gr.update(interactive=True),   # start_button re-enabled
            gr.update(interactive=False),  # stop_button disabled
            gr.update(visible=True),       # audio visible
            gr.update(visible=False, value=100), # slider hidden
        )

    # --- Stream in progress ---
    return (
        gr.update(interactive=False),      # start_button disabled
        gr.update(interactive=True),       # stop_button enabled
        gr.update(visible=False),          # hide audio
        gr.update(visible=True, value=progress), # show progress
    )


# --------------------------------------------------------
# Gradio Interface
# --------------------------------------------------------
with gr.Blocks(theme=gr.themes.Soft()) as demo:

    session_hash = gr.State()
    session_hash_box = gr.Textbox(label="Session ID", interactive=False)
    demo.load(fn=on_load, inputs=None, outputs=[session_hash, session_hash_box])
    demo.unload(on_unload)

    stop_streaming_flags = gr.State(value={"stop": False})

    gr.Markdown(
        "## WebRTC Audio Streamer (Server β†’ Client)\n"
        "Upload or record an audio file, then click **Start** to listen to the streamed audio."
    )

    active_filepath = gr.State(value=DEFAULT_FILE)

    with gr.Row(equal_height=True):
        with gr.Column(elem_id="column_source", scale=1):
            with gr.Group(elem_id="centered_content"):
                main_audio = gr.Audio(
                    label="Audio File",
                    sources=["upload", "microphone"],
                    type="filepath",
                    value=DEFAULT_FILE,
                )
                status_slider = gr.Slider(
                    0, 100, value=0, label="Streaming Progress", interactive=False, visible=False
                )

        with gr.Column():
            webrtc_stream = WebRTC(
                label="Live",
                mode="receive",
                modality="audio",
                rtc_configuration=generate_coturn_config(),
                visible=True,
                height=200,
            )

    with gr.Row():
        with gr.Column():
            start_button = gr.Button("Start Streaming", variant="primary")
            stop_button = gr.Button("Stop Streaming", variant="stop", interactive=False)

    def set_new_file(filepath):
        """Update active audio path or reset to default if empty."""
        if filepath is None:
            logging.info("[ui] Audio cleared β€” reverting to default example file.")
            new_path = DEFAULT_FILE
        else:
            logging.info(f"[ui] New audio source selected: {filepath}")
            new_path = filepath
        return new_path

    main_audio.change(fn=set_new_file, inputs=[main_audio], outputs=[active_filepath])
    main_audio.stop_recording(fn=set_new_file, inputs=[main_audio], outputs=[active_filepath])

    ui_components = [start_button, stop_button, main_audio, status_slider]

    stream_event = webrtc_stream.stream(
        fn=read_and_stream_audio,
        inputs=[active_filepath, session_hash, stop_streaming_flags],
        outputs=[webrtc_stream],
        trigger=start_button.click,
        concurrency_id="audio_stream",
        concurrency_limit=10,
    )

    webrtc_stream.on_additional_outputs(
        fn=handle_additional_outputs,
        inputs=ui_components,
        outputs=ui_components,
        concurrency_id="additional_outputs_audio_stream",
        concurrency_limit=10,
    )

    start_button.click(fn=None, inputs=None, outputs=None)

    stop_button.click(
        fn=stop_streaming,
        inputs=[session_hash, stop_streaming_flags],
        outputs=[stop_streaming_flags],
    )

    with gr.Accordion("πŸ“Š Active Sessions", open=False):
        sessions_table = gr.DataFrame(
            headers=["session_hash", "file", "start_time", "status"],
            interactive=False,
            wrap=True,
            max_height=200,
        )

    gr.Timer(3.0).tick(fn=get_active_sessions, outputs=sessions_table)


# --------------------------------------------------------
# Custom CSS
# --------------------------------------------------------
custom_css = """
#column_source {
    display: flex;
    flex-direction: column;
    justify-content: center;
    align-items: center;
    gap: 1rem;
    margin-top: auto;
    margin-bottom: auto;
}
#column_source .gr-row {
    padding-top: 12px;
    padding-bottom: 12px;
}
"""
demo.css = custom_css


# --------------------------------------------------------
# Main
# --------------------------------------------------------
if __name__ == "__main__":
    demo.queue(max_size=10, api_open=False).launch(show_api=False, debug=True)