Archime commited on
Commit
4c19345
·
1 Parent(s): f482080

add start and stop task

Browse files
Files changed (4) hide show
  1. app.py +52 -37
  2. app/session_utils.py +100 -76
  3. app/stream_utils.py +83 -5
  4. app/ui_utils.py +1 -1
app.py CHANGED
@@ -18,9 +18,11 @@ from app.utils import (
18
  from app.session_utils import (
19
  on_load,
20
  on_unload,
21
- get_active_sessions,
22
- register_session,
23
- reset_all_active_sessions,
 
 
24
  )
25
 
26
  from app.ui_utils import (
@@ -37,27 +39,28 @@ from app.ui_utils import (
37
  from app.stream_utils import (
38
  generate_coturn_config,
39
  read_and_stream_audio,
40
- stop_streaming
 
41
  )
42
 
43
  # --------------------------------------------------------
44
  # Initialization
45
  # --------------------------------------------------------
46
- reset_all_active_sessions()
47
 
48
  theme,css_style = get_custom_theme()
49
 
50
  with gr.Blocks(theme=theme, css=css_style) as demo:
51
  session_hash = gr.State()
52
  session_hash_box = gr.Textbox(label="Session ID", interactive=False, visible=DEBUG)
53
- with gr.Accordion("📊 Active Sessions", open=True ,visible=DEBUG):
54
  sessions_table = gr.DataFrame(
55
  headers=["session_hash", "file", "start_time", "status"],
56
  interactive=False,
57
  wrap=True,
58
  max_height=200,
59
  )
60
- gr.Timer(3.0).tick(fn=get_active_sessions, outputs=sessions_table)
61
 
62
  demo.load(fn=on_load, inputs=None, outputs=[session_hash, session_hash_box])
63
  demo.unload(on_unload)
@@ -226,17 +229,17 @@ with gr.Blocks(theme=theme, css=css_style) as demo:
226
  interactive=False,
227
  visible=False
228
  )
 
229
 
230
  transcription_output = gr.Textbox(
231
  label="Transcription / Translation Result",
232
- placeholder="The output text will appear here...",
233
  lines=10,
234
  interactive=False,
235
  visible=True
236
  )
237
 
238
  start_task_button = gr.Button("Start Task", visible=True)
239
- stop_stream_button = gr.Button("Stop Streaming", visible=False)
240
  stop_task_button = gr.Button("Stop Task", visible=False)
241
 
242
  stop_stream_button.click(
@@ -245,14 +248,50 @@ with gr.Blocks(theme=theme, css=css_style) as demo:
245
  outputs=[stop_streaming_flags],
246
  )
247
 
248
- def stop_task_fn():
249
- return "Task stopped by user."
 
 
 
 
250
 
251
  stop_task_button.click(
252
  fn=stop_task_fn,
253
- inputs=None,
254
  outputs=transcription_output
255
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
256
 
257
  ui_components = [
258
  start_stream_button, stop_stream_button,
@@ -267,30 +306,6 @@ with gr.Blocks(theme=theme, css=css_style) as demo:
267
  concurrency_limit=10,
268
  )
269
 
270
- # def start_transcription(
271
- # session_hash, stop_streaming_flags,
272
- # task_type, lang_source, lang_target,
273
- # chunk_secs, left_context_secs, right_context_secs,
274
- # streaming_policy, alignatt_thr, waitk_lagging,
275
- # exclude_sink_frames, xatt_scores_layer, hallucinations_detector
276
- # ):
277
- # if task_type == "Translation":
278
- # return f"Translation completed ({lang_source} → {lang_target})\n\nTranslated text:\nLorem ipsum..."
279
- # else:
280
- # return f"Transcription completed ({lang_source})\n\nTranscribed text:\nHello everyone, this is a test audio stream..."
281
-
282
- # start_task_button.click(
283
- # fn=start_transcription,
284
- # inputs=[
285
- # session_hash, stop_streaming_flags,
286
- # task_type, lang_source, lang_target,
287
- # chunk_secs, left_context_secs, right_context_secs,
288
- # streaming_policy, alignatt_thr, waitk_lagging,
289
- # exclude_sink_frames, xatt_scores_layer, hallucinations_detector
290
- # ],
291
- # outputs=transcription_output
292
- # )
293
-
294
  # def toggle_task_buttons():
295
  # return (
296
  # gr.update(visible=False),
@@ -307,4 +322,4 @@ with gr.Blocks(theme=theme, css=css_style) as demo:
307
 
308
 
309
  if __name__ == "__main__":
310
- demo.queue(max_size=10, api_open=False).launch(show_api=False,show_error=True, debug=True)
 
18
  from app.session_utils import (
19
  on_load,
20
  on_unload,
21
+ get_active_sessions_hash,
22
+ register_session_hash,
23
+ reset_all_active_sessions_hash,
24
+ get_active_task_flag_file,
25
+
26
  )
27
 
28
  from app.ui_utils import (
 
39
  from app.stream_utils import (
40
  generate_coturn_config,
41
  read_and_stream_audio,
42
+ stop_streaming,
43
+ task
44
  )
45
 
46
  # --------------------------------------------------------
47
  # Initialization
48
  # --------------------------------------------------------
49
+ reset_all_active_sessions_hash()
50
 
51
  theme,css_style = get_custom_theme()
52
 
53
  with gr.Blocks(theme=theme, css=css_style) as demo:
54
  session_hash = gr.State()
55
  session_hash_box = gr.Textbox(label="Session ID", interactive=False, visible=DEBUG)
56
+ with gr.Accordion("📊 Active Sessions Hash", open=True ,visible=DEBUG):
57
  sessions_table = gr.DataFrame(
58
  headers=["session_hash", "file", "start_time", "status"],
59
  interactive=False,
60
  wrap=True,
61
  max_height=200,
62
  )
63
+ gr.Timer(3.0).tick(fn=get_active_sessions_hash, outputs=sessions_table)
64
 
65
  demo.load(fn=on_load, inputs=None, outputs=[session_hash, session_hash_box])
66
  demo.unload(on_unload)
 
229
  interactive=False,
230
  visible=False
231
  )
232
+ stop_stream_button = gr.Button("Stop Streaming", visible=False)
233
 
234
  transcription_output = gr.Textbox(
235
  label="Transcription / Translation Result",
236
+ placeholder="Waiting for output...",
237
  lines=10,
238
  interactive=False,
239
  visible=True
240
  )
241
 
242
  start_task_button = gr.Button("Start Task", visible=True)
 
243
  stop_task_button = gr.Button("Stop Task", visible=False)
244
 
245
  stop_stream_button.click(
 
248
  outputs=[stop_streaming_flags],
249
  )
250
 
251
+ def stop_task_fn(session_hash):
252
+ transcribe_active = get_active_task_flag_file(session_hash)
253
+ if os.path.exists(transcribe_active):
254
+ os.remove(transcribe_active)
255
+ yield "Task stopped by user."
256
+
257
 
258
  stop_task_button.click(
259
  fn=stop_task_fn,
260
+ inputs=session_hash,
261
  outputs=transcription_output
262
  )
263
+ # task(session_hash)
264
+
265
+ def start_transcription(
266
+ session_hash, stop_streaming_flags,
267
+ task_type, lang_source, lang_target,
268
+ chunk_secs, left_context_secs, right_context_secs,
269
+ streaming_policy, alignatt_thr, waitk_lagging,
270
+ exclude_sink_frames, xatt_scores_layer, hallucinations_detector
271
+ ):
272
+ """Stream transcription or translation results in real time."""
273
+
274
+ accumulated = ""
275
+ yield f"Starting {task_type.lower()}...\n\n",gr.update(visible=False),gr.update(visible=True)
276
+
277
+ # Boucle sur le générateur de `task()`
278
+ for msg in task(session_hash):
279
+ accumulated += msg
280
+ yield accumulated,gr.update(visible=False),gr.update(visible=True)
281
+
282
+ yield accumulated + "\nDone.",gr.update(visible=True),gr.update(visible=False)
283
+
284
+ start_task_button.click(
285
+ fn=start_transcription,
286
+ inputs=[
287
+ session_hash, stop_streaming_flags,
288
+ task_type, lang_source, lang_target,
289
+ chunk_secs, left_context_secs, right_context_secs,
290
+ streaming_policy, alignatt_thr, waitk_lagging,
291
+ exclude_sink_frames, xatt_scores_layer, hallucinations_detector
292
+ ],
293
+ outputs=[transcription_output,start_task_button,stop_task_button]
294
+ )
295
 
296
  ui_components = [
297
  start_stream_button, stop_stream_button,
 
306
  concurrency_limit=10,
307
  )
308
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
309
  # def toggle_task_buttons():
310
  # return (
311
  # gr.update(visible=False),
 
322
 
323
 
324
  if __name__ == "__main__":
325
+ demo.queue(max_size=10, api_open=False).launch(show_api=False,show_error=True, debug=DEBUG)
app/session_utils.py CHANGED
@@ -8,25 +8,27 @@ import gradio as gr
8
  # TMP_DIR = "/tmp/canary_aed_streaming"
9
 
10
  TMP_DIR = os.getenv("TMP_DIR", "/tmp/canary_aed_streaming")
11
- ACTIVE_SESSIONS_FILE = os.path.join(TMP_DIR, "active_sessions.json")
 
 
12
 
13
 
14
  # ---------------------------
15
  # Helper to manage the JSON
16
  # ---------------------------
17
- def _read_sessions():
18
- if not os.path.exists(ACTIVE_SESSIONS_FILE):
19
  return {}
20
  try:
21
- with open(ACTIVE_SESSIONS_FILE, "r") as f:
22
  return json.load(f)
23
  except Exception:
24
  return {}
25
 
26
 
27
- def _write_sessions(data):
28
- os.makedirs(os.path.dirname(ACTIVE_SESSIONS_FILE), exist_ok=True)
29
- with open(ACTIVE_SESSIONS_FILE, "w") as f:
30
  json.dump(data, f, indent=2)
31
 
32
 
@@ -35,8 +37,8 @@ def _write_sessions(data):
35
  # ---------------------------
36
  def on_load(request: gr.Request):
37
  """Called when a new visitor opens the app."""
38
- session_hash = request.session_hash # ✅ Directly use session_hash as unique ID
39
- sessions = _read_sessions()
40
 
41
  sessions[session_hash] = {
42
  "session_hash": session_hash,
@@ -45,8 +47,8 @@ def on_load(request: gr.Request):
45
  "status": "active",
46
  }
47
 
48
- _write_sessions(sessions)
49
- logging.info(f"[{session_hash}] Session registered (on_load).")
50
 
51
  return session_hash, session_hash # can be used as gr.State + display
52
 
@@ -57,16 +59,16 @@ def on_load(request: gr.Request):
57
  def on_unload(request: gr.Request):
58
  """Called when the visitor closes or refreshes the app."""
59
  sid = request.session_hash
60
- sessions = _read_sessions()
61
 
62
  if sid in sessions:
63
  sessions.pop(sid)
64
- _write_sessions(sessions)
65
- remove_session_data(sid)
66
- unregister_session(sid)
67
- logging.info(f"[{sid}] Session removed (on_unload).")
68
  else:
69
- logging.info(f"[{sid}] No active session found to remove.")
70
 
71
  def ensure_tmp_dir():
72
  """Ensures the base temporary directory exists."""
@@ -76,47 +78,66 @@ def ensure_tmp_dir():
76
  logging.error(f"Failed to create tmp directory {TMP_DIR}: {e}")
77
 
78
 
79
- def reset_all_active_sessions():
80
- """Removes all temporary session files and folders at startup."""
81
  ensure_tmp_dir()
82
 
83
  try:
84
- # --- Remove active sessions file ---
85
- if os.path.exists(ACTIVE_SESSIONS_FILE):
86
- os.remove(ACTIVE_SESSIONS_FILE)
87
- logging.info("Active sessions file reset at startup.")
88
  else:
89
- logging.debug("No active sessions file found to reset.")
90
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
91
  except Exception as e:
92
- logging.error(f"Error resetting active sessions: {e}")
93
 
94
- def remove_session_data(session_id: str):
95
- """Removes all temporary files and data related to a specific session."""
96
- if not session_id:
97
- logging.warning("reset_session() called without a valid session_id.")
98
  return
99
 
100
  try:
101
- # --- Remove session from active_sessions.json ---
102
- if os.path.exists(ACTIVE_SESSIONS_FILE):
103
  try:
104
- with open(ACTIVE_SESSIONS_FILE, "r") as f:
105
  data = json.load(f)
106
- if session_id in data:
107
- data.pop(session_id)
108
- with open(ACTIVE_SESSIONS_FILE, "w") as f:
109
  json.dump(data, f, indent=2)
110
- logging.debug(f"[{session_id}] Removed from active_sessions.json.")
111
  except Exception as e:
112
- logging.warning(f"[{session_id}] Failed to update active_sessions.json: {e}")
113
 
114
- # --- Define all possible session file patterns ---
115
  files_to_remove = [
116
- f"progress_{session_id}.json",
117
- # f"stream_stop_flag_{session_id}.txt",
118
- f"transcribe_stop_flag_{session_id}.txt",
119
- f"transcribe_active_{session_id}.txt",
120
  ]
121
 
122
  # --- Remove all temporary files ---
@@ -125,78 +146,79 @@ def remove_session_data(session_id: str):
125
  if os.path.exists(path):
126
  try:
127
  os.remove(path)
128
- logging.debug(f"[{session_id}] Removed file: {fname}")
129
  except Exception as e:
130
- logging.warning(f"[{session_id}] Failed to remove file {fname}: {e}")
131
 
132
  # --- Remove chunk folder if exists ---
133
- chunk_dir = os.path.join(TMP_DIR, f"chunks_{session_id}")
134
  if os.path.isdir(chunk_dir):
135
  try:
136
  shutil.rmtree(chunk_dir)
137
- logging.debug(f"[{session_id}] Removed chunk folder: chunks_{session_id}")
138
  except Exception as e:
139
- logging.warning(f"[{session_id}] Failed to remove chunk folder: {e}")
140
 
141
- logging.info(f"[{session_id}] Session fully reset.")
142
 
143
  except Exception as e:
144
- logging.error(f"[{session_id}] Error during reset_session: {e}")
 
145
  def generate_session_id() -> str:
146
- """Generates a unique session ID."""
147
  sid = str(uuid.uuid4())
148
- logging.debug(f"[{sid}] New session created.")
149
  return sid
150
 
151
 
152
- def register_session(session_id: str, filepath: str):
153
- """Registers a new session."""
154
  ensure_tmp_dir()
155
  data = {}
156
- if os.path.exists(ACTIVE_SESSIONS_FILE):
157
- with open(ACTIVE_SESSIONS_FILE, "r") as f:
158
  try:
159
  data = json.load(f)
160
  except Exception:
161
  data = {}
162
 
163
- data[session_id] = {
164
- "session_hash": session_id,
165
  "file": filepath,
166
  "start_time": datetime.utcnow().strftime("%H:%M:%S"),
167
  "status": "active",
168
  }
169
 
170
- with open(ACTIVE_SESSIONS_FILE, "w") as f:
171
  json.dump(data, f)
172
 
173
- logging.debug(f"[{session_id}] Session registered in active_sessions.json.")
174
 
175
 
176
- def unregister_session(session_id: str):
177
- """Removes a session from the registry."""
178
- if not os.path.exists(ACTIVE_SESSIONS_FILE):
179
  return
180
 
181
  try:
182
- with open(ACTIVE_SESSIONS_FILE, "r") as f:
183
  data = json.load(f)
184
- if session_id in data:
185
- data.pop(session_id)
186
- with open(ACTIVE_SESSIONS_FILE, "w") as f:
187
  json.dump(data, f)
188
- logging.debug(f"[{session_id}] Session unregistered.")
189
  except Exception as e:
190
- logging.error(f"[{session_id}] Error unregistering session: {e}")
191
 
192
 
193
- def get_active_sessions():
194
- """Returns active sessions as a list of rows for the DataFrame."""
195
- if not os.path.exists(ACTIVE_SESSIONS_FILE):
196
  return []
197
 
198
  try:
199
- with open(ACTIVE_SESSIONS_FILE, "r") as f:
200
  data = json.load(f)
201
 
202
  rows = [
@@ -210,11 +232,13 @@ def get_active_sessions():
210
  ]
211
  return rows
212
  except Exception as e:
213
- logging.error(f"Error reading active sessions: {e}")
214
  return []
215
 
216
 
 
 
217
 
218
 
219
-
220
-
 
8
  # TMP_DIR = "/tmp/canary_aed_streaming"
9
 
10
  TMP_DIR = os.getenv("TMP_DIR", "/tmp/canary_aed_streaming")
11
+ ACTIVE_SESSIONS_HASH_FILE = os.path.join(TMP_DIR, "active_sessions_hash.json")
12
+ ACTIVE_TASK_FLAG="task_active_"
13
+ NAME_FOLDER_CHUNKS="chunks_"
14
 
15
 
16
  # ---------------------------
17
  # Helper to manage the JSON
18
  # ---------------------------
19
+ def _read_sessions_hash():
20
+ if not os.path.exists(ACTIVE_SESSIONS_HASH_FILE):
21
  return {}
22
  try:
23
+ with open(ACTIVE_SESSIONS_HASH_FILE, "r") as f:
24
  return json.load(f)
25
  except Exception:
26
  return {}
27
 
28
 
29
+ def _write_sessions_hash(data):
30
+ os.makedirs(os.path.dirname(ACTIVE_SESSIONS_HASH_FILE), exist_ok=True)
31
+ with open(ACTIVE_SESSIONS_HASH_FILE, "w") as f:
32
  json.dump(data, f, indent=2)
33
 
34
 
 
37
  # ---------------------------
38
  def on_load(request: gr.Request):
39
  """Called when a new visitor opens the app."""
40
+ session_hash = request.session_hash
41
+ sessions = _read_sessions_hash()
42
 
43
  sessions[session_hash] = {
44
  "session_hash": session_hash,
 
47
  "status": "active",
48
  }
49
 
50
+ _write_sessions_hash(sessions)
51
+ logging.info(f"[{session_hash}] session_hash registered (on_load).")
52
 
53
  return session_hash, session_hash # can be used as gr.State + display
54
 
 
59
  def on_unload(request: gr.Request):
60
  """Called when the visitor closes or refreshes the app."""
61
  sid = request.session_hash
62
+ sessions = _read_sessions_hash()
63
 
64
  if sid in sessions:
65
  sessions.pop(sid)
66
+ _write_sessions_hash(sessions)
67
+ remove_session_hash_data(sid)
68
+ unregister_session_hash_hash(sid)
69
+ logging.info(f"[{sid}] session_hash removed (on_unload).")
70
  else:
71
+ logging.info(f"[{sid}] No active session_hash found to remove.")
72
 
73
  def ensure_tmp_dir():
74
  """Ensures the base temporary directory exists."""
 
78
  logging.error(f"Failed to create tmp directory {TMP_DIR}: {e}")
79
 
80
 
81
+ def reset_all_active_sessions_hash():
82
+ """Removes all temporary session_hash files and folders at startup."""
83
  ensure_tmp_dir()
84
 
85
  try:
86
+ # --- Remove active session_hashs file ---
87
+ if os.path.exists(ACTIVE_SESSIONS_HASH_FILE):
88
+ os.remove(ACTIVE_SESSIONS_HASH_FILE)
89
+ logging.info("Active session_hashs file reset at startup.")
90
  else:
91
+ logging.debug("No active session_hashs file found to reset.")
92
+ # --- Clean all flag files (stream + transcribe) ---
93
+ for f in os.listdir(TMP_DIR):
94
+ if (
95
+ f.startswith(f"{ACTIVE_TASK_FLAG}")
96
+ ) and f.endswith(".txt"):
97
+ path = os.path.join(TMP_DIR, f)
98
+ try:
99
+ os.remove(path)
100
+ logging.debug(f"Removed leftover flag file: {f}")
101
+ except Exception as e:
102
+ logging.warning(f"Failed to remove flag file {f}: {e}")
103
+
104
+ # --- Clean chunk directories ---
105
+ for name in os.listdir(TMP_DIR):
106
+ path = os.path.join(TMP_DIR, name)
107
+ if os.path.isdir(path) and name.startswith(f"{NAME_FOLDER_CHUNKS}"):
108
+ try:
109
+ shutil.rmtree(path)
110
+ logging.debug(f"Removed leftover chunk folder: {name}")
111
+ except Exception as e:
112
+ logging.warning(f"Failed to remove chunk folder {name}: {e}")
113
+
114
+ logging.info("Temporary session cleanup completed successfully.")
115
  except Exception as e:
116
+ logging.error(f"Error resetting active session_hashs: {e}")
117
 
118
+ def remove_session_hash_data(session_hash: str):
119
+ """Removes all temporary files and data related to a specific session_hash."""
120
+ if not session_hash:
121
+ logging.warning("reset_session() called without a valid session_hash.")
122
  return
123
 
124
  try:
125
+ # --- Remove session_hash from active_sessions.json ---
126
+ if os.path.exists(ACTIVE_SESSIONS_HASH_FILE):
127
  try:
128
+ with open(ACTIVE_SESSIONS_HASH_FILE, "r") as f:
129
  data = json.load(f)
130
+ if session_hash in data:
131
+ data.pop(session_hash)
132
+ with open(ACTIVE_SESSIONS_HASH_FILE, "w") as f:
133
  json.dump(data, f, indent=2)
134
+ logging.debug(f"[{session_hash}] Removed from {ACTIVE_SESSIONS_HASH_FILE}.")
135
  except Exception as e:
136
+ logging.warning(f"[{session_hash}] Failed to update {ACTIVE_SESSIONS_HASH_FILE}: {e}")
137
 
138
+ # --- Define all possible session_hash file patterns ---
139
  files_to_remove = [
140
+ get_active_task_flag_file(session_hash),
 
 
 
141
  ]
142
 
143
  # --- Remove all temporary files ---
 
146
  if os.path.exists(path):
147
  try:
148
  os.remove(path)
149
+ logging.debug(f"[{session_hash}] Removed file: {fname}")
150
  except Exception as e:
151
+ logging.warning(f"[{session_hash}] Failed to remove file {fname}: {e}")
152
 
153
  # --- Remove chunk folder if exists ---
154
+ chunk_dir = os.path.join(TMP_DIR, f"chunks_{session_hash}")
155
  if os.path.isdir(chunk_dir):
156
  try:
157
  shutil.rmtree(chunk_dir)
158
+ logging.debug(f"[{session_hash}] Removed chunk folder: chunks_{session_hash}")
159
  except Exception as e:
160
+ logging.warning(f"[{session_hash}] Failed to remove chunk folder: {e}")
161
 
162
+ logging.info(f"[{session_hash}] session_hash fully reset.")
163
 
164
  except Exception as e:
165
+ logging.error(f"[{session_hash}] Error during reset_session: {e}")
166
+
167
  def generate_session_id() -> str:
168
+ """Generates a unique session_hash ID."""
169
  sid = str(uuid.uuid4())
170
+ logging.debug(f"[{sid}] New session_hash created.")
171
  return sid
172
 
173
 
174
+ def register_session_hash(session_hash: str, filepath: str):
175
+ """Registers a new session_hash."""
176
  ensure_tmp_dir()
177
  data = {}
178
+ if os.path.exists(ACTIVE_SESSIONS_HASH_FILE):
179
+ with open(ACTIVE_SESSIONS_HASH_FILE, "r") as f:
180
  try:
181
  data = json.load(f)
182
  except Exception:
183
  data = {}
184
 
185
+ data[session_hash] = {
186
+ "session_hash": session_hash,
187
  "file": filepath,
188
  "start_time": datetime.utcnow().strftime("%H:%M:%S"),
189
  "status": "active",
190
  }
191
 
192
+ with open(ACTIVE_SESSIONS_HASH_FILE, "w") as f:
193
  json.dump(data, f)
194
 
195
+ logging.debug(f"[{session_hash}] session_hash registered in active_sessions.json.")
196
 
197
 
198
+ def unregister_session_hash_hash(session_hash: str):
199
+ """Removes a session_hash from the registry."""
200
+ if not os.path.exists(ACTIVE_SESSIONS_HASH_FILE):
201
  return
202
 
203
  try:
204
+ with open(ACTIVE_SESSIONS_HASH_FILE, "r") as f:
205
  data = json.load(f)
206
+ if session_hash in data:
207
+ data.pop(session_hash)
208
+ with open(ACTIVE_SESSIONS_HASH_FILE, "w") as f:
209
  json.dump(data, f)
210
+ logging.debug(f"[{session_hash}] session_hash unregistered.")
211
  except Exception as e:
212
+ logging.error(f"[{session_hash}] Error unregistering session_hash: {e}")
213
 
214
 
215
+ def get_active_sessions_hash():
216
+ """Returns active session_hashs as a list of rows for the DataFrame."""
217
+ if not os.path.exists(ACTIVE_SESSIONS_HASH_FILE):
218
  return []
219
 
220
  try:
221
+ with open(ACTIVE_SESSIONS_HASH_FILE, "r") as f:
222
  data = json.load(f)
223
 
224
  rows = [
 
232
  ]
233
  return rows
234
  except Exception as e:
235
+ logging.error(f"Error reading active session_hashs: {e}")
236
  return []
237
 
238
 
239
+ def get_active_task_flag_file(session_hash: str):
240
+ return os.path.join(TMP_DIR, f"{ACTIVE_TASK_FLAG}{session_hash}.txt")
241
 
242
 
243
+ def get_folder_chunks(session_hash: str):
244
+ return os.path.join(TMP_DIR, f"{NAME_FOLDER_CHUNKS}{session_hash}")
app/stream_utils.py CHANGED
@@ -14,14 +14,15 @@ import os
14
  import time
15
  import random
16
 
 
 
 
 
 
17
 
18
  # --------------------------------------------------------
19
  # Utility functions
20
  # --------------------------------------------------------
21
-
22
-
23
-
24
-
25
  def generate_coturn_config():
26
  """
27
  Génère une configuration Coturn complète avec authentification dynamique (use-auth-secret).
@@ -69,7 +70,7 @@ def read_and_stream_audio(filepath_to_stream: str, session_id: str, stop_streami
69
  if not filepath_to_stream or not os.path.exists(filepath_to_stream):
70
  yield from handle_stream_error(session_id, f"Audio file not found: {filepath_to_stream}", stop_streaming_flags)
71
  return
72
-
73
  try:
74
  segment = AudioSegment.from_file(filepath_to_stream)
75
  chunk_duration_ms = 1000
@@ -91,6 +92,16 @@ def read_and_stream_audio(filepath_to_stream: str, session_id: str, stop_streami
91
  logging.debug(f"[{session_id}] Sent chunk {i+1}/{total_chunks} ({progress}%).")
92
 
93
  time.sleep(1)
 
 
 
 
 
 
 
 
 
 
94
  # raise_function() # Optional injected test exception
95
 
96
  logging.info(f"[{session_id}] Audio streaming completed successfully.")
@@ -138,6 +149,73 @@ def _is_stop_requested(stop_streaming_flags: dict) -> bool:
138
 
139
 
140
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
141
  def stop_streaming(session_id: str, stop_streaming_flags: dict):
142
  """Trigger the stop flag for active streaming."""
143
  logging.info(f"[{session_id}] Stop button clicked — sending stop signal.")
 
14
  import time
15
  import random
16
 
17
+ from app.session_utils import (
18
+ get_active_task_flag_file,
19
+ get_stop_task_flag_file,
20
+ get_folder_chunks
21
+ )
22
 
23
  # --------------------------------------------------------
24
  # Utility functions
25
  # --------------------------------------------------------
 
 
 
 
26
  def generate_coturn_config():
27
  """
28
  Génère une configuration Coturn complète avec authentification dynamique (use-auth-secret).
 
70
  if not filepath_to_stream or not os.path.exists(filepath_to_stream):
71
  yield from handle_stream_error(session_id, f"Audio file not found: {filepath_to_stream}", stop_streaming_flags)
72
  return
73
+ transcribe_flag = get_active_task_flag_file(session_id)
74
  try:
75
  segment = AudioSegment.from_file(filepath_to_stream)
76
  chunk_duration_ms = 1000
 
92
  logging.debug(f"[{session_id}] Sent chunk {i+1}/{total_chunks} ({progress}%).")
93
 
94
  time.sleep(1)
95
+ # Save only if transcription is active
96
+ if os.path.exists(transcribe_flag) :
97
+ chunk_dir = get_folder_chunks(session_id)
98
+ if not os.path.exists(chunk_dir) :
99
+ os.makedirs(chunk_dir, exist_ok=True)
100
+ npz_path = os.path.join(chunk_dir, f"chunk_{i:05d}.npz")
101
+ chunk_array = np.array(chunk.get_array_of_samples(), dtype=np.int16)
102
+ np.savez_compressed(npz_path, data=chunk_array, rate=frame_rate)
103
+ logging.debug(f"[{session_id}] Saved chunk {i}/{total_chunks} (transcribe active)")
104
+
105
  # raise_function() # Optional injected test exception
106
 
107
  logging.info(f"[{session_id}] Audio streaming completed successfully.")
 
149
 
150
 
151
 
152
+ def task(session_id: str):
153
+ """Continuously read and delete .npz chunks while task is active."""
154
+ active_flag = get_active_task_flag_file(session_id)
155
+ with open(active_flag, "w") as f:
156
+ f.write("1")
157
+ chunk_dir = get_folder_chunks(session_id)
158
+ logging.info(f"[{session_id}] task started. {chunk_dir}")
159
+
160
+
161
+ try:
162
+ logging.info(f"[{session_id}] task loop started.")
163
+ yield f"Task started for session {session_id}\n\n"
164
+ while os.path.exists(active_flag):
165
+ if not os.path.exists(chunk_dir):
166
+ logging.warning(f"[{session_id}] No chunk directory found for task.")
167
+ yield "No audio chunks yet... waiting for stream.\n"
168
+ time.sleep(0.25)
169
+ continue
170
+ files = sorted(f for f in os.listdir(chunk_dir) if f.endswith(".npz"))
171
+ if not files:
172
+ time.sleep(0.25)
173
+ continue
174
+
175
+ for fname in files:
176
+ fpath = os.path.join(chunk_dir, fname)
177
+ try:
178
+ npz = np.load(fpath)
179
+ samples = npz["data"]
180
+ rate = int(npz["rate"])
181
+
182
+ text = f"Transcribed {fname}: {len(samples)} samples @ {rate}Hz"
183
+ yield f"{text}\n"
184
+ logging.debug(f"[{session_id}] {text}")
185
+
186
+ os.remove(fpath)
187
+ logging.debug(f"[{session_id}] Deleted processed chunk: {fname}")
188
+ except Exception as e:
189
+ logging.error(f"[{session_id}] Error processing {fname}: {e}")
190
+ yield f"Error processing {fname}: {e}\n"
191
+ continue
192
+
193
+ time.sleep(0.25)
194
+ # raise_function()
195
+ yield "\nTask stopped by user or stream ended.\n"
196
+ logging.info(f"[{session_id}] task loop ended (flag removed).")
197
+
198
+ except Exception as e:
199
+ logging.error(f"[{session_id}] task error: {e}", exc_info=True)
200
+ yield f"Unexpected error: {e}\n"
201
+ finally:
202
+ # active_flag = os.path.join(TMP_DIR, f"transcribe_active_{session_id}.txt")
203
+ if os.path.exists(active_flag):
204
+ os.remove(active_flag)
205
+ logging.info(f"[{session_id}] task stopped.")
206
+ try:
207
+ if os.path.exists(chunk_dir) and not os.listdir(chunk_dir):
208
+ os.rmdir(chunk_dir)
209
+ logging.debug(f"[{session_id}] Cleaned up empty chunk dir.")
210
+ except Exception as e:
211
+ logging.error(f"[{session_id}] Cleanup error: {e}")
212
+ yield "\nCleanup error: {e}"
213
+ logging.info(f"[{session_id}] Exiting task loop.")
214
+ yield "\nTask finished and cleaned up.\n"
215
+
216
+
217
+
218
+
219
  def stop_streaming(session_id: str, stop_streaming_flags: dict):
220
  """Trigger the stop flag for active streaming."""
221
  logging.info(f"[{session_id}] Stop button clicked — sending stop signal.")
app/ui_utils.py CHANGED
@@ -131,7 +131,7 @@ def handle_additional_outputs(webrtc_stream, msg):
131
  Update UI elements based on streaming progress or errors.
132
  Controls button states, audio visibility, and progress slider.
133
  """
134
- logging.debug(f"Additional output received: {msg}")
135
  # ui_components = [start_stream_button, stop_stream_button,go_to_task, audio_source_step, status_slider,walkthrough]
136
 
137
  progress = float(0)
 
131
  Update UI elements based on streaming progress or errors.
132
  Controls button states, audio visibility, and progress slider.
133
  """
134
+ # logging.debug(f"Additional output received: {msg}")
135
  # ui_components = [start_stream_button, stop_stream_button,go_to_task, audio_source_step, status_slider,walkthrough]
136
 
137
  progress = float(0)