VoiceSementle / backend.py
SJLee-0525
[CHORE] test12
2b426bd
raw
history blame
41.1 kB
"""
FastAPI Backend for Komentle Voice Challenge
Handles voice analysis requests and communicates with AI server
"""
from fastapi import FastAPI, UploadFile, File, Form, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from typing import Optional, Dict
from datetime import datetime
from contextlib import asynccontextmanager, AsyncExitStack
import os
import time
import base64
import json
import asyncio
import hashlib
import io
from pathlib import Path
from dotenv import load_dotenv
from sqlalchemy import create_engine, text
import httpx
import logging
from mcp.client.sse import sse_client
from mcp.client.session import ClientSession
from gemini_adapter import call_gemini_with_tools, get_text_from_gemini_response
from pydub import AudioSegment
from pydub.effects import normalize
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Load environment variables
load_dotenv()
# Database connection
DATABASE_URL = os.getenv("DATABASE_URL")
engine = create_engine(
DATABASE_URL,
pool_size=10, # κΈ°λ³Έ μ—°κ²° ν’€ 크기
max_overflow=20, # μ΅œλŒ€ μΆ”κ°€ μ—°κ²° 수
pool_pre_ping=True, # μ—°κ²° μ‚¬μš© μ „ μœ νš¨μ„± 검사
pool_recycle=3600, # 1μ‹œκ°„λ§ˆλ‹€ μ—°κ²° μž¬μƒμ„±
connect_args={
"connect_timeout": 10, # μ—°κ²° νƒ€μž„μ•„μ›ƒ 10초
"options": "-c statement_timeout=30000" # 쿼리 νƒ€μž„μ•„μ›ƒ 30초
}
)
# AI Server URL (ν™˜κ²½λ³€μˆ˜λ‘œ 관리) - No longer used, replaced with direct MCP integration
AI_SERVER_URL = os.getenv("AI_SERVER_URL")
# Global VoiceKit MCP session
voicekit_session = None
session_stack = None
mcp_lock = None # Lock for MCP reconnection
# Session tracking for attempt counts
session_attempts = {} # {session_id: attempt_count}
# VoiceKit result cache (audio_hash -> scores)
voicekit_result_cache = {} # {hash: {"scores": dict, "timestamp": float}}
VOICEKIT_CACHE_TTL = 3600 # 1 hour TTL
async def reconnect_voicekit_mcp():
"""Reconnect to VoiceKit MCP when connection is lost"""
global voicekit_session, session_stack, mcp_lock
if mcp_lock is None:
import asyncio
mcp_lock = asyncio.Lock()
async with mcp_lock:
# Check if already reconnected by another call
if voicekit_session is not None:
try:
# Test if session is alive
await voicekit_session.list_tools()
logger.info("MCP session already alive, no reconnection needed")
return
except:
pass
logger.info("Reconnecting to VoiceKit MCP...")
# Clean up old session
if session_stack:
try:
await session_stack.aclose()
except:
pass
# Create new session
session_stack = AsyncExitStack()
try:
voicekit_url = "https://mcp-1st-birthday-voicekit.hf.space/gradio_api/mcp/sse"
read, write = await session_stack.enter_async_context(sse_client(voicekit_url))
voicekit_session = await session_stack.enter_async_context(
ClientSession(read, write)
)
await voicekit_session.initialize()
tools_result = await voicekit_session.list_tools()
logger.info(
f"βœ“ VoiceKit MCP reconnected. Tools: {[t.name for t in tools_result.tools]}"
)
except Exception as e:
logger.error(f"Failed to reconnect VoiceKit MCP: {e}")
voicekit_session = None
raise
def get_audio_hash(audio_bytes: bytes, reference_b64: str, answer_word: str, category: str) -> str:
"""Generate hash for audio caching key"""
# Combine user audio + reference audio + answer + category for unique key
cache_key = f"{hashlib.sha256(audio_bytes).hexdigest()}_{reference_b64[:50]}_{answer_word}_{category}"
return hashlib.sha256(cache_key.encode()).hexdigest()
def compress_audio(audio_bytes: bytes, target_sample_rate: int = 16000) -> bytes:
"""
Compress audio to reduce size for faster MCP transmission
Args:
audio_bytes: Original audio bytes
target_sample_rate: Target sample rate (default 16kHz for voice)
Returns:
Compressed audio bytes
"""
try:
compress_start = time.time()
original_size = len(audio_bytes)
# Load audio using pydub
audio = AudioSegment.from_file(io.BytesIO(audio_bytes))
# Convert to mono (voice doesn't need stereo)
if audio.channels > 1:
audio = audio.set_channels(1)
# Downsample to 16kHz (optimal for voice recognition)
if audio.frame_rate != target_sample_rate:
audio = audio.set_frame_rate(target_sample_rate)
# Normalize audio levels
audio = normalize(audio)
# Strip silence from beginning/end (threshold -50dB)
audio = audio.strip_silence(silence_thresh=-50, padding=100)
# Export as compressed WAV (16-bit PCM)
output_buffer = io.BytesIO()
audio.export(output_buffer, format="wav", parameters=["-ac", "1", "-ar", str(target_sample_rate)])
compressed_bytes = output_buffer.getvalue()
compressed_size = len(compressed_bytes)
reduction = (1 - compressed_size / original_size) * 100
compress_time = (time.time() - compress_start) * 1000
logger.info(
f"πŸ—œοΈ Audio compression: {original_size/1024:.1f}KB β†’ {compressed_size/1024:.1f}KB "
f"({reduction:.1f}% reduction) in {compress_time:.1f}ms"
)
return compressed_bytes
except Exception as e:
logger.warning(f"Audio compression failed: {e}, using original")
return audio_bytes
# Lifespan handler for MCP initialization
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Initialize and cleanup VoiceKit MCP connection"""
global voicekit_session, session_stack
# Startup: Initialize VoiceKit MCP
logger.info("Initializing VoiceKit MCP connection...")
session_stack = AsyncExitStack()
try:
voicekit_url = "https://mcp-1st-birthday-voicekit.hf.space/gradio_api/mcp/sse"
read, write = await session_stack.enter_async_context(sse_client(voicekit_url))
voicekit_session = await session_stack.enter_async_context(
ClientSession(read, write)
)
await voicekit_session.initialize()
# List available tools
tools_result = await voicekit_session.list_tools()
logger.info(
f"βœ“ VoiceKit MCP connected. Tools: {[t.name for t in tools_result.tools]}"
)
except Exception as e:
logger.error(f"Failed to initialize VoiceKit MCP: {e}")
voicekit_session = None
yield
# Shutdown: cleanup
if session_stack:
await session_stack.aclose()
logger.info("βœ“ VoiceKit MCP connection closed")
app = FastAPI(title="Komentle Voice API", lifespan=lifespan)
# CORS μ„€μ •
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Mount static files for hint images
os.makedirs("images", exist_ok=True)
app.mount("/images", StaticFiles(directory="images"), name="images")
# Mount static files for reference audio
os.makedirs("reference_audio", exist_ok=True)
app.mount("/reference_audio", StaticFiles(directory="reference_audio"), name="reference_audio")
# ============================================================================
# Performance Optimization: Caches
# ============================================================================
# Cache for base64-encoded reference audio (key: puzzle_number)
reference_audio_cache = {}
# Cache for Gemini-generated hints (key: cache_key from attempt+scores)
hint_cache = {}
# ============================================================================
# Audio Format Handling: Multi-format fallback
# ============================================================================
def load_reference_audio_with_fallback(reference_audio_path: str, puzzle_number: Optional[int] = None) -> Optional[str]:
"""
Load reference audio with multi-format fallback and caching
Args:
reference_audio_path: Path from database (may be wrong extension)
puzzle_number: Puzzle number for caching (optional)
Returns:
base64-encoded audio string or None if not found
"""
# Check cache first (if puzzle_number provided)
if puzzle_number is not None and puzzle_number in reference_audio_cache:
logger.info(f"βœ“ Using cached reference audio for puzzle #{puzzle_number}")
return reference_audio_cache[puzzle_number]
if not reference_audio_path:
return None
# Get base path without extension
base_path = Path(reference_audio_path.lstrip("/"))
base_name = base_path.stem
parent_dir = base_path.parent
# Try exact path first
if base_path.exists():
with open(base_path, "rb") as f:
audio_b64 = base64.b64encode(f.read()).decode("utf-8")
if puzzle_number is not None:
reference_audio_cache[puzzle_number] = audio_b64
logger.info(f"βœ“ Loaded reference audio: {base_path}")
return audio_b64
# Try alternative formats (priority: .wav for VoiceKit, then others)
for ext in ['.wav', '.mp3', '.m4a', '.ogg', '.flac']:
alt_path = parent_dir / f"{base_name}{ext}"
if alt_path.exists():
with open(alt_path, "rb") as f:
audio_b64 = base64.b64encode(f.read()).decode("utf-8")
if puzzle_number is not None:
reference_audio_cache[puzzle_number] = audio_b64
logger.info(f"βœ“ Loaded reference audio (alternative format): {alt_path}")
return audio_b64
logger.warning(f"⚠ No reference audio found for: {reference_audio_path}")
return None
def get_hint_cache_key(attempt: int, scores: dict, category: str) -> str:
"""
Generate cache key for Gemini hints based on attempt and score buckets
Args:
attempt: Attempt number (1-6)
scores: Dict of scores {pitch, rhythm, energy, pronunciation, transcript}
category: Puzzle category
Returns:
Cache key string
"""
def bucket(score):
"""Bucket scores into low/medium/high"""
if score < 30:
return "low"
elif score < 70:
return "med"
else:
return "high"
# Filter only numeric score fields (exclude 'user_text' and other non-numeric fields)
numeric_scores = {k: v for k, v in scores.items() if isinstance(v, (int, float))}
# Find weakest 2 aspects
weakest = sorted(numeric_scores.items(), key=lambda x: x[1])[:2]
weakest_str = "_".join([f"{k}:{bucket(v)}" for k, v in weakest])
return f"{category}_attempt{attempt}_{weakest_str}"
# Response models
class AnalysisResponse(BaseModel):
status: str
category: str
answer_word: Optional[str] = None # Answer word for chatbot context
reference_audio_path: Optional[str] = None # Reference audio for voice cloning
pitch: float
rhythm: float
energy: float
pronunciation: float
transcript: float
overall: float
advice: Optional[str] = None
is_correct: bool = False
message: Optional[str] = None
user_text: Optional[str] = None # μ‚¬μš©μžκ°€ λ§ν•œ ν…μŠ€νŠΈ (STT κ²°κ³Ό)
class ErrorResponse(BaseModel):
status: str
message: str
def get_puzzle_by_date(date: str) -> Optional[Dict]:
"""
Query puzzle by date from database
Args:
date: Date string in YYYY-MM-DD format
Returns:
dict: Puzzle info or None if not found
"""
try:
query = text(
"""
SELECT puzzle_number, answer_word, puzzle_date, difficulty, category, reference_audio_path
FROM puzzles
WHERE puzzle_date = :date
LIMIT 1
"""
)
with engine.connect() as connection:
result = connection.execute(query, {"date": date})
row = result.fetchone()
if row:
return {
"puzzle_number": row[0],
"answer_word": row[1],
"puzzle_date": str(row[2]),
"difficulty": row[3],
"category": row[4],
"reference_audio_path": row[5],
}
return None
except Exception as e:
logger.error(f"Database error: {e}")
return None
def get_attempt_count(session_id: str) -> int:
"""Track and increment attempt count for session"""
global session_attempts
if session_id not in session_attempts:
session_attempts[session_id] = 0
session_attempts[session_id] += 1
return session_attempts[session_id]
def list_hint_files(category: str) -> list:
"""List available hint images for category"""
hints_dir = Path("images/hints") / category
if not hints_dir.exists():
return []
extensions = ["*.jpg", "*.png", "*.jpeg", "*.gif"]
files = []
for ext in extensions:
files.extend([f.name for f in hints_dir.glob(ext)])
return files
async def generate_hints_with_gemini(
scores: dict, attempt: int, answer_word: str, category: str
) -> dict:
"""Generate JSON hints using Gemini LLM (caching disabled to ensure unique hints)"""
try:
# TEMPORARY FIX: Disable hint caching to prevent repetitive hints
# The cache was too aggressive, returning identical hints when scores were similar
# TODO: Revisit with smarter caching strategy (e.g., include previous hint hash)
# cache_key = get_hint_cache_key(attempt, scores, category)
# if cache_key in hint_cache:
# logger.info(f"βœ“ Using cached hint for: {cache_key}")
# return hint_cache[cache_key]
# Find weakest metrics
metrics = {
k: v
for k, v in scores.items()
if k in ["pitch", "rhythm", "energy", "pronunciation"]
}
weakest = sorted(metrics.items(), key=lambda x: x[1])[:2]
weakest_names = [w[0] for w in weakest]
# List available hint files
available_hints = list_hint_files(category)
hint_files_str = (
", ".join(available_hints[:5]) if available_hints else "none available"
)
# Determine hint type and guidance based on attempt (progressive difficulty)
if attempt == 1:
hint_type = "hint"
guidance = f"Give an EXTREMELY VAGUE clue. Don't mention the category yet. Just hint at the general concept."
category_hint = "Do NOT mention the category on first attempt."
elif attempt == 2:
hint_type = "hint"
guidance = f"Give a VAGUE clue and casually mention it's a {category}. Include an image hint if available."
category_hint = f"Mention it's a {category} but keep the clue vague."
elif attempt <= 4:
hint_type = "hint"
guidance = f"Give a MORE SPECIFIC clue about this {category}. Include relevant context. Use image if available."
category_hint = f"Be clear this is a {category} and add more context."
elif attempt <= 6:
hint_type = "hint"
guidance = f"Give a QUITE SPECIFIC hint about this {category}. Can mention era, context, or usage. Include image if helpful."
category_hint = f"Give substantial clues while still not revealing the answer."
elif attempt <= 10:
hint_type = "hint"
guidance = f"Give VERY SPECIFIC hints. Can mention syllable count, rhymes, or first letter. This is attempt {attempt} - be helpful!"
category_hint = f"User has tried {attempt} times. Give strong hints without saying the answer."
else:
hint_type = "advice"
guidance = f"Attempt {attempt}! Focus on pronunciation coaching for {', '.join(weakest_names)}. Give very strong hints about what to say."
category_hint = f"After {attempt} attempts, be very helpful while still not directly revealing the answer."
# Build prompt for Gemini
prompt = f"""You are a hint generator for "Audio Semantle" - a pronunciation puzzle game where players start blind and must figure out what word to say.
**Current State:**
- Answer word: "{answer_word}" (DO NOT reveal this directly!)
- Category: {category} (this is a {category})
- Attempt number: {attempt} (players have UNLIMITED attempts)
- Scores (0-100): Pitch={scores.get('pitch', 0)}, Rhythm={scores.get('rhythm', 0)}, Energy={scores.get('energy', 0)}, Pronunciation={scores.get('pronunciation', 0)}, Overall={scores.get('overall', 0)}
- Weakest areas: {', '.join(weakest_names)}
- Available hint images: {hint_files_str}
**Task:** {guidance}
**Category Guidance:** {category_hint}
**Hint Examples by Category:**
- If category = "meme": "This viral phrase often appears in funny internet videos..."
- If category = "movie": "This famous movie quote/title was released in..."
- If category = "song": "This classic song by [artist hint] topped the charts..."
**Return ONLY this JSON format, no other text:**
{{
"type": "{hint_type}",
"answer": [
{{
"text": "Your hint or advice text here (can mention category)",
"path": "images/hints/{category}/filename.jpg" OR ""
}}
]
}}
**Rules for Progressive Hints:**
1. Remember: Players start COMPLETELY BLIND - they don't know what to say initially
2. Hints should get progressively more helpful with each attempt
3. For "hint" type: Follow the guidance above based on attempt number
4. For "advice" type: Focus on pronunciation + give strong contextual clues
5. Keep text concise (1-2 sentences max)
6. NEVER reveal the answer directly, but after 10+ attempts be very helpful
7. Return ONLY valid JSON, no markdown, no extra text
"""
# Call Gemini
response = call_gemini_with_tools(
model_name="gemini-2.5-flash",
system_prompt="You are a JSON generator. Return ONLY valid JSON with no markdown formatting or extra text.",
messages=[{"role": "user", "content": prompt}],
tools=[],
max_tokens=512, # Reduced from 1024 - hints are 1-2 sentences
)
# Extract JSON from response
response_text, error = get_text_from_gemini_response(response)
if error:
logger.error(f"Gemini response error: {error}")
# Fallback hint
return {
"type": "advice",
"answer": [
{
"text": f"Focus on improving {weakest_names[0]} (score: {weakest[0][1]:.0f}/100)",
"path": "",
}
],
}
# Clean response text (remove markdown code blocks if present)
response_text = response_text.strip()
if response_text.startswith("```"):
lines = response_text.split("\n")
response_text = "\n".join(lines[1:-1]) if len(lines) > 2 else response_text
# Parse JSON
hints_json = json.loads(response_text)
# DISABLED: Don't cache hints to ensure unique hints per attempt
# hint_cache[cache_key] = hints_json
# logger.info(f"βœ“ Cached hint for: {cache_key}")
logger.info(f"βœ“ Generated fresh hint for attempt {attempt}, category {category}")
return hints_json
except Exception as e:
logger.error(f"Hint generation error: {e}")
# Fallback hint
return {
"type": "advice",
"answer": [
{
"text": "Keep practicing! Focus on your pronunciation.",
"path": "",
}
],
}
def extract_advice_text(hints_json: dict) -> str:
"""Extract plain text from hints JSON for advice field"""
try:
return " ".join([item["text"] for item in hints_json.get("answer", [])])
except:
return "Keep practicing!"
async def call_ai_server(
audio_file: bytes,
session_id: str,
category: str,
answer_word: str,
reference_audio_path: Optional[str] = None,
puzzle_number: Optional[int] = None,
) -> Dict:
"""
Analyze voice using VoiceKit MCP + Gemini for hints
Args:
audio_file: Audio file bytes
session_id: User session ID
category: Puzzle category (meme, movie, song)
answer_word: Correct answer for this puzzle
reference_audio_path: Path to reference audio file (from DB)
puzzle_number: Puzzle number for caching reference audio
Returns:
dict: AI analysis results with pitch, rhythm, energy, pronounciation, transcript, overall_score, advice, hints, is_correct
"""
try:
start_time = time.time()
if not voicekit_session:
logger.error("VoiceKit MCP not initialized")
return {"error": "AI service not available"}
# Compress audio before processing (reduces size by 50-70%)
compressed_audio = compress_audio(audio_file)
# Convert audio bytes to base64
user_b64 = base64.b64encode(compressed_audio).decode("utf-8")
logger.info(f"⏱️ Base64 encoding: {(time.time() - start_time)*1000:.1f}ms")
# Load reference audio (ground truth) with format fallback and caching
ref_start = time.time()
reference_b64 = load_reference_audio_with_fallback(reference_audio_path, puzzle_number)
logger.info(f"⏱️ Reference audio load: {(time.time() - ref_start)*1000:.1f}ms")
if reference_b64 is None:
# Fallback: use user audio as reference if GT not available
reference_b64 = user_b64
logger.warning("⚠ No reference audio available, using user audio")
# Track attempt count
attempt = get_attempt_count(session_id)
logger.info(f"Session {session_id}: Attempt {attempt}/6")
# DISABLED: VoiceKit result caching
# Users may submit the same audio multiple times intentionally for practice
# Each submission should be analyzed fresh to provide real-time feedback
# audio_hash = get_audio_hash(compressed_audio, reference_b64 or "", answer_word, category)
result = None
# Call VoiceKit MCP for voice analysis with retry logic and timeout (always fresh)
if result is None:
max_retries = 3
timeout_seconds = 20 # Balance between reliability and user wait time (60s max)
voicekit_start = time.time()
for retry in range(max_retries):
try:
logger.info(f"Calling VoiceKit MCP (attempt {retry + 1}/{max_retries})...")
result = await asyncio.wait_for(
voicekit_session.call_tool(
"voicekit_analyze_voice_similarity",
{
"user_audio_base64": user_b64,
"reference_audio_base64": reference_b64,
"reference_text": answer_word,
"category": category,
},
),
timeout=timeout_seconds
)
voicekit_time = (time.time() - voicekit_start) * 1000
logger.info(f"βœ“ VoiceKit MCP call successful")
logger.info(f"⏱️ VoiceKit MCP call: {voicekit_time:.1f}ms")
# DISABLED: Don't cache VoiceKit results
# Each user submission should be analyzed fresh
# voicekit_result_cache[audio_hash] = {
# "result": result,
# "timestamp": time.time()
# }
logger.info(f"βœ“ Fresh VoiceKit analysis completed")
break # Success, exit retry loop
except asyncio.TimeoutError:
if retry < max_retries - 1:
# Exponential backoff: 0.5s, 1s, 2s
retry_delay = 0.5 * (2 ** retry)
logger.warning(
f"VoiceKit call timed out after {timeout_seconds}s (attempt {retry + 1}/{max_retries}), retrying in {retry_delay}s"
)
await asyncio.sleep(retry_delay)
else:
logger.error(
f"VoiceKit call timed out after {max_retries} attempts"
)
raise HTTPException(status_code=504, detail="VoiceKit service timeout")
except Exception as e:
error_msg = str(e)
# Check if MCP connection is closed
if "ClosedResourceError" in error_msg or "ClosedResourceError" in str(type(e)):
logger.warning(f"MCP connection closed, attempting to reconnect...")
try:
await reconnect_voicekit_mcp()
logger.info("MCP reconnected, retrying request...")
await asyncio.sleep(1)
continue # Retry with new connection
except Exception as reconnect_error:
logger.error(f"MCP reconnection failed: {reconnect_error}")
if retry < max_retries - 1:
# Exponential backoff: 0.5s, 1s, 2s
retry_delay = 0.5 * (2 ** retry)
logger.warning(
f"VoiceKit call failed (attempt {retry + 1}/{max_retries}): {e}, retrying in {retry_delay}s"
)
await asyncio.sleep(retry_delay)
else:
logger.error(
f"VoiceKit call failed after {max_retries} attempts: {e}"
)
raise
# Parse VoiceKit response
scores_text = result.content[0].text
scores = json.loads(scores_text)
# scores = {pitch, rhythm, energy, pronunciation, transcript, overall}
logger.info(f"VoiceKit scores: {scores}")
print(f"\n{'='*50}")
print(f"[AI RESPONSE] VoiceKit MCP 응닡 데이터:")
print(f" Raw text: {scores_text}")
print(f" Parsed scores:")
for key, value in scores.items():
print(f" - {key}: {value}")
print(f"{'='*50}\n")
# Generate hints with Gemini
gemini_start = time.time()
hints_json = await generate_hints_with_gemini(
scores=scores, attempt=attempt, answer_word=answer_word, category=category
)
gemini_time = (time.time() - gemini_start) * 1000
logger.info(f"⏱️ Gemini hint generation: {gemini_time:.1f}ms")
logger.info(f"Generated hints: {hints_json}")
print(f"\n{'='*50}")
print(f"[AI RESPONSE] Gemini 힌트 응닡 데이터:")
print(f" {json.dumps(hints_json, ensure_ascii=False, indent=2)}")
print(f"{'='*50}\n")
# Total time
total_time = (time.time() - start_time) * 1000
logger.info(f"⏱️ TOTAL REQUEST TIME: {total_time:.1f}ms")
# Format response (convert 0-100 to 0.0-1.0 as Chloe expects)
return {
"pitch": scores.get("pitch", 0) / 100.0,
"rhythm": scores.get("rhythm", 0) / 100.0,
"energy": scores.get("energy", 0) / 100.0,
"pronounciation": scores.get("pronunciation", 0)
/ 100.0, # Note: typo to match Chloe's expectation
"transcript": scores.get("transcript", 0) / 100.0,
"overall_score": scores.get("overall", 0) / 100.0,
"advice": extract_advice_text(hints_json),
"hints": hints_json,
"is_correct": scores.get("overall", 0) > 85,
"user_text": scores.get("user_text", ""), # STT κ²°κ³Ό
}
except Exception as e:
logger.error(f"AI analysis error: {e}")
import traceback
traceback.print_exc()
return {"error": str(e)}
def convert_to_percentage(value: float) -> float:
"""
Convert AI score (0.0-1.0) to percentage (0-100)
Args:
value: Score in 0.0-1.0 range
Returns:
float: Score in 0-100 range
"""
return round(value * 100, 1)
def save_guess_record(
session_id: str,
puzzle_number: int,
pitch: float,
rhythm: float,
energy: float,
pronunciation: float,
transcript: float,
overall: float,
advice: str,
is_correct: bool,
user_text: str = "",
) -> bool:
"""
Save guess record to database
Args:
session_id: User session UUID
puzzle_number: Puzzle number
pitch: Pitch score (0-100)
rhythm: Rhythm score (0-100)
energy: Energy score (0-100)
pronunciation: Pronunciation score (0-100)
transcript: Transcript score (0-100)
overall: Overall score (0-100)
advice: AI advice
is_correct: Whether answer is correct
user_text: STT transcription from MCP
Returns:
bool: True if saved successfully, False otherwise
"""
try:
# ν˜„μž¬ μ‹œκ°μ„ λ°€λ¦¬μ΄ˆ λ‹¨μœ„ timestamp둜 λ³€ν™˜
guess_timestamp = int(time.time() * 1000)
query = text(
"""
INSERT INTO guess_records
(session_id, puzzle_number, pitch, rhythm, energy, pronunciation,
transcript, overall, advice, is_correct, guess_timestamp, user_text)
VALUES
(:session_id, :puzzle_number, :pitch, :rhythm, :energy, :pronunciation,
:transcript, :overall, :advice, :is_correct, :guess_timestamp, :user_text)
"""
)
with engine.connect() as connection:
connection.execute(
query,
{
"session_id": session_id,
"puzzle_number": puzzle_number,
"pitch": pitch,
"rhythm": rhythm,
"energy": energy,
"pronunciation": pronunciation,
"transcript": transcript,
"overall": overall,
"advice": advice,
"is_correct": is_correct,
"guess_timestamp": guess_timestamp,
"user_text": user_text,
},
)
connection.commit()
logger.info(
f"Saved guess record: session={session_id}, puzzle={puzzle_number}, correct={is_correct}"
)
return True
except Exception as e:
logger.error(f"Failed to save guess record: {e}")
return False
@app.get("/")
async def root():
"""Health check endpoint"""
return {"status": "ok", "message": "Komentle Voice API"}
@app.get("/health")
async def health_check():
"""Detailed health check"""
db_status = "ok"
try:
with engine.connect() as connection:
connection.execute(text("SELECT 1"))
except Exception as e:
db_status = f"error: {str(e)}"
return {
"status": "ok",
"database": db_status,
"timestamp": datetime.now().isoformat(),
}
async def analyze_voice_logic(audio_bytes: bytes, date: str, session_id: str) -> Dict:
"""
Core logic for voice analysis (can be called directly or via API)
Args:
audio_bytes: Audio file bytes
date: Date in YYYY-MM-DD format
session_id: User session UUID
Returns:
dict: Analysis results with scores
"""
logger.info(f"Received request: date={date}, session_id={session_id}")
# 1. Get puzzle for the date
puzzle = get_puzzle_by_date(date)
if not puzzle:
return {"status": "error", "message": f"No puzzle found for date: {date}"}
logger.info(f"Found puzzle: {puzzle['puzzle_number']} - {puzzle['category']}")
# 2. Call AI server with session_id, category, answer_word, reference_audio_path, and puzzle_number
ai_response = await call_ai_server(
audio_bytes,
session_id,
puzzle["category"],
puzzle["answer_word"],
puzzle.get("reference_audio_path"),
puzzle["puzzle_number"],
)
if "error" in ai_response:
return {
"status": "error",
"message": f"AI server error: {ai_response['error']}",
}
# 3. Convert scores to percentage (0-100) and map fields
# AI μ„œλ²„λŠ” 0.0-1.0 λ²”μœ„λ‘œ λ°˜ν™˜, ν”„λ‘ νŠΈμ—”λ“œλŠ” 0-100 ν•„μš”
pitch = convert_to_percentage(ai_response.get("pitch", 0.0))
rhythm = convert_to_percentage(ai_response.get("rhythm", 0.0))
energy = convert_to_percentage(ai_response.get("energy", 0.0))
pronunciation = convert_to_percentage(
ai_response.get("pronounciation", 0.0)
) # AI μ„œλ²„ μ˜€νƒ€: pronounciation
transcript = convert_to_percentage(
ai_response.get("transcript", 0.0)
) # λŒ€μ‚¬ 정확도 점수
overall = convert_to_percentage(ai_response.get("overall_score", 0.0))
advice = ai_response.get("advice", "")
is_correct = ai_response.get("is_correct", False) # AIκ°€ νŒλ‹¨ν•œ μ •λ‹΅ μ—¬λΆ€
user_text = ai_response.get("user_text", "") # μ‚¬μš©μžκ°€ λ§ν•œ ν…μŠ€νŠΈ (STT κ²°κ³Ό)
# 4. Save guess record to database
save_guess_record(
session_id=session_id,
puzzle_number=puzzle["puzzle_number"],
pitch=pitch,
rhythm=rhythm,
energy=energy,
pronunciation=pronunciation,
transcript=transcript,
overall=overall,
advice=advice,
is_correct=is_correct,
user_text=user_text,
)
logger.info(
f"Analysis complete: category={puzzle['category']}, overall={overall}, correct={is_correct}"
)
result = {
"status": "success",
"category": puzzle["category"],
"answer_word": puzzle["answer_word"], # Add answer for chatbot context
"reference_audio_path": puzzle.get("reference_audio_path"), # For TTS voice cloning
"pitch": pitch,
"rhythm": rhythm,
"energy": energy,
"pronunciation": pronunciation,
"transcript": transcript,
"overall": overall,
"advice": advice,
"is_correct": is_correct,
"user_text": user_text,
}
print(f"\n{'='*50}")
print(f"[SCORING RESULT] analyze_voice_logic 리턴값:")
print(f" - status: {result['status']}")
print(f" - category: {result['category']}")
print(f" - pitch: {result['pitch']}")
print(f" - rhythm: {result['rhythm']}")
print(f" - energy: {result['energy']}")
print(f" - pronunciation: {result['pronunciation']}")
print(f" - transcript: {result['transcript']}")
print(f" - overall: {result['overall']}")
print(f" - is_correct: {result['is_correct']}")
print(f" - user_text: {result['user_text']}")
print(f" - advice: {result['advice'][:100]}..." if len(result['advice']) > 100 else f" - advice: {result['advice']}")
print(f"{'='*50}\n")
return result
@app.post("/api/analyze-voice", response_model=AnalysisResponse)
async def analyze_voice(
audio: UploadFile = File(...), date: str = Form(...), session_id: str = Form(...)
):
"""
Analyze user voice recording (API endpoint)
Args:
audio: Audio file (WAV format)
date: Date in YYYY-MM-DD format
session_id: User session UUID
Returns:
AnalysisResponse: Analysis results with scores
"""
# Read audio file
audio_bytes = await audio.read()
# Call core logic
result = await analyze_voice_logic(audio_bytes, date, session_id)
# Handle errors
if result.get("status") == "error":
raise HTTPException(
status_code=500, detail=result.get("message", "Unknown error")
)
return AnalysisResponse(**result)
@app.get("/api/puzzle/{date}")
async def get_puzzle(date: str):
"""
Get puzzle information for a specific date
Args:
date: Date in YYYY-MM-DD format
Returns:
dict: Puzzle information
"""
puzzle = get_puzzle_by_date(date)
if not puzzle:
raise HTTPException(status_code=404, detail=f"No puzzle found for date: {date}")
# Don't expose answer_word to frontend
return {
"puzzle_number": puzzle["puzzle_number"],
"puzzle_date": puzzle["puzzle_date"],
"difficulty": puzzle["difficulty"],
"category": puzzle["category"],
}
@app.get("/api/dashboard")
async def get_dashboard():
"""
전체 λŒ€μ‹œλ³΄λ“œ 데이터 쑰회 (였늘 + 전체 톡합)
Returns:
dict: Flat dashboard statistics with 6 key metrics
"""
try:
today = datetime.now().strftime("%Y-%m-%d")
today_answer = text(
"""
SELECT
answer_word,
reference_audio_path,
category,
difficulty,
puzzle_date
FROM puzzles
WHERE puzzle_date = :today
LIMIT 1
"""
)
# 였늘의 톡계 쑰회
today_query = text(
"""
SELECT
puzzle_date,
participants,
success_rate,
total_attempts
FROM daily_statistics
WHERE puzzle_date = :today
"""
)
# 전체 톡계 쑰회
overall_query = text(
"""
SELECT
total_participants,
overall_success_rate,
total_attempts,
total_puzzles
FROM overall_statistics
"""
)
with engine.connect() as connection:
# 였늘의 퍼즐 정보
answer_result = connection.execute(today_answer, {"today": today})
answer_row = answer_result.fetchone()
if answer_row:
answer_word = answer_row[0]
reference_audio_path = answer_row[1]
category = answer_row[2]
difficulty = answer_row[3]
else:
answer_word = None
reference_audio_path = None
category = None
difficulty = None
# 였늘 톡계
today_result = connection.execute(today_query, {"today": today})
today_row = today_result.fetchone()
if not today_row:
today_participants = 0
today_success_rate = 0.0
today_attempts = 0
else:
today_participants = today_row[1]
today_success_rate = float(today_row[2])
today_attempts = today_row[3]
# 전체 톡계
overall_result = connection.execute(overall_query)
overall_row = overall_result.fetchone()
if not overall_row:
total_participants = 0
total_success_rate = 0.0
total_attempts = 0
total_puzzles = 0
else:
total_participants = overall_row[0]
total_success_rate = float(overall_row[1])
total_attempts = overall_row[2]
total_puzzles = overall_row[3]
return {
# 였늘 톡계
"today_participants": today_participants, # 1. 였늘 μ°Έμ—¬μž 수
"today_success_rate": today_success_rate, # 2. 였늘 μ •λ‹΅λ₯ 
"today_attempts": today_attempts, # 5. 였늘 μ‹œλ„ 횟수
# 전체 톡계
"total_participants": total_participants, # 3. 전체 μ°Έμ—¬μž 수
"total_success_rate": total_success_rate, # 4. 전체 μ •λ‹΅λ₯ 
"total_attempts": total_attempts, # 6. 총 μ‹œλ„ 횟수
# 였늘의 퍼즐 정보
"answer_word": answer_word,
"reference_audio_path": reference_audio_path,
"category": category,
"difficulty": difficulty,
# μΆ”κ°€ 정보
"date": today,
"total_puzzles": total_puzzles,
}
except Exception as e:
logger.error(f"Failed to get dashboard: {e}")
raise HTTPException(status_code=500, detail="Failed to retrieve dashboard")
if __name__ == "__main__":
import uvicorn
host = os.getenv("SERVER_HOST")
port = int(os.getenv("BACKEND_PORT"))
uvicorn.run(app, host=host, port=port, log_level="info")