|
|
""" |
|
|
Gradio UI for Chloe's Voice Komentle Game |
|
|
Connects to FastAPI backend for voice analysis |
|
|
""" |
|
|
|
|
|
import os |
|
|
|
|
|
_upload_dir = os.path.join(os.path.dirname(__file__), "gradio_uploads") |
|
|
os.makedirs(_upload_dir, exist_ok=True) |
|
|
os.environ["GRADIO_TEMP_DIR"] = _upload_dir |
|
|
|
|
|
import gradio as gr |
|
|
from datetime import datetime |
|
|
import uuid |
|
|
import asyncio |
|
|
from sqlalchemy import create_engine, text |
|
|
from dotenv import load_dotenv |
|
|
|
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
|
|
|
from backend import ( |
|
|
analyze_voice_logic, |
|
|
get_puzzle_by_date, |
|
|
lifespan, |
|
|
app as backend_app, |
|
|
) |
|
|
|
|
|
|
|
|
DATABASE_URL = os.getenv("DATABASE_URL") |
|
|
engine = create_engine( |
|
|
DATABASE_URL, |
|
|
pool_size=10, |
|
|
max_overflow=20, |
|
|
pool_pre_ping=True, |
|
|
pool_recycle=3600, |
|
|
connect_args={ |
|
|
"connect_timeout": 10, |
|
|
"options": "-c statement_timeout=30000" |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
session_id = str(uuid.uuid4()) |
|
|
|
|
|
|
|
|
backend_initialized = False |
|
|
|
|
|
|
|
|
async def analyze_voice_async(audio_file, date_str): |
|
|
""" |
|
|
Analyze voice using backend logic directly |
|
|
|
|
|
Args: |
|
|
audio_file: Path to recorded audio file |
|
|
date_str: Date string for puzzle lookup |
|
|
|
|
|
Returns: |
|
|
tuple: (result_text, scores_text, hint_text, image_path) |
|
|
""" |
|
|
if audio_file is None: |
|
|
return "β μ€λμ€λ₯Ό λ¨Όμ λ
Ήμν΄μ£ΌμΈμ!", "", "", None |
|
|
|
|
|
try: |
|
|
|
|
|
with open(audio_file, "rb") as f: |
|
|
audio_bytes = f.read() |
|
|
|
|
|
|
|
|
result = await analyze_voice_logic(audio_bytes, date_str, session_id) |
|
|
|
|
|
|
|
|
if result.get("status") == "error": |
|
|
return f"β {result.get('message', 'Unknown error')}", "", "", None |
|
|
|
|
|
|
|
|
category = result.get("category", "unknown") |
|
|
pitch = result.get("pitch", 0.0) |
|
|
rhythm = result.get("rhythm", 0.0) |
|
|
energy = result.get("energy", 0.0) |
|
|
pronunciation = result.get("pronunciation", 0.0) |
|
|
transcript = result.get("transcript", 0.0) |
|
|
overall = result.get("overall", 0.0) |
|
|
advice = result.get("advice", "") |
|
|
is_correct = result.get("is_correct", False) |
|
|
hints = {} |
|
|
|
|
|
|
|
|
if is_correct: |
|
|
result_msg = f"π μ λ΅μ
λλ€! μ 체 μ μ: {overall:.1f}/100" |
|
|
else: |
|
|
result_msg = f"π μ 체 μ μ: {overall:.1f}/100 - λ€μ μλν΄λ³΄μΈμ!" |
|
|
|
|
|
|
|
|
scores_text = f""" |
|
|
### π μ μ μμΈ |
|
|
|
|
|
**μΉ΄ν
κ³ λ¦¬:** {category.upper()} |
|
|
|
|
|
- **λ°μ (Pronunciation):** {pronunciation:.1f}/100 |
|
|
- **μλμ΄ (Pitch):** {pitch:.1f}/100 |
|
|
- **λ¦¬λ¬ (Rhythm):** {rhythm:.1f}/100 |
|
|
- **μλμ§ (Energy):** {energy:.1f}/100 |
|
|
- **μ μ¬ (Transcript):** {transcript:.1f}/100 |
|
|
- **μ 체 (Overall):** {overall:.1f}/100 |
|
|
""" |
|
|
|
|
|
|
|
|
hint_text = "" |
|
|
hint_image = None |
|
|
|
|
|
if hints and "answer" in hints: |
|
|
hint_type = hints.get("type", "hint") |
|
|
hint_items = hints.get("answer", []) |
|
|
|
|
|
if hint_type == "hint": |
|
|
hint_text = "π‘ **ννΈ:**\n\n" |
|
|
else: |
|
|
hint_text = "π― **λ°μ μ‘°μΈ:**\n\n" |
|
|
|
|
|
for item in hint_items: |
|
|
hint_text += f"{item.get('text', '')}\n\n" |
|
|
|
|
|
|
|
|
img_path = item.get("path", "") |
|
|
if img_path and os.path.exists(img_path): |
|
|
hint_image = img_path |
|
|
|
|
|
|
|
|
if not hint_text and advice: |
|
|
hint_text = f"π¬ **μ‘°μΈ:**\n\n{advice}" |
|
|
|
|
|
return result_msg, scores_text, hint_text, hint_image |
|
|
|
|
|
except Exception as e: |
|
|
return f"β μ€λ₯ λ°μ: {str(e)}", "", "", None |
|
|
|
|
|
|
|
|
def analyze_voice(audio_file, date_str): |
|
|
"""Synchronous wrapper for async analyze_voice_async""" |
|
|
return asyncio.run(analyze_voice_async(audio_file, date_str)) |
|
|
|
|
|
|
|
|
def get_today_puzzle(): |
|
|
"""Get today's puzzle information from database""" |
|
|
try: |
|
|
today = datetime.now().strftime("%Y-%m-%d") |
|
|
|
|
|
|
|
|
puzzle = get_puzzle_by_date(today) |
|
|
|
|
|
if puzzle: |
|
|
return f""" |
|
|
### π
μ€λμ νΌμ¦ |
|
|
|
|
|
**λ μ§:** {puzzle.get('puzzle_date', 'N/A')} |
|
|
**νΌμ¦ λ²νΈ:** #{puzzle.get('puzzle_number', 'N/A')} |
|
|
**μΉ΄ν
κ³ λ¦¬:** {puzzle.get('category', 'N/A').upper()} |
|
|
**λμ΄λ:** {puzzle.get('difficulty', 'N/A')} |
|
|
|
|
|
μ λ΅ λ¨μ΄λ₯Ό λ°μν΄λ³΄μΈμ! (μ΅λ 6ν μλ) |
|
|
""" |
|
|
else: |
|
|
return "β μ€λμ νΌμ¦μ μ°Ύμ μ μμ΅λλ€." |
|
|
|
|
|
except Exception as e: |
|
|
return f"β νΌμ¦ μ 보λ₯Ό κ°μ Έμ¬ μ μμ΅λλ€: {str(e)}" |
|
|
|
|
|
|
|
|
def reset_session(): |
|
|
"""Reset session for new game""" |
|
|
global session_id |
|
|
session_id = str(uuid.uuid4()) |
|
|
return "β
μ κ²μ μμ! μ€λμ€λ₯Ό λ
Ήμν΄μ£ΌμΈμ.", "", "", None |
|
|
|
|
|
|
|
|
|
|
|
with gr.Blocks(title="Chloe's Voice Komentle") as demo: |
|
|
gr.Markdown("# π€ Chloe's Voice Komentle") |
|
|
|
|
|
|
|
|
with gr.Row(): |
|
|
puzzle_info = gr.Markdown(value=get_today_puzzle()) |
|
|
refresh_btn = gr.Button("π νΌμ¦ μ 보 μλ‘κ³ μΉ¨", size="sm") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(scale=1): |
|
|
|
|
|
gr.Markdown("### ποΈ μμ± λ
Ήμ") |
|
|
audio_input = gr.Audio( |
|
|
sources=["microphone"], |
|
|
type="filepath", |
|
|
label="λ§μ΄ν¬λ‘ λ
Ήμ", |
|
|
format="wav", |
|
|
) |
|
|
|
|
|
|
|
|
date_input = gr.Textbox( |
|
|
label="λ μ§ (YYYY-MM-DD)", |
|
|
value=datetime.now().strftime("%Y-%m-%d"), |
|
|
interactive=True, |
|
|
) |
|
|
|
|
|
|
|
|
submit_btn = gr.Button("π― λΆμνκΈ°", variant="primary", size="lg") |
|
|
reset_btn = gr.Button("π μ κ²μ μμ", variant="secondary") |
|
|
|
|
|
with gr.Column(scale=1): |
|
|
|
|
|
gr.Markdown("### π κ²°κ³Ό") |
|
|
result_output = gr.Markdown(label="κ²°κ³Ό") |
|
|
scores_output = gr.Markdown(label="μ μ μμΈ") |
|
|
|
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
hint_output = gr.Markdown(label="ννΈ λ° μ‘°μΈ") |
|
|
|
|
|
with gr.Column(): |
|
|
hint_image = gr.Image(label="ννΈ μ΄λ―Έμ§", show_label=True) |
|
|
|
|
|
|
|
|
submit_btn.click( |
|
|
fn=analyze_voice, |
|
|
inputs=[audio_input, date_input], |
|
|
outputs=[result_output, scores_output, hint_output, hint_image], |
|
|
) |
|
|
|
|
|
reset_btn.click( |
|
|
fn=reset_session, |
|
|
inputs=[], |
|
|
outputs=[result_output, scores_output, hint_output, hint_image], |
|
|
) |
|
|
|
|
|
refresh_btn.click(fn=get_today_puzzle, inputs=[], outputs=[puzzle_info]) |
|
|
|
|
|
|
|
|
gr.Markdown("---\n**Powered by:** VoiceKit MCP + Gemini AI") |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
print("β³ Initializing VoiceKit MCP...") |
|
|
|
|
|
async def init_backend(): |
|
|
"""Initialize backend resources""" |
|
|
async with lifespan(backend_app): |
|
|
print("β VoiceKit MCP initialized") |
|
|
|
|
|
await asyncio.Event().wait() |
|
|
|
|
|
|
|
|
import threading |
|
|
|
|
|
def run_backend_init(): |
|
|
asyncio.run(init_backend()) |
|
|
|
|
|
backend_thread = threading.Thread(target=run_backend_init, daemon=True) |
|
|
backend_thread.start() |
|
|
|
|
|
|
|
|
import time |
|
|
|
|
|
time.sleep(5) |
|
|
print("β Backend initialized") |
|
|
|
|
|
|
|
|
server_host = os.getenv("SERVER_HOST") |
|
|
frontend_port = int(os.getenv("FRONTEND_PORT")) |
|
|
demo.launch( |
|
|
server_name=server_host, |
|
|
server_port=frontend_port, |
|
|
share=False, |
|
|
show_error=True, |
|
|
allowed_paths=[os.path.join(os.path.dirname(__file__), "hints", "audio")], |
|
|
) |
|
|
|