Sada1 / app.py
Opera8's picture
Update app.py
501163c verified
raw
history blame
13.3 kB
import os
import sys
import importlib.util
import site
import json
import torch
import gradio as gr
import torchaudio
import numpy as np
from huggingface_hub import snapshot_download, hf_hub_download
import subprocess
import re
import spaces
import uuid
import soundfile as sf
# --- تنظیمات و نصب ---
downloaded_resources = {
"configs": False,
"tokenizer_vq8192": False,
"fmt_Vq8192ToMels": False,
"vocoder": False
}
def install_espeak():
try:
result = subprocess.run(["which", "espeak-ng"], capture_output=True, text=True)
if result.returncode != 0:
print("Installing espeak-ng...")
subprocess.run(["apt-get", "update"], check=True)
subprocess.run(["apt-get", "install", "-y", "espeak-ng", "espeak-ng-data"], check=True)
except Exception as e:
print(f"Error installing espeak-ng: {e}")
install_espeak()
def patch_langsegment_init():
try:
spec = importlib.util.find_spec("LangSegment")
if spec is None or spec.origin is None: return
init_path = os.path.join(os.path.dirname(spec.origin), '__init__.py')
with open(init_path, 'r') as f: lines = f.readlines()
modified = False
new_lines = []
target_line_prefix = "from .LangSegment import"
for line in lines:
if line.strip().startswith(target_line_prefix) and ('setLangfilters' in line or 'getLangfilters' in line):
mod_line = line.replace(',setLangfilters', '').replace(',getLangfilters', '')
mod_line = mod_line.replace('setLangfilters,', '').replace('getLangfilters,', '').rstrip(',')
new_lines.append(mod_line + '\n')
modified = True
else:
new_lines.append(line)
if modified:
with open(init_path, 'w') as f: f.writelines(new_lines)
try:
import LangSegment
importlib.reload(LangSegment)
except: pass
except: pass
patch_langsegment_init()
if not os.path.exists("Amphion"):
subprocess.run(["git", "clone", "https://github.com/open-mmlab/Amphion.git"])
os.chdir("Amphion")
if os.path.dirname(os.path.abspath("Amphion")) not in sys.path:
sys.path.append(os.path.dirname(os.path.abspath("Amphion")))
os.makedirs("wav", exist_ok=True)
os.makedirs("ckpts/Vevo", exist_ok=True)
from models.vc.vevo.vevo_utils import VevoInferencePipeline
def save_audio_pcm16(waveform, output_path, sample_rate=24000):
try:
if isinstance(waveform, torch.Tensor):
waveform = waveform.detach().cpu()
if waveform.dim() == 2 and waveform.shape[0] == 1:
waveform = waveform.squeeze(0)
waveform = waveform.numpy()
sf.write(output_path, waveform, sample_rate, subtype='PCM_16')
except Exception as e:
print(f"Save error: {e}")
def setup_configs():
if downloaded_resources["configs"]: return
config_path = "models/vc/vevo/config"
os.makedirs(config_path, exist_ok=True)
config_files = ["Vq8192ToMels.json", "Vocoder.json"]
for file in config_files:
file_path = f"{config_path}/{file}"
if not os.path.exists(file_path):
try:
file_data = hf_hub_download(repo_id="amphion/Vevo", filename=f"config/{file}", repo_type="model")
subprocess.run(["cp", file_data, file_path])
except Exception as e: print(f"Error downloading config {file}: {e}")
downloaded_resources["configs"] = True
setup_configs()
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
inference_pipelines = {}
def preload_all_resources():
setup_configs()
global downloaded_content_style_tokenizer_path, downloaded_fmt_path, downloaded_vocoder_path
if not downloaded_resources["tokenizer_vq8192"]:
downloaded_content_style_tokenizer_path = snapshot_download(repo_id="amphion/Vevo", repo_type="model", cache_dir="./ckpts/Vevo", allow_patterns=["tokenizer/vq8192/*"])
downloaded_resources["tokenizer_vq8192"] = True
if not downloaded_resources["fmt_Vq8192ToMels"]:
downloaded_fmt_path = snapshot_download(repo_id="amphion/Vevo", repo_type="model", cache_dir="./ckpts/Vevo", allow_patterns=["acoustic_modeling/Vq8192ToMels/*"])
downloaded_resources["fmt_Vq8192ToMels"] = True
if not downloaded_resources["vocoder"]:
downloaded_vocoder_path = snapshot_download(repo_id="amphion/Vevo", repo_type="model", cache_dir="./ckpts/Vevo", allow_patterns=["acoustic_modeling/Vocoder/*"])
downloaded_resources["vocoder"] = True
downloaded_content_style_tokenizer_path = None
downloaded_fmt_path = None
downloaded_vocoder_path = None
preload_all_resources()
def get_pipeline():
if "timbre" in inference_pipelines: return inference_pipelines["timbre"]
pipeline = VevoInferencePipeline(
content_style_tokenizer_ckpt_path=os.path.join(downloaded_content_style_tokenizer_path, "tokenizer/vq8192"),
fmt_cfg_path="./models/vc/vevo/config/Vq8192ToMels.json",
fmt_ckpt_path=os.path.join(downloaded_fmt_path, "acoustic_modeling/Vq8192ToMels"),
vocoder_cfg_path="./models/vc/vevo/config/Vocoder.json",
vocoder_ckpt_path=os.path.join(downloaded_vocoder_path, "acoustic_modeling/Vocoder"),
device=device,
)
inference_pipelines["timbre"] = pipeline
return pipeline
@spaces.GPU()
def vevo_timbre(content_wav, reference_wav):
session_id = str(uuid.uuid4())[:8]
temp_content_path = f"wav/c_{session_id}.wav"
temp_reference_path = f"wav/r_{session_id}.wav"
output_path = f"wav/out_{session_id}.wav"
if content_wav is None or reference_wav is None:
raise ValueError("Please upload audio files")
try:
SR = 24000
# --- 1. پردازش ورودی ---
if isinstance(content_wav, tuple):
content_sr, content_data = content_wav if isinstance(content_wav[0], int) else (content_wav[1], content_wav[0])
else:
content_sr, content_data = content_wav
if len(content_data.shape) > 1: content_data = np.mean(content_data, axis=1)
content_tensor = torch.FloatTensor(content_data).unsqueeze(0)
if content_sr != SR:
content_tensor = torchaudio.functional.resample(content_tensor, content_sr, SR)
content_tensor = content_tensor / (torch.max(torch.abs(content_tensor)) + 1e-6) * 0.95
content_full_np = content_tensor.squeeze().numpy()
# --- 2. پردازش رفرنس ---
if isinstance(reference_wav, tuple):
ref_sr, ref_data = reference_wav if isinstance(reference_wav[0], int) else (reference_wav[1], reference_wav[0])
else:
ref_sr, ref_data = reference_wav
if len(ref_data.shape) > 1: ref_data = np.mean(ref_data, axis=1)
ref_tensor = torch.FloatTensor(ref_data).unsqueeze(0)
if ref_sr != SR:
ref_tensor = torchaudio.functional.resample(ref_tensor, ref_sr, SR)
ref_tensor = ref_tensor / (torch.max(torch.abs(ref_tensor)) + 1e-6) * 0.95
if ref_tensor.shape[1] > SR * 20: ref_tensor = ref_tensor[:, :SR * 20]
save_audio_pcm16(ref_tensor, temp_reference_path, SR)
# --- 3. استراتژی جوش دادن Equal Power (500ms) ---
pipeline = get_pipeline()
# تنظیمات حیاتی
CHUNK_DURATION = 10.0 # طول خالص هر تکه
CROSSFADE_SEC = 0.5 # طول هم‌پوشانی (نیم ثانیه برای حذف لرزش)
chunk_samples = int(CHUNK_DURATION * SR)
crossfade_samples = int(CROSSFADE_SEC * SR)
total_samples = len(content_full_np)
final_output = np.array([], dtype=np.float32)
# ایجاد منحنی فید Equal Power (سینوسی)
# این منحنی باعث می‌شود حجم صدا در محل اتصال ثابت بماند
fade_out_curve = np.cos(np.linspace(0, np.pi/2, crossfade_samples))
fade_in_curve = np.sin(np.linspace(0, np.pi/2, crossfade_samples))
# شروع حلقه پردازش
# ما در هر مرحله به اندازه chunk_samples جلو می‌رویم
# اما برای ورودی مدل، crossfade_samples را از قبل هم برمی‌داریم
cursor = 0
print(f"[{session_id}] Processing with 500ms Equal-Power Crossfade...")
while cursor < total_samples:
# تعیین بازه ورودی برای مدل
# اگر اولین تکه نیست، باید کمی از عقب‌تر شروع کنیم (برای هم‌پوشانی)
is_first_chunk = (cursor == 0)
start_idx = cursor
if not is_first_chunk:
start_idx -= crossfade_samples # عقب‌گرد برای هم‌پوشانی
end_idx = min(total_samples, cursor + chunk_samples)
# اگر به انتهای فایل رسیدیم و تکه خیلی کوچک است
if start_idx >= end_idx:
break
current_chunk_input = content_full_np[start_idx:end_idx]
# ذخیره و اجرا
save_audio_pcm16(torch.FloatTensor(current_chunk_input).unsqueeze(0), temp_content_path, SR)
try:
gen = pipeline.inference_fm(
src_wav_path=temp_content_path,
timbre_ref_wav_path=temp_reference_path,
flow_matching_steps=64,
)
if torch.isnan(gen).any(): gen = torch.nan_to_num(gen, nan=0.0)
gen_np = gen.detach().cpu().squeeze().numpy()
# --- عملیات میکس هوشمند ---
if is_first_chunk:
# تکه اول: مستقیماً اضافه کن
final_output = np.concatenate([final_output, gen_np])
else:
# تکه‌های بعدی:
# 1. بخش هم‌پوشانی (Crossfade Area)
# 2. بخش جدید (New Area)
if len(gen_np) < crossfade_samples:
# اگر خروجی خیلی کوتاه بود (نادر)، فقط بچسبان
final_output = np.concatenate([final_output, gen_np])
else:
# جدا کردن بخش میکس و بخش جدید از خروجی فعلی
overlap_part_new = gen_np[:crossfade_samples]
rest_part_new = gen_np[crossfade_samples:]
# جدا کردن بخش میکس از انتهای خروجی قبلی
if len(final_output) >= crossfade_samples:
overlap_part_old = final_output[-crossfade_samples:]
# فرمول Equal Power Crossfade
# Old * Cos + New * Sin
blended = (overlap_part_old * fade_out_curve) + (overlap_part_new * fade_in_curve)
# جایگزینی انتهای آرایه اصلی با بخش میکس شده
final_output[-crossfade_samples:] = blended
# اضافه کردن باقی‌مانده
final_output = np.concatenate([final_output, rest_part_new])
else:
# اگر بافر قبلی خیلی کوتاه بود (نباید پیش بیاید)
final_output = np.concatenate([final_output, gen_np])
except Exception as e:
print(f"Error at {cursor}: {e}")
# در صورت خطا سکوت اضافه کن
missing = end_idx - start_idx
final_output = np.concatenate([final_output, np.zeros(missing)])
# حرکت به جلو
cursor += chunk_samples
save_audio_pcm16(final_output, output_path, SR)
return output_path
finally:
if os.path.exists(temp_content_path): os.remove(temp_content_path)
if os.path.exists(temp_reference_path): os.remove(temp_reference_path)
with gr.Blocks(title="Vevo-Timbre (Pro Stitch)") as demo:
gr.Markdown("## Vevo-Timbre: Zero-Shot Voice Conversion")
gr.Markdown("Professional Stitching: 500ms Equal-Power Crossfade (No Jitter, No Ghosting).")
with gr.Row():
with gr.Column():
timbre_content = gr.Audio(label="Source Audio", type="numpy")
timbre_reference = gr.Audio(label="Target Timbre", type="numpy")
timbre_button = gr.Button("Generate", variant="primary")
with gr.Column():
timbre_output = gr.Audio(label="Result")
timbre_button.click(vevo_timbre, inputs=[timbre_content, timbre_reference], outputs=timbre_output)
demo.launch()