from app.logger_config import logger as logging import numpy as np import gradio as gr import asyncio from fastrtc.webrtc import WebRTC from pydub import AudioSegment import time import os from gradio.utils import get_space from app.logger_config import logger as logging from app.utils import ( generate_coturn_config ) from app.session_utils import ( on_load, on_unload, get_active_sessions, reset_all_active_sessions, ) reset_all_active_sessions() EXAMPLE_FILES = ["data/bonjour.wav", "data/bonjour2.wav"] DEFAULT_FILE = EXAMPLE_FILES[0] def _is_stop_requested(stop_streaming_flags: dict) -> bool: if not isinstance(stop_streaming_flags, dict): return False return bool(stop_streaming_flags.get("stop", False)) def read_and_stream_audio(filepath_to_stream: str, session_id: str, stop_streaming_flags: dict): """ Un générateur synchrone qui lit un fichier audio (via filepath_to_stream) et le streame chunk par chunk d'1 seconde. """ if not session_id: logging.warning( "Aucun session_id fourni, arrêt du stream par sécurité.") return if isinstance(stop_streaming_flags, dict): stop_streaming_flags["stop"] = False else: logging.warning(f" [{session_id}] Stop stop_streaming_flags non initialisés, le stream continuera sans contrôle d'arrêt.") if not filepath_to_stream or not os.path.exists(filepath_to_stream): logging.error(f"[{session_id}] Fichier audio non trouvé ou non spécifié : {filepath_to_stream}") # Tenter d'utiliser le fichier par défaut en cas de problème if os.path.exists(DEFAULT_FILE): logging.warning(f"[{session_id}] Utilisation du fichier par défaut : {DEFAULT_FILE}") filepath_to_stream = DEFAULT_FILE else: logging.error(f"[{session_id}] Fichier par défaut non trouvé. Arrêt du stream.") return logging.info(f"[{session_id}] Préparation du segment audio depuis : {filepath_to_stream}") try: segment = AudioSegment.from_file(filepath_to_stream) chunk_duree_ms = 1000 logging.info(f"[{session_id}] Début du streaming en chunks de {chunk_duree_ms}ms...") for i, chunk in enumerate(segment[::chunk_duree_ms]): iter_start_time = time.perf_counter() logging.info(f"Envoi du chunk {i+1}...") if _is_stop_requested(stop_streaming_flags): logging.info(f"[{session_id}]Signal d'arrêt reçu, arrêt de la boucle.") break output_chunk = ( chunk.frame_rate, np.array(chunk.get_array_of_samples()).reshape(1, -1), ) yield output_chunk iter_end_time = time.perf_counter() processing_duration_ms = (iter_end_time - iter_start_time) * 1000 sleep_duration = (chunk_duree_ms / 1000.0) - (processing_duration_ms / 1000.0) - 0.1 if sleep_duration < 0: sleep_duration = 0.01 # Éviter un temps de sommeil négatif logging.debug(f"[{session_id}]Temps de traitement: {processing_duration_ms:.2f}ms, Sommeil: {sleep_duration:.2f}s") elapsed = 0.0 interval = 0.05 while elapsed < sleep_duration: if _is_stop_requested(stop_streaming_flags): logging.info(f"[{session_id}]Signal d'arrêt reçu pendant l'attente.") break wait_chunk = min(interval, sleep_duration - elapsed) time.sleep(wait_chunk) elapsed += wait_chunk if _is_stop_requested(stop_streaming_flags): break logging.info(f"[{session_id}]Streaming terminé.") except asyncio.CancelledError: logging.info(f"[{session_id}]Stream arrêté par l'utilisateur (CancelledError).") raise except FileNotFoundError: logging.error(f"[{session_id}] Erreur critique : Fichier non trouvé : {filepath_to_stream}") except Exception as e: logging.error(f"[{session_id}] Erreur pendant le stream: {e}", exc_info=True) raise finally: if isinstance(stop_streaming_flags, dict): stop_streaming_flags["stop"] = False logging.info(f"[{session_id}]Signal d'arrêt nettoyé.") def stop_streaming(session_id: str, stop_streaming_flags: dict): """Active le signal d'arrêt pour le générateur.""" logging.info("Bouton Stop cliqué: envoi du signal d'arrêt.") if not isinstance(stop_streaming_flags, dict): stop_streaming_flags = {"stop": True} else: stop_streaming_flags["stop"] = True return stop_streaming_flags # --- Interface Gradio --- with gr.Blocks(theme=gr.themes.Soft()) as demo: session_hash = gr.State() session_hash_box = gr.Textbox(label="Session ID", interactive=False) demo.load(fn=on_load, inputs=None, outputs=[session_hash, session_hash_box]) demo.unload(on_unload) stop_streaming_flags = gr.State(value={"stop": False}) gr.Markdown( "## Application 'Streamer' WebRTC (Serveur -> Client)\n" "Utilisez l'exemple fourni, uploadez un fichier ou enregistrez depuis votre micro, " "puis cliquez sur 'Start' pour écouter le stream." ) # 1. État pour stocker le chemin du fichier à lire active_filepath = gr.State(value=DEFAULT_FILE) with gr.Row(): with gr.Column(): main_audio = gr.Audio( label="Source Audio", sources=["upload", "microphone"], # Combine les deux sources type="filepath", value=DEFAULT_FILE, # Défaut au premier exemple ) with gr.Column(): webrtc_stream = WebRTC( label="Stream Audio", mode="receive", modality="audio", rtc_configuration=generate_coturn_config(), visible=True, # Caché par défaut height = 200, ) # 4. Boutons de contrôle with gr.Row(): with gr.Column(): start_button = gr.Button("Start Streaming", variant="primary") stop_button = gr.Button("Stop Streaming", variant="stop", interactive=False) with gr.Column(): gr.Text() def set_new_file(filepath): """Met à jour l'état avec le nouveau chemin, ou revient au défaut si None.""" if filepath is None: logging.info("Audio effacé, retour au fichier d'exemple par défaut.") new_path = DEFAULT_FILE else: logging.info(f"Nouvelle source audio sélectionnée : {filepath}") new_path = filepath # Retourne la valeur à mettre dans le gr.State return new_path # Mettre à jour le chemin si l'utilisateur upload, efface, ou change le fichier main_audio.change( fn=set_new_file, inputs=[main_audio], outputs=[active_filepath] ) # Mettre à jour le chemin si l'utilisateur termine un enregistrement main_audio.stop_recording( fn=set_new_file, inputs=[main_audio], outputs=[active_filepath] ) # Fonctions pour mettre à jour l'état de l'interface def start_streaming_ui(session_id: str, flags: dict): logging.info("UI : Démarrage du streaming. Désactivation des contrôles.") if not isinstance(flags, dict): flags = {"stop": False} else: flags["stop"] = False return ( gr.Button(interactive=False), gr.Button(interactive=True), gr.Audio(visible=False), flags, ) def stop_streaming_ui(flags: dict): logging.info("UI : Arrêt du streaming. Réactivation des contrôles.") return ( gr.Button(interactive=True), gr.Button(interactive=False), gr.Audio( label="Source Audio", sources=["upload", "microphone"], type="filepath", value=active_filepath.value, visible=True, ), ) ui_components = [ start_button, stop_button, main_audio, ] stream_event = webrtc_stream.stream( fn=read_and_stream_audio, inputs=[active_filepath, session_hash, stop_streaming_flags], outputs=[webrtc_stream], trigger=start_button.click, concurrency_id="audio_stream", # ID de concurrence concurrency_limit=10 ) # Mettre à jour l'interface au clic sur START start_button.click( fn=start_streaming_ui, inputs=[session_hash, stop_streaming_flags], outputs=ui_components + [stop_streaming_flags] ) # Correction : S'assurer que le stream est bien annulé stop_button.click( fn=stop_streaming, inputs=[session_hash, stop_streaming_flags], outputs=[stop_streaming_flags], ).then( fn=stop_streaming_ui, # ENSUITE, mettre à jour l'interface inputs=[stop_streaming_flags], outputs=ui_components ) # --- Active sessions --- with gr.Accordion("📊 Active Sessions", open=False): sessions_table = gr.DataFrame( headers=["session_hash", "file", "start_time", "status"], interactive=False, wrap=True, max_height=200, ) gr.Timer(3.0).tick(fn=get_active_sessions, outputs=sessions_table) if __name__ == "__main__": demo.queue(max_size=10, api_open=False).launch(show_api=False, debug=True)