Archime commited on
Commit
5c8a18f
·
1 Parent(s): 010fb88

add progress_stream

Browse files
Files changed (2) hide show
  1. app.py +100 -70
  2. app/session_utils.py +31 -22
app.py CHANGED
@@ -6,10 +6,9 @@ from fastrtc.webrtc import WebRTC
6
  from pydub import AudioSegment
7
  import time
8
  import os
 
9
  import spaces
10
  from app.utils import generate_coturn_config
11
-
12
- # ✅ Import des fonctions externalisées
13
  from app.session_utils import (
14
  generate_session_id,
15
  register_session,
@@ -18,20 +17,19 @@ from app.session_utils import (
18
  stop_file_path,
19
  create_stop_flag,
20
  clear_stop_flag,
 
21
  )
22
 
23
- # --- Constants ---
 
 
24
  EXAMPLE_FILES = ["data/bonjour.wav", "data/bonjour2.wav"]
25
  DEFAULT_FILE = EXAMPLE_FILES[0]
26
 
27
 
28
- # --- Streaming logic ---
29
  @spaces.GPU
30
  def read_and_stream_audio(filepath_to_stream: str, session_id: str):
31
- """
32
- Stream audio chunks for a specific session.
33
- Stops when the corresponding stop flag file exists.
34
- """
35
  stop_file = stop_file_path(session_id)
36
  logging.debug(f"[{session_id}] read_and_stream_audio() started with file: {filepath_to_stream}")
37
 
@@ -41,28 +39,31 @@ def read_and_stream_audio(filepath_to_stream: str, session_id: str):
41
  filepath_to_stream = DEFAULT_FILE
42
  logging.warning(f"[{session_id}] Using default file: {DEFAULT_FILE}")
43
  else:
44
- logging.error(f"[{session_id}] Default file missing. Abort stream.")
45
  return
46
 
47
- # Clear any leftover stop flag
48
  clear_stop_flag(session_id)
49
-
50
- # Register session
51
  register_session(session_id, filepath_to_stream)
 
52
 
53
  try:
54
  segment = AudioSegment.from_file(filepath_to_stream)
55
  chunk_ms = 1000
56
- logging.info(f"[{session_id}] Starting streaming ({chunk_ms}ms chunks)...")
 
57
 
58
- for i, chunk in enumerate(segment[::chunk_ms]):
59
  if os.path.exists(stop_file):
60
- logging.info(f"[{session_id}] Stop flag detected at chunk {i+1}. Ending stream.")
61
  clear_stop_flag(session_id)
62
  break
63
 
64
  iter_start = time.perf_counter()
65
- logging.debug(f"[{session_id}] Sending chunk {i+1}...")
 
 
 
 
66
 
67
  output_chunk = (
68
  chunk.frame_rate,
@@ -71,81 +72,104 @@ def read_and_stream_audio(filepath_to_stream: str, session_id: str):
71
  yield output_chunk
72
 
73
  process_ms = (time.perf_counter() - iter_start) * 1000
74
- sleep_s = max((chunk_ms / 1000.0) - (process_ms / 1000.0) - 0.1, 0.01)
75
- logging.debug(f"[{session_id}] Chunk {i+1} processed in {process_ms:.2f}ms, sleep={sleep_s:.2f}s")
76
- time.sleep(sleep_s)
 
77
 
78
- logging.info(f"[{session_id}] Stream finished normally.")
79
 
80
  except asyncio.CancelledError:
81
  logging.info(f"[{session_id}] Stream cancelled by user.")
82
  raise
83
  except Exception as e:
84
- logging.error(f"[{session_id}] Error during streaming: {e}", exc_info=True)
85
  raise
86
  finally:
87
  unregister_session(session_id)
88
  clear_stop_flag(session_id)
89
- logging.debug(f"[{session_id}] Exiting read_and_stream_audio().")
 
 
90
 
91
 
92
- # --- Stop Streaming ---
93
  def stop_streaming(session_id: str):
94
  create_stop_flag(session_id)
95
  logging.info(f"[{session_id}] Stop button clicked → stop flag created.")
96
  return None
97
 
98
 
99
- # --- Gradio UI ---
 
 
 
 
 
 
 
 
 
 
 
 
 
100
  with gr.Blocks(theme=gr.themes.Soft()) as demo:
101
- gr.Markdown(
102
- "## Application 'Streamer' WebRTC (multi-utilisateur)\n"
103
- "Chaque utilisateur contrôle son propre flux audio, avec suivi des sessions actives."
104
- )
105
 
106
- # State
107
  session_id = gr.State(value=generate_session_id)
108
  active_filepath = gr.State(value=DEFAULT_FILE)
109
 
110
- with gr.Row():
111
- with gr.Column():
112
- main_audio = gr.Audio(
113
- label="Source Audio",
114
- sources=["upload", "microphone"],
115
- type="filepath",
116
- value=DEFAULT_FILE,
117
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
118
  with gr.Column():
119
  webrtc_stream = WebRTC(
120
- label="Flux Audio",
121
  mode="receive",
122
  modality="audio",
123
  rtc_configuration=generate_coturn_config(),
124
  visible=True,
125
- height=200,
126
  )
127
 
128
- with gr.Row():
129
- with gr.Column():
130
- start_button = gr.Button("Start Streaming", variant="primary")
131
- stop_button = gr.Button("Stop Streaming", variant="stop", interactive=False)
132
- with gr.Column():
133
- gr.Text()
134
-
135
- # --- Audio selection ---
136
  def set_new_file(filepath):
137
  return filepath if filepath else DEFAULT_FILE
138
 
139
  main_audio.change(fn=set_new_file, inputs=[main_audio], outputs=[active_filepath])
140
  main_audio.stop_recording(fn=set_new_file, inputs=[main_audio], outputs=[active_filepath])
141
 
142
- # --- UI state updates ---
143
  def start_streaming_ui(session_id):
144
  logging.debug(f"[{session_id}] UI: Start clicked → disabling controls.")
145
  return {
146
  start_button: gr.Button(interactive=False),
147
  stop_button: gr.Button(interactive=True),
148
  main_audio: gr.Audio(visible=False),
 
 
149
  }
150
 
151
  def stop_streaming_ui(session_id):
@@ -154,17 +178,18 @@ with gr.Blocks(theme=gr.themes.Soft()) as demo:
154
  start_button: gr.Button(interactive=True),
155
  stop_button: gr.Button(interactive=False),
156
  main_audio: gr.Audio(
157
- label="Source Audio",
158
  sources=["upload", "microphone"],
159
  type="filepath",
160
  value=DEFAULT_FILE,
161
  visible=True,
162
  ),
 
 
163
  }
164
 
165
- ui_components = [start_button, stop_button, main_audio]
166
 
167
- # --- Streaming event ---
168
  webrtc_stream.stream(
169
  fn=read_and_stream_audio,
170
  inputs=[active_filepath, session_id],
@@ -174,37 +199,42 @@ with gr.Blocks(theme=gr.themes.Soft()) as demo:
174
  concurrency_limit=20,
175
  )
176
 
177
- # --- Buttons ---
178
- start_button.click(
179
- fn=start_streaming_ui,
180
- inputs=[session_id],
181
- outputs=ui_components,
182
- )
183
-
184
- stop_button.click(
185
- fn=stop_streaming,
186
- inputs=[session_id],
187
- outputs=[webrtc_stream],
188
- ).then(
189
- fn=stop_streaming_ui,
190
- inputs=[session_id],
191
- outputs=ui_components,
192
  )
193
 
194
- # --- Sessions Table ---
195
- with gr.Accordion("📊 Sessions actives", open=False):
196
  sessions_table = gr.DataFrame(
197
  headers=["session_id", "file", "start_time", "status"],
198
  interactive=False,
199
  wrap=True,
200
- label="Utilisateurs connectés",
201
  max_height=200,
202
  )
203
 
204
- # ✅ Timer pour mise à jour automatique du tableau
205
  timer = gr.Timer(3.0)
206
  timer.tick(fn=get_active_sessions, outputs=sessions_table)
207
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
208
 
209
  if __name__ == "__main__":
210
  demo.queue(max_size=50, api_open=False).launch(show_api=False, debug=True)
 
6
  from pydub import AudioSegment
7
  import time
8
  import os
9
+ import json
10
  import spaces
11
  from app.utils import generate_coturn_config
 
 
12
  from app.session_utils import (
13
  generate_session_id,
14
  register_session,
 
17
  stop_file_path,
18
  create_stop_flag,
19
  clear_stop_flag,
20
+ reset_active_sessions,
21
  )
22
 
23
+ # Reset active sessions at startup
24
+ reset_active_sessions()
25
+
26
  EXAMPLE_FILES = ["data/bonjour.wav", "data/bonjour2.wav"]
27
  DEFAULT_FILE = EXAMPLE_FILES[0]
28
 
29
 
 
30
  @spaces.GPU
31
  def read_and_stream_audio(filepath_to_stream: str, session_id: str):
32
+ """Stream audio chunks for a specific session."""
 
 
 
33
  stop_file = stop_file_path(session_id)
34
  logging.debug(f"[{session_id}] read_and_stream_audio() started with file: {filepath_to_stream}")
35
 
 
39
  filepath_to_stream = DEFAULT_FILE
40
  logging.warning(f"[{session_id}] Using default file: {DEFAULT_FILE}")
41
  else:
42
+ logging.error(f"[{session_id}] Default file missing. Aborting.")
43
  return
44
 
 
45
  clear_stop_flag(session_id)
 
 
46
  register_session(session_id, filepath_to_stream)
47
+ progress_path = f"/tmp/progress_{session_id}.json"
48
 
49
  try:
50
  segment = AudioSegment.from_file(filepath_to_stream)
51
  chunk_ms = 1000
52
+ total_chunks = len(segment) // chunk_ms + 1
53
+ logging.info(f"[{session_id}] Streaming {total_chunks} chunks...")
54
 
55
+ for i, chunk in enumerate(segment[::chunk_ms], start=1):
56
  if os.path.exists(stop_file):
57
+ logging.info(f"[{session_id}] Stop flag detected at chunk {i}. Stopping.")
58
  clear_stop_flag(session_id)
59
  break
60
 
61
  iter_start = time.perf_counter()
62
+ logging.debug(f"[{session_id}] Sending chunk {i}/{total_chunks}...")
63
+
64
+ progress_data = {"value": i / total_chunks, "text": f"Streaming... ({i}/{total_chunks})"}
65
+ with open(progress_path, "w") as f:
66
+ json.dump(progress_data, f)
67
 
68
  output_chunk = (
69
  chunk.frame_rate,
 
72
  yield output_chunk
73
 
74
  process_ms = (time.perf_counter() - iter_start) * 1000
75
+ time.sleep(max((chunk_ms / 1000.0) - (process_ms / 1000.0) - 0.1, 0.01))
76
+
77
+ with open(progress_path, "w") as f:
78
+ json.dump({"value": 1.0, "text": "Streaming completed ✅"}, f)
79
 
80
+ logging.info(f"[{session_id}] Stream completed successfully.")
81
 
82
  except asyncio.CancelledError:
83
  logging.info(f"[{session_id}] Stream cancelled by user.")
84
  raise
85
  except Exception as e:
86
+ logging.error(f"[{session_id}] Stream error: {e}", exc_info=True)
87
  raise
88
  finally:
89
  unregister_session(session_id)
90
  clear_stop_flag(session_id)
91
+ if os.path.exists(progress_path):
92
+ os.remove(progress_path)
93
+ logging.debug(f"[{session_id}] Stream closed.")
94
 
95
 
 
96
  def stop_streaming(session_id: str):
97
  create_stop_flag(session_id)
98
  logging.info(f"[{session_id}] Stop button clicked → stop flag created.")
99
  return None
100
 
101
 
102
+ def get_session_progress(session_id: str):
103
+ """Read progress data from /tmp/progress_<session_id>.json."""
104
+ progress_path = f"/tmp/progress_{session_id}.json"
105
+ if not os.path.exists(progress_path):
106
+ return 0.0, ""
107
+ try:
108
+ with open(progress_path, "r") as f:
109
+ data = json.load(f)
110
+ return data.get("value", 0.0), data.get("text", "")
111
+ except Exception as e:
112
+ logging.error(f"[{session_id}] Progress read error: {e}")
113
+ return 0.0, ""
114
+
115
+
116
  with gr.Blocks(theme=gr.themes.Soft()) as demo:
117
+ gr.Markdown("## 🎧 WebRTC Audio Streamer (Multi-user)\nEach user controls their own audio stream with live progress.")
 
 
 
118
 
 
119
  session_id = gr.State(value=generate_session_id)
120
  active_filepath = gr.State(value=DEFAULT_FILE)
121
 
122
+ with gr.Row(equal_height=True):
123
+ with gr.Column(elem_id="column_source", scale=1):
124
+ with gr.Group(elem_id="centered_content"):
125
+ main_audio = gr.Audio(
126
+ label="Audio Source",
127
+ sources=["upload", "microphone"],
128
+ type="filepath",
129
+ value=DEFAULT_FILE,
130
+ )
131
+
132
+ progress_bar = gr.Slider(
133
+ label="Streaming Progress",
134
+ minimum=0,
135
+ maximum=1,
136
+ value=0,
137
+ step=0.01,
138
+ interactive=False,
139
+ visible=False,
140
+ )
141
+
142
+ progress_text = gr.Textbox(label="Stream Status", interactive=False, visible=False)
143
+
144
+ with gr.Row():
145
+ with gr.Column(scale=1, min_width=0):
146
+ start_button = gr.Button("▶️ Start Streaming", variant="primary")
147
+ with gr.Column(scale=1, min_width=0):
148
+ stop_button = gr.Button("⏹️ Stop Streaming", variant="stop", interactive=False)
149
+
150
  with gr.Column():
151
  webrtc_stream = WebRTC(
152
+ label="Audio Stream",
153
  mode="receive",
154
  modality="audio",
155
  rtc_configuration=generate_coturn_config(),
156
  visible=True,
 
157
  )
158
 
 
 
 
 
 
 
 
 
159
  def set_new_file(filepath):
160
  return filepath if filepath else DEFAULT_FILE
161
 
162
  main_audio.change(fn=set_new_file, inputs=[main_audio], outputs=[active_filepath])
163
  main_audio.stop_recording(fn=set_new_file, inputs=[main_audio], outputs=[active_filepath])
164
 
 
165
  def start_streaming_ui(session_id):
166
  logging.debug(f"[{session_id}] UI: Start clicked → disabling controls.")
167
  return {
168
  start_button: gr.Button(interactive=False),
169
  stop_button: gr.Button(interactive=True),
170
  main_audio: gr.Audio(visible=False),
171
+ progress_bar: gr.Slider(value=0, visible=True),
172
+ progress_text: gr.Textbox(value="Streaming started...", visible=False),
173
  }
174
 
175
  def stop_streaming_ui(session_id):
 
178
  start_button: gr.Button(interactive=True),
179
  stop_button: gr.Button(interactive=False),
180
  main_audio: gr.Audio(
181
+ label="Audio Source",
182
  sources=["upload", "microphone"],
183
  type="filepath",
184
  value=DEFAULT_FILE,
185
  visible=True,
186
  ),
187
+ progress_bar: gr.Slider(value=0, visible=False),
188
+ progress_text: gr.Textbox(value="", visible=False),
189
  }
190
 
191
+ ui_components = [start_button, stop_button, main_audio, progress_bar, progress_text]
192
 
 
193
  webrtc_stream.stream(
194
  fn=read_and_stream_audio,
195
  inputs=[active_filepath, session_id],
 
199
  concurrency_limit=20,
200
  )
201
 
202
+ start_button.click(fn=start_streaming_ui, inputs=[session_id], outputs=ui_components)
203
+ stop_button.click(fn=stop_streaming, inputs=[session_id], outputs=[webrtc_stream]).then(
204
+ fn=stop_streaming_ui, inputs=[session_id], outputs=ui_components
 
 
 
 
 
 
 
 
 
 
 
 
205
  )
206
 
207
+ with gr.Accordion("📊 Active Sessions", open=False):
 
208
  sessions_table = gr.DataFrame(
209
  headers=["session_id", "file", "start_time", "status"],
210
  interactive=False,
211
  wrap=True,
212
+ label="Connected Users",
213
  max_height=200,
214
  )
215
 
 
216
  timer = gr.Timer(3.0)
217
  timer.tick(fn=get_active_sessions, outputs=sessions_table)
218
 
219
+ progress_timer = gr.Timer(1.0)
220
+ progress_timer.tick(fn=get_session_progress, inputs=[session_id], outputs=[progress_bar, progress_text])
221
+
222
+ custom_css = """
223
+ #column_source {
224
+ display: flex;
225
+ flex-direction: column;
226
+ justify-content: center;
227
+ align-items: center;
228
+ gap: 1rem;
229
+ margin-top: auto;
230
+ margin-bottom: auto;
231
+ }
232
+ #column_source .gr-row {
233
+ padding-top: 12px;
234
+ padding-bottom: 12px;
235
+ }
236
+ """
237
+ demo.css = custom_css
238
 
239
  if __name__ == "__main__":
240
  demo.queue(max_size=50, api_open=False).launch(show_api=False, debug=True)
app/session_utils.py CHANGED
@@ -7,17 +7,27 @@ from app.logger_config import logger as logging
7
  ACTIVE_SESSIONS_FILE = "/tmp/active_sessions.json"
8
 
9
 
10
- # --- ID Management ---
 
 
 
 
 
 
 
 
 
 
 
11
  def generate_session_id() -> str:
12
- """Génère un identifiant unique de session."""
13
  sid = str(uuid.uuid4())
14
  logging.debug(f"[{sid}] New session created.")
15
  return sid
16
 
17
 
18
- # --- Active Session Registry ---
19
  def register_session(session_id: str, filepath: str):
20
- """Ajoute une session au registre."""
21
  data = {}
22
  if os.path.exists(ACTIVE_SESSIONS_FILE):
23
  with open(ACTIVE_SESSIONS_FILE, "r") as f:
@@ -36,11 +46,11 @@ def register_session(session_id: str, filepath: str):
36
  with open(ACTIVE_SESSIONS_FILE, "w") as f:
37
  json.dump(data, f)
38
 
39
- logging.debug(f"[{session_id}] Registered session in active_sessions.json.")
40
 
41
 
42
  def unregister_session(session_id: str):
43
- """Retire une session du registre."""
44
  if not os.path.exists(ACTIVE_SESSIONS_FILE):
45
  return
46
 
@@ -51,13 +61,13 @@ def unregister_session(session_id: str):
51
  data.pop(session_id)
52
  with open(ACTIVE_SESSIONS_FILE, "w") as f:
53
  json.dump(data, f)
54
- logging.debug(f"[{session_id}] Unregistered session.")
55
  except Exception as e:
56
  logging.error(f"[{session_id}] Error unregistering session: {e}")
57
 
58
 
59
  def get_active_sessions():
60
- """Retourne les sessions actives sous forme de tableau pour le DataFrame."""
61
  if not os.path.exists(ACTIVE_SESSIONS_FILE):
62
  return []
63
 
@@ -65,29 +75,28 @@ def get_active_sessions():
65
  with open(ACTIVE_SESSIONS_FILE, "r") as f:
66
  data = json.load(f)
67
 
68
- rows = []
69
- for session in data.values():
70
- rows.append([
71
- session.get("session_id", ""),
72
- session.get("file", ""),
73
- session.get("start_time", ""),
74
- session.get("status", ""),
75
- ])
 
76
  return rows
77
-
78
  except Exception as e:
79
- logging.error(f"Erreur lecture sessions actives: {e}")
80
  return []
81
 
82
 
83
- # --- Stop Flag Management ---
84
  def stop_file_path(session_id: str) -> str:
85
- """Retourne le chemin du fichier stop d'une session."""
86
  return f"/tmp/stream_stop_flag_{session_id}.txt"
87
 
88
 
89
  def create_stop_flag(session_id: str):
90
- """Crée le fichier de stop pour cette session."""
91
  path = stop_file_path(session_id)
92
  with open(path, "w") as f:
93
  f.write("1")
@@ -95,7 +104,7 @@ def create_stop_flag(session_id: str):
95
 
96
 
97
  def clear_stop_flag(session_id: str):
98
- """Supprime le fichier de stop s'il existe."""
99
  path = stop_file_path(session_id)
100
  if os.path.exists(path):
101
  os.remove(path)
 
7
  ACTIVE_SESSIONS_FILE = "/tmp/active_sessions.json"
8
 
9
 
10
+ def reset_active_sessions():
11
+ """Removes or clears the active sessions file at startup."""
12
+ try:
13
+ if os.path.exists(ACTIVE_SESSIONS_FILE):
14
+ os.remove(ACTIVE_SESSIONS_FILE)
15
+ logging.info("Active sessions file reset at startup.")
16
+ else:
17
+ logging.debug("No active sessions found to reset.")
18
+ except Exception as e:
19
+ logging.error(f"Error resetting active sessions: {e}")
20
+
21
+
22
  def generate_session_id() -> str:
23
+ """Generates a unique session ID."""
24
  sid = str(uuid.uuid4())
25
  logging.debug(f"[{sid}] New session created.")
26
  return sid
27
 
28
 
 
29
  def register_session(session_id: str, filepath: str):
30
+ """Registers a new session."""
31
  data = {}
32
  if os.path.exists(ACTIVE_SESSIONS_FILE):
33
  with open(ACTIVE_SESSIONS_FILE, "r") as f:
 
46
  with open(ACTIVE_SESSIONS_FILE, "w") as f:
47
  json.dump(data, f)
48
 
49
+ logging.debug(f"[{session_id}] Session registered in active_sessions.json.")
50
 
51
 
52
  def unregister_session(session_id: str):
53
+ """Removes a session from the registry."""
54
  if not os.path.exists(ACTIVE_SESSIONS_FILE):
55
  return
56
 
 
61
  data.pop(session_id)
62
  with open(ACTIVE_SESSIONS_FILE, "w") as f:
63
  json.dump(data, f)
64
+ logging.debug(f"[{session_id}] Session unregistered.")
65
  except Exception as e:
66
  logging.error(f"[{session_id}] Error unregistering session: {e}")
67
 
68
 
69
  def get_active_sessions():
70
+ """Returns active sessions as a list of rows for the DataFrame."""
71
  if not os.path.exists(ACTIVE_SESSIONS_FILE):
72
  return []
73
 
 
75
  with open(ACTIVE_SESSIONS_FILE, "r") as f:
76
  data = json.load(f)
77
 
78
+ rows = [
79
+ [
80
+ s.get("session_id", ""),
81
+ s.get("file", ""),
82
+ s.get("start_time", ""),
83
+ s.get("status", ""),
84
+ ]
85
+ for s in data.values()
86
+ ]
87
  return rows
 
88
  except Exception as e:
89
+ logging.error(f"Error reading active sessions: {e}")
90
  return []
91
 
92
 
 
93
  def stop_file_path(session_id: str) -> str:
94
+ """Returns the stop-flag file path for a given session."""
95
  return f"/tmp/stream_stop_flag_{session_id}.txt"
96
 
97
 
98
  def create_stop_flag(session_id: str):
99
+ """Creates a stop-flag file for this session."""
100
  path = stop_file_path(session_id)
101
  with open(path, "w") as f:
102
  f.write("1")
 
104
 
105
 
106
  def clear_stop_flag(session_id: str):
107
+ """Deletes the stop-flag file if it exists."""
108
  path = stop_file_path(session_id)
109
  if os.path.exists(path):
110
  os.remove(path)