Archime commited on
Commit
4f560c0
·
1 Parent(s): f76a38c

optimise read_and_stream_audio

Browse files
Files changed (1) hide show
  1. app/utils.py +88 -76
app/utils.py CHANGED
@@ -5,6 +5,7 @@ import asyncio
5
  import os
6
  import time
7
  import numpy as np
 
8
  import spaces
9
  import hmac
10
  import hashlib
@@ -29,6 +30,11 @@ from app.silero_vad_engine import Silero_Vad_Engine
29
  from app.streaming_audio_processor import StreamingAudioProcessor,StreamingAudioProcessorConfig
30
  import nemo.collections.asr as nemo_asr
31
  READ_SIZE=4000
 
 
 
 
 
32
 
33
  # --------------------------------------------------------
34
  # Utility functions
@@ -68,7 +74,7 @@ def generate_coturn_config():
68
 
69
 
70
 
71
- def read_and_stream_audio(filepath_to_stream: str, session_hash_code: str,read_size:int =8000, sample_rate:int =16000):
72
  """
73
  Read an audio file and stream it chunk by chunk (1s per chunk).
74
  Handles errors safely and reports structured messages to the client.
@@ -84,55 +90,104 @@ def read_and_stream_audio(filepath_to_stream: str, session_hash_code: str,read_s
84
  try:
85
  segment = AudioSegment.from_file(filepath_to_stream)
86
  chunk_duration_ms = int((read_size/sample_rate)*1000)
 
87
  total_chunks = len(segment) // chunk_duration_ms + 1
88
  start_streaming(session_hash_code)
89
- logging.info(f"[{session_hash_code}] Starting audio streaming {filepath_to_stream} ({total_chunks} chunks).")
90
 
91
- for i, chunk in enumerate(segment[::chunk_duration_ms]):
 
 
92
 
93
 
 
 
 
 
94
  frame_rate = chunk.frame_rate
95
- samples = np.array(chunk.get_array_of_samples()).reshape(1, -1)
 
 
 
 
 
 
 
 
96
  progress = round(((i + 1) / total_chunks) * 100, 2)
 
 
97
  if is_stop_requested(session_hash_code):
98
- logging.info(f"[{session_hash_code}] Stop signal received. Terminating stream.")
99
- yield ((frame_rate, samples), AdditionalOutputs({"stoped": True, "value": "STREAM_STOPED", "session_hash_code" : session_hash_code } ) )
 
 
 
 
100
  break
101
-
102
- yield ((frame_rate, samples), AdditionalOutputs({"progressed": True, "value": progress , "session_hash_code" : session_hash_code} ))
103
- logging.debug(f"[{session_hash_code}] Sent chunk {i+1}/{total_chunks} ({progress}%).")
104
-
105
- time.sleep(chunk_duration_ms/1000)
106
- # Save only if transcription is active
107
- if os.path.exists(task_active_flag) :
108
- chunk_dir = get_session_hashe_chunks_dir(session_hash_code)
109
- if not os.path.exists(chunk_dir) :
110
  os.makedirs(chunk_dir, exist_ok=True)
 
 
111
  npz_path = os.path.join(chunk_dir, f"chunk_{i:05d}.npz")
112
- chunk_array = np.array(chunk.get_array_of_samples(), dtype=np.int16)
113
- if os.path.exists(task_active_flag):
114
- np.savez_compressed(npz_path, data=chunk_array, rate=frame_rate)
115
- logging.debug(f"[{session_hash_code}] Saved chunk {i}/{total_chunks} (transcribe active) ({progress}%) ({npz_path}).")
116
 
117
- # raise_error() # Optional injected test exception
118
 
119
- logging.info(f"[{session_hash_code}] Audio streaming completed successfully.")
 
 
120
 
121
- except asyncio.CancelledError:
122
- yield from handle_stream_error(session_hash_code, "Streaming cancelled by user.")
123
- except FileNotFoundError as e:
124
- yield from handle_stream_error(session_hash_code, e)
125
  except Exception as e:
126
  yield from handle_stream_error(session_hash_code, e)
127
  finally:
128
  remove_active_stream_flag_file(session_hash_code)
129
- logging.info(f"[{session_hash_code}] Stop flag reset.")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
130
 
 
 
 
 
131
 
132
 
133
  # asr_model = nemo_asr.models.ASRModel.from_pretrained("nvidia/canary-1b-v2")
134
  asr_model = None
135
 
 
 
 
 
 
 
 
 
 
 
 
 
136
  @spaces.GPU
137
  def task_fake(session_hash_code: str,
138
  task_type, lang_source, lang_target,
@@ -143,23 +198,6 @@ def task_fake(session_hash_code: str,
143
  """Continuously read and delete .npz chunks while task is active."""
144
  global asr_model
145
  yield ("initializing the CanarySpeechEngine and Silero_Vad_Engine", "info", None)
146
- ### TODO
147
- ##-----------
148
- # conf = CanaryConfig.from_params(
149
- # task_type, SUPPORTED_LANGS_MAP.get(lang_source),SUPPORTED_LANGS_MAP.get(lang_target) ,
150
- # chunk_secs, left_context_secs, right_context_secs,
151
- # streaming_policy, alignatt_thr, waitk_lagging,
152
- # exclude_sink_frames, xatt_scores_layer, hallucinations_detector
153
- # )
154
-
155
- # canary_speech_engine = CanarySpeechEngine(asr_model,conf)
156
- # silero_vad_engine = Silero_Vad_Engine()
157
- # streaming_audio_processor_config = StreamingAudioProcessorConfig(
158
- # read_size=READ_SIZE,
159
- # silence_threshold_chunks=1
160
- # )
161
- # streamer = StreamingAudioProcessor(speech_engine=canary_speech_engine,vad_engine=silero_vad_engine,cfg=streaming_audio_processor_config)
162
- ##-----------
163
  yield ("initialized the CanarySpeechEngine and Silero_Vad_Engine", "info", None)
164
  yield (f"Task started for session {session_hash_code}", "info", None)
165
 
@@ -191,29 +229,22 @@ def task_fake(session_hash_code: str,
191
  npz = np.load(fpath)
192
  samples = npz["data"]
193
  rate = int(npz["rate"])
194
- ##-----------
195
- # new_texts = streamer.process_chunk(samples)
196
- # for text in new_texts:
197
- # print(text, end='', flush=True)
198
- # yield (text, "success", text)
199
- # logging.debug(f"[{session_hash_code}] {new_texts}")
200
- ##-----------
201
- ### TODO
202
  text = f"Transcribed {fname}: {len(samples)} samples @ {rate}Hz\n"
203
  yield (text, "success", fname)
204
  os.remove(fpath)
205
  logging.debug(f"[{session_hash_code}] Deleted processed chunk: {fname}")
 
 
 
 
 
206
  except Exception as e:
207
  logging.warning(f"[{session_hash_code}] Error processing {fname}: {e}")
208
  yield (f"Error processing {fname}: {e}", "warning", fname)
209
- continue
 
210
  time.sleep(0.1)
211
 
212
- # TODO
213
- ##-----------
214
- # final_text = streamer.finalize_stream()
215
- # yield (text, "success", final_text)
216
- ##-----------
217
  yield ("DONE", "done", None)
218
  logging.info(f"[{session_hash_code}] task loop ended (flag removed).")
219
 
@@ -338,25 +369,6 @@ def task(session_hash_code: str,
338
  yield ("Task finished and cleaned up.", "done", None)
339
 
340
 
341
- def handle_stream_error(session_hash_code: str, error: Exception):
342
- """
343
- Handle streaming errors:
344
- - Log the error
345
- - Send structured info to client
346
- - Reset stop flag
347
- """
348
- if isinstance(error, Exception):
349
- msg = f"{type(error).__name__}: {str(error)}"
350
- else:
351
- msg = str(error)
352
-
353
- logging.error(f"[{session_hash_code}] Streaming error: {msg}", exc_info=isinstance(error, Exception))
354
-
355
- remove_active_stream_flag_file(session_hash_code)
356
-
357
- yield ((16000,np.zeros(16000, dtype=np.float32).reshape(1, -1)), AdditionalOutputs({"errored": True, "value": msg, "session_hash_code" : session_hash_code}))
358
-
359
-
360
 
361
 
362
  # --- Decorator compatibility layer ---
 
5
  import os
6
  import time
7
  import numpy as np
8
+
9
  import spaces
10
  import hmac
11
  import hashlib
 
30
  from app.streaming_audio_processor import StreamingAudioProcessor,StreamingAudioProcessorConfig
31
  import nemo.collections.asr as nemo_asr
32
  READ_SIZE=4000
33
+ import gradio as gr
34
+ from typing import Generator
35
+ from typing import Generator, Tuple, Any, Optional
36
+ GradioAudioYield = Tuple[int, np.ndarray]
37
+ StreamYield = Generator[Tuple[GradioAudioYield, AdditionalOutputs], None, None]
38
 
39
  # --------------------------------------------------------
40
  # Utility functions
 
74
 
75
 
76
 
77
+ def read_and_stream_audio(filepath_to_stream: str, session_hash_code: str,read_size:int =8000, sample_rate:int =16000) -> StreamYield:
78
  """
79
  Read an audio file and stream it chunk by chunk (1s per chunk).
80
  Handles errors safely and reports structured messages to the client.
 
90
  try:
91
  segment = AudioSegment.from_file(filepath_to_stream)
92
  chunk_duration_ms = int((read_size/sample_rate)*1000)
93
+ total_duration_ms = len(segment)
94
  total_chunks = len(segment) // chunk_duration_ms + 1
95
  start_streaming(session_hash_code)
96
+ logging.info(f"[{session_hash_code}] Starting stream: {filepath_to_stream} ({total_chunks} chunks, {chunk_duration_ms}ms steps).")
97
 
98
+ chunk_dir = get_session_hashe_chunks_dir(session_hash_code)
99
+ ensure_dir_exists = False
100
+ for i, start_ms in enumerate(range(0, total_duration_ms, chunk_duration_ms)):
101
 
102
 
103
+
104
+
105
+ end_ms = min(start_ms + chunk_duration_ms, total_duration_ms)
106
+ chunk = segment[start_ms:end_ms]
107
  frame_rate = chunk.frame_rate
108
+ samples_int16 = np.array(chunk.get_array_of_samples(), dtype=np.int16)
109
+ samples_float = (samples_int16 / 32768.0).astype(np.float32)
110
+
111
+ # Gestion Mono vs Stéréo pour Gradio
112
+ if chunk.channels > 1:
113
+ samples_reshaped = samples_float.reshape(-1, chunk.channels)
114
+ else:
115
+ samples_reshaped = samples_float.reshape(1, -1)
116
+
117
  progress = round(((i + 1) / total_chunks) * 100, 2)
118
+ # Envoi au client
119
+
120
  if is_stop_requested(session_hash_code):
121
+ logging.info(f"[{session_hash_code}] Stop signal received.")
122
+ samples = np.array(chunk.get_array_of_samples()).reshape(1, -1)
123
+ yield (
124
+ (sample_rate, samples_reshaped),
125
+ AdditionalOutputs({"stoped": True, "value": "STREAM_STOPPED", "session_hash_code": session_hash_code})
126
+ )
127
  break
128
+ yield (
129
+ (frame_rate, samples_reshaped),
130
+ AdditionalOutputs({"progressed": True, "value": progress, "session_hash_code": session_hash_code})
131
+ )
132
+ if is_active_task(session_hash_code):
133
+ if not ensure_dir_exists:
 
 
 
134
  os.makedirs(chunk_dir, exist_ok=True)
135
+ ensure_dir_exists = True
136
+
137
  npz_path = os.path.join(chunk_dir, f"chunk_{i:05d}.npz")
138
+ # Compression activée, attention c'est lent (CPU intensif)
139
+ if is_active_task(session_hash_code):
140
+ np.savez_compressed(npz_path, data=samples_int16, rate=frame_rate)
141
+ logging.debug(f"[{session_hash_code}] Saved chunk {i} to {npz_path}")
142
 
143
+ time.sleep(chunk_duration_ms/1000)
144
 
145
+ raise_error() # Optional injected test exception
146
+
147
+ logging.info(f"[{session_hash_code}] Streaming completed.")
148
 
 
 
 
 
149
  except Exception as e:
150
  yield from handle_stream_error(session_hash_code, e)
151
  finally:
152
  remove_active_stream_flag_file(session_hash_code)
153
+ logging.info(f"[{session_hash_code}] Cleanup done.")
154
+
155
+
156
+
157
+ def handle_stream_error(session_hash_code: str, error: Exception):
158
+ """
159
+ Handle streaming errors:
160
+ - Log the error
161
+ - Send structured info to client
162
+ - Reset stop flag
163
+ """
164
+ msg = f"{type(error).__name__}: {str(error)}"
165
+ logging.error(f"[{session_hash_code}] Stream Error: {msg}", exc_info=True)
166
+
167
+ remove_active_stream_flag_file(session_hash_code)
168
+ empty_audio = np.zeros((1, 16000), dtype=np.float32)
169
 
170
+ yield (
171
+ (16000, empty_audio),
172
+ AdditionalOutputs({"errored": True, "value": msg, "session_hash_code": session_hash_code})
173
+ )
174
 
175
 
176
  # asr_model = nemo_asr.models.ASRModel.from_pretrained("nvidia/canary-1b-v2")
177
  asr_model = None
178
 
179
+ # @spaces.cache
180
+ # def load_model():
181
+ # logging.info("Chargement du modèle ASR/AST de NeMo...")
182
+ # # Remplacez par votre logique de chargement de modèle
183
+ # model = nemo_asr.models.EncDecRNNTModel.restore_from("path/to/model.nemo")
184
+ # logging.info("Modèle chargé.")
185
+ # return model
186
+
187
+ # # Chargez-le une seule fois au démarrage du script
188
+ # ASR_MODEL = load_model()
189
+
190
+
191
  @spaces.GPU
192
  def task_fake(session_hash_code: str,
193
  task_type, lang_source, lang_target,
 
198
  """Continuously read and delete .npz chunks while task is active."""
199
  global asr_model
200
  yield ("initializing the CanarySpeechEngine and Silero_Vad_Engine", "info", None)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
201
  yield ("initialized the CanarySpeechEngine and Silero_Vad_Engine", "info", None)
202
  yield (f"Task started for session {session_hash_code}", "info", None)
203
 
 
229
  npz = np.load(fpath)
230
  samples = npz["data"]
231
  rate = int(npz["rate"])
 
 
 
 
 
 
 
 
232
  text = f"Transcribed {fname}: {len(samples)} samples @ {rate}Hz\n"
233
  yield (text, "success", fname)
234
  os.remove(fpath)
235
  logging.debug(f"[{session_hash_code}] Deleted processed chunk: {fname}")
236
+ # raise_error()
237
+
238
+ except EOFError as e:
239
+ logging.warning(f"[{session_hash_code}] Error processing {fname}: {e}")
240
+ yield (f"EOFError processing {fname}: {e}", "warning", fname)
241
  except Exception as e:
242
  logging.warning(f"[{session_hash_code}] Error processing {fname}: {e}")
243
  yield (f"Error processing {fname}: {e}", "warning", fname)
244
+
245
+ # continue
246
  time.sleep(0.1)
247
 
 
 
 
 
 
248
  yield ("DONE", "done", None)
249
  logging.info(f"[{session_hash_code}] task loop ended (flag removed).")
250
 
 
369
  yield ("Task finished and cleaned up.", "done", None)
370
 
371
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
372
 
373
 
374
  # --- Decorator compatibility layer ---