File size: 9,528 Bytes
7b7db64
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
import gradio as gr
import numpy as np
import librosa
import time
from typing import Dict, Any, Optional, Tuple
from .gpt import gen_llm_response


class StreamingManager:
    """Manages audio file streaming functionality for testing purposes"""

    def __init__(self, processor):
        """Initialize streaming manager with audio processor"""
        self.processor = processor
        self.streaming_data = {
            'active': False,
            'audio_data': None,
            'sr': None,
            'chunk_index': 0,
            'total_chunks': 0,
            'chunk_duration': 0.5,
            'chunk_size': 0
        }

        # Store original processor settings for restoration
        self.original_min_process_length = processor.min_process_length
        self.original_process_interval = processor.process_interval

    def start_file_streaming_test(self, audio_file: str) -> Tuple[str, str, str]:
        """Start streaming an audio file in chunks"""
        if audio_file is None:
            return "Please upload an audio file first", "", ""

        try:
            # Clear buffer and reset state
            self.processor.clear_buffer()

            # Adjust processor settings for streaming test
            self.processor.min_process_length = 0.5 * self.processor.sample_rate  # Process every 0.5 seconds
            self.processor.process_interval = 0.3  # Check for processing every 0.3 seconds

            # Load audio file
            audio_data, sr = librosa.load(audio_file, sr=None)

            # Calculate chunks
            chunk_duration = 0.5  # 0.5 second chunks
            chunk_size = int(chunk_duration * sr)
            total_chunks = len(audio_data) // chunk_size + (1 if len(audio_data) % chunk_size > 0 else 0)

            # Store streaming data
            self.streaming_data.update({
                'active': True,
                'audio_data': audio_data,
                'sr': sr,
                'chunk_index': 0,
                'total_chunks': total_chunks,
                'chunk_duration': chunk_duration,
                'chunk_size': chunk_size
            })

            #print(f"🎵 Starting stream: {len(audio_data)/sr:.1f}s audio, {total_chunks} chunks of {chunk_duration}s each")
            return f"Started streaming {len(audio_data)/sr:.1f}s audio file in {total_chunks} chunks", "", ""

        except Exception as e:
            return f"Error loading audio file: {e}", "", ""

    def stop_file_streaming_test(self) -> Tuple[str, str, str]:
        """Stop streaming test"""
        self.streaming_data['active'] = False

        # Restore original processor settings
        self.processor.min_process_length = self.original_min_process_length
        self.processor.process_interval = self.original_process_interval

        # Force complete processing of all remaining audio
        final_transcription = self.processor.force_complete_processing()
        llm_response = ""
        if final_transcription and len(final_transcription) > 0:
            llm_response = gen_llm_response(final_transcription)

        return "Streaming stopped", final_transcription, llm_response

    def update_streaming_test(self) -> Tuple[str, str, str]:
        """Update function called periodically during streaming"""
        if not self.streaming_data['active']:
            current_transcription = self.processor.get_transcription()
            return "Not streaming", current_transcription, ""

        try:
            # Check if we've processed all chunks
            if self.streaming_data['chunk_index'] >= self.streaming_data['total_chunks']:
                # Finished streaming
                self.streaming_data['active'] = False

                # Force complete processing of all remaining audio
                final_transcription = self.processor.force_complete_processing()

                # Restore settings after processing is complete
                self.processor.min_process_length = self.original_min_process_length
                self.processor.process_interval = self.original_process_interval

                # Send final transcription to LLM and get response
                llm_response = ""
                if final_transcription and len(final_transcription) > 0:
                    llm_response = gen_llm_response(final_transcription)

                return f"Streaming complete! Processed {self.streaming_data['total_chunks']} chunks", str(final_transcription), llm_response

            # Get current chunk info
            chunk_size = self.streaming_data['chunk_size']
            current_chunk = self.streaming_data['chunk_index']
            start_idx = current_chunk * chunk_size
            end_idx = min((current_chunk + 1) * chunk_size, len(self.streaming_data['audio_data']))

            # Extract and process chunk
            chunk = self.streaming_data['audio_data'][start_idx:end_idx]
            #print(f"Processing chunk {current_chunk + 1}/{self.streaming_data['total_chunks']}: samples {start_idx}-{end_idx} ({len(chunk)} samples)")

            # Add chunk to processor
            buffer_size = self.processor.add_audio(chunk, self.streaming_data['sr'])

            # Wait for any pending processing to complete before getting transcription
            self.processor.wait_for_processing_complete(2.0)

            # Get current transcription
            transcription = self.processor.get_transcription()

            # Send transcription to LLM and get response (for real-time updates)
            llm_response = ""
            if transcription and len(transcription) > 0:
                llm_response = gen_llm_response(transcription)

            # Update status
            buffer_seconds = buffer_size / self.processor.sample_rate
            status = f"Chunk {current_chunk+1}/{self.streaming_data['total_chunks']} | Buffer: {buffer_seconds:.1f}s | Processed: {self.processor.processed_length/self.processor.sample_rate:.1f}s"

            # Move to next chunk
            self.streaming_data['chunk_index'] += 1

            # Check if this was the last chunk
            if self.streaming_data['chunk_index'] >= self.streaming_data['total_chunks']:
                print(f"✅ All {self.streaming_data['total_chunks']} chunks processed!")

            return status, str(transcription), llm_response

        except Exception as e:
            self.streaming_data['active'] = False
            return f"Streaming error: {e}", "", ""

    def is_active(self) -> bool:
        """Check if streaming is currently active"""
        return self.streaming_data['active']

    def get_streaming_data(self) -> Dict[str, Any]:
        """Get current streaming data"""
        return self.streaming_data.copy()


def create_streaming_interface(streaming_manager: StreamingManager) -> Dict[str, Any]:
    """Create Gradio interface components for streaming functionality"""


    with gr.Row():
        test_audio_file = gr.Audio(sources=["upload"], type="filepath", label="Upload Audio File for Testing")

    with gr.Row():
        test_stream_btn = gr.Button("🎵 Start Streaming Test", variant="primary")
        test_stop_btn = gr.Button("⏹️ Stop Streaming", variant="stop")

    with gr.Row():
        test_status = gr.Textbox(label="Streaming Status", interactive=False, placeholder="Upload an audio file and click 'Start Streaming Test'")

    with gr.Row():
        with gr.Column():
            transcription_output = gr.Textbox(label="Live Transcription", lines=5, interactive=False)
        with gr.Column():
            llm_output = gr.Textbox(label="LLM Response", lines=5, interactive=False)

    # Timer for streaming updates (every 0.5 seconds)
    streaming_timer = gr.Timer(value=0.5, active=False)

    # Event handlers
    def start_and_activate_timer(audio_file):
        status, transcription, llm_response = streaming_manager.start_file_streaming_test(audio_file)
        if streaming_manager.is_active():
            return status, transcription, llm_response, gr.Timer(active=True)
        else:
            return status, transcription, llm_response, gr.Timer(active=False)

    def stop_and_deactivate_timer():
        status, transcription, llm_response = streaming_manager.stop_file_streaming_test()
        return status, transcription, llm_response, gr.Timer(active=False)

    def update_with_timer_control():
        status, transcription, llm_response = streaming_manager.update_streaming_test()
        # Keep timer active if still streaming
        timer_active = streaming_manager.is_active()
        return status, transcription, llm_response, gr.Timer(active=timer_active)

    # Connect event handlers
    test_stream_btn.click(
        start_and_activate_timer,
        inputs=[test_audio_file],
        outputs=[test_status, transcription_output, llm_output, streaming_timer]
    )

    test_stop_btn.click(
        stop_and_deactivate_timer,
        outputs=[test_status, transcription_output, llm_output, streaming_timer]
    )

    # Timer tick updates with automatic deactivation when done
    streaming_timer.tick(
        update_with_timer_control,
        outputs=[test_status, transcription_output, llm_output, streaming_timer]
    )

    return {
        'test_audio_file': test_audio_file,
        'test_stream_btn': test_stream_btn,
        'test_stop_btn': test_stop_btn,
        'test_status': test_status,
        'transcription_output': transcription_output,
        'llm_output': llm_output,
        'streaming_timer': streaming_timer
    }