import os import numpy as np import torch import gradio as gr import spaces from typing import Optional, Tuple from pathlib import Path import tempfile import soundfile as sf def setup_cache_env(): """ Setup cache environment variables. Must be called in GPU worker context as well. """ _cache_home = os.path.join(os.path.expanduser("~"), ".cache") # HuggingFace cache os.environ["HF_HOME"] = os.path.join(_cache_home, "huggingface") os.environ["HUGGINGFACE_HUB_CACHE"] = os.path.join(_cache_home, "huggingface", "hub") # ModelScope cache (for FunASR SenseVoice) os.environ["MODELSCOPE_CACHE"] = os.path.join(_cache_home, "modelscope") # Torch Hub cache (for some audio models like ZipEnhancer) os.environ["TORCH_HOME"] = os.path.join(_cache_home, "torch") # Create cache directories for d in [os.environ["HF_HOME"], os.environ["MODELSCOPE_CACHE"], os.environ["TORCH_HOME"]]: os.makedirs(d, exist_ok=True) # Setup cache in main process BEFORE any imports setup_cache_env() os.environ["TOKENIZERS_PARALLELISM"] = "false" if os.environ.get("HF_REPO_ID", "").strip() == "": os.environ["HF_REPO_ID"] = "openbmb/VoxCPM1.5" # Global model cache for ZeroGPU _asr_model = None _voxcpm_model = None _default_local_model_dir = "./models/VoxCPM1.5" _zipenhancer_local_path = None # Will be set after pre-download def predownload_models(): """ Pre-download models at startup (runs in main process, not GPU worker). This ensures models are cached before GPU functions are called. """ global _zipenhancer_local_path print("=" * 50) print("Pre-downloading models to cache...") print(f"MODELSCOPE_CACHE={os.environ.get('MODELSCOPE_CACHE')}") print(f"HF_HOME={os.environ.get('HF_HOME')}") print("=" * 50) # Pre-download ZipEnhancer from ModelScope try: from modelscope.hub.snapshot_download import snapshot_download as ms_snapshot_download zipenhancer_model_id = "iic/speech_zipenhancer_ans_multiloss_16k_base" print(f"Pre-downloading ZipEnhancer: {zipenhancer_model_id}") _zipenhancer_local_path = ms_snapshot_download( zipenhancer_model_id, cache_dir=os.environ.get("MODELSCOPE_CACHE"), ) print(f"ZipEnhancer downloaded to: {_zipenhancer_local_path}") except Exception as e: print(f"Warning: Failed to pre-download ZipEnhancer: {e}") _zipenhancer_local_path = None # Pre-download ASR model (SenseVoice) from ModelScope try: from modelscope.hub.snapshot_download import snapshot_download as ms_snapshot_download asr_model_id = "iic/SenseVoiceSmall" print(f"Pre-downloading ASR model: {asr_model_id}") asr_local_path = ms_snapshot_download( asr_model_id, cache_dir=os.environ.get("MODELSCOPE_CACHE"), ) print(f"ASR model downloaded to: {asr_local_path}") except Exception as e: print(f"Warning: Failed to pre-download ASR model: {e}") print("=" * 50) print("Model pre-download complete!") print("=" * 50) # Run pre-download at startup predownload_models() def _resolve_model_dir() -> str: """ Resolve model directory: 1) Use local checkpoint directory if exists 2) If HF_REPO_ID env is set, download into models/{repo} 3) Fallback to 'models' """ if os.path.isdir(_default_local_model_dir): return _default_local_model_dir repo_id = os.environ.get("HF_REPO_ID", "").strip() if len(repo_id) > 0: target_dir = os.path.join("models", repo_id.replace("/", "__")) if not os.path.isdir(target_dir): try: from huggingface_hub import snapshot_download os.makedirs(target_dir, exist_ok=True) print(f"Downloading model from HF repo '{repo_id}' to '{target_dir}' ...") snapshot_download(repo_id=repo_id, local_dir=target_dir, local_dir_use_symlinks=False) except Exception as e: print(f"Warning: HF download failed: {e}. Falling back to 'models'.") return "models" return target_dir return "models" def get_asr_model(): """Lazy load ASR model.""" global _asr_model if _asr_model is None: # Setup cache env in GPU worker context setup_cache_env() from funasr import AutoModel print("Loading ASR model...") print(f" MODELSCOPE_CACHE={os.environ.get('MODELSCOPE_CACHE')}") _asr_model = AutoModel( model="iic/SenseVoiceSmall", # ModelScope model ID hub="ms", # Use ModelScope Hub disable_update=True, log_level='INFO', device="cuda:0", ) print("ASR model loaded.") return _asr_model def _get_zipenhancer_local_path(): """ Get ZipEnhancer local path from ModelScope cache. This works in both main process and GPU worker. """ setup_cache_env() try: from modelscope.hub.snapshot_download import snapshot_download as ms_snapshot_download zipenhancer_model_id = "iic/speech_zipenhancer_ans_multiloss_16k_base" # This will use cache if already downloaded local_path = ms_snapshot_download( zipenhancer_model_id, cache_dir=os.environ.get("MODELSCOPE_CACHE"), ) return local_path except Exception as e: print(f"Warning: Failed to get ZipEnhancer path: {e}") return "iic/speech_zipenhancer_ans_multiloss_16k_base" def get_voxcpm_model(): """Lazy load VoxCPM model.""" global _voxcpm_model if _voxcpm_model is None: # Setup cache env in GPU worker context setup_cache_env() import voxcpm print("Loading VoxCPM model...") model_dir = _resolve_model_dir() print(f"Using model dir: {model_dir}") # Get ZipEnhancer local path (uses cache if pre-downloaded) zipenhancer_path = _get_zipenhancer_local_path() print(f"ZipEnhancer path: {zipenhancer_path}") _voxcpm_model = voxcpm.VoxCPM( voxcpm_model_path=model_dir, optimize=True, enable_denoiser=True, zipenhancer_model_path=zipenhancer_path, ) print("VoxCPM model loaded.") return _voxcpm_model @spaces.GPU(duration=120) def prompt_wav_recognition(prompt_wav: Optional[str]) -> str: """Use ASR to recognize prompt audio text.""" if prompt_wav is None or not prompt_wav.strip(): return "" asr_model = get_asr_model() res = asr_model.generate(input=prompt_wav, language="auto", use_itn=True) text = res[0]["text"].split('|>')[-1] return text @spaces.GPU(duration=120) def generate_tts_audio_gpu( text_input: str, prompt_wav_data: Optional[Tuple[np.ndarray, int]] = None, prompt_text_input: Optional[str] = None, cfg_value_input: float = 2.0, inference_timesteps_input: int = 10, do_normalize: bool = True, denoise: bool = True, ) -> Tuple[int, np.ndarray]: """ GPU function: Generate speech from text using VoxCPM. prompt_wav_data is (audio_array, sample_rate) tuple. """ voxcpm_model = get_voxcpm_model() text = (text_input or "").strip() if len(text) == 0: raise ValueError("Please input text to synthesize.") prompt_text = prompt_text_input if prompt_text_input else None prompt_wav_path = None # If prompt audio data provided, write to temp file for voxcpm if prompt_wav_data is not None: audio_array, sr = prompt_wav_data with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f: sf.write(f.name, audio_array, sr) prompt_wav_path = f.name try: print(f"Generating audio for text: '{text[:60]}...'") wav = voxcpm_model.generate( text=text, prompt_text=prompt_text, prompt_wav_path=prompt_wav_path, cfg_value=float(cfg_value_input), inference_timesteps=int(inference_timesteps_input), normalize=do_normalize, denoise=denoise, ) return (voxcpm_model.tts_model.sample_rate, wav) finally: # Cleanup temp file if prompt_wav_path and os.path.exists(prompt_wav_path): try: os.unlink(prompt_wav_path) except Exception: pass def generate_tts_audio( text_input: str, prompt_wav_path_input: Optional[str] = None, prompt_text_input: Optional[str] = None, cfg_value_input: float = 2.0, inference_timesteps_input: int = 10, do_normalize: bool = True, denoise: bool = True, ) -> Tuple[int, np.ndarray]: """ Wrapper: Read audio file in CPU, then call GPU function. """ prompt_wav_data = None # Read audio file before entering GPU context if prompt_wav_path_input and os.path.exists(prompt_wav_path_input): try: audio_array, sr = sf.read(prompt_wav_path_input, dtype='float32') prompt_wav_data = (audio_array, sr) print(f"Loaded prompt audio: {audio_array.shape}, sr={sr}") except Exception as e: print(f"Warning: Failed to load prompt audio: {e}") prompt_wav_data = None return generate_tts_audio_gpu( text_input=text_input, prompt_wav_data=prompt_wav_data, prompt_text_input=prompt_text_input, cfg_value_input=cfg_value_input, inference_timesteps_input=inference_timesteps_input, do_normalize=do_normalize, denoise=denoise, ) # ---------- UI Builders ---------- def create_demo_interface(): """Build the Gradio UI for VoxCPM demo.""" # static assets (logo path) try: gr.set_static_paths(paths=[Path.cwd().absolute()/"assets"]) except Exception: pass with gr.Blocks( theme=gr.themes.Soft( primary_hue="blue", secondary_hue="gray", neutral_hue="slate", font=[gr.themes.GoogleFont("Inter"), "Arial", "sans-serif"] ), css=""" .logo-container { text-align: center; margin: 0.5rem 0 1rem 0; } .logo-container img { height: 80px; width: auto; max-width: 200px; display: inline-block; } /* Bold accordion labels */ #acc_quick details > summary, #acc_tips details > summary { font-weight: 600 !important; font-size: 1.1em !important; } /* Bold labels for specific checkboxes */ #chk_denoise label, #chk_denoise span, #chk_normalize label, #chk_normalize span { font-weight: 600; } """ ) as interface: # Header logo gr.HTML('
