Spaces:
Sleeping
Sleeping
| # voice_processing_service.py | |
| import logging | |
| import whisper | |
| import os | |
| import tempfile | |
| from typing import Dict, Any, Optional | |
| from pathlib import Path | |
| # Set up logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class VoiceProcessor: | |
| """ | |
| Service for processing voice queries with speech-to-text and translation. | |
| Features: | |
| - Speech-to-text using OpenAI Whisper | |
| - Automatic language detection | |
| - Arabic-to-English translation | |
| - Supports 99+ languages | |
| - Works offline | |
| Whisper Model Sizes: | |
| - tiny: 39M params, ~1GB RAM, fast but less accurate | |
| - base: 74M params, ~1GB RAM, balanced (RECOMMENDED for quick start) | |
| - small: 244M params, ~2GB RAM, good accuracy | |
| - medium: 769M params, ~5GB RAM, better accuracy | |
| - large: 1550M params, ~10GB RAM, best accuracy | |
| """ | |
| def __init__(self, model_size: str = "base"): | |
| """ | |
| Initialize the voice processing service. | |
| Args: | |
| model_size: Whisper model to use. Options: | |
| - "tiny" (39M) - Fast, less accurate | |
| - "base" (74M) - Balanced, recommended for development | |
| - "small" (244M) - Good accuracy | |
| - "medium" (769M) - Better accuracy | |
| - "large" (1550M) - Best accuracy, slowest | |
| """ | |
| logger.info(f"Loading Whisper model: {model_size}") | |
| logger.info("This may take a few minutes on first run (downloading model)...") | |
| # Load Whisper model | |
| # This downloads the model on first run | |
| self.model = whisper.load_model(model_size) | |
| self.model_size = model_size | |
| logger.info(f"✓ Whisper model '{model_size}' loaded successfully") | |
| logger.info(f"Supported languages: 99+ (auto-detected)") | |
| def transcribe_audio( | |
| self, | |
| audio_path: str, | |
| language: Optional[str] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Transcribe audio file in its original language. | |
| Args: | |
| audio_path: Path to audio file (mp3, wav, m4a, etc.) | |
| language: Optional language code (e.g., "en", "ar"). If None, auto-detect. | |
| Returns: | |
| Dictionary with transcription results: | |
| { | |
| "text": "transcribed text", | |
| "language": "en", | |
| "language_name": "English", | |
| "confidence": 0.95 | |
| } | |
| """ | |
| logger.info(f"Transcribing audio: {audio_path}") | |
| # Transcribe with Whisper | |
| result = self.model.transcribe( | |
| audio_path, | |
| language=language, | |
| fp16=False # Use fp32 for better compatibility | |
| ) | |
| transcription = { | |
| "text": result["text"].strip(), | |
| "language": result["language"], | |
| "language_name": self._get_language_name(result["language"]), | |
| "confidence": self._calculate_confidence(result) | |
| } | |
| logger.info(f"✓ Transcribed: '{transcription['text'][:100]}...'") | |
| logger.info(f" Language: {transcription['language_name']} ({transcription['language']})") | |
| logger.info(f" Confidence: {transcription['confidence']:.2f}") | |
| return transcription | |
| def translate_to_english(self, audio_path: str) -> Dict[str, Any]: | |
| """ | |
| Transcribe audio and translate to English (if not already English). | |
| This is optimized for the use case where you always want English output, | |
| regardless of the input language. | |
| Args: | |
| audio_path: Path to audio file | |
| Returns: | |
| Dictionary with translation results: | |
| { | |
| "original_text": "النص الأصلي", | |
| "english_text": "translated text", | |
| "original_language": "ar", | |
| "original_language_name": "Arabic", | |
| "was_translated": true | |
| } | |
| """ | |
| logger.info(f"Processing audio for English output: {audio_path}") | |
| # First, transcribe in original language to detect it | |
| original = self.model.transcribe(audio_path, fp16=False) | |
| # Then translate to English | |
| translated = self.model.transcribe( | |
| audio_path, | |
| task="translate", # This translates to English | |
| fp16=False | |
| ) | |
| result = { | |
| "original_text": original["text"].strip(), | |
| "english_text": translated["text"].strip(), | |
| "original_language": original["language"], | |
| "original_language_name": self._get_language_name(original["language"]), | |
| "was_translated": original["language"] != "en" | |
| } | |
| if result["was_translated"]: | |
| logger.info(f"✓ Detected {result['original_language_name']}, translated to English") | |
| logger.info(f" Original: '{result['original_text'][:100]}...'") | |
| logger.info(f" English: '{result['english_text'][:100]}...'") | |
| else: | |
| logger.info(f"✓ Already in English, no translation needed") | |
| return result | |
| def process_voice_query(self, audio_path: str) -> Dict[str, Any]: | |
| """ | |
| Complete pipeline: transcribe, translate if needed, return query text. | |
| This is the main method for the voice assistant use case. | |
| Args: | |
| audio_path: Path to audio file | |
| Returns: | |
| Dictionary ready for division extraction: | |
| { | |
| "query": "english text for processing", | |
| "original_text": "original text if different", | |
| "language": "ar", | |
| "language_name": "Arabic", | |
| "was_translated": true, | |
| "audio_duration": 5.2 | |
| } | |
| """ | |
| logger.info(f"Processing voice query: {audio_path}") | |
| # Get audio duration | |
| audio_info = whisper.load_audio(audio_path) | |
| duration = len(audio_info) / whisper.audio.SAMPLE_RATE | |
| # Translate to English (works for all languages) | |
| result = self.translate_to_english(audio_path) | |
| return { | |
| "query": result["english_text"], # Always English for processing | |
| "original_text": result["original_text"], | |
| "language": result["original_language"], | |
| "language_name": result["original_language_name"], | |
| "was_translated": result["was_translated"], | |
| "audio_duration": round(duration, 2) | |
| } | |
| def _get_language_name(self, lang_code: str) -> str: | |
| """Get full language name from code.""" | |
| language_names = { | |
| "en": "English", | |
| "ar": "Arabic", | |
| "es": "Spanish", | |
| "fr": "French", | |
| "de": "German", | |
| "zh": "Chinese", | |
| "ja": "Japanese", | |
| "ko": "Korean", | |
| "ru": "Russian", | |
| "pt": "Portuguese", | |
| "it": "Italian", | |
| "nl": "Dutch", | |
| "tr": "Turkish", | |
| "pl": "Polish", | |
| "uk": "Ukrainian", | |
| "vi": "Vietnamese", | |
| "th": "Thai", | |
| "hi": "Hindi", | |
| "ur": "Urdu", | |
| # Add more as needed | |
| } | |
| return language_names.get(lang_code, lang_code.upper()) | |
| def _calculate_confidence(self, whisper_result: Dict) -> float: | |
| """ | |
| Calculate confidence score from Whisper result. | |
| Whisper doesn't directly provide confidence, so we estimate it | |
| based on available metrics. | |
| """ | |
| # If segments are available, average their probabilities | |
| if "segments" in whisper_result and whisper_result["segments"]: | |
| avg_logprob = sum(s.get("avg_logprob", -1.0) for s in whisper_result["segments"]) | |
| avg_logprob /= len(whisper_result["segments"]) | |
| # Convert log probability to approximate confidence (0-1) | |
| # logprob ranges from -inf to 0, typically -2 to 0 for good transcriptions | |
| confidence = max(0.0, min(1.0, (avg_logprob + 2.0) / 2.0)) | |
| return round(confidence, 2) | |
| # Default confidence | |
| return 0.85 | |
| def save_uploaded_audio(self, audio_bytes: bytes, filename: str) -> str: | |
| """ | |
| Save uploaded audio file to temporary location. | |
| Args: | |
| audio_bytes: Audio file bytes | |
| filename: Original filename | |
| Returns: | |
| Path to saved file | |
| """ | |
| # Create temp directory if it doesn't exist | |
| temp_dir = Path(tempfile.gettempdir()) / "voice_assistant_uploads" | |
| temp_dir.mkdir(exist_ok=True) | |
| # Save file | |
| file_extension = Path(filename).suffix | |
| temp_file = temp_dir / f"upload_{os.urandom(8).hex()}{file_extension}" | |
| temp_file.write_bytes(audio_bytes) | |
| logger.info(f"Saved uploaded audio to: {temp_file}") | |
| return str(temp_file) | |
| def save_audio_array(self, audio_data, sample_rate: int) -> str: | |
| """ | |
| Save audio numpy array to temporary WAV file (for Gradio integration). | |
| Args: | |
| audio_data: Audio data as numpy array | |
| sample_rate: Sample rate of the audio | |
| Returns: | |
| Path to saved WAV file | |
| """ | |
| import numpy as np | |
| import scipy.io.wavfile as wavfile | |
| # Create temp directory if it doesn't exist | |
| temp_dir = Path(tempfile.gettempdir()) / "voice_assistant_uploads" | |
| temp_dir.mkdir(exist_ok=True) | |
| # Save as WAV file | |
| temp_file = temp_dir / f"gradio_{os.urandom(8).hex()}.wav" | |
| # Ensure audio_data is in the correct format | |
| if isinstance(audio_data, np.ndarray): | |
| # Normalize to int16 if needed | |
| if audio_data.dtype == np.float32 or audio_data.dtype == np.float64: | |
| audio_data = (audio_data * 32767).astype(np.int16) | |
| wavfile.write(str(temp_file), sample_rate, audio_data) | |
| logger.info(f"Saved Gradio audio to: {temp_file}") | |
| return str(temp_file) | |
| def cleanup_temp_file(self, file_path: str): | |
| """Delete temporary audio file.""" | |
| try: | |
| if os.path.exists(file_path): | |
| os.remove(file_path) | |
| logger.info(f"Cleaned up temp file: {file_path}") | |
| except Exception as e: | |
| logger.warning(f"Failed to cleanup temp file {file_path}: {e}") | |