import os import sys import importlib.util import site import json import torch import gradio as gr import torchaudio import numpy as np from huggingface_hub import snapshot_download, hf_hub_download import subprocess import uuid import soundfile as sf import spaces import librosa # --- 1. نصب و راه‌اندازی --- downloaded_resources = { "configs": False, "tokenizer_vq8192": False, "fmt_Vq8192ToMels": False, "vocoder": False } def install_espeak(): try: result = subprocess.run(["which", "espeak-ng"], capture_output=True, text=True) if result.returncode != 0: print("Installing espeak-ng...") subprocess.run(["apt-get", "update"], check=True) subprocess.run(["apt-get", "install", "-y", "espeak-ng", "espeak-ng-data"], check=True) except Exception as e: print(f"Error installing espeak-ng: {e}") install_espeak() def patch_langsegment_init(): try: spec = importlib.util.find_spec("LangSegment") if spec is None or spec.origin is None: return init_path = os.path.join(os.path.dirname(spec.origin), '__init__.py') with open(init_path, 'r') as f: lines = f.readlines() modified = False new_lines = [] target_line_prefix = "from .LangSegment import" for line in lines: if line.strip().startswith(target_line_prefix) and ('setLangfilters' in line or 'getLangfilters' in line): mod_line = line.replace(',setLangfilters', '').replace(',getLangfilters', '') mod_line = mod_line.replace('setLangfilters,', '').replace('getLangfilters,', '').rstrip(',') new_lines.append(mod_line + '\n') modified = True else: new_lines.append(line) if modified: with open(init_path, 'w') as f: f.writelines(new_lines) try: import LangSegment importlib.reload(LangSegment) except: pass except: pass patch_langsegment_init() if not os.path.exists("Amphion"): subprocess.run(["git", "clone", "https://github.com/open-mmlab/Amphion.git"]) os.chdir("Amphion") if os.path.dirname(os.path.abspath("Amphion")) not in sys.path: sys.path.append(os.path.dirname(os.path.abspath("Amphion"))) os.makedirs("wav", exist_ok=True) os.makedirs("ckpts/Vevo", exist_ok=True) from models.vc.vevo.vevo_utils import VevoInferencePipeline def save_audio_pcm16(waveform, output_path, sample_rate=24000): try: if isinstance(waveform, torch.Tensor): waveform = waveform.detach().cpu() if waveform.dim() == 2 and waveform.shape[0] == 1: waveform = waveform.squeeze(0) waveform = waveform.numpy() sf.write(output_path, waveform, sample_rate, subtype='PCM_16') except Exception as e: print(f"Save error: {e}") def setup_configs(): if downloaded_resources["configs"]: return config_path = "models/vc/vevo/config" os.makedirs(config_path, exist_ok=True) config_files = ["Vq8192ToMels.json", "Vocoder.json"] for file in config_files: file_path = f"{config_path}/{file}" if not os.path.exists(file_path): try: file_data = hf_hub_download(repo_id="amphion/Vevo", filename=f"config/{file}", repo_type="model") subprocess.run(["cp", file_data, file_path]) except Exception as e: print(f"Error downloading config {file}: {e}") downloaded_resources["configs"] = True setup_configs() device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu") inference_pipelines = {} def preload_all_resources(): setup_configs() global downloaded_content_style_tokenizer_path, downloaded_fmt_path, downloaded_vocoder_path if not downloaded_resources["tokenizer_vq8192"]: downloaded_content_style_tokenizer_path = snapshot_download(repo_id="amphion/Vevo", repo_type="model", cache_dir="./ckpts/Vevo", allow_patterns=["tokenizer/vq8192/*"]) downloaded_resources["tokenizer_vq8192"] = True if not downloaded_resources["fmt_Vq8192ToMels"]: downloaded_fmt_path = snapshot_download(repo_id="amphion/Vevo", repo_type="model", cache_dir="./ckpts/Vevo", allow_patterns=["acoustic_modeling/Vq8192ToMels/*"]) downloaded_resources["fmt_Vq8192ToMels"] = True if not downloaded_resources["vocoder"]: downloaded_vocoder_path = snapshot_download(repo_id="amphion/Vevo", repo_type="model", cache_dir="./ckpts/Vevo", allow_patterns=["acoustic_modeling/Vocoder/*"]) downloaded_resources["vocoder"] = True downloaded_content_style_tokenizer_path = None downloaded_fmt_path = None downloaded_vocoder_path = None preload_all_resources() def get_pipeline(): if "timbre" in inference_pipelines: return inference_pipelines["timbre"] pipeline = VevoInferencePipeline( content_style_tokenizer_ckpt_path=os.path.join(downloaded_content_style_tokenizer_path, "tokenizer/vq8192"), fmt_cfg_path="./models/vc/vevo/config/Vq8192ToMels.json", fmt_ckpt_path=os.path.join(downloaded_fmt_path, "acoustic_modeling/Vq8192ToMels"), vocoder_cfg_path="./models/vc/vevo/config/Vocoder.json", vocoder_ckpt_path=os.path.join(downloaded_vocoder_path, "acoustic_modeling/Vocoder"), device=device, ) inference_pipelines["timbre"] = pipeline return pipeline # --- 2. الگوریتم برش فوق هوشمند --- def find_advanced_split_points(audio_np, sr): """ پیدا کردن نقاط برش با استراتژی فال‌بک (Fallback Strategy): ۱. تلاش برای پیدا کردن سکوت در بازه ۸ تا ۱۲ ثانیه. ۲. اگر نشد، تلاش در بازه وسیع‌تر ۶ تا ۱۴ ثانیه. ۳. انتخاب نقطه با کمترین انرژی (حتی اگر سکوت نباشد). ۴. تنظیم دقیق روی نزدیک‌ترین Zero-Crossing. """ total_samples = len(audio_np) # تنظیمات بازه جستجو MIN_PREFERRED = 8.0 MAX_PREFERRED = 12.0 MIN_HARD = 6.0 MAX_HARD = 15.0 split_points = [0] current_pos = 0 hop_length = 512 frame_length = 1024 while current_pos < total_samples: # استراتژی ۱: بازه ایده‌آل start_search = current_pos + int(MIN_PREFERRED * sr) end_search = current_pos + int(MAX_PREFERRED * sr) # اگر به انتهای فایل نزدیکیم if start_search >= total_samples: split_points.append(total_samples) break end_search = min(end_search, total_samples) # استراتژی ۲: اگر بازه ایده‌آل خیلی کوتاه است (ته فایل)، گسترش بده if end_search - start_search < sr: # استفاده از بازه سخت (وسیع) start_search = current_pos + int(MIN_HARD * sr) end_search = current_pos + int(MAX_HARD * sr) start_search = min(start_search, total_samples) end_search = min(end_search, total_samples) # برش منطقه جستجو region = audio_np[start_search:end_search] if len(region) == 0: split_points.append(total_samples) break # محاسبه انرژی rms = librosa.feature.rms(y=region, frame_length=frame_length, hop_length=hop_length)[0] # پیدا کردن کم‌انرژی‌ترین نقطه (Local Minimum) min_idx = np.argmin(rms) local_cut_sample = min_idx * hop_length # --- تکنیک Zero Crossing --- # نقطه برش تقریبی را پیدا کردیم. حالا باید دقیقاً روی محور صفر برش دهیم # تا صدای "کلیک" ایجاد نشود. cut_absolute_approx = start_search + local_cut_sample # جستجو در اطراف نقطه تقریبی (±500 نمونه) برای پیدا کردن صفر search_radius = 500 zc_start = max(0, cut_absolute_approx - search_radius) zc_end = min(total_samples, cut_absolute_approx + search_radius) zc_region = audio_np[zc_start:zc_end] # پیدا کردن نزدیک‌ترین عبور از صفر # (جایی که علامت عدد تغییر می‌کند) zero_crossings = np.where(np.diff(np.signbit(zc_region)))[0] if len(zero_crossings) > 0: # نزدیک‌ترین صفر به وسط بازه جستجو closest_zc = zero_crossings[np.argmin(np.abs(zero_crossings - search_radius))] best_cut_absolute = zc_start + closest_zc else: # اگر صفر پیدا نشد (خیلی بعید)، همان نقطه کم‌انرژی را بگیر best_cut_absolute = cut_absolute_approx split_points.append(best_cut_absolute) current_pos = best_cut_absolute return split_points @spaces.GPU() def vevo_timbre(content_wav, reference_wav): session_id = str(uuid.uuid4())[:8] temp_content_path = f"wav/c_{session_id}.wav" temp_reference_path = f"wav/r_{session_id}.wav" output_path = f"wav/out_{session_id}.wav" if content_wav is None or reference_wav is None: raise ValueError("Please upload audio files") try: SR = 24000 # --- ورودی --- if isinstance(content_wav, tuple): content_sr, content_data = content_wav if isinstance(content_wav[0], int) else (content_wav[1], content_wav[0]) else: content_sr, content_data = content_wav if len(content_data.shape) > 1: content_data = np.mean(content_data, axis=1) content_tensor = torch.FloatTensor(content_data).unsqueeze(0) if content_sr != SR: content_tensor = torchaudio.functional.resample(content_tensor, content_sr, SR) content_tensor = content_tensor / (torch.max(torch.abs(content_tensor)) + 1e-6) * 0.95 content_full_np = content_tensor.squeeze().numpy() # --- رفرنس --- if isinstance(reference_wav, tuple): ref_sr, ref_data = reference_wav if isinstance(reference_wav[0], int) else (reference_wav[1], reference_wav[0]) else: ref_sr, ref_data = reference_wav if len(ref_data.shape) > 1: ref_data = np.mean(ref_data, axis=1) ref_tensor = torch.FloatTensor(ref_data).unsqueeze(0) if ref_sr != SR: ref_tensor = torchaudio.functional.resample(ref_tensor, ref_sr, SR) ref_tensor = ref_tensor / (torch.max(torch.abs(ref_tensor)) + 1e-6) * 0.95 if ref_tensor.shape[1] > SR * 20: ref_tensor = ref_tensor[:, :SR * 20] save_audio_pcm16(ref_tensor, temp_reference_path, SR) pipeline = get_pipeline() # --- تقسیم‌بندی پیشرفته --- print(f"[{session_id}] Finding best energy split points (Zero-Crossing)...") split_points = find_advanced_split_points(content_full_np, SR) print(f"[{session_id}] Split into {len(split_points)-1} chunks.") final_output = [] PADDING_SAMPLES = int(2.5 * SR) # کمی پدینگ بیشتر برای اطمینان for i in range(len(split_points) - 1): start = split_points[i] end = split_points[i+1] read_start = max(0, start - PADDING_SAMPLES) read_end = end chunk_input = content_full_np[read_start:read_end] save_audio_pcm16(torch.FloatTensor(chunk_input).unsqueeze(0), temp_content_path, SR) try: gen = pipeline.inference_fm( src_wav_path=temp_content_path, timbre_ref_wav_path=temp_reference_path, flow_matching_steps=32, ) if torch.isnan(gen).any(): gen = torch.nan_to_num(gen, nan=0.0) gen_np = gen.detach().cpu().squeeze().numpy() trim_amount = start - read_start if len(gen_np) > trim_amount: valid_audio = gen_np[trim_amount:] # اتصال if len(final_output) > 0: # اگر برش روی سکوت نبوده (اجباری)، باید کمی بیشتر کراس‌فید کنیم # تا تغییر ناگهانی لحن مخفی شود. fade_len = int(0.03 * SR) # 30ms standard if len(final_output[-1]) > fade_len and len(valid_audio) > fade_len: fade_out = np.linspace(1, 0, fade_len) fade_in = np.linspace(0, 1, fade_len) prev_tail = final_output[-1][-fade_len:] curr_head = valid_audio[:fade_len] mixed = (prev_tail * fade_out) + (curr_head * fade_in) final_output[-1][-fade_len:] = mixed valid_audio = valid_audio[fade_len:] final_output.append(valid_audio) except Exception as e: print(f"Error segment {i}: {e}") # پر کردن جای خالی با سکوت برای به هم نریختن تایم final_output.append(np.zeros(end - start)) if len(final_output) > 0: full_audio = np.concatenate(final_output) else: full_audio = np.zeros(SR) save_audio_pcm16(full_audio, output_path, SR) return output_path finally: if os.path.exists(temp_content_path): os.remove(temp_content_path) if os.path.exists(temp_reference_path): os.remove(temp_reference_path) with gr.Blocks(title="Vevo-Timbre (Pro Logic)") as demo: gr.Markdown("## Vevo-Timbre: Zero-Shot Voice Conversion") gr.Markdown("Robust Splitting: Uses Minimum Energy + Zero Crossing detection to handle fast speech without glitches.") with gr.Row(): with gr.Column(): timbre_content = gr.Audio(label="Source Audio", type="numpy") timbre_reference = gr.Audio(label="Target Timbre", type="numpy") timbre_button = gr.Button("Generate", variant="primary") with gr.Column(): timbre_output = gr.Audio(label="Result") timbre_button.click(vevo_timbre, inputs=[timbre_content, timbre_reference], outputs=timbre_output) demo.launch()