File size: 17,993 Bytes
8bf7b95
 
f7787ba
8bf7b95
f7787ba
8bf7b95
 
2dd4a32
 
8bf7b95
479ea03
126f04d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
479ea03
f7787ba
 
 
8bf7b95
f7787ba
 
 
 
126f04d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f7787ba
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8bf7b95
f7787ba
 
 
 
 
 
 
 
 
 
 
 
 
 
 
126f04d
 
 
f7787ba
 
126f04d
f7787ba
1f183a6
 
f7787ba
 
 
 
 
 
 
 
126f04d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f7787ba
 
 
 
126f04d
 
 
f7787ba
 
 
 
126f04d
 
 
 
 
 
 
d75344d
126f04d
 
 
f7787ba
 
 
 
cddb0f1
f7787ba
 
 
 
 
 
 
 
 
 
 
2dd4a32
f7787ba
2dd4a32
f7787ba
 
 
 
 
 
 
2dd4a32
 
f7787ba
 
 
 
 
 
 
 
2dd4a32
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f7787ba
2dd4a32
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f7787ba
 
8bf7b95
 
 
 
f7787ba
 
 
8bf7b95
f7787ba
 
 
8bf7b95
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f7787ba
 
 
 
 
 
8bf7b95
 
 
 
 
 
 
 
 
f7787ba
98637c8
8bf7b95
 
c1e01fa
8bf7b95
 
 
 
 
 
 
 
 
 
 
 
 
c1e01fa
8bf7b95
 
f7787ba
 
 
 
8bf7b95
 
 
f7787ba
 
 
8bf7b95
 
f7787ba
 
 
 
8bf7b95
 
 
 
 
 
 
f7787ba
c1e01fa
8bf7b95
 
 
 
 
c1e01fa
 
8bf7b95
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c1e01fa
8bf7b95
 
 
 
 
 
 
c1e01fa
8bf7b95
 
 
f7787ba
8bf7b95
f7787ba
8bf7b95
 
 
 
 
f7787ba
8bf7b95
 
 
 
f7787ba
 
 
 
8bf7b95
 
 
f7787ba
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
import os
import numpy as np
import torch
import gradio as gr  
import spaces
from typing import Optional, Tuple
from pathlib import Path
import tempfile
import soundfile as sf


def setup_cache_env():
    """
    Setup cache environment variables.
    Must be called in GPU worker context as well.
    """
    _cache_home = os.path.join(os.path.expanduser("~"), ".cache")
    
    # HuggingFace cache
    os.environ["HF_HOME"] = os.path.join(_cache_home, "huggingface")
    os.environ["HUGGINGFACE_HUB_CACHE"] = os.path.join(_cache_home, "huggingface", "hub")
    
    # ModelScope cache (for FunASR SenseVoice)
    os.environ["MODELSCOPE_CACHE"] = os.path.join(_cache_home, "modelscope")
    
    # Torch Hub cache (for some audio models like ZipEnhancer)
    os.environ["TORCH_HOME"] = os.path.join(_cache_home, "torch")
    
    # Create cache directories
    for d in [os.environ["HF_HOME"], os.environ["MODELSCOPE_CACHE"], os.environ["TORCH_HOME"]]:
        os.makedirs(d, exist_ok=True)


# Setup cache in main process BEFORE any imports
setup_cache_env()

os.environ["TOKENIZERS_PARALLELISM"] = "false"
if os.environ.get("HF_REPO_ID", "").strip() == "":
    os.environ["HF_REPO_ID"] = "openbmb/VoxCPM1.5"

# Global model cache for ZeroGPU
_asr_model = None
_voxcpm_model = None
_default_local_model_dir = "./models/VoxCPM1.5"
_zipenhancer_local_path = None  # Will be set after pre-download


def predownload_models():
    """
    Pre-download models at startup (runs in main process, not GPU worker).
    This ensures models are cached before GPU functions are called.
    """
    global _zipenhancer_local_path
    
    print("=" * 50)
    print("Pre-downloading models to cache...")
    print(f"MODELSCOPE_CACHE={os.environ.get('MODELSCOPE_CACHE')}")
    print(f"HF_HOME={os.environ.get('HF_HOME')}")
    print("=" * 50)
    
    # Pre-download ZipEnhancer from ModelScope
    try:
        from modelscope.hub.snapshot_download import snapshot_download as ms_snapshot_download
        zipenhancer_model_id = "iic/speech_zipenhancer_ans_multiloss_16k_base"
        print(f"Pre-downloading ZipEnhancer: {zipenhancer_model_id}")
        _zipenhancer_local_path = ms_snapshot_download(
            zipenhancer_model_id,
            cache_dir=os.environ.get("MODELSCOPE_CACHE"),
        )
        print(f"ZipEnhancer downloaded to: {_zipenhancer_local_path}")
    except Exception as e:
        print(f"Warning: Failed to pre-download ZipEnhancer: {e}")
        _zipenhancer_local_path = None
    
    # Pre-download ASR model (SenseVoice) from ModelScope
    try:
        from modelscope.hub.snapshot_download import snapshot_download as ms_snapshot_download
        asr_model_id = "iic/SenseVoiceSmall"
        print(f"Pre-downloading ASR model: {asr_model_id}")
        asr_local_path = ms_snapshot_download(
            asr_model_id,
            cache_dir=os.environ.get("MODELSCOPE_CACHE"),
        )
        print(f"ASR model downloaded to: {asr_local_path}")
    except Exception as e:
        print(f"Warning: Failed to pre-download ASR model: {e}")
    
    print("=" * 50)
    print("Model pre-download complete!")
    print("=" * 50)


# Run pre-download at startup
predownload_models()


def _resolve_model_dir() -> str:
    """
    Resolve model directory:
    1) Use local checkpoint directory if exists
    2) If HF_REPO_ID env is set, download into models/{repo}
    3) Fallback to 'models'
    """
    if os.path.isdir(_default_local_model_dir):
        return _default_local_model_dir

    repo_id = os.environ.get("HF_REPO_ID", "").strip()
    if len(repo_id) > 0:
        target_dir = os.path.join("models", repo_id.replace("/", "__"))
        if not os.path.isdir(target_dir):
            try:
                from huggingface_hub import snapshot_download
                os.makedirs(target_dir, exist_ok=True)
                print(f"Downloading model from HF repo '{repo_id}' to '{target_dir}' ...")
                snapshot_download(repo_id=repo_id, local_dir=target_dir, local_dir_use_symlinks=False)
            except Exception as e:
                print(f"Warning: HF download failed: {e}. Falling back to 'models'.")
                return "models"
        return target_dir
    return "models"


def get_asr_model():
    """Lazy load ASR model."""
    global _asr_model
    if _asr_model is None:
        # Setup cache env in GPU worker context
        setup_cache_env()
        
        from funasr import AutoModel
        print("Loading ASR model...")
        print(f"  MODELSCOPE_CACHE={os.environ.get('MODELSCOPE_CACHE')}")
        _asr_model = AutoModel(
            model="iic/SenseVoiceSmall",  # ModelScope model ID
            hub="ms",  # Use ModelScope Hub
            disable_update=True,
            log_level='INFO',
            device="cuda:0",
        )
        print("ASR model loaded.")
    return _asr_model


def _get_zipenhancer_local_path():
    """
    Get ZipEnhancer local path from ModelScope cache.
    This works in both main process and GPU worker.
    """
    setup_cache_env()
    try:
        from modelscope.hub.snapshot_download import snapshot_download as ms_snapshot_download
        zipenhancer_model_id = "iic/speech_zipenhancer_ans_multiloss_16k_base"
        # This will use cache if already downloaded
        local_path = ms_snapshot_download(
            zipenhancer_model_id,
            cache_dir=os.environ.get("MODELSCOPE_CACHE"),
        )
        return local_path
    except Exception as e:
        print(f"Warning: Failed to get ZipEnhancer path: {e}")
        return "iic/speech_zipenhancer_ans_multiloss_16k_base"


def get_voxcpm_model():
    """Lazy load VoxCPM model."""
    global _voxcpm_model
    if _voxcpm_model is None:
        # Setup cache env in GPU worker context
        setup_cache_env()
        
        import voxcpm
        print("Loading VoxCPM model...")
        model_dir = _resolve_model_dir()
        print(f"Using model dir: {model_dir}")
        
        # Get ZipEnhancer local path (uses cache if pre-downloaded)
        zipenhancer_path = _get_zipenhancer_local_path()
        print(f"ZipEnhancer path: {zipenhancer_path}")
        
        _voxcpm_model = voxcpm.VoxCPM(
            voxcpm_model_path=model_dir, 
            optimize=True,
            enable_denoiser=True,
            zipenhancer_model_path=zipenhancer_path,
        )
        print("VoxCPM model loaded.")
    return _voxcpm_model


@spaces.GPU(duration=120)
def prompt_wav_recognition(prompt_wav: Optional[str]) -> str:
    """Use ASR to recognize prompt audio text."""
    if prompt_wav is None or not prompt_wav.strip():
        return ""
    asr_model = get_asr_model()
    res = asr_model.generate(input=prompt_wav, language="auto", use_itn=True)
    text = res[0]["text"].split('|>')[-1]
    return text


@spaces.GPU(duration=120)
def generate_tts_audio_gpu(
    text_input: str,
    prompt_wav_data: Optional[Tuple[np.ndarray, int]] = None,
    prompt_text_input: Optional[str] = None,
    cfg_value_input: float = 2.0,
    inference_timesteps_input: int = 10,
    do_normalize: bool = True,
    denoise: bool = True,
) -> Tuple[int, np.ndarray]:
    """
    GPU function: Generate speech from text using VoxCPM.
    prompt_wav_data is (audio_array, sample_rate) tuple.
    """
    voxcpm_model = get_voxcpm_model()

    text = (text_input or "").strip()
    if len(text) == 0:
        raise ValueError("Please input text to synthesize.")

    prompt_text = prompt_text_input if prompt_text_input else None
    prompt_wav_path = None

    # If prompt audio data provided, write to temp file for voxcpm
    if prompt_wav_data is not None:
        audio_array, sr = prompt_wav_data
        with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
            sf.write(f.name, audio_array, sr)
            prompt_wav_path = f.name

    try:
        print(f"Generating audio for text: '{text[:60]}...'")
        wav = voxcpm_model.generate(
            text=text,
            prompt_text=prompt_text,
            prompt_wav_path=prompt_wav_path,
            cfg_value=float(cfg_value_input),
            inference_timesteps=int(inference_timesteps_input),
            normalize=do_normalize,
            denoise=denoise,
        )
        return (voxcpm_model.tts_model.sample_rate, wav)
    finally:
        # Cleanup temp file
        if prompt_wav_path and os.path.exists(prompt_wav_path):
            try:
                os.unlink(prompt_wav_path)
            except Exception:
                pass


def generate_tts_audio(
    text_input: str,
    prompt_wav_path_input: Optional[str] = None,
    prompt_text_input: Optional[str] = None,
    cfg_value_input: float = 2.0,
    inference_timesteps_input: int = 10,
    do_normalize: bool = True,
    denoise: bool = True,
) -> Tuple[int, np.ndarray]:
    """
    Wrapper: Read audio file in CPU, then call GPU function.
    """
    prompt_wav_data = None
    
    # Read audio file before entering GPU context
    if prompt_wav_path_input and os.path.exists(prompt_wav_path_input):
        try:
            audio_array, sr = sf.read(prompt_wav_path_input, dtype='float32')
            prompt_wav_data = (audio_array, sr)
            print(f"Loaded prompt audio: {audio_array.shape}, sr={sr}")
        except Exception as e:
            print(f"Warning: Failed to load prompt audio: {e}")
            prompt_wav_data = None
    
    return generate_tts_audio_gpu(
        text_input=text_input,
        prompt_wav_data=prompt_wav_data,
        prompt_text_input=prompt_text_input,
        cfg_value_input=cfg_value_input,
        inference_timesteps_input=inference_timesteps_input,
        do_normalize=do_normalize,
        denoise=denoise,
    )


# ---------- UI Builders ----------

def create_demo_interface():
    """Build the Gradio UI for VoxCPM demo."""
    # static assets (logo path)
    try:
        gr.set_static_paths(paths=[Path.cwd().absolute()/"assets"])
    except Exception:
        pass

    with gr.Blocks(
        theme=gr.themes.Soft(
            primary_hue="blue",
            secondary_hue="gray",
            neutral_hue="slate",
            font=[gr.themes.GoogleFont("Inter"), "Arial", "sans-serif"]
        ),
        css="""
        .logo-container {
            text-align: center;
            margin: 0.5rem 0 1rem 0;
        }
        .logo-container img {
            height: 80px;
            width: auto;
            max-width: 200px;
            display: inline-block;
        }
        /* Bold accordion labels */
        #acc_quick details > summary,
        #acc_tips details > summary {
            font-weight: 600 !important;
            font-size: 1.1em !important;
        }
        /* Bold labels for specific checkboxes */
        #chk_denoise label,
        #chk_denoise span,
        #chk_normalize label,
        #chk_normalize span {
            font-weight: 600;
        }
        """
    ) as interface:
        # Header logo
        gr.HTML('<div class="logo-container"><img src="/gradio_api/file=assets/voxcpm-logo.png" alt="VoxCPM Logo"></div>')

        # Quick Start
        with gr.Accordion("📋 Quick Start Guide |快速入门", open=False, elem_id="acc_quick"):
            gr.Markdown("""
            ### How to Use |使用说明
            1. **(Optional) Provide a Voice Prompt** - Upload or record an audio clip to provide the desired voice characteristics for synthesis.  
               **(可选)提供参考声音** - 上传或录制一段音频,为声音合成提供音色、语调和情感等个性化特征
            2. **(Optional) Enter prompt text** - If you provided a voice prompt, enter the corresponding transcript here (auto-recognition available).  
               **(可选项)输入参考文本** - 如果提供了参考语音,请输入其对应的文本内容(支持自动识别)。
            3. **Enter target text** - Type the text you want the model to speak.  
               **输入目标文本** - 输入您希望模型朗读的文字内容。
            4. **Generate Speech** - Click the "Generate" button to create your audio.  
               **生成语音** - 点击"生成"按钮,即可为您创造出音频。
            """)

        # Pro Tips
        with gr.Accordion("💡 Pro Tips |使用建议", open=False, elem_id="acc_tips"):
            gr.Markdown("""
            ### Prompt Speech Enhancement|参考语音降噪
            - **Enable** to remove background noise for a clean voice, with an external ZipEnhancer component. However, this will limit the audio sampling rate to 16kHz, restricting the cloning quality ceiling.  
              **启用**:通过 ZipEnhancer 组件消除背景噪音,但会将音频采样率限制在16kHz,限制克隆上限。
            - **Disable** to preserve the original audio's all information, including background atmosphere, and support audio cloning up to 44.1kHz sampling rate.  
              **禁用**:保留原始音频的全部信息,包括背景环境声,最高支持44.1kHz的音频复刻。

            ### Text Normalization|文本正则化
            - **Enable** to process general text with an external WeTextProcessing component.  
              **启用**:使用 WeTextProcessing 组件,可支持常见文本的正则化处理。
            - **Disable** to use VoxCPM's native text understanding ability. For example, it supports phonemes input (For Chinese, phonemes are converted using pinyin, {ni3}{hao3}; For English, phonemes are converted using CMUDict, {HH AH0 L OW1}), try it!  
              **禁用**:将使用 VoxCPM 内置的文本理解能力。如,支持音素输入(如中文转拼音:{ni3}{hao3};英文转CMUDict:{HH AH0 L OW1})和公式符号合成,尝试一下!

            ### CFG Value|CFG 值
            - **Lower CFG** if the voice prompt sounds strained or expressive, or instability occurs with long text input.  
              **调低**:如果提示语音听起来不自然或过于夸张,或者长文本输入出现稳定性问题。
            - **Higher CFG** for better adherence to the prompt speech style or input text, or instability occurs with too short text input.
              **调高**:为更好地贴合提示音频的风格或输入文本, 或者极短文本输入出现稳定性问题。

            ### Inference Timesteps|推理时间步
            - **Lower** for faster synthesis speed.  
              **调低**:合成速度更快。
            - **Higher** for better synthesis quality.  
              **调高**:合成质量更佳。
            """)
            
        # Main controls
        with gr.Row():
            with gr.Column():
                prompt_wav = gr.Audio(
                    sources=["upload", 'microphone'],
                    type="filepath",
                    label="Prompt Speech (Optional, or let VoxCPM improvise)",
                    value="./examples/example.wav",
                )
                DoDenoisePromptAudio = gr.Checkbox(
                    value=False,
                    label="Prompt Speech Enhancement",
                    elem_id="chk_denoise",
                    info="We use ZipEnhancer model to denoise the prompt audio."
                )
                with gr.Row():
                    prompt_text = gr.Textbox(
                        value="Just by listening a few minutes a day, you'll be able to eliminate negative thoughts by conditioning your mind to be more positive.",
                        label="Prompt Text",
                        placeholder="Please enter the prompt text. Automatic recognition is supported, and you can correct the results yourself..."
                    )
                run_btn = gr.Button("Generate Speech", variant="primary")

            with gr.Column():
                cfg_value = gr.Slider(
                    minimum=1.0,
                    maximum=3.0,
                    value=2.0,
                    step=0.1,
                    label="CFG Value (Guidance Scale)",
                    info="Higher values increase adherence to prompt, lower values allow more creativity"
                )
                inference_timesteps = gr.Slider(
                    minimum=4,
                    maximum=30,
                    value=10,
                    step=1,
                    label="Inference Timesteps",
                    info="Number of inference timesteps for generation (higher values may improve quality but slower)"
                )
                with gr.Row():
                    text = gr.Textbox(
                        value="VoxCPM is an innovative end-to-end TTS model from ModelBest, designed to generate highly realistic speech.",
                        label="Target Text",
                    )
                with gr.Row():
                    DoNormalizeText = gr.Checkbox(
                        value=False,
                        label="Text Normalization",
                        elem_id="chk_normalize",
                        info="We use wetext library to normalize the input text."
                    )
                audio_output = gr.Audio(label="Output Audio")

        # Wiring
        run_btn.click(
            fn=generate_tts_audio,
            inputs=[text, prompt_wav, prompt_text, cfg_value, inference_timesteps, DoNormalizeText, DoDenoisePromptAudio],
            outputs=[audio_output],
            show_progress=True,
            api_name="generate",
        )
        prompt_wav.change(fn=prompt_wav_recognition, inputs=[prompt_wav], outputs=[prompt_text])

    return interface


def run_demo(server_name: str = "0.0.0.0", server_port: int = 7860, show_error: bool = True):
    interface = create_demo_interface()
    # Recommended to enable queue on Spaces for better throughput
    interface.queue(max_size=10).launch(server_name=server_name, server_port=server_port, show_error=show_error)


if __name__ == "__main__":
    run_demo()