File size: 9,661 Bytes
70d2ece
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
from app.logger_config import logger as logging
import numpy as np
import gradio as gr
import asyncio
from fastrtc.webrtc import WebRTC
from pydub import AudioSegment
import time
import os 
from gradio.utils import get_space

from app.logger_config import logger as logging
from app.utils import (
    generate_coturn_config
)
from app.session_utils import (
    on_load,
    on_unload,
    get_active_sessions,
    reset_all_active_sessions,
)
reset_all_active_sessions()
EXAMPLE_FILES = ["data/bonjour.wav", "data/bonjour2.wav"]
DEFAULT_FILE = EXAMPLE_FILES[0] 

def _is_stop_requested(stop_streaming_flags: dict) -> bool:
    if not isinstance(stop_streaming_flags, dict):
        return False
    return bool(stop_streaming_flags.get("stop", False))

def read_and_stream_audio(filepath_to_stream: str, session_id: str, stop_streaming_flags: dict):
    """
    Un générateur synchrone qui lit un fichier audio (via filepath_to_stream)
    et le streame chunk par chunk d'1 seconde.
    """

    if not session_id:
        logging.warning( "Aucun session_id fourni, arrêt du stream par sécurité.")
        return

    if isinstance(stop_streaming_flags, dict):
        stop_streaming_flags["stop"] = False
    else:
        logging.warning(f" [{session_id}] Stop stop_streaming_flags non initialisés, le stream continuera sans contrôle d'arrêt.")

    if not filepath_to_stream or not os.path.exists(filepath_to_stream):
        logging.error(f"[{session_id}] Fichier audio non trouvé ou non spécifié : {filepath_to_stream}")
        # Tenter d'utiliser le fichier par défaut en cas de problème
        if os.path.exists(DEFAULT_FILE):
            logging.warning(f"[{session_id}] Utilisation du fichier par défaut : {DEFAULT_FILE}")
            filepath_to_stream = DEFAULT_FILE
        else:
            logging.error(f"[{session_id}] Fichier par défaut non trouvé. Arrêt du stream.")
            return

    logging.info(f"[{session_id}] Préparation du segment audio depuis : {filepath_to_stream}")

    try:
        segment = AudioSegment.from_file(filepath_to_stream)
        chunk_duree_ms = 1000
        logging.info(f"[{session_id}] Début du streaming en chunks de {chunk_duree_ms}ms...")

        for i, chunk in enumerate(segment[::chunk_duree_ms]):
            iter_start_time = time.perf_counter()
            logging.info(f"Envoi du chunk {i+1}...")

            if _is_stop_requested(stop_streaming_flags):
                logging.info(f"[{session_id}]Signal d'arrêt reçu, arrêt de la boucle.")
                break

            output_chunk = (
                chunk.frame_rate,
                np.array(chunk.get_array_of_samples()).reshape(1, -1),
            )

            yield output_chunk

            iter_end_time = time.perf_counter()
            processing_duration_ms = (iter_end_time - iter_start_time) * 1000
            
            sleep_duration = (chunk_duree_ms / 1000.0) - (processing_duration_ms / 1000.0) - 0.1
            if sleep_duration < 0:
                sleep_duration = 0.01 # Éviter un temps de sommeil négatif
                
            logging.debug(f"[{session_id}]Temps de traitement: {processing_duration_ms:.2f}ms, Sommeil: {sleep_duration:.2f}s")

            elapsed = 0.0
            interval = 0.05
            while elapsed < sleep_duration:
                if _is_stop_requested(stop_streaming_flags):
                    logging.info(f"[{session_id}]Signal d'arrêt reçu pendant l'attente.")
                    break
                wait_chunk = min(interval, sleep_duration - elapsed)
                time.sleep(wait_chunk)
                elapsed += wait_chunk
            if _is_stop_requested(stop_streaming_flags):
                break

        logging.info(f"[{session_id}]Streaming terminé.")

    except asyncio.CancelledError:
        logging.info(f"[{session_id}]Stream arrêté par l'utilisateur (CancelledError).")
        raise
    except FileNotFoundError:
        logging.error(f"[{session_id}] Erreur critique : Fichier non trouvé : {filepath_to_stream}")
    except Exception as e:
        logging.error(f"[{session_id}] Erreur pendant le stream: {e}", exc_info=True)
        raise
    finally:
        if isinstance(stop_streaming_flags, dict):
            stop_streaming_flags["stop"] = False
        logging.info(f"[{session_id}]Signal d'arrêt nettoyé.")


def stop_streaming(session_id: str, stop_streaming_flags: dict):
    """Active le signal d'arrêt pour le générateur."""
    logging.info("Bouton Stop cliqué: envoi du signal d'arrêt.")
    if not isinstance(stop_streaming_flags, dict):
        stop_streaming_flags = {"stop": True}
    else:
        stop_streaming_flags["stop"] = True
    return stop_streaming_flags

# --- Interface Gradio ---

with gr.Blocks(theme=gr.themes.Soft()) as demo:

    session_hash = gr.State() 
    session_hash_box = gr.Textbox(label="Session ID", interactive=False)
    demo.load(fn=on_load, inputs=None, outputs=[session_hash, session_hash_box])
    demo.unload(on_unload)

    stop_streaming_flags = gr.State(value={"stop": False})





    gr.Markdown(
        "## Application 'Streamer' WebRTC (Serveur -> Client)\n"
        "Utilisez l'exemple fourni, uploadez un fichier ou enregistrez depuis votre micro, "
        "puis cliquez sur 'Start' pour écouter le stream."
    )

    # 1. État pour stocker le chemin du fichier à lire
    active_filepath = gr.State(value=DEFAULT_FILE)

    with gr.Row():
        with gr.Column():
            main_audio = gr.Audio(
                label="Source Audio",
                sources=["upload", "microphone"], # Combine les deux sources
                type="filepath",
                value=DEFAULT_FILE, # Défaut au premier exemple
            )
        with gr.Column():
            webrtc_stream = WebRTC(
                label="Stream Audio",
                mode="receive",
                modality="audio",
                rtc_configuration=generate_coturn_config(),
                visible=True, # Caché par défaut
                height = 200,
            )
    # 4. Boutons de contrôle
    with gr.Row():
        with gr.Column():
            start_button = gr.Button("Start Streaming", variant="primary")
            stop_button = gr.Button("Stop Streaming", variant="stop", interactive=False)
        with gr.Column():
            gr.Text()

    def set_new_file(filepath):
        """Met à jour l'état avec le nouveau chemin, ou revient au défaut si None."""
        if filepath is None:
            logging.info("Audio effacé, retour au fichier d'exemple par défaut.")
            new_path = DEFAULT_FILE
        else:
            logging.info(f"Nouvelle source audio sélectionnée : {filepath}")
            new_path = filepath
        # Retourne la valeur à mettre dans le gr.State
        return new_path

    # Mettre à jour le chemin si l'utilisateur upload, efface, ou change le fichier
    main_audio.change(
        fn=set_new_file, 
        inputs=[main_audio], 
        outputs=[active_filepath]
    )
    
    # Mettre à jour le chemin si l'utilisateur termine un enregistrement
    main_audio.stop_recording(
        fn=set_new_file, 
        inputs=[main_audio], 
        outputs=[active_filepath]
    )


    # Fonctions pour mettre à jour l'état de l'interface
    def start_streaming_ui(session_id: str, flags: dict):
        logging.info("UI : Démarrage du streaming. Désactivation des contrôles.")
        if not isinstance(flags, dict):
            flags = {"stop": False}
        else:
            flags["stop"] = False
        return (
            gr.Button(interactive=False),
            gr.Button(interactive=True),
            gr.Audio(visible=False),
            flags,
        )

    def stop_streaming_ui(flags: dict):
        logging.info("UI : Arrêt du streaming. Réactivation des contrôles.")
        return (
            gr.Button(interactive=True),
            gr.Button(interactive=False),
            gr.Audio(
                label="Source Audio",
                sources=["upload", "microphone"],
                type="filepath",
                value=active_filepath.value,
                visible=True,
            ),
        )


    ui_components = [
        start_button, stop_button, 
        main_audio, 
    ]

    stream_event = webrtc_stream.stream(
        fn=read_and_stream_audio,
        inputs=[active_filepath, session_hash, stop_streaming_flags],  
        outputs=[webrtc_stream],
        trigger=start_button.click,
        concurrency_id="audio_stream", # ID de concurrence
        concurrency_limit=10
    )

    # Mettre à jour l'interface au clic sur START
    start_button.click(
        fn=start_streaming_ui,
        inputs=[session_hash, stop_streaming_flags],
        outputs=ui_components + [stop_streaming_flags]
    )

    # Correction : S'assurer que le stream est bien annulé
    stop_button.click(
        fn=stop_streaming, 
        inputs=[session_hash, stop_streaming_flags],
        outputs=[stop_streaming_flags], 
    ).then(
        fn=stop_streaming_ui, # ENSUITE, mettre à jour l'interface
        inputs=[stop_streaming_flags],
        outputs=ui_components
    )
    # --- Active sessions ---
    with gr.Accordion("📊 Active Sessions", open=False):
        sessions_table = gr.DataFrame(
            headers=["session_hash", "file", "start_time", "status"],
            interactive=False,
            wrap=True,
            max_height=200,
        )

    gr.Timer(3.0).tick(fn=get_active_sessions, outputs=sessions_table)

if __name__ == "__main__":
    demo.queue(max_size=10, api_open=False).launch(show_api=False, debug=True)