Spaces:
Running
on
Zero
Running
on
Zero
| from app.logger_config import logger as logging | |
| import numpy as np | |
| import gradio as gr | |
| import asyncio | |
| from fastrtc.webrtc import WebRTC | |
| from fastrtc.utils import AdditionalOutputs | |
| from pydub import AudioSegment | |
| import time | |
| import os | |
| from gradio.utils import get_space | |
| from app.logger_config import logger as logging | |
| from app.utils import ( | |
| generate_coturn_config | |
| ) | |
| from app.new_session_utils import ( | |
| on_load, | |
| on_unload, | |
| get_active_sessions, | |
| reset_all_active_sessions, | |
| ) | |
| reset_all_active_sessions() | |
| EXAMPLE_FILES = ["data/bonjour.wav", "data/bonjour2.wav"] | |
| DEFAULT_FILE = EXAMPLE_FILES[0] | |
| def _is_stop_requested(stop_streaming_flags: dict) -> bool: | |
| if not isinstance(stop_streaming_flags, dict): | |
| return False | |
| return bool(stop_streaming_flags.get("stop", False)) | |
| def read_and_stream_audio(filepath_to_stream: str, session_id: str, stop_streaming_flags: dict): | |
| """ | |
| Un générateur synchrone qui lit un fichier audio (via filepath_to_stream) | |
| et le streame chunk par chunk d'1 seconde. | |
| """ | |
| if not session_id: | |
| logging.warning( "Aucun session_id fourni, arrêt du stream par sécurité.") | |
| return | |
| if isinstance(stop_streaming_flags, dict): | |
| stop_streaming_flags["stop"] = False | |
| else: | |
| logging.warning(f" [{session_id}] Stop stop_streaming_flags non initialisés, le stream continuera sans contrôle d'arrêt.") | |
| if not filepath_to_stream or not os.path.exists(filepath_to_stream): | |
| logging.error(f"[{session_id}] Fichier audio non trouvé ou non spécifié : {filepath_to_stream}") | |
| # Tenter d'utiliser le fichier par défaut en cas de problème | |
| if os.path.exists(DEFAULT_FILE): | |
| logging.warning(f"[{session_id}] Utilisation du fichier par défaut : {DEFAULT_FILE}") | |
| filepath_to_stream = DEFAULT_FILE | |
| else: | |
| logging.error(f"[{session_id}] Fichier par défaut non trouvé. Arrêt du stream.") | |
| return | |
| logging.info(f"[{session_id}] Préparation du segment audio depuis : {filepath_to_stream}") | |
| try: | |
| segment = AudioSegment.from_file(filepath_to_stream) | |
| chunk_duree_ms = 1000 | |
| total_chunks = len(segment) // chunk_duree_ms + 1 | |
| logging.info(f"[{session_id}] Début du streaming en chunks de {chunk_duree_ms}ms...") | |
| for i, chunk in enumerate(segment[::chunk_duree_ms]): | |
| iter_start_time = time.perf_counter() | |
| logging.info(f"Envoi du chunk {i+1}...") | |
| if _is_stop_requested(stop_streaming_flags): | |
| logging.info(f"[{session_id}]Signal d'arrêt reçu, arrêt de la boucle.") | |
| break | |
| output_chunk = ( | |
| chunk.frame_rate, | |
| np.array(chunk.get_array_of_samples()).reshape(1, -1), | |
| ) | |
| # Calcul du pourcentage de progression | |
| progress = round(((i + 1) / total_chunks) * 100, 2) | |
| logging.debug(f"[{session_id}] Progression: {progress}%") | |
| # Envoi du chunk et de la progression numérique | |
| yield (output_chunk, AdditionalOutputs(progress)) | |
| iter_end_time = time.perf_counter() | |
| processing_duration_ms = (iter_end_time - iter_start_time) * 1000 | |
| sleep_duration = (chunk_duree_ms / 1000.0) - (processing_duration_ms / 1000.0) - 0.1 | |
| if sleep_duration < 0: | |
| sleep_duration = 0.01 # Éviter un temps de sommeil négatif | |
| logging.debug(f"[{session_id}]Temps de traitement: {processing_duration_ms:.2f}ms, Sommeil: {sleep_duration:.2f}s") | |
| elapsed = 0.0 | |
| interval = 0.05 | |
| while elapsed < sleep_duration: | |
| if _is_stop_requested(stop_streaming_flags): | |
| logging.info(f"[{session_id}]Signal d'arrêt reçu pendant l'attente.") | |
| break | |
| wait_chunk = min(interval, sleep_duration - elapsed) | |
| time.sleep(wait_chunk) | |
| elapsed += wait_chunk | |
| if _is_stop_requested(stop_streaming_flags): | |
| break | |
| logging.info(f"[{session_id}]Streaming terminé.") | |
| except asyncio.CancelledError: | |
| logging.info(f"[{session_id}]Stream arrêté par l'utilisateur (CancelledError).") | |
| raise | |
| except FileNotFoundError: | |
| logging.error(f"[{session_id}] Erreur critique : Fichier non trouvé : {filepath_to_stream}") | |
| except Exception as e: | |
| logging.error(f"[{session_id}] Erreur pendant le stream: {e}", exc_info=True) | |
| raise | |
| finally: | |
| if isinstance(stop_streaming_flags, dict): | |
| stop_streaming_flags["stop"] = False | |
| logging.info(f"[{session_id}]Signal d'arrêt nettoyé.") | |
| yield (None, AdditionalOutputs({"progress": 100})) | |
| def stop_streaming(session_id: str, stop_streaming_flags: dict): | |
| """Active le signal d'arrêt pour le générateur.""" | |
| logging.info("Bouton Stop cliqué: envoi du signal d'arrêt.") | |
| if not isinstance(stop_streaming_flags, dict): | |
| stop_streaming_flags = {"stop": True} | |
| else: | |
| stop_streaming_flags["stop"] = True | |
| return stop_streaming_flags | |
| def handle_additional_outputs(status_slider,progress_value): | |
| """Met à jour le slider selon la valeur reçue et gère sa visibilité.""" | |
| logging.debug(f"📡 Additional output received: {progress_value}") | |
| try: | |
| progress = float(progress_value) | |
| except (ValueError, TypeError): | |
| progress = 0 | |
| # status_slider = gr.update(interactive=True,visible=True, value=max(0, min(progress, 100))) | |
| # return status_slider | |
| if progress >= 100: | |
| return gr.update(visible=False, value=100) | |
| elif progress <= 0: | |
| return gr.update(visible=False, value=0) | |
| else: | |
| return gr.update(visible=True, value=progress) | |
| # --- Interface Gradio --- | |
| with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
| session_hash = gr.State() | |
| session_hash_box = gr.Textbox(label="Session ID", interactive=False) | |
| demo.load(fn=on_load, inputs=None, outputs=[session_hash, session_hash_box]) | |
| demo.unload(on_unload) | |
| stop_streaming_flags = gr.State(value={"stop": False}) | |
| gr.Markdown( | |
| "## Application 'Streamer' WebRTC (Serveur -> Client)\n" | |
| "Utilisez l'exemple fourni, uploadez un fichier ou enregistrez depuis votre micro, " | |
| "puis cliquez sur 'Start' pour écouter le stream." | |
| ) | |
| # 1. État pour stocker le chemin du fichier à lire | |
| active_filepath = gr.State(value=DEFAULT_FILE) | |
| with gr.Row(equal_height=True): | |
| with gr.Column(elem_id="column_source", scale=1): | |
| with gr.Group(elem_id="centered_content"): | |
| main_audio = gr.Audio( | |
| label="Source Audio", | |
| sources=["upload", "microphone"], # Combine les deux sources | |
| type="filepath", | |
| value=DEFAULT_FILE, # Défaut au premier exemple | |
| ) | |
| status_slider = gr.Slider(0, 100, value=0, label="Progression du streaming", interactive=False, visible=False ) | |
| with gr.Column(): | |
| webrtc_stream = WebRTC( | |
| label="Stream Audio", | |
| mode="receive", | |
| modality="audio", | |
| rtc_configuration=generate_coturn_config(), | |
| visible=True, # Caché par défaut | |
| height = 200, | |
| ) | |
| # 4. Boutons de contrôle | |
| with gr.Row(): | |
| with gr.Column(): | |
| start_button = gr.Button("Start Streaming", variant="primary") | |
| stop_button = gr.Button("Stop Streaming", variant="stop", interactive=False) | |
| with gr.Column(): | |
| gr.Text() | |
| def set_new_file(filepath): | |
| """Met à jour l'état avec le nouveau chemin, ou revient au défaut si None.""" | |
| if filepath is None: | |
| logging.info("Audio effacé, retour au fichier d'exemple par défaut.") | |
| new_path = DEFAULT_FILE | |
| else: | |
| logging.info(f"Nouvelle source audio sélectionnée : {filepath}") | |
| new_path = filepath | |
| # Retourne la valeur à mettre dans le gr.State | |
| return new_path | |
| # Mettre à jour le chemin si l'utilisateur upload, efface, ou change le fichier | |
| main_audio.change( | |
| fn=set_new_file, | |
| inputs=[main_audio], | |
| outputs=[active_filepath] | |
| ) | |
| # Mettre à jour le chemin si l'utilisateur termine un enregistrement | |
| main_audio.stop_recording( | |
| fn=set_new_file, | |
| inputs=[main_audio], | |
| outputs=[active_filepath] | |
| ) | |
| # Fonctions pour mettre à jour l'état de l'interface | |
| def start_streaming_ui(session_id: str, flags: dict): | |
| logging.info("UI : Démarrage du streaming. Désactivation des contrôles.") | |
| if not isinstance(flags, dict): | |
| flags = {"stop": False} | |
| else: | |
| flags["stop"] = False | |
| return ( | |
| gr.Button(interactive=False), | |
| gr.Button(interactive=True), | |
| gr.Audio(visible=False), | |
| flags, | |
| ) | |
| def stop_streaming_ui(flags: dict): | |
| logging.info("UI : Arrêt du streaming. Réactivation des contrôles.") | |
| return ( | |
| gr.Button(interactive=True), | |
| gr.Button(interactive=False), | |
| gr.Audio( | |
| label="Source Audio", | |
| sources=["upload", "microphone"], | |
| type="filepath", | |
| value=active_filepath.value, | |
| visible=True, | |
| ), | |
| ) | |
| ui_components = [ | |
| start_button, stop_button, | |
| main_audio, | |
| ] | |
| stream_event = webrtc_stream.stream( | |
| fn=read_and_stream_audio, | |
| inputs=[active_filepath, session_hash, stop_streaming_flags], | |
| outputs=[webrtc_stream], | |
| trigger=start_button.click, | |
| concurrency_id="audio_stream", | |
| concurrency_limit=10 | |
| ) | |
| webrtc_stream.on_additional_outputs( | |
| fn=handle_additional_outputs, | |
| inputs=[status_slider], | |
| outputs=[status_slider], | |
| concurrency_id="additional_outputs_audio_stream", | |
| concurrency_limit=10 | |
| ) | |
| # Mettre à jour l'interface au clic sur START | |
| start_button.click( | |
| fn=start_streaming_ui, | |
| inputs=[session_hash, stop_streaming_flags], | |
| outputs=ui_components + [stop_streaming_flags] | |
| ) | |
| # Correction : S'assurer que le stream est bien annulé | |
| stop_button.click( | |
| fn=stop_streaming, | |
| inputs=[session_hash, stop_streaming_flags], | |
| outputs=[stop_streaming_flags], | |
| ).then( | |
| fn=stop_streaming_ui, # ENSUITE, mettre à jour l'interface | |
| inputs=[stop_streaming_flags], | |
| outputs=ui_components | |
| ) | |
| # --- Active sessions --- | |
| with gr.Accordion("📊 Active Sessions", open=False): | |
| sessions_table = gr.DataFrame( | |
| headers=["session_hash", "file", "start_time", "status"], | |
| interactive=False, | |
| wrap=True, | |
| max_height=200, | |
| ) | |
| gr.Timer(3.0).tick(fn=get_active_sessions, outputs=sessions_table) | |
| # -------------------------------------------------------- | |
| # CSS | |
| # -------------------------------------------------------- | |
| custom_css = """ | |
| #column_source { | |
| display: flex; | |
| flex-direction: column; | |
| justify-content: center; | |
| align-items: center; | |
| gap: 1rem; | |
| margin-top: auto; | |
| margin-bottom: auto; | |
| } | |
| #column_source .gr-row { | |
| padding-top: 12px; | |
| padding-bottom: 12px; | |
| } | |
| """ | |
| demo.css = custom_css | |
| # -------------------------------------------------------- | |
| # MAIN | |
| # -------------------------------------------------------- | |
| if __name__ == "__main__": | |
| demo.queue(max_size=10, api_open=False).launch(show_api=False, debug=True) | |