Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import numpy as np | |
| import librosa | |
| import time | |
| from typing import Dict, Any, Optional, Tuple | |
| from .gpt import gen_llm_response | |
| class StreamingManager: | |
| """Manages audio file streaming functionality for testing purposes""" | |
| def __init__(self, processor): | |
| """Initialize streaming manager with audio processor""" | |
| self.processor = processor | |
| self.streaming_data = { | |
| 'active': False, | |
| 'audio_data': None, | |
| 'sr': None, | |
| 'chunk_index': 0, | |
| 'total_chunks': 0, | |
| 'chunk_duration': 0.5, | |
| 'chunk_size': 0 | |
| } | |
| # Store original processor settings for restoration | |
| self.original_min_process_length = processor.min_process_length | |
| self.original_process_interval = processor.process_interval | |
| def start_file_streaming_test(self, audio_file: str) -> Tuple[str, str, str]: | |
| """Start streaming an audio file in chunks""" | |
| if audio_file is None: | |
| return "Please upload an audio file first", "", "" | |
| try: | |
| # Clear buffer and reset state | |
| self.processor.clear_buffer() | |
| # Adjust processor settings for streaming test | |
| self.processor.min_process_length = 0.5 * self.processor.sample_rate # Process every 0.5 seconds | |
| self.processor.process_interval = 0.3 # Check for processing every 0.3 seconds | |
| # Load audio file | |
| audio_data, sr = librosa.load(audio_file, sr=None) | |
| # Calculate chunks | |
| chunk_duration = 0.5 # 0.5 second chunks | |
| chunk_size = int(chunk_duration * sr) | |
| total_chunks = len(audio_data) // chunk_size + (1 if len(audio_data) % chunk_size > 0 else 0) | |
| # Store streaming data | |
| self.streaming_data.update({ | |
| 'active': True, | |
| 'audio_data': audio_data, | |
| 'sr': sr, | |
| 'chunk_index': 0, | |
| 'total_chunks': total_chunks, | |
| 'chunk_duration': chunk_duration, | |
| 'chunk_size': chunk_size | |
| }) | |
| #print(f"🎵 Starting stream: {len(audio_data)/sr:.1f}s audio, {total_chunks} chunks of {chunk_duration}s each") | |
| return f"Started streaming {len(audio_data)/sr:.1f}s audio file in {total_chunks} chunks", "", "" | |
| except Exception as e: | |
| return f"Error loading audio file: {e}", "", "" | |
| def stop_file_streaming_test(self) -> Tuple[str, str, str]: | |
| """Stop streaming test""" | |
| self.streaming_data['active'] = False | |
| # Restore original processor settings | |
| self.processor.min_process_length = self.original_min_process_length | |
| self.processor.process_interval = self.original_process_interval | |
| # Force complete processing of all remaining audio | |
| final_transcription = self.processor.force_complete_processing() | |
| llm_response = "" | |
| if final_transcription and len(final_transcription) > 0: | |
| llm_response = gen_llm_response(final_transcription) | |
| return "Streaming stopped", final_transcription, llm_response | |
| def update_streaming_test(self) -> Tuple[str, str, str]: | |
| """Update function called periodically during streaming""" | |
| if not self.streaming_data['active']: | |
| current_transcription = self.processor.get_transcription() | |
| return "Not streaming", current_transcription, "" | |
| try: | |
| # Check if we've processed all chunks | |
| if self.streaming_data['chunk_index'] >= self.streaming_data['total_chunks']: | |
| # Finished streaming | |
| self.streaming_data['active'] = False | |
| # Force complete processing of all remaining audio | |
| final_transcription = self.processor.force_complete_processing() | |
| # Restore settings after processing is complete | |
| self.processor.min_process_length = self.original_min_process_length | |
| self.processor.process_interval = self.original_process_interval | |
| # Send final transcription to LLM and get response | |
| llm_response = "" | |
| if final_transcription and len(final_transcription) > 0: | |
| llm_response = gen_llm_response(final_transcription) | |
| return f"Streaming complete! Processed {self.streaming_data['total_chunks']} chunks", str(final_transcription), llm_response | |
| # Get current chunk info | |
| chunk_size = self.streaming_data['chunk_size'] | |
| current_chunk = self.streaming_data['chunk_index'] | |
| start_idx = current_chunk * chunk_size | |
| end_idx = min((current_chunk + 1) * chunk_size, len(self.streaming_data['audio_data'])) | |
| # Extract and process chunk | |
| chunk = self.streaming_data['audio_data'][start_idx:end_idx] | |
| #print(f"Processing chunk {current_chunk + 1}/{self.streaming_data['total_chunks']}: samples {start_idx}-{end_idx} ({len(chunk)} samples)") | |
| # Add chunk to processor | |
| buffer_size = self.processor.add_audio(chunk, self.streaming_data['sr']) | |
| # Wait for any pending processing to complete before getting transcription | |
| self.processor.wait_for_processing_complete(2.0) | |
| # Get current transcription | |
| transcription = self.processor.get_transcription() | |
| # Send transcription to LLM and get response (for real-time updates) | |
| llm_response = "" | |
| if transcription and len(transcription) > 0: | |
| llm_response = gen_llm_response(transcription) | |
| # Update status | |
| buffer_seconds = buffer_size / self.processor.sample_rate | |
| status = f"Chunk {current_chunk+1}/{self.streaming_data['total_chunks']} | Buffer: {buffer_seconds:.1f}s | Processed: {self.processor.processed_length/self.processor.sample_rate:.1f}s" | |
| # Move to next chunk | |
| self.streaming_data['chunk_index'] += 1 | |
| # Check if this was the last chunk | |
| if self.streaming_data['chunk_index'] >= self.streaming_data['total_chunks']: | |
| print(f"✅ All {self.streaming_data['total_chunks']} chunks processed!") | |
| return status, str(transcription), llm_response | |
| except Exception as e: | |
| self.streaming_data['active'] = False | |
| return f"Streaming error: {e}", "", "" | |
| def is_active(self) -> bool: | |
| """Check if streaming is currently active""" | |
| return self.streaming_data['active'] | |
| def get_streaming_data(self) -> Dict[str, Any]: | |
| """Get current streaming data""" | |
| return self.streaming_data.copy() | |
| def create_streaming_interface(streaming_manager: StreamingManager) -> Dict[str, Any]: | |
| """Create Gradio interface components for streaming functionality""" | |
| with gr.Row(): | |
| test_audio_file = gr.Audio(sources=["upload"], type="filepath", label="Upload Audio File for Testing") | |
| with gr.Row(): | |
| test_stream_btn = gr.Button("🎵 Start Streaming Test", variant="primary") | |
| test_stop_btn = gr.Button("⏹️ Stop Streaming", variant="stop") | |
| with gr.Row(): | |
| test_status = gr.Textbox(label="Streaming Status", interactive=False, placeholder="Upload an audio file and click 'Start Streaming Test'") | |
| with gr.Row(): | |
| with gr.Column(): | |
| transcription_output = gr.Textbox(label="Live Transcription", lines=5, interactive=False) | |
| with gr.Column(): | |
| llm_output = gr.Textbox(label="LLM Response", lines=5, interactive=False) | |
| # Timer for streaming updates (every 0.5 seconds) | |
| streaming_timer = gr.Timer(value=0.5, active=False) | |
| # Event handlers | |
| def start_and_activate_timer(audio_file): | |
| status, transcription, llm_response = streaming_manager.start_file_streaming_test(audio_file) | |
| if streaming_manager.is_active(): | |
| return status, transcription, llm_response, gr.Timer(active=True) | |
| else: | |
| return status, transcription, llm_response, gr.Timer(active=False) | |
| def stop_and_deactivate_timer(): | |
| status, transcription, llm_response = streaming_manager.stop_file_streaming_test() | |
| return status, transcription, llm_response, gr.Timer(active=False) | |
| def update_with_timer_control(): | |
| status, transcription, llm_response = streaming_manager.update_streaming_test() | |
| # Keep timer active if still streaming | |
| timer_active = streaming_manager.is_active() | |
| return status, transcription, llm_response, gr.Timer(active=timer_active) | |
| # Connect event handlers | |
| test_stream_btn.click( | |
| start_and_activate_timer, | |
| inputs=[test_audio_file], | |
| outputs=[test_status, transcription_output, llm_output, streaming_timer] | |
| ) | |
| test_stop_btn.click( | |
| stop_and_deactivate_timer, | |
| outputs=[test_status, transcription_output, llm_output, streaming_timer] | |
| ) | |
| # Timer tick updates with automatic deactivation when done | |
| streaming_timer.tick( | |
| update_with_timer_control, | |
| outputs=[test_status, transcription_output, llm_output, streaming_timer] | |
| ) | |
| return { | |
| 'test_audio_file': test_audio_file, | |
| 'test_stream_btn': test_stream_btn, | |
| 'test_stop_btn': test_stop_btn, | |
| 'test_status': test_status, | |
| 'transcription_output': transcription_output, | |
| 'llm_output': llm_output, | |
| 'streaming_timer': streaming_timer | |
| } | |