canary_aed_streaming / app /streaming_audio_processor.py
Archime's picture
rename somme functions
799a0f6
raw
history blame
7.66 kB
import numpy as np
from app.logger_config import (
logger as logging,
DEBUG
)
from app.interfaces import IStreamingSpeechEngine,IVoiceActivityEngine
from dataclasses import dataclass
@dataclass
class StreamingAudioProcessorConfig:
"""Configuration settings for the StreamingAudioProcessor."""
read_size: int = 8000
silence_threshold_chunks: int = 2
sample_rate: int = 16000
# Add other streaming-related parameters here
# e.g., vad_padding_chunks: int = 0
class StreamingAudioProcessor:
"""
Manages streaming transcription by combining a speech engine
and a voice activity detector (VAD).
This class handles internal audio buffering and VAD state.
"""
def __init__(self, speech_engine: IStreamingSpeechEngine, vad_engine :IVoiceActivityEngine, cfg : StreamingAudioProcessorConfig):
"""
Initializes the streaming processor.
Args:
speech_engine: The ASR speech engine (must have .transcribe_chunk() and .reset()).
vad_engine: The VAD engine (returns True/False for a chunk).
cfg: The configuration object for this processor.
"""
logging.info("Initializing StreamingAudioProcessor...")
self.speech_engine = speech_engine
self.vad_engine = vad_engine
# Store config
self.VAD_SAMPLE_RATE = cfg.sample_rate
self.read_size = cfg.read_size
self.SILENCE_THRESHOLD_CHUNKS = cfg.silence_threshold_chunks
# Internal buffer state (Optimized: using numpy array)
self.internal_buffer = np.array([], dtype='int16')
# Internal logic state
self.is_first_logical_chunk = True
self.logical_chunk_size = self.speech_engine.context_samples.chunk
self.initial_logical_chunk_size = self.speech_engine.context_samples.chunk + self.speech_engine.context_samples.right
# Internal VAD state
self.silent_chunks_count = 0
self.chunks_count = 0
logging.info(f" Config: VAD Sample Rate={self.VAD_SAMPLE_RATE}Hz")
logging.info(f" Config: Physical Read Size={self.read_size} samples")
logging.info(f" Config: Silence Threshold={self.SILENCE_THRESHOLD_CHUNKS} chunks")
logging.info(f" Config: Initial Logical Chunk={self.initial_logical_chunk_size} samples")
logging.info(f" Config: Subsequent Logical Chunk={self.logical_chunk_size} samples")
def _append_to_buffer(self, chunk_np, asr_chunk_len):
"""
Appends a NumPy chunk to the internal buffer and returns a logical chunk if ready.
(Optimized to use numpy concatenation).
"""
logging.debug(f"Appending {len(chunk_np)} samples to internal buffer (current size: {len(self.internal_buffer)}).")
self.internal_buffer = np.concatenate((self.internal_buffer, chunk_np))
if len(self.internal_buffer) >= asr_chunk_len:
asr_signal_chunk = self.internal_buffer[:asr_chunk_len]
self.internal_buffer = self.internal_buffer[asr_chunk_len:]
logging.debug(f"Extracted logical chunk of {len(asr_signal_chunk)} samples. Buffer remaining: {len(self.internal_buffer)}.")
return asr_signal_chunk
else:
logging.debug(f"Buffer size ({len(self.internal_buffer)}) < target ({asr_chunk_len}). Holding.")
return None
def _flush_and_reset(self):
"""
Flushes the remaining buffer to the transcriber, resets the state,
and returns the last transcribed text.
"""
if len(self.internal_buffer) > 0:
# Buffer is already a numpy array
final_segment_chunk = self.internal_buffer
logging.info(f"Flushing segment remainder of {len(final_segment_chunk)} samples.")
for seg, new_text in self.speech_engine.transcribe_chunk(final_segment_chunk, is_last_chunk=True) :
yield new_text
else:
# Buffer is empty, but send a silent "flush"
# to force the transcriber to finalize its internal state.
logging.info("Buffer empty, sending silent flush to finalize segment.")
flush_chunk = np.zeros(self.logical_chunk_size, dtype='int16')
for seg, new_text in self.speech_engine.transcribe_chunk(flush_chunk, is_last_chunk=True) :
yield new_text
# Full state reset
logging.debug("Resetting speech engine state...")
self.speech_engine.reset() # Resets the speech engine (decoder state)
logging.debug("Resetting internal buffer and VAD state.")
self.internal_buffer = np.array([], dtype='int16') # Reset buffer
self.is_first_logical_chunk = True
self.silent_chunks_count = 0
yield ""
def process_chunk(self, chunk: np.ndarray):
"""
Processes a single physical chunk (e.g., 8000 samples).
Manages VAD, buffering, and transcription.
Args:
chunk (np.ndarray): The audio chunk (int16).
Returns:
list: A list of new transcribed text segments.
(Often empty, may contain one or more segments).
"""
new_text_segments = []
self.chunks_count += 1
logging.debug(f"--- Processing Physical Chunk {self.chunks_count} ---")
# --- 1. VAD Logic ---
has_speech = self.vad_engine(chunk)
logging.debug(f"VAD result: {'SPEECH' if has_speech else 'SILENCE'}")
if has_speech:
self.silent_chunks_count = 0
else:
self.silent_chunks_count += 1
logging.debug(f"Silent chunks count: {self.silent_chunks_count}/{self.SILENCE_THRESHOLD_CHUNKS}")
silence_reset = self.silent_chunks_count >= self.SILENCE_THRESHOLD_CHUNKS
# --- 2. Buffering & Transcription Logic ---
target_size = self.initial_logical_chunk_size if self.is_first_logical_chunk else self.logical_chunk_size
asr_chunk_np = self._append_to_buffer(chunk, target_size) # Now returns np.ndarray or None
if asr_chunk_np is not None:
logging.debug(f"Sending logical chunk (size: {len(asr_chunk_np)}) to speech engine...")
for seg, new_text in self.speech_engine.transcribe_chunk(asr_chunk_np, is_last_chunk=False) :
logging.info(f"Received new text segment: '{new_text}'")
new_text_segments.append(new_text)
yield new_text
else :
yield ""
self.is_first_logical_chunk = False
# --- 3. VAD Reset Logic ---
if silence_reset and not self.is_first_logical_chunk:
logging.info(f"\n[VAD RESET: SILENCE detected ({self.silent_chunks_count} empty chunks) at {(self.chunks_count * (self.read_size/self.VAD_SAMPLE_RATE)):.2f}s]")
# Flush the buffer, reset state, and get final text
for reset_text in self._flush_and_reset() :
logging.info(f"Received final reset text: '{reset_text}'")
new_text_segments.append(reset_text)
yield reset_text
else :
yield ""
yield ""
def finalize_stream(self):
"""
Must be called at the very end of the stream (after the loop breaks).
Flushes anything remaining in the buffer.
"""
logging.info("Finalizing stream. Flushing final buffer...")
for reset_text in self._flush_and_reset() :
logging.info(f"Received final flushed text: '{reset_text}'")
yield reset_text