import os from pathlib import Path import asyncio async def text_to_speech(text: str, output_path: str = None) -> str: """ Convert text to speech using ElevenLabs Args: text: Text to convert output_path: Output audio file path Returns: Path to generated audio file """ if not os.getenv('ELEVENLABS_API_KEY'): raise ValueError("ELEVENLABS_API_KEY not set") try: from elevenlabs.client import AsyncElevenLabs from elevenlabs import VoiceSettings client = AsyncElevenLabs(api_key=os.getenv('ELEVENLABS_API_KEY')) # Generate audio audio_generator = await client.text_to_speech.convert( text=text, voice_id="21m00Tcm4TlvDq8ikWAM", # Rachel voice model_id="eleven_monolingual_v1", voice_settings=VoiceSettings( stability=0.5, similarity_boost=0.75, style=0.5, use_speaker_boost=True ) ) # Save audio if output_path is None: output_path = f"data/outputs/speech_{int(asyncio.get_event_loop().time())}.mp3" Path(output_path).parent.mkdir(parents=True, exist_ok=True) # Write audio chunks with open(output_path, 'wb') as f: async for chunk in audio_generator: f.write(chunk) return output_path except Exception as e: print(f"TTS Error: {e}") # Return empty path on error return "" async def speech_to_text(audio_path: str) -> str: """ Convert speech to text using Groq Whisper Args: audio_path: Path to audio file Returns: Transcribed text """ if not os.getenv('GROQ_API_KEY'): raise ValueError("GROQ_API_KEY not set for STT") try: from groq import AsyncGroq client = AsyncGroq(api_key=os.getenv('GROQ_API_KEY')) with open(audio_path, 'rb') as audio_file: transcription = await client.audio.transcriptions.create( file=audio_file, model="whisper-large-v3", response_format="text" ) return transcription except Exception as e: print(f"STT Error: {e}") return "" async def process_audio_input(audio_data: bytes) -> str: """ Process audio input from microphone Args: audio_data: Raw audio bytes Returns: Transcribed text """ # Save temp audio file temp_path = "/tmp/voice_input.wav" with open(temp_path, 'wb') as f: f.write(audio_data) # Transcribe text = await speech_to_text(temp_path) # Cleanup if os.path.exists(temp_path): os.remove(temp_path) return text