import gradio as gr import numpy as np import librosa import time from typing import Dict, Any, Optional, Tuple from .gpt import gen_llm_response class StreamingManager: """Manages audio file streaming functionality for testing purposes""" def __init__(self, processor): """Initialize streaming manager with audio processor""" self.processor = processor self.streaming_data = { 'active': False, 'audio_data': None, 'sr': None, 'chunk_index': 0, 'total_chunks': 0, 'chunk_duration': 0.5, 'chunk_size': 0 } # Store original processor settings for restoration self.original_min_process_length = processor.min_process_length self.original_process_interval = processor.process_interval def start_file_streaming_test(self, audio_file: str) -> Tuple[str, str, str]: """Start streaming an audio file in chunks""" if audio_file is None: return "Please upload an audio file first", "", "" try: # Clear buffer and reset state self.processor.clear_buffer() # Adjust processor settings for streaming test self.processor.min_process_length = 0.5 * self.processor.sample_rate # Process every 0.5 seconds self.processor.process_interval = 0.3 # Check for processing every 0.3 seconds # Load audio file audio_data, sr = librosa.load(audio_file, sr=None) # Calculate chunks chunk_duration = 0.5 # 0.5 second chunks chunk_size = int(chunk_duration * sr) total_chunks = len(audio_data) // chunk_size + (1 if len(audio_data) % chunk_size > 0 else 0) # Store streaming data self.streaming_data.update({ 'active': True, 'audio_data': audio_data, 'sr': sr, 'chunk_index': 0, 'total_chunks': total_chunks, 'chunk_duration': chunk_duration, 'chunk_size': chunk_size }) #print(f"🎵 Starting stream: {len(audio_data)/sr:.1f}s audio, {total_chunks} chunks of {chunk_duration}s each") return f"Started streaming {len(audio_data)/sr:.1f}s audio file in {total_chunks} chunks", "", "" except Exception as e: return f"Error loading audio file: {e}", "", "" def stop_file_streaming_test(self) -> Tuple[str, str, str]: """Stop streaming test""" self.streaming_data['active'] = False # Restore original processor settings self.processor.min_process_length = self.original_min_process_length self.processor.process_interval = self.original_process_interval # Force complete processing of all remaining audio final_transcription = self.processor.force_complete_processing() llm_response = "" if final_transcription and len(final_transcription) > 0: llm_response = gen_llm_response(final_transcription) return "Streaming stopped", final_transcription, llm_response def update_streaming_test(self) -> Tuple[str, str, str]: """Update function called periodically during streaming""" if not self.streaming_data['active']: current_transcription = self.processor.get_transcription() return "Not streaming", current_transcription, "" try: # Check if we've processed all chunks if self.streaming_data['chunk_index'] >= self.streaming_data['total_chunks']: # Finished streaming self.streaming_data['active'] = False # Force complete processing of all remaining audio final_transcription = self.processor.force_complete_processing() # Restore settings after processing is complete self.processor.min_process_length = self.original_min_process_length self.processor.process_interval = self.original_process_interval # Send final transcription to LLM and get response llm_response = "" if final_transcription and len(final_transcription) > 0: llm_response = gen_llm_response(final_transcription) return f"Streaming complete! Processed {self.streaming_data['total_chunks']} chunks", str(final_transcription), llm_response # Get current chunk info chunk_size = self.streaming_data['chunk_size'] current_chunk = self.streaming_data['chunk_index'] start_idx = current_chunk * chunk_size end_idx = min((current_chunk + 1) * chunk_size, len(self.streaming_data['audio_data'])) # Extract and process chunk chunk = self.streaming_data['audio_data'][start_idx:end_idx] #print(f"Processing chunk {current_chunk + 1}/{self.streaming_data['total_chunks']}: samples {start_idx}-{end_idx} ({len(chunk)} samples)") # Add chunk to processor buffer_size = self.processor.add_audio(chunk, self.streaming_data['sr']) # Wait for any pending processing to complete before getting transcription self.processor.wait_for_processing_complete(2.0) # Get current transcription transcription = self.processor.get_transcription() # Send transcription to LLM and get response (for real-time updates) llm_response = "" if transcription and len(transcription) > 0: llm_response = gen_llm_response(transcription) # Update status buffer_seconds = buffer_size / self.processor.sample_rate status = f"Chunk {current_chunk+1}/{self.streaming_data['total_chunks']} | Buffer: {buffer_seconds:.1f}s | Processed: {self.processor.processed_length/self.processor.sample_rate:.1f}s" # Move to next chunk self.streaming_data['chunk_index'] += 1 # Check if this was the last chunk if self.streaming_data['chunk_index'] >= self.streaming_data['total_chunks']: print(f"✅ All {self.streaming_data['total_chunks']} chunks processed!") return status, str(transcription), llm_response except Exception as e: self.streaming_data['active'] = False return f"Streaming error: {e}", "", "" def is_active(self) -> bool: """Check if streaming is currently active""" return self.streaming_data['active'] def get_streaming_data(self) -> Dict[str, Any]: """Get current streaming data""" return self.streaming_data.copy() def create_streaming_interface(streaming_manager: StreamingManager) -> Dict[str, Any]: """Create Gradio interface components for streaming functionality""" with gr.Row(): test_audio_file = gr.Audio(sources=["upload"], type="filepath", label="Upload Audio File for Testing") with gr.Row(): test_stream_btn = gr.Button("🎵 Start Streaming Test", variant="primary") test_stop_btn = gr.Button("⏹️ Stop Streaming", variant="stop") with gr.Row(): test_status = gr.Textbox(label="Streaming Status", interactive=False, placeholder="Upload an audio file and click 'Start Streaming Test'") with gr.Row(): with gr.Column(): transcription_output = gr.Textbox(label="Live Transcription", lines=5, interactive=False) with gr.Column(): llm_output = gr.Textbox(label="LLM Response", lines=5, interactive=False) # Timer for streaming updates (every 0.5 seconds) streaming_timer = gr.Timer(value=0.5, active=False) # Event handlers def start_and_activate_timer(audio_file): status, transcription, llm_response = streaming_manager.start_file_streaming_test(audio_file) if streaming_manager.is_active(): return status, transcription, llm_response, gr.Timer(active=True) else: return status, transcription, llm_response, gr.Timer(active=False) def stop_and_deactivate_timer(): status, transcription, llm_response = streaming_manager.stop_file_streaming_test() return status, transcription, llm_response, gr.Timer(active=False) def update_with_timer_control(): status, transcription, llm_response = streaming_manager.update_streaming_test() # Keep timer active if still streaming timer_active = streaming_manager.is_active() return status, transcription, llm_response, gr.Timer(active=timer_active) # Connect event handlers test_stream_btn.click( start_and_activate_timer, inputs=[test_audio_file], outputs=[test_status, transcription_output, llm_output, streaming_timer] ) test_stop_btn.click( stop_and_deactivate_timer, outputs=[test_status, transcription_output, llm_output, streaming_timer] ) # Timer tick updates with automatic deactivation when done streaming_timer.tick( update_with_timer_control, outputs=[test_status, transcription_output, llm_output, streaming_timer] ) return { 'test_audio_file': test_audio_file, 'test_stream_btn': test_stream_btn, 'test_stop_btn': test_stop_btn, 'test_status': test_status, 'transcription_output': transcription_output, 'llm_output': llm_output, 'streaming_timer': streaming_timer }