canary_aed_streaming / app /silero_vad_engine.py
Archime's picture
add nemo_asr and silero_vad Engine
11c4a5a
raw
history blame
7.7 kB
import os
import subprocess
import torch
import numpy as np
import onnxruntime
import warnings
from app.interfaces import IVoiceActivityEngine
from app.logger_config import (
logger as logging,
DEBUG
)
class VoiceActivityDetection():
def __init__(self, force_onnx_cpu=True):
logging.info("Initializing VoiceActivityDetection...")
path = self.download()
opts = onnxruntime.SessionOptions()
opts.log_severity_level = 3 # Suppress ONNX runtime's own logs
opts.inter_op_num_threads = 1
opts.intra_op_num_threads = 1
try:
if force_onnx_cpu and 'CPUExecutionProvider' in onnxruntime.get_available_providers():
self.session = onnxruntime.InferenceSession(path, providers=['CPUExecutionProvider'], sess_options=opts)
logging.info("ONNX VAD session created with CPUExecutionProvider.")
else:
self.session = onnxruntime.InferenceSession(path, providers=['CUDAExecutionProvider'], sess_options=opts)
logging.info("ONNX VAD session created with CUDAExecutionProvider.")
except Exception as e:
logging.critical(f"Failed to create ONNX InferenceSession: {e}", exc_info=True)
raise
self.reset_states()
if '16k' in path:
logging.warning('This VAD model supports only 16000 sampling rate!')
self.sample_rates = [16000]
else:
logging.info("VAD model supports 8000Hz and 16000Hz.")
self.sample_rates = [8000, 16000]
def _validate_input(self, x, sr: int):
if x.dim() == 1:
x = x.unsqueeze(0)
if x.dim() > 2:
logging.error(f"Too many dimensions for input audio chunk: {x.dim()}")
raise ValueError(f"Too many dimensions for input audio chunk {x.dim()}")
if sr != 16000 and (sr % 16000 == 0):
step = sr // 16000
x = x[:,::step]
sr = 16000
logging.debug(f"Downsampled input audio to 16000Hz from {sr}Hz.")
if sr not in self.sample_rates:
logging.error(f"Unsupported sampling rate: {sr}. Supported: {self.sample_rates}")
raise ValueError(f"Supported sampling rates: {self.sample_rates} (or multiply of 16000)")
return x, sr
def reset_states(self, batch_size=1):
logging.debug(f"Resetting VAD states for batch_size: {batch_size}")
self._state = torch.zeros((2, batch_size, 128)).float()
self._context = torch.zeros(0)
self._last_sr = 0
self._last_batch_size = 0
def __call__(self, x, sr: int):
x, sr = self._validate_input(x, sr)
num_samples = 512 if sr == 16000 else 256
if x.shape[-1] != num_samples:
logging.error(f"Invalid audio chunk size: {x.shape[-1]}. Expected {num_samples} for {sr}Hz.")
raise ValueError(f"Provided number of samples is {x.shape[-1]} (Supported values: 256 for 8000 sample rate, 512 for 16000)")
batch_size = x.shape[0]
context_size = 64 if sr == 16000 else 32
if not self._last_batch_size:
logging.debug("First call, resetting states.")
self.reset_states(batch_size)
if (self._last_sr) and (self._last_sr != sr):
logging.warning(f"Sample rate changed ({self._last_sr}Hz -> {sr}Hz). Resetting states.")
self.reset_states(batch_size)
if (self._last_batch_size) and (self._last_batch_size != batch_size):
logging.warning(f"Batch size changed ({self._last_batch_size} -> {batch_size}). Resetting states.")
self.reset_states(batch_size)
if not len(self._context):
self._context = torch.zeros(batch_size, context_size)
x = torch.cat([self._context, x], dim=1)
if sr in [8000, 16000]:
ort_inputs = {'input': x.numpy(), 'state': self._state.numpy(), 'sr': np.array(sr, dtype='int64')}
ort_outs = self.session.run(None, ort_inputs)
out, state = ort_outs
self._state = torch.from_numpy(state)
else:
# This should be caught by _validate_input, but as a safeguard:
logging.critical(f"Unexpected sample rate in VAD __call__: {sr}")
raise ValueError()
self._context = x[..., -context_size:]
self._last_sr = sr
self._last_batch_size = batch_size
out = torch.from_numpy(out)
return out
def audio_forward(self, x, sr: int):
outs = []
x, sr = self._validate_input(x, sr)
self.reset_states()
num_samples = 512 if sr == 16000 else 256
if x.shape[1] % num_samples:
pad_num = num_samples - (x.shape[1] % num_samples)
logging.debug(f"Padding audio input with {pad_num} samples.")
x = torch.nn.functional.pad(x, (0, pad_num), 'constant', value=0.0)
for i in range(0, x.shape[1], num_samples):
wavs_batch = x[:, i:i+num_samples]
out_chunk = self.__call__(wavs_batch, sr)
outs.append(out_chunk)
stacked = torch.cat(outs, dim=1)
return stacked.cpu()
@staticmethod
def download(model_url="https://github.com/snakers4/silero-vad/raw/v5.0/files/silero_vad.onnx"):
target_dir = os.path.expanduser("~/.cache/silero_vad/")
os.makedirs(target_dir, exist_ok=True)
model_filename = os.path.join(target_dir, "silero_vad.onnx")
if not os.path.exists(model_filename):
logging.info(f"Downloading VAD model to {model_filename}...")
try:
subprocess.run(["wget", "-O", model_filename, model_url], check=True)
logging.info("VAD model downloaded successfully.")
except subprocess.CalledProcessError as e:
logging.critical(f"Failed to download the model using wget: {e}")
raise
else:
logging.info(f"VAD model already exists at {model_filename}.")
return model_filename
class Silero_Vad_Engine(IVoiceActivityEngine):
def __init__(self, threshold :float =0.5, frame_rate: int =16000):
"""
Initializes the Silero_Vad_Engine with a voice activity detection model and a threshold.
Args:
threshold (float, optional): The probability threshold for detecting voice activity. Defaults to 0.5.
"""
logging.info(f"Initializing Silero_Vad_Engine with threshold: {threshold} and frame_rate: {frame_rate}Hz.")
self.model = VoiceActivityDetection()
self.threshold = threshold
self.frame_rate = frame_rate
def __call__(self, audio_frame):
"""
Determines if the given audio frame contains speech by comparing the detected speech probability against
the threshold.
Args:
audio_frame (np.ndarray): The audio frame to be analyzed for voice activity. It is expected to be a
NumPy array of audio samples.
Returns:
bool: True if the speech probability exceeds the threshold, indicating the presence of voice activity;
False otherwise.
"""
# Convert frame to tensor
audio_tensor = torch.from_numpy(audio_frame.copy())
# Get speech probabilities
speech_probs = self.model.audio_forward(audio_tensor, self.frame_rate)[0]
# Check against threshold
is_speech = torch.any(speech_probs > self.threshold).item()
logging.debug(f"VAD check result: {is_speech} (Max prob: {torch.max(speech_probs).item():.4f})")
return is_speech