import os import sys import logging import traceback import numpy as np import gradio as gr from typing import Optional, Tuple import soundfile as sf from pathlib import Path import requests import json import base64 import io import tempfile import uuid import time # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler('app.log', mode='a', encoding='utf-8') ] ) logger = logging.getLogger(__name__) # 启动日志 logger.info("="*50) logger.info("🚀 VoxCPM应用启动中...") logger.info(f"Python版本: {sys.version}") logger.info(f"工作目录: {os.getcwd()}") logger.info(f"环境变量PORT: {os.environ.get('PORT', '未设置')}") logger.info(f"环境变量RAY_SERVE_URL: {os.environ.get('RAY_SERVE_URL', '未设置')}") logger.info("="*50) class RayServeVoxCPMClient: """Client wrapper that talks to Ray Serve TTS API.""" def __init__(self) -> None: logger.info("📡 初始化RayServeVoxCPMClient...") try: # Ray Serve API URL (can be overridden via env) self.RAY_SERVE_DEFAULT_URL = "https://d09162224-pytorch251-cuda124-u-5512-iyr4lse3-8970.550c.cloud" self.api_url = self._resolve_server_url() logger.info(f"🔗 准备连接到Ray Serve API: {self.api_url}") # Test connection logger.info("⏳ 测试Ray Serve连接...") health_start = time.time() health_response = requests.get(f"{self.api_url}/health", timeout=10) health_response.raise_for_status() health_time = time.time() - health_start logger.info(f"✅ 成功连接到Ray Serve API: {self.api_url} (耗时: {health_time:.3f}秒)") except Exception as e: logger.error(f"❌ 初始化RayServeVoxCPMClient失败: {e}") logger.error(f"错误详情: {traceback.format_exc()}") raise # ----------- Helpers ----------- def _resolve_server_url(self) -> str: """Resolve Ray Serve API base URL, prefer env RAY_SERVE_URL.""" return os.environ.get("RAY_SERVE_URL", self.RAY_SERVE_DEFAULT_URL).rstrip("/") def _audio_file_to_base64(self, audio_file_path: str) -> str: """ 将音频文件转换为base64编码 Args: audio_file_path: 音频文件路径 Returns: base64编码的音频数据 """ try: with open(audio_file_path, 'rb') as f: audio_bytes = f.read() return base64.b64encode(audio_bytes).decode('utf-8') except Exception as e: logger.error(f"音频文件转base64失败: {e}") raise def _base64_to_audio_array(self, base64_audio: str, sample_rate: int = 16000) -> Tuple[int, np.ndarray]: """ 将base64编码的音频转换为numpy数组 Args: base64_audio: base64编码的音频数据 sample_rate: 期望的采样率 Returns: (sample_rate, audio_array) tuple """ try: # 解码base64 audio_bytes = base64.b64decode(base64_audio) # 创建临时文件 with tempfile.NamedTemporaryFile(delete=False, suffix='.wav') as tmp_file: tmp_file.write(audio_bytes) tmp_file_path = tmp_file.name # 读取音频文件 try: audio_data, sr = sf.read(tmp_file_path, dtype='float32') # 转换为单声道 if audio_data.ndim == 2: audio_data = audio_data[:, 0] # 转换为int16格式(Gradio期望的格式) audio_int16 = (audio_data * 32767).astype(np.int16) return sr, audio_int16 finally: # 清理临时文件 try: os.unlink(tmp_file_path) except: pass except Exception as e: logger.error(f"base64转音频数组失败: {e}") raise # ----------- Functional endpoints ----------- def prompt_wav_recognition(self, prompt_wav: Optional[str]) -> str: """Use Ray Serve ASR API for speech recognition.""" logger.info(f"🎵 开始语音识别,输入文件: {prompt_wav}") if prompt_wav is None or not prompt_wav.strip(): logger.info("⚠️ 没有提供音频文件,跳过语音识别") return "" try: start_time = time.time() logger.info(f"📁 处理音频文件: {prompt_wav}") # 将音频文件转换为base64 convert_start = time.time() audio_base64 = self._audio_file_to_base64(prompt_wav) convert_time = time.time() - convert_start logger.info(f"🔄 音频转base64耗时: {convert_time:.3f}秒") logger.info("📡 调用Ray Serve ASR API...") # 构建ASR请求 asr_request = { "reqid": str(uuid.uuid4()), "audio_data": audio_base64, "language": "auto", "use_itn": True } # 调用ASR接口 api_start = time.time() response = requests.post( f"{self.api_url}/asr", json=asr_request, headers={"Content-Type": "application/json"}, timeout=30 ) response.raise_for_status() api_time = time.time() - api_start result_data = response.json() total_time = time.time() - start_time logger.info(f"⏱️ ASR API请求耗时: {api_time:.3f}秒") logger.info(f"⏱️ ASR总耗时: {total_time:.3f}秒") logger.info(f"✅ 语音识别完成,响应: {result_data}") # 检查响应状态 if result_data.get("code") == 3000: recognized_text = result_data.get("text", "") logger.info(f"🎯 识别结果: '{recognized_text}'") return recognized_text else: logger.warning(f"⚠️ ASR识别失败: {result_data.get('message', 'Unknown error')}") return "" except Exception as e: logger.error(f"❌ 语音识别失败: {e}") logger.error(f"错误详情: {traceback.format_exc()}") return "" def _call_ray_serve_generate( self, text: str, prompt_wav_path: Optional[str] = None, prompt_text: Optional[str] = None, cfg_value: float = 2.0, inference_timesteps: int = 10, do_normalize: bool = True, denoise: bool = True, ) -> Tuple[int, np.ndarray]: """ Call Ray Serve /generate API and return (sample_rate, waveform). """ logger.info(f"🔥 调用Ray Serve生成API,文本: '{text[:60]}...'") try: start_time = time.time() # 构建请求数据 prepare_start = time.time() audio_config = { "voice_type": "default", # 使用默认模式,或者可以根据需要调整 "encoding": "wav", "speed_ratio": 1.0, "cfg_value": cfg_value, "inference_timesteps": inference_timesteps } # 如果有参考音频和文本,使用voice-clone模式 if prompt_wav_path and prompt_text: logger.info("🎭 使用语音克隆模式") convert_start = time.time() audio_base64 = self._audio_file_to_base64(prompt_wav_path) convert_time = time.time() - convert_start logger.info(f"🔄 参考音频转base64耗时: {convert_time:.3f}秒") audio_config.update({ "voice_type": None, # 清除voice_type,使用克隆模式 "prompt_wav": audio_base64, "prompt_text": prompt_text }) else: logger.info("🎤 使用默认语音模式") request_data = { "audio": audio_config, "request": { "reqid": str(uuid.uuid4()), "text": text, "operation": "query", "do_normalize": do_normalize, "denoise": denoise } } prepare_time = time.time() - prepare_start logger.info(f"⏱️ 请求数据准备耗时: {prepare_time:.3f}秒") logger.info(f"📡 发送请求到Ray Serve: {self.api_url}/generate") logger.info(f"📊 请求参数: CFG={cfg_value}, 推理步数={inference_timesteps}, 文本长度={len(text)}") # 调用生成接口 api_start = time.time() response = requests.post( f"{self.api_url}/generate", json=request_data, headers={"Content-Type": "application/json"}, timeout=120 # TTS可能需要较长时间 ) response.raise_for_status() api_time = time.time() - api_start result_data = response.json() logger.info(f"⏱️ TTS API请求耗时: {api_time:.3f}秒") logger.info(f"✅ Ray Serve响应: code={result_data.get('code')}, message={result_data.get('message')}") # 检查响应状态 if result_data.get("code") == 3000: # 成功生成音频 audio_base64 = result_data.get("data", "") if not audio_base64: raise RuntimeError("Ray Serve返回的音频数据为空") # 将base64音频转换为numpy数组 decode_start = time.time() sample_rate, audio_array = self._base64_to_audio_array(audio_base64) decode_time = time.time() - decode_start total_time = time.time() - start_time duration_ms = result_data.get('addition', {}).get('duration', 'unknown') logger.info(f"🔄 音频解码耗时: {decode_time:.3f}秒") logger.info(f"⏱️ TTS总耗时: {total_time:.3f}秒") logger.info(f"🎵 音频生成成功,采样率: {sample_rate}, 时长: {duration_ms}ms") logger.info(f"📈 性能指标: API={api_time:.3f}s, 解码={decode_time:.3f}s, 总计={total_time:.3f}s") return sample_rate, audio_array else: error_msg = result_data.get("message", "Unknown error") raise RuntimeError(f"Ray Serve生成失败: {error_msg}") except requests.exceptions.RequestException as e: logger.error(f"❌ Ray Serve请求失败: {e}") raise RuntimeError(f"Failed to connect Ray Serve TTS service: {e}. Check RAY_SERVE_URL='{self.api_url}' and service status") except Exception as e: logger.error(f"❌ Ray Serve调用异常: {e}") raise def generate_tts_audio( self, text_input: str, prompt_wav_path_input: Optional[str] = None, prompt_text_input: Optional[str] = None, cfg_value_input: float = 2.0, inference_timesteps_input: int = 10, do_normalize: bool = True, denoise: bool = True, ) -> Tuple[int, np.ndarray]: logger.info("🎤 开始TTS音频生成...") logger.info(f"📝 输入文本: '{text_input[:60]}{'...' if len(text_input) > 60 else ''}'") logger.info(f"🎵 参考音频: {prompt_wav_path_input or '无'}") logger.info(f"📄 参考文本: '{prompt_text_input[:30]}{'...' if prompt_text_input and len(prompt_text_input) > 30 else ''}' " if prompt_text_input else "无") logger.info(f"⚙️ CFG值: {cfg_value_input}, 推理步数: {inference_timesteps_input}") logger.info(f"🔧 文本正规化: {do_normalize}, 音频降噪: {denoise}") try: full_start_time = time.time() text = (text_input or "").strip() if len(text) == 0: logger.error("❌ 输入文本为空") raise ValueError("Please input text to synthesize.") prompt_wav_path = prompt_wav_path_input or "" prompt_text = prompt_text_input or "" cfg_value = cfg_value_input if cfg_value_input is not None else 2.0 inference_timesteps = inference_timesteps_input if inference_timesteps_input is not None else 10 logger.info("🚀 调用Ray Serve TTS生成引擎...") generate_start = time.time() sr, wav_np = self._call_ray_serve_generate( text=text, prompt_wav_path=prompt_wav_path, prompt_text=prompt_text, cfg_value=cfg_value, inference_timesteps=inference_timesteps, do_normalize=do_normalize, denoise=denoise, ) generate_time = time.time() - generate_start full_time = time.time() - full_start_time logger.info(f"✅ TTS生成完成,采样率: {sr}, 音频长度: {len(wav_np) if hasattr(wav_np, '__len__') else 'unknown'}") logger.info(f"🏁 完整TTS流程耗时: {full_time:.3f}秒 (生成={generate_time:.3f}s)") return (sr, wav_np) except Exception as e: logger.error(f"❌ TTS音频生成失败: {e}") logger.error(f"错误详情: {traceback.format_exc()}") raise # ---------- UI Builders ---------- def create_demo_interface(client: RayServeVoxCPMClient): """Build the Gradio UI for Gradio API VoxCPM client.""" logger.info("🎨 开始创建Gradio界面...") try: assets_path = Path.cwd().absolute()/"assets" logger.info(f"📁 设置静态资源路径: {assets_path}") gr.set_static_paths(paths=[assets_path]) logger.info("✅ 静态资源路径设置完成") except Exception as e: logger.warning(f"⚠️ 静态资源路径设置失败: {e}") logger.warning("继续创建界面...") with gr.Blocks( theme=gr.themes.Soft( primary_hue="blue", secondary_hue="gray", neutral_hue="slate", font=[gr.themes.GoogleFont("Inter"), "Arial", "sans-serif"] ), css=""" .logo-container { text-align: center; margin: 0.5rem 0 1rem 0; } .logo-container img { height: 80px; width: auto; max-width: 200px; display: inline-block; } /* Bold labels for specific checkboxes */ #chk_denoise label, #chk_denoise span, #chk_normalize label, #chk_normalize span { font-weight: 600; } """ ) as interface: gr.HTML('
