Spaces:
Running
on
Zero
Running
on
Zero
| import numpy as np | |
| from app.logger_config import ( | |
| logger as logging, | |
| DEBUG | |
| ) | |
| from app.interfaces import IStreamingSpeechEngine,IVoiceActivityEngine | |
| from dataclasses import dataclass | |
| class StreamingAudioProcessorConfig: | |
| """Configuration settings for the StreamingAudioProcessor.""" | |
| read_size: int = 8000 | |
| silence_threshold_chunks: int = 2 | |
| sample_rate: int = 16000 | |
| # Add other streaming-related parameters here | |
| # e.g., vad_padding_chunks: int = 0 | |
| class StreamingAudioProcessor: | |
| """ | |
| Manages streaming transcription by combining a speech engine | |
| and a voice activity detector (VAD). | |
| This class handles internal audio buffering and VAD state. | |
| """ | |
| def __init__(self, speech_engine: IStreamingSpeechEngine, vad_engine :IVoiceActivityEngine, cfg : StreamingAudioProcessorConfig): | |
| """ | |
| Initializes the streaming processor. | |
| Args: | |
| speech_engine: The ASR speech engine (must have .transcribe_chunk() and .reset()). | |
| vad_engine: The VAD engine (returns True/False for a chunk). | |
| cfg: The configuration object for this processor. | |
| """ | |
| logging.info("Initializing StreamingAudioProcessor...") | |
| self.speech_engine = speech_engine | |
| self.vad_engine = vad_engine | |
| # Store config | |
| self.VAD_SAMPLE_RATE = cfg.sample_rate | |
| self.read_size = cfg.read_size | |
| self.SILENCE_THRESHOLD_CHUNKS = cfg.silence_threshold_chunks | |
| # Internal buffer state (Optimized: using numpy array) | |
| self.internal_buffer = np.array([], dtype='int16') | |
| # Internal logic state | |
| self.is_first_logical_chunk = True | |
| self.logical_chunk_size = self.speech_engine.context_samples.chunk | |
| self.initial_logical_chunk_size = self.speech_engine.context_samples.chunk + self.speech_engine.context_samples.right | |
| # Internal VAD state | |
| self.silent_chunks_count = 0 | |
| self.chunks_count = 0 | |
| logging.info(f" Config: VAD Sample Rate={self.VAD_SAMPLE_RATE}Hz") | |
| logging.info(f" Config: Physical Read Size={self.read_size} samples") | |
| logging.info(f" Config: Silence Threshold={self.SILENCE_THRESHOLD_CHUNKS} chunks") | |
| logging.info(f" Config: Initial Logical Chunk={self.initial_logical_chunk_size} samples") | |
| logging.info(f" Config: Subsequent Logical Chunk={self.logical_chunk_size} samples") | |
| def _append_to_buffer(self, chunk_np, asr_chunk_len): | |
| """ | |
| Appends a NumPy chunk to the internal buffer and returns a logical chunk if ready. | |
| (Optimized to use numpy concatenation). | |
| """ | |
| logging.debug(f"Appending {len(chunk_np)} samples to internal buffer (current size: {len(self.internal_buffer)}).") | |
| self.internal_buffer = np.concatenate((self.internal_buffer, chunk_np)) | |
| if len(self.internal_buffer) >= asr_chunk_len: | |
| asr_signal_chunk = self.internal_buffer[:asr_chunk_len] | |
| self.internal_buffer = self.internal_buffer[asr_chunk_len:] | |
| logging.debug(f"Extracted logical chunk of {len(asr_signal_chunk)} samples. Buffer remaining: {len(self.internal_buffer)}.") | |
| return asr_signal_chunk | |
| else: | |
| logging.debug(f"Buffer size ({len(self.internal_buffer)}) < target ({asr_chunk_len}). Holding.") | |
| return None | |
| def _flush_and_reset(self): | |
| """ | |
| Flushes the remaining buffer to the transcriber, resets the state, | |
| and returns the last transcribed text. | |
| """ | |
| if len(self.internal_buffer) > 0: | |
| # Buffer is already a numpy array | |
| final_segment_chunk = self.internal_buffer | |
| logging.info(f"Flushing segment remainder of {len(final_segment_chunk)} samples.") | |
| for seg, new_text in self.speech_engine.transcribe_chunk(final_segment_chunk, is_last_chunk=True) : | |
| yield new_text | |
| else: | |
| # Buffer is empty, but send a silent "flush" | |
| # to force the transcriber to finalize its internal state. | |
| logging.info("Buffer empty, sending silent flush to finalize segment.") | |
| flush_chunk = np.zeros(self.logical_chunk_size, dtype='int16') | |
| for seg, new_text in self.speech_engine.transcribe_chunk(flush_chunk, is_last_chunk=True) : | |
| yield new_text | |
| # Full state reset | |
| logging.debug("Resetting speech engine state...") | |
| self.speech_engine.reset() # Resets the speech engine (decoder state) | |
| logging.debug("Resetting internal buffer and VAD state.") | |
| self.internal_buffer = np.array([], dtype='int16') # Reset buffer | |
| self.is_first_logical_chunk = True | |
| self.silent_chunks_count = 0 | |
| yield "" | |
| def process_chunk(self, chunk: np.ndarray): | |
| """ | |
| Processes a single physical chunk (e.g., 8000 samples). | |
| Manages VAD, buffering, and transcription. | |
| Args: | |
| chunk (np.ndarray): The audio chunk (int16). | |
| Returns: | |
| list: A list of new transcribed text segments. | |
| (Often empty, may contain one or more segments). | |
| """ | |
| new_text_segments = [] | |
| self.chunks_count += 1 | |
| logging.debug(f"--- Processing Physical Chunk {self.chunks_count} ---") | |
| # --- 1. VAD Logic --- | |
| has_speech = self.vad_engine(chunk) | |
| logging.debug(f"VAD result: {'SPEECH' if has_speech else 'SILENCE'}") | |
| if has_speech: | |
| self.silent_chunks_count = 0 | |
| else: | |
| self.silent_chunks_count += 1 | |
| logging.debug(f"Silent chunks count: {self.silent_chunks_count}/{self.SILENCE_THRESHOLD_CHUNKS}") | |
| silence_reset = self.silent_chunks_count >= self.SILENCE_THRESHOLD_CHUNKS | |
| # --- 2. Buffering & Transcription Logic --- | |
| target_size = self.initial_logical_chunk_size if self.is_first_logical_chunk else self.logical_chunk_size | |
| asr_chunk_np = self._append_to_buffer(chunk, target_size) # Now returns np.ndarray or None | |
| if asr_chunk_np is not None: | |
| logging.debug(f"Sending logical chunk (size: {len(asr_chunk_np)}) to speech engine...") | |
| for seg, new_text in self.speech_engine.transcribe_chunk(asr_chunk_np, is_last_chunk=False) : | |
| logging.info(f"Received new text segment: '{new_text}'") | |
| new_text_segments.append(new_text) | |
| yield new_text | |
| else : | |
| yield "" | |
| self.is_first_logical_chunk = False | |
| # --- 3. VAD Reset Logic --- | |
| if silence_reset and not self.is_first_logical_chunk: | |
| logging.info(f"\n[VAD RESET: SILENCE detected ({self.silent_chunks_count} empty chunks) at {(self.chunks_count * (self.read_size/self.VAD_SAMPLE_RATE)):.2f}s]") | |
| # Flush the buffer, reset state, and get final text | |
| for reset_text in self._flush_and_reset() : | |
| logging.info(f"Received final reset text: '{reset_text}'") | |
| new_text_segments.append(reset_text) | |
| yield reset_text | |
| else : | |
| yield "" | |
| yield "" | |
| def finalize_stream(self): | |
| """ | |
| Must be called at the very end of the stream (after the loop breaks). | |
| Flushes anything remaining in the buffer. | |
| """ | |
| logging.info("Finalizing stream. Flushing final buffer...") | |
| for reset_text in self._flush_and_reset() : | |
| logging.info(f"Received final flushed text: '{reset_text}'") | |
| yield reset_text | |