Testing / components /streaming.py
Sidak Singh
question boundary works
7b7db64
import gradio as gr
import numpy as np
import librosa
import time
from typing import Dict, Any, Optional, Tuple
from .gpt import gen_llm_response
class StreamingManager:
"""Manages audio file streaming functionality for testing purposes"""
def __init__(self, processor):
"""Initialize streaming manager with audio processor"""
self.processor = processor
self.streaming_data = {
'active': False,
'audio_data': None,
'sr': None,
'chunk_index': 0,
'total_chunks': 0,
'chunk_duration': 0.5,
'chunk_size': 0
}
# Store original processor settings for restoration
self.original_min_process_length = processor.min_process_length
self.original_process_interval = processor.process_interval
def start_file_streaming_test(self, audio_file: str) -> Tuple[str, str, str]:
"""Start streaming an audio file in chunks"""
if audio_file is None:
return "Please upload an audio file first", "", ""
try:
# Clear buffer and reset state
self.processor.clear_buffer()
# Adjust processor settings for streaming test
self.processor.min_process_length = 0.5 * self.processor.sample_rate # Process every 0.5 seconds
self.processor.process_interval = 0.3 # Check for processing every 0.3 seconds
# Load audio file
audio_data, sr = librosa.load(audio_file, sr=None)
# Calculate chunks
chunk_duration = 0.5 # 0.5 second chunks
chunk_size = int(chunk_duration * sr)
total_chunks = len(audio_data) // chunk_size + (1 if len(audio_data) % chunk_size > 0 else 0)
# Store streaming data
self.streaming_data.update({
'active': True,
'audio_data': audio_data,
'sr': sr,
'chunk_index': 0,
'total_chunks': total_chunks,
'chunk_duration': chunk_duration,
'chunk_size': chunk_size
})
#print(f"🎵 Starting stream: {len(audio_data)/sr:.1f}s audio, {total_chunks} chunks of {chunk_duration}s each")
return f"Started streaming {len(audio_data)/sr:.1f}s audio file in {total_chunks} chunks", "", ""
except Exception as e:
return f"Error loading audio file: {e}", "", ""
def stop_file_streaming_test(self) -> Tuple[str, str, str]:
"""Stop streaming test"""
self.streaming_data['active'] = False
# Restore original processor settings
self.processor.min_process_length = self.original_min_process_length
self.processor.process_interval = self.original_process_interval
# Force complete processing of all remaining audio
final_transcription = self.processor.force_complete_processing()
llm_response = ""
if final_transcription and len(final_transcription) > 0:
llm_response = gen_llm_response(final_transcription)
return "Streaming stopped", final_transcription, llm_response
def update_streaming_test(self) -> Tuple[str, str, str]:
"""Update function called periodically during streaming"""
if not self.streaming_data['active']:
current_transcription = self.processor.get_transcription()
return "Not streaming", current_transcription, ""
try:
# Check if we've processed all chunks
if self.streaming_data['chunk_index'] >= self.streaming_data['total_chunks']:
# Finished streaming
self.streaming_data['active'] = False
# Force complete processing of all remaining audio
final_transcription = self.processor.force_complete_processing()
# Restore settings after processing is complete
self.processor.min_process_length = self.original_min_process_length
self.processor.process_interval = self.original_process_interval
# Send final transcription to LLM and get response
llm_response = ""
if final_transcription and len(final_transcription) > 0:
llm_response = gen_llm_response(final_transcription)
return f"Streaming complete! Processed {self.streaming_data['total_chunks']} chunks", str(final_transcription), llm_response
# Get current chunk info
chunk_size = self.streaming_data['chunk_size']
current_chunk = self.streaming_data['chunk_index']
start_idx = current_chunk * chunk_size
end_idx = min((current_chunk + 1) * chunk_size, len(self.streaming_data['audio_data']))
# Extract and process chunk
chunk = self.streaming_data['audio_data'][start_idx:end_idx]
#print(f"Processing chunk {current_chunk + 1}/{self.streaming_data['total_chunks']}: samples {start_idx}-{end_idx} ({len(chunk)} samples)")
# Add chunk to processor
buffer_size = self.processor.add_audio(chunk, self.streaming_data['sr'])
# Wait for any pending processing to complete before getting transcription
self.processor.wait_for_processing_complete(2.0)
# Get current transcription
transcription = self.processor.get_transcription()
# Send transcription to LLM and get response (for real-time updates)
llm_response = ""
if transcription and len(transcription) > 0:
llm_response = gen_llm_response(transcription)
# Update status
buffer_seconds = buffer_size / self.processor.sample_rate
status = f"Chunk {current_chunk+1}/{self.streaming_data['total_chunks']} | Buffer: {buffer_seconds:.1f}s | Processed: {self.processor.processed_length/self.processor.sample_rate:.1f}s"
# Move to next chunk
self.streaming_data['chunk_index'] += 1
# Check if this was the last chunk
if self.streaming_data['chunk_index'] >= self.streaming_data['total_chunks']:
print(f"✅ All {self.streaming_data['total_chunks']} chunks processed!")
return status, str(transcription), llm_response
except Exception as e:
self.streaming_data['active'] = False
return f"Streaming error: {e}", "", ""
def is_active(self) -> bool:
"""Check if streaming is currently active"""
return self.streaming_data['active']
def get_streaming_data(self) -> Dict[str, Any]:
"""Get current streaming data"""
return self.streaming_data.copy()
def create_streaming_interface(streaming_manager: StreamingManager) -> Dict[str, Any]:
"""Create Gradio interface components for streaming functionality"""
with gr.Row():
test_audio_file = gr.Audio(sources=["upload"], type="filepath", label="Upload Audio File for Testing")
with gr.Row():
test_stream_btn = gr.Button("🎵 Start Streaming Test", variant="primary")
test_stop_btn = gr.Button("⏹️ Stop Streaming", variant="stop")
with gr.Row():
test_status = gr.Textbox(label="Streaming Status", interactive=False, placeholder="Upload an audio file and click 'Start Streaming Test'")
with gr.Row():
with gr.Column():
transcription_output = gr.Textbox(label="Live Transcription", lines=5, interactive=False)
with gr.Column():
llm_output = gr.Textbox(label="LLM Response", lines=5, interactive=False)
# Timer for streaming updates (every 0.5 seconds)
streaming_timer = gr.Timer(value=0.5, active=False)
# Event handlers
def start_and_activate_timer(audio_file):
status, transcription, llm_response = streaming_manager.start_file_streaming_test(audio_file)
if streaming_manager.is_active():
return status, transcription, llm_response, gr.Timer(active=True)
else:
return status, transcription, llm_response, gr.Timer(active=False)
def stop_and_deactivate_timer():
status, transcription, llm_response = streaming_manager.stop_file_streaming_test()
return status, transcription, llm_response, gr.Timer(active=False)
def update_with_timer_control():
status, transcription, llm_response = streaming_manager.update_streaming_test()
# Keep timer active if still streaming
timer_active = streaming_manager.is_active()
return status, transcription, llm_response, gr.Timer(active=timer_active)
# Connect event handlers
test_stream_btn.click(
start_and_activate_timer,
inputs=[test_audio_file],
outputs=[test_status, transcription_output, llm_output, streaming_timer]
)
test_stop_btn.click(
stop_and_deactivate_timer,
outputs=[test_status, transcription_output, llm_output, streaming_timer]
)
# Timer tick updates with automatic deactivation when done
streaming_timer.tick(
update_with_timer_control,
outputs=[test_status, transcription_output, llm_output, streaming_timer]
)
return {
'test_audio_file': test_audio_file,
'test_stream_btn': test_stream_btn,
'test_stop_btn': test_stop_btn,
'test_status': test_status,
'transcription_output': transcription_output,
'llm_output': llm_output,
'streaming_timer': streaming_timer
}