ContactSearchAssistant / voice_processing_service.py
Muhammed Essam
Initial commit: Voice Assistant demo
8ef276c
# voice_processing_service.py
import logging
import whisper
import os
import tempfile
from typing import Dict, Any, Optional
from pathlib import Path
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class VoiceProcessor:
"""
Service for processing voice queries with speech-to-text and translation.
Features:
- Speech-to-text using OpenAI Whisper
- Automatic language detection
- Arabic-to-English translation
- Supports 99+ languages
- Works offline
Whisper Model Sizes:
- tiny: 39M params, ~1GB RAM, fast but less accurate
- base: 74M params, ~1GB RAM, balanced (RECOMMENDED for quick start)
- small: 244M params, ~2GB RAM, good accuracy
- medium: 769M params, ~5GB RAM, better accuracy
- large: 1550M params, ~10GB RAM, best accuracy
"""
def __init__(self, model_size: str = "base"):
"""
Initialize the voice processing service.
Args:
model_size: Whisper model to use. Options:
- "tiny" (39M) - Fast, less accurate
- "base" (74M) - Balanced, recommended for development
- "small" (244M) - Good accuracy
- "medium" (769M) - Better accuracy
- "large" (1550M) - Best accuracy, slowest
"""
logger.info(f"Loading Whisper model: {model_size}")
logger.info("This may take a few minutes on first run (downloading model)...")
# Load Whisper model
# This downloads the model on first run
self.model = whisper.load_model(model_size)
self.model_size = model_size
logger.info(f"✓ Whisper model '{model_size}' loaded successfully")
logger.info(f"Supported languages: 99+ (auto-detected)")
def transcribe_audio(
self,
audio_path: str,
language: Optional[str] = None
) -> Dict[str, Any]:
"""
Transcribe audio file in its original language.
Args:
audio_path: Path to audio file (mp3, wav, m4a, etc.)
language: Optional language code (e.g., "en", "ar"). If None, auto-detect.
Returns:
Dictionary with transcription results:
{
"text": "transcribed text",
"language": "en",
"language_name": "English",
"confidence": 0.95
}
"""
logger.info(f"Transcribing audio: {audio_path}")
# Transcribe with Whisper
result = self.model.transcribe(
audio_path,
language=language,
fp16=False # Use fp32 for better compatibility
)
transcription = {
"text": result["text"].strip(),
"language": result["language"],
"language_name": self._get_language_name(result["language"]),
"confidence": self._calculate_confidence(result)
}
logger.info(f"✓ Transcribed: '{transcription['text'][:100]}...'")
logger.info(f" Language: {transcription['language_name']} ({transcription['language']})")
logger.info(f" Confidence: {transcription['confidence']:.2f}")
return transcription
def translate_to_english(self, audio_path: str) -> Dict[str, Any]:
"""
Transcribe audio and translate to English (if not already English).
This is optimized for the use case where you always want English output,
regardless of the input language.
Args:
audio_path: Path to audio file
Returns:
Dictionary with translation results:
{
"original_text": "النص الأصلي",
"english_text": "translated text",
"original_language": "ar",
"original_language_name": "Arabic",
"was_translated": true
}
"""
logger.info(f"Processing audio for English output: {audio_path}")
# First, transcribe in original language to detect it
original = self.model.transcribe(audio_path, fp16=False)
# Then translate to English
translated = self.model.transcribe(
audio_path,
task="translate", # This translates to English
fp16=False
)
result = {
"original_text": original["text"].strip(),
"english_text": translated["text"].strip(),
"original_language": original["language"],
"original_language_name": self._get_language_name(original["language"]),
"was_translated": original["language"] != "en"
}
if result["was_translated"]:
logger.info(f"✓ Detected {result['original_language_name']}, translated to English")
logger.info(f" Original: '{result['original_text'][:100]}...'")
logger.info(f" English: '{result['english_text'][:100]}...'")
else:
logger.info(f"✓ Already in English, no translation needed")
return result
def process_voice_query(self, audio_path: str) -> Dict[str, Any]:
"""
Complete pipeline: transcribe, translate if needed, return query text.
This is the main method for the voice assistant use case.
Args:
audio_path: Path to audio file
Returns:
Dictionary ready for division extraction:
{
"query": "english text for processing",
"original_text": "original text if different",
"language": "ar",
"language_name": "Arabic",
"was_translated": true,
"audio_duration": 5.2
}
"""
logger.info(f"Processing voice query: {audio_path}")
# Get audio duration
audio_info = whisper.load_audio(audio_path)
duration = len(audio_info) / whisper.audio.SAMPLE_RATE
# Translate to English (works for all languages)
result = self.translate_to_english(audio_path)
return {
"query": result["english_text"], # Always English for processing
"original_text": result["original_text"],
"language": result["original_language"],
"language_name": result["original_language_name"],
"was_translated": result["was_translated"],
"audio_duration": round(duration, 2)
}
def _get_language_name(self, lang_code: str) -> str:
"""Get full language name from code."""
language_names = {
"en": "English",
"ar": "Arabic",
"es": "Spanish",
"fr": "French",
"de": "German",
"zh": "Chinese",
"ja": "Japanese",
"ko": "Korean",
"ru": "Russian",
"pt": "Portuguese",
"it": "Italian",
"nl": "Dutch",
"tr": "Turkish",
"pl": "Polish",
"uk": "Ukrainian",
"vi": "Vietnamese",
"th": "Thai",
"hi": "Hindi",
"ur": "Urdu",
# Add more as needed
}
return language_names.get(lang_code, lang_code.upper())
def _calculate_confidence(self, whisper_result: Dict) -> float:
"""
Calculate confidence score from Whisper result.
Whisper doesn't directly provide confidence, so we estimate it
based on available metrics.
"""
# If segments are available, average their probabilities
if "segments" in whisper_result and whisper_result["segments"]:
avg_logprob = sum(s.get("avg_logprob", -1.0) for s in whisper_result["segments"])
avg_logprob /= len(whisper_result["segments"])
# Convert log probability to approximate confidence (0-1)
# logprob ranges from -inf to 0, typically -2 to 0 for good transcriptions
confidence = max(0.0, min(1.0, (avg_logprob + 2.0) / 2.0))
return round(confidence, 2)
# Default confidence
return 0.85
def save_uploaded_audio(self, audio_bytes: bytes, filename: str) -> str:
"""
Save uploaded audio file to temporary location.
Args:
audio_bytes: Audio file bytes
filename: Original filename
Returns:
Path to saved file
"""
# Create temp directory if it doesn't exist
temp_dir = Path(tempfile.gettempdir()) / "voice_assistant_uploads"
temp_dir.mkdir(exist_ok=True)
# Save file
file_extension = Path(filename).suffix
temp_file = temp_dir / f"upload_{os.urandom(8).hex()}{file_extension}"
temp_file.write_bytes(audio_bytes)
logger.info(f"Saved uploaded audio to: {temp_file}")
return str(temp_file)
def save_audio_array(self, audio_data, sample_rate: int) -> str:
"""
Save audio numpy array to temporary WAV file (for Gradio integration).
Args:
audio_data: Audio data as numpy array
sample_rate: Sample rate of the audio
Returns:
Path to saved WAV file
"""
import numpy as np
import scipy.io.wavfile as wavfile
# Create temp directory if it doesn't exist
temp_dir = Path(tempfile.gettempdir()) / "voice_assistant_uploads"
temp_dir.mkdir(exist_ok=True)
# Save as WAV file
temp_file = temp_dir / f"gradio_{os.urandom(8).hex()}.wav"
# Ensure audio_data is in the correct format
if isinstance(audio_data, np.ndarray):
# Normalize to int16 if needed
if audio_data.dtype == np.float32 or audio_data.dtype == np.float64:
audio_data = (audio_data * 32767).astype(np.int16)
wavfile.write(str(temp_file), sample_rate, audio_data)
logger.info(f"Saved Gradio audio to: {temp_file}")
return str(temp_file)
def cleanup_temp_file(self, file_path: str):
"""Delete temporary audio file."""
try:
if os.path.exists(file_path):
os.remove(file_path)
logger.info(f"Cleaned up temp file: {file_path}")
except Exception as e:
logger.warning(f"Failed to cleanup temp file {file_path}: {e}")