|
|
""" |
|
|
Gradio Test Page for analyze_voice_logic Function |
|
|
Tests the core voice analysis logic independently |
|
|
""" |
|
|
|
|
|
import gradio as gr |
|
|
import asyncio |
|
|
import uuid |
|
|
from datetime import datetime, timedelta |
|
|
from pathlib import Path |
|
|
import sys |
|
|
import logging |
|
|
from contextlib import AsyncExitStack |
|
|
|
|
|
|
|
|
from backend import analyze_voice_logic, voicekit_session, session_stack |
|
|
import backend |
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
mcp_initialized = False |
|
|
|
|
|
|
|
|
async def initialize_mcp(): |
|
|
"""Initialize VoiceKit MCP connection if not already initialized""" |
|
|
global mcp_initialized |
|
|
|
|
|
if mcp_initialized and backend.voicekit_session is not None: |
|
|
logger.info("MCP already initialized") |
|
|
return |
|
|
|
|
|
logger.info("Initializing VoiceKit MCP connection...") |
|
|
backend.session_stack = AsyncExitStack() |
|
|
|
|
|
try: |
|
|
from mcp.client.sse import sse_client |
|
|
from mcp.client.session import ClientSession |
|
|
|
|
|
voicekit_url = "https://mcp-1st-birthday-voicekit.hf.space/gradio_api/mcp/sse" |
|
|
read, write = await backend.session_stack.enter_async_context(sse_client(voicekit_url)) |
|
|
backend.voicekit_session = await backend.session_stack.enter_async_context( |
|
|
ClientSession(read, write) |
|
|
) |
|
|
await backend.voicekit_session.initialize() |
|
|
|
|
|
|
|
|
tools_result = await backend.voicekit_session.list_tools() |
|
|
logger.info(f"β VoiceKit MCP connected. Tools: {[t.name for t in tools_result.tools]}") |
|
|
mcp_initialized = True |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to initialize VoiceKit MCP: {e}") |
|
|
backend.voicekit_session = None |
|
|
raise |
|
|
|
|
|
|
|
|
def format_result_display(result: dict) -> str: |
|
|
"""Format the analysis result for display""" |
|
|
if result.get("status") == "error": |
|
|
return f"β **μλ¬ λ°μ**\n\n{result.get('message', 'Unknown error')}" |
|
|
|
|
|
|
|
|
output = f""" |
|
|
# β
λΆμ μλ£! |
|
|
|
|
|
## π μ μ κ²°κ³Ό |
|
|
- **μ 체 μ μ**: {result.get('overall', 0)}/100 |
|
|
- **μλμ΄ (Pitch)**: {result.get('pitch', 0)}/100 |
|
|
- **λ¦¬λ¬ (Rhythm)**: {result.get('rhythm', 0)}/100 |
|
|
- **μλμ§ (Energy)**: {result.get('energy', 0)}/100 |
|
|
- **λ°μ (Pronunciation)**: {result.get('pronunciation', 0)}/100 |
|
|
- **λμ¬ μ νλ (Transcript)**: {result.get('transcript', 0)}/100 |
|
|
|
|
|
## π― κ²°κ³Ό |
|
|
- **μΉ΄ν
κ³ λ¦¬**: {result.get('category', 'N/A')} |
|
|
- **μ λ΅ μ¬λΆ**: {'β
μ λ΅!' if result.get('is_correct') else 'β μ€λ΅'} |
|
|
|
|
|
## π‘ μ‘°μΈ |
|
|
{result.get('advice', 'μ‘°μΈμ΄ μμ΅λλ€.')} |
|
|
|
|
|
## π¨ ννΈ |
|
|
{result.get('hints', 'ννΈκ° μμ΅λλ€.')} |
|
|
""" |
|
|
return output |
|
|
|
|
|
|
|
|
def get_available_dates(): |
|
|
"""Get list of available puzzle dates""" |
|
|
|
|
|
dates = [] |
|
|
today = datetime.now() |
|
|
for i in range(30): |
|
|
date = today - timedelta(days=i) |
|
|
dates.append(date.strftime("%Y-%m-%d")) |
|
|
return dates |
|
|
|
|
|
|
|
|
async def test_analyze_voice(audio_file, date: str, session_id: str, generate_new_session: bool): |
|
|
""" |
|
|
Test the analyze_voice_logic function |
|
|
|
|
|
Args: |
|
|
audio_file: Uploaded audio file (gr.Audio component returns file path) |
|
|
date: Date string in YYYY-MM-DD format |
|
|
session_id: Session ID (UUID) |
|
|
generate_new_session: If True, generate a new session ID |
|
|
""" |
|
|
try: |
|
|
|
|
|
await initialize_mcp() |
|
|
|
|
|
|
|
|
if generate_new_session or not session_id: |
|
|
session_id = str(uuid.uuid4()) |
|
|
|
|
|
|
|
|
if audio_file is None: |
|
|
return "β μ€λμ€ νμΌμ μ
λ‘λν΄μ£ΌμΈμ.", session_id |
|
|
|
|
|
|
|
|
audio_path = Path(audio_file) |
|
|
if not audio_path.exists(): |
|
|
return f"β μ€λμ€ νμΌμ μ°Ύμ μ μμ΅λλ€: {audio_file}", session_id |
|
|
|
|
|
with open(audio_path, "rb") as f: |
|
|
audio_bytes = f.read() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
result = await analyze_voice_logic(audio_bytes, date, session_id) |
|
|
|
|
|
|
|
|
formatted_result = format_result_display(result) |
|
|
|
|
|
return formatted_result, session_id |
|
|
|
|
|
except Exception as e: |
|
|
import traceback |
|
|
error_msg = f"β **μμΈ λ°μ**\n\n```\n{str(e)}\n\n{traceback.format_exc()}\n```" |
|
|
return error_msg, session_id |
|
|
|
|
|
|
|
|
def sync_test_analyze_voice(audio_file, date: str, session_id: str, generate_new_session: bool): |
|
|
"""Synchronous wrapper for async function""" |
|
|
return asyncio.run(test_analyze_voice(audio_file, date, session_id, generate_new_session)) |
|
|
|
|
|
|
|
|
|
|
|
with gr.Blocks(title="Analyze Voice Logic Test") as demo: |
|
|
gr.Markdown("# π€ Analyze Voice Logic Test") |
|
|
gr.Markdown("Test the `analyze_voice_logic` function from `backend.py`") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(scale=1): |
|
|
gr.Markdown("## π₯ μ
λ ₯") |
|
|
|
|
|
|
|
|
audio_input = gr.Audio( |
|
|
label="μ€λμ€ νμΌ", |
|
|
type="filepath", |
|
|
sources=["upload", "microphone"] |
|
|
) |
|
|
|
|
|
|
|
|
date_input = gr.Dropdown( |
|
|
choices=get_available_dates(), |
|
|
label="λ μ§ μ ν", |
|
|
value=datetime.now().strftime("%Y-%m-%d"), |
|
|
allow_custom_value=True |
|
|
) |
|
|
|
|
|
|
|
|
with gr.Row(): |
|
|
session_id_input = gr.Textbox( |
|
|
label="μΈμ
ID (UUID)", |
|
|
value=str(uuid.uuid4()), |
|
|
interactive=True |
|
|
) |
|
|
generate_session_btn = gr.Checkbox( |
|
|
label="μ μΈμ
ID μμ±", |
|
|
value=False |
|
|
) |
|
|
|
|
|
|
|
|
submit_btn = gr.Button("π λΆμ μμ", variant="primary", size="lg") |
|
|
|
|
|
with gr.Column(scale=2): |
|
|
gr.Markdown("## π κ²°κ³Ό") |
|
|
result_output = gr.Markdown( |
|
|
"κ²°κ³Όκ° μ¬κΈ°μ νμλ©λλ€.", |
|
|
label="λΆμ κ²°κ³Ό" |
|
|
) |
|
|
|
|
|
|
|
|
with gr.Accordion("βΉοΈ μ¬μ© λ°©λ²", open=False): |
|
|
gr.Markdown(""" |
|
|
1. **μ€λμ€ νμΌ**: ν
μ€νΈν μμ± νμΌμ μ
λ‘λνκ±°λ λ§μ΄ν¬λ‘ λ
ΉμνμΈμ |
|
|
2. **λ μ§**: νΌμ¦μ΄ μλ λ μ§λ₯Ό μ ννμΈμ (DBμ ν΄λΉ λ μ§μ νΌμ¦μ΄ μμ΄μΌ ν©λλ€) |
|
|
3. **μΈμ
ID**: μλ μμ±λκ±°λ μ§μ μ
λ ₯ν μ μμ΅λλ€ |
|
|
4. **λΆμ μμ**: λ²νΌμ ν΄λ¦νμ¬ λΆμμ μμν©λλ€ |
|
|
|
|
|
### μ£Όμμ¬ν |
|
|
- μ΄ νμ΄μ§λ λ
립μ μΌλ‘ μ€νλ©λλ€ (Backend μλ² λΆνμ) |
|
|
- MCP μ°κ²°μ μλμΌλ‘ μ΄κΈ°νλ©λλ€ |
|
|
- λ°μ΄ν°λ² μ΄μ€μ ν΄λΉ λ μ§μ νΌμ¦μ΄ μμ΄μΌ ν©λλ€ |
|
|
- VoiceKit MCP (μΈλΆ μλ²)μ Gemini APIκ° μ μ μλν΄μΌ ν©λλ€ |
|
|
""") |
|
|
|
|
|
with gr.Accordion("π λλ²κ·Έ μ 보", open=False): |
|
|
gr.Markdown(""" |
|
|
### μν νμΈ |
|
|
- Database: PostgreSQL μ°κ²° νμΈ (`docker ps | grep voice-komentle-db`) |
|
|
- MCP: VoiceKit MCP μλ² (μΈλΆ) - μλ μ°κ²° |
|
|
- Gemini: GOOGLE_API_KEY νκ²½λ³μ νμΈ |
|
|
|
|
|
### νμ νκ²½ λ³μ |
|
|
```bash |
|
|
DATABASE_URL=postgresql://user:pass@host:port/dbname |
|
|
GOOGLE_API_KEY=your_api_key_here |
|
|
``` |
|
|
""") |
|
|
|
|
|
|
|
|
submit_btn.click( |
|
|
fn=sync_test_analyze_voice, |
|
|
inputs=[audio_input, date_input, session_id_input, generate_session_btn], |
|
|
outputs=[result_output, session_id_input] |
|
|
) |
|
|
|
|
|
|
|
|
def update_session_id(generate_new): |
|
|
if generate_new: |
|
|
return str(uuid.uuid4()) |
|
|
return gr.skip() |
|
|
|
|
|
generate_session_btn.change( |
|
|
fn=update_session_id, |
|
|
inputs=[generate_session_btn], |
|
|
outputs=[session_id_input] |
|
|
) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
demo.launch( |
|
|
server_name="127.0.0.1", |
|
|
server_port=7861, |
|
|
share=False |
|
|
) |
|
|
|
|
|
|