Sada / app.py
Opera8's picture
Update app.py
9454331 verified
raw
history blame
13.6 kB
import os
import sys
import importlib.util
import site
import json
import torch
import gradio as gr
import torchaudio
import numpy as np
from huggingface_hub import snapshot_download, hf_hub_download
import subprocess
import uuid
import soundfile as sf
# --- تنظیمات و نصب پیش‌نیازها ---
downloaded_resources = {
"configs": False,
"tokenizer_vq8192": False,
"fmt_Vq8192ToMels": False,
"vocoder": False
}
def install_espeak():
try:
result = subprocess.run(["which", "espeak-ng"], capture_output=True, text=True)
if result.returncode != 0:
print("Installing espeak-ng...")
subprocess.run(["apt-get", "update"], check=True)
subprocess.run(["apt-get", "install", "-y", "espeak-ng", "espeak-ng-data"], check=True)
except Exception as e:
print(f"Error installing espeak-ng: {e}")
install_espeak()
def patch_langsegment_init():
try:
spec = importlib.util.find_spec("LangSegment")
if spec is None or spec.origin is None: return
init_path = os.path.join(os.path.dirname(spec.origin), '__init__.py')
with open(init_path, 'r') as f: lines = f.readlines()
modified = False
new_lines = []
target_line_prefix = "from .LangSegment import"
for line in lines:
if line.strip().startswith(target_line_prefix) and ('setLangfilters' in line or 'getLangfilters' in line):
mod_line = line.replace(',setLangfilters', '').replace(',getLangfilters', '')
mod_line = mod_line.replace('setLangfilters,', '').replace('getLangfilters,', '').rstrip(',')
new_lines.append(mod_line + '\n')
modified = True
else:
new_lines.append(line)
if modified:
with open(init_path, 'w') as f: f.writelines(new_lines)
try:
import LangSegment
importlib.reload(LangSegment)
except: pass
except: pass
patch_langsegment_init()
if not os.path.exists("Amphion"):
subprocess.run(["git", "clone", "https://github.com/open-mmlab/Amphion.git"])
os.chdir("Amphion")
if os.path.dirname(os.path.abspath("Amphion")) not in sys.path:
sys.path.append(os.path.dirname(os.path.abspath("Amphion")))
os.makedirs("wav", exist_ok=True)
os.makedirs("ckpts/Vevo", exist_ok=True)
from models.vc.vevo.vevo_utils import VevoInferencePipeline
def save_audio_pcm16(waveform, output_path, sample_rate=24000):
try:
if isinstance(waveform, torch.Tensor):
waveform = waveform.detach().cpu()
if waveform.dim() == 2 and waveform.shape[0] == 1:
waveform = waveform.squeeze(0)
waveform = waveform.numpy()
sf.write(output_path, waveform, sample_rate, subtype='PCM_16')
except Exception as e:
print(f"Save error: {e}")
def setup_configs():
if downloaded_resources["configs"]: return
config_path = "models/vc/vevo/config"
os.makedirs(config_path, exist_ok=True)
config_files = ["Vq8192ToMels.json", "Vocoder.json"]
for file in config_files:
file_path = f"{config_path}/{file}"
if not os.path.exists(file_path):
try:
file_data = hf_hub_download(repo_id="amphion/Vevo", filename=f"config/{file}", repo_type="model")
subprocess.run(["cp", file_data, file_path])
except Exception as e: print(f"Error downloading config {file}: {e}")
downloaded_resources["configs"] = True
setup_configs()
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
inference_pipelines = {}
def preload_all_resources():
setup_configs()
global downloaded_content_style_tokenizer_path, downloaded_fmt_path, downloaded_vocoder_path
if not downloaded_resources["tokenizer_vq8192"]:
downloaded_content_style_tokenizer_path = snapshot_download(repo_id="amphion/Vevo", repo_type="model", cache_dir="./ckpts/Vevo", allow_patterns=["tokenizer/vq8192/*"])
downloaded_resources["tokenizer_vq8192"] = True
if not downloaded_resources["fmt_Vq8192ToMels"]:
downloaded_fmt_path = snapshot_download(repo_id="amphion/Vevo", repo_type="model", cache_dir="./ckpts/Vevo", allow_patterns=["acoustic_modeling/Vq8192ToMels/*"])
downloaded_resources["fmt_Vq8192ToMels"] = True
if not downloaded_resources["vocoder"]:
downloaded_vocoder_path = snapshot_download(repo_id="amphion/Vevo", repo_type="model", cache_dir="./ckpts/Vevo", allow_patterns=["acoustic_modeling/Vocoder/*"])
downloaded_resources["vocoder"] = True
downloaded_content_style_tokenizer_path = None
downloaded_fmt_path = None
downloaded_vocoder_path = None
preload_all_resources()
def get_pipeline():
if "timbre" in inference_pipelines: return inference_pipelines["timbre"]
pipeline = VevoInferencePipeline(
content_style_tokenizer_ckpt_path=os.path.join(downloaded_content_style_tokenizer_path, "tokenizer/vq8192"),
fmt_cfg_path="./models/vc/vevo/config/Vq8192ToMels.json",
fmt_ckpt_path=os.path.join(downloaded_fmt_path, "acoustic_modeling/Vq8192ToMels"),
vocoder_cfg_path="./models/vc/vevo/config/Vocoder.json",
vocoder_ckpt_path=os.path.join(downloaded_vocoder_path, "acoustic_modeling/Vocoder"),
device=device,
)
inference_pipelines["timbre"] = pipeline
return pipeline
@spaces.GPU()
def vevo_timbre(content_wav, reference_wav):
session_id = str(uuid.uuid4())[:8]
temp_content_path = f"wav/c_{session_id}.wav"
temp_reference_path = f"wav/r_{session_id}.wav"
output_path = f"wav/out_{session_id}.wav"
if content_wav is None or reference_wav is None:
raise ValueError("Please upload audio files")
try:
SR = 24000
# --- آماده‌سازی ورودی ---
if isinstance(content_wav, tuple):
content_sr, content_data = content_wav if isinstance(content_wav[0], int) else (content_wav[1], content_wav[0])
else:
content_sr, content_data = content_wav
if len(content_data.shape) > 1: content_data = np.mean(content_data, axis=1)
content_tensor = torch.FloatTensor(content_data).unsqueeze(0)
if content_sr != SR:
content_tensor = torchaudio.functional.resample(content_tensor, content_sr, SR)
content_tensor = content_tensor / (torch.max(torch.abs(content_tensor)) + 1e-6) * 0.95
content_full_np = content_tensor.squeeze().numpy()
# --- آماده‌سازی رفرنس ---
if isinstance(reference_wav, tuple):
ref_sr, ref_data = reference_wav if isinstance(reference_wav[0], int) else (reference_wav[1], reference_wav[0])
else:
ref_sr, ref_data = reference_wav
if len(ref_data.shape) > 1: ref_data = np.mean(ref_data, axis=1)
ref_tensor = torch.FloatTensor(ref_data).unsqueeze(0)
if ref_sr != SR:
ref_tensor = torchaudio.functional.resample(ref_tensor, ref_sr, SR)
ref_tensor = ref_tensor / (torch.max(torch.abs(ref_tensor)) + 1e-6) * 0.95
if ref_tensor.shape[1] > SR * 20: ref_tensor = ref_tensor[:, :SR * 20]
save_audio_pcm16(ref_tensor, temp_reference_path, SR)
# --- استراتژی Center-Only Processing (حذف لرزش) ---
pipeline = get_pipeline()
# تنظیمات اصلی (به ثانیه)
CORE_CHUNK_SEC = 10.0 # مقداری که نهایتاً نگه می‌داریم
PADDING_SEC = 2.0 # مقداری که از هر طرف اضافه می‌خوانیم و دور می‌ریزیم
core_samples = int(CORE_CHUNK_SEC * SR)
padding_samples = int(PADDING_SEC * SR)
total_samples = len(content_full_np)
final_output = []
cursor = 0
print(f"[{session_id}] Starting Center-Only processing...")
while cursor < total_samples:
# محاسبه بازه خواندن از فایل اصلی (Source)
# ما PADDING را از عقب و جلو اضافه می‌کنیم
read_start = max(0, cursor - padding_samples)
read_end = min(total_samples, cursor + core_samples + padding_samples)
# اگر به ته فایل رسیدیم و چیزی برای پردازش نمانده
if cursor >= total_samples:
break
# استخراج تکه "پد شده"
chunk_input = content_full_np[read_start:read_end]
# اگر تکه خیلی کوچک است (انتهای فایل)، فقط پردازش کن
if len(chunk_input) < SR * 0.5:
break
save_audio_pcm16(torch.FloatTensor(chunk_input).unsqueeze(0), temp_content_path, SR)
try:
# تولید صدا با حاشیه امن
gen = pipeline.inference_fm(
src_wav_path=temp_content_path,
timbre_ref_wav_path=temp_reference_path,
flow_matching_steps=64,
)
if torch.isnan(gen).any(): gen = torch.nan_to_num(gen, nan=0.0)
gen_np = gen.detach().cpu().squeeze().numpy()
# --- برش هوشمند (Trimming) ---
# حالا باید حاشیه‌های ناپایدار (لرزش دار) را حذف کنیم
# 1. محاسبه مقدار برش از ابتدا (Front Trim)
# اگر اولین تکه است، ما PADDING نداشتیم (چون read_start=0 بود)
if cursor == 0:
trim_front = 0
else:
# در غیر این صورت، دقیقاً به اندازه PADDING از جلو می‌بریم
trim_front = padding_samples
# 2. محاسبه مقدار برش از انتها (End Trim)
# ما می‌خواهیم فقط به اندازه CORE (10 ثانیه) نگه داریم
# اما باید مراقب انتهای فایل باشیم
# طول معتبر این تکه در خروجی نهایی
valid_length = min(core_samples, total_samples - cursor)
# استخراج بخش مرکزی (Stable Core)
# از trim_front شروع کن و به اندازه valid_length بردار
if len(gen_np) > trim_front:
core_audio = gen_np[trim_front : trim_front + valid_length]
# --- اتصال میکروسکوپی (Micro Crossfade 50ms) ---
# این فقط برای جلوگیری از کلیک دیجیتال است، نه برای تغییر لحن
fade_len = int(0.05 * SR) # 50ms
if len(final_output) > 0 and len(core_audio) > fade_len:
# نرم کردن اتصال
fade_out = np.linspace(1, 0, fade_len)
fade_in = np.linspace(0, 1, fade_len)
# آخرین تکه لیست
prev_tail = final_output[-1][-fade_len:]
curr_head = core_audio[:fade_len]
# اگر سایزها یکی بود میکس کن
if len(prev_tail) == fade_len:
mixed = (prev_tail * fade_out) + (curr_head * fade_in)
final_output[-1][-fade_len:] = mixed
# حذف بخش میکس شده از تکه جدید
core_audio = core_audio[fade_len:]
final_output.append(core_audio)
except Exception as e:
print(f"Error processing chunk at {cursor}: {e}")
missing = min(core_samples, total_samples - cursor)
final_output.append(np.zeros(missing))
# جلو رفتن نشانگر به اندازه هسته اصلی (بدون هم‌پوشانی منطقی)
cursor += core_samples
# چسباندن نهایی
if len(final_output) > 0:
full_audio = np.concatenate(final_output)
else:
full_audio = np.zeros(SR)
save_audio_pcm16(full_audio, output_path, SR)
return output_path
finally:
if os.path.exists(temp_content_path): os.remove(temp_content_path)
if os.path.exists(temp_reference_path): os.remove(temp_reference_path)
with gr.Blocks(title="Vevo-Timbre (Stable Core)") as demo:
gr.Markdown("## Vevo-Timbre: Zero-Shot Voice Conversion")
gr.Markdown("Center-Only Processing Strategy: Generates extra padding and discards unstable edges to remove jitter.")
with gr.Row():
with gr.Column():
timbre_content = gr.Audio(label="Source Audio", type="numpy")
timbre_reference = gr.Audio(label="Target Timbre", type="numpy")
timbre_button = gr.Button("Generate", variant="primary")
with gr.Column():
timbre_output = gr.Audio(label="Result")
timbre_button.click(vevo_timbre, inputs=[timbre_content, timbre_reference], outputs=timbre_output)
demo.launch()