Sada / app.py
Opera8's picture
Update app.py
82faa29 verified
raw
history blame
12.3 kB
import os
import sys
import importlib.util
import site
import json
import torch
import gradio as gr
import torchaudio
import numpy as np
from huggingface_hub import snapshot_download, hf_hub_download
import subprocess
import uuid
import soundfile as sf
import spaces # این خط برای ZeroGPU ضروری است
# --- تنظیمات و نصب پیش‌نیازها ---
downloaded_resources = {
"configs": False,
"tokenizer_vq8192": False,
"fmt_Vq8192ToMels": False,
"vocoder": False
}
def install_espeak():
try:
result = subprocess.run(["which", "espeak-ng"], capture_output=True, text=True)
if result.returncode != 0:
print("Installing espeak-ng...")
subprocess.run(["apt-get", "update"], check=True)
subprocess.run(["apt-get", "install", "-y", "espeak-ng", "espeak-ng-data"], check=True)
except Exception as e:
print(f"Error installing espeak-ng: {e}")
install_espeak()
def patch_langsegment_init():
try:
spec = importlib.util.find_spec("LangSegment")
if spec is None or spec.origin is None: return
init_path = os.path.join(os.path.dirname(spec.origin), '__init__.py')
with open(init_path, 'r') as f: lines = f.readlines()
modified = False
new_lines = []
target_line_prefix = "from .LangSegment import"
for line in lines:
if line.strip().startswith(target_line_prefix) and ('setLangfilters' in line or 'getLangfilters' in line):
mod_line = line.replace(',setLangfilters', '').replace(',getLangfilters', '')
mod_line = mod_line.replace('setLangfilters,', '').replace('getLangfilters,', '').rstrip(',')
new_lines.append(mod_line + '\n')
modified = True
else:
new_lines.append(line)
if modified:
with open(init_path, 'w') as f: f.writelines(new_lines)
try:
import LangSegment
importlib.reload(LangSegment)
except: pass
except: pass
patch_langsegment_init()
if not os.path.exists("Amphion"):
subprocess.run(["git", "clone", "https://github.com/open-mmlab/Amphion.git"])
os.chdir("Amphion")
if os.path.dirname(os.path.abspath("Amphion")) not in sys.path:
sys.path.append(os.path.dirname(os.path.abspath("Amphion")))
os.makedirs("wav", exist_ok=True)
os.makedirs("ckpts/Vevo", exist_ok=True)
from models.vc.vevo.vevo_utils import VevoInferencePipeline
def save_audio_pcm16(waveform, output_path, sample_rate=24000):
try:
if isinstance(waveform, torch.Tensor):
waveform = waveform.detach().cpu()
if waveform.dim() == 2 and waveform.shape[0] == 1:
waveform = waveform.squeeze(0)
waveform = waveform.numpy()
sf.write(output_path, waveform, sample_rate, subtype='PCM_16')
except Exception as e:
print(f"Save error: {e}")
def setup_configs():
if downloaded_resources["configs"]: return
config_path = "models/vc/vevo/config"
os.makedirs(config_path, exist_ok=True)
config_files = ["Vq8192ToMels.json", "Vocoder.json"]
for file in config_files:
file_path = f"{config_path}/{file}"
if not os.path.exists(file_path):
try:
file_data = hf_hub_download(repo_id="amphion/Vevo", filename=f"config/{file}", repo_type="model")
subprocess.run(["cp", file_data, file_path])
except Exception as e: print(f"Error downloading config {file}: {e}")
downloaded_resources["configs"] = True
setup_configs()
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
inference_pipelines = {}
def preload_all_resources():
setup_configs()
global downloaded_content_style_tokenizer_path, downloaded_fmt_path, downloaded_vocoder_path
if not downloaded_resources["tokenizer_vq8192"]:
downloaded_content_style_tokenizer_path = snapshot_download(repo_id="amphion/Vevo", repo_type="model", cache_dir="./ckpts/Vevo", allow_patterns=["tokenizer/vq8192/*"])
downloaded_resources["tokenizer_vq8192"] = True
if not downloaded_resources["fmt_Vq8192ToMels"]:
downloaded_fmt_path = snapshot_download(repo_id="amphion/Vevo", repo_type="model", cache_dir="./ckpts/Vevo", allow_patterns=["acoustic_modeling/Vq8192ToMels/*"])
downloaded_resources["fmt_Vq8192ToMels"] = True
if not downloaded_resources["vocoder"]:
downloaded_vocoder_path = snapshot_download(repo_id="amphion/Vevo", repo_type="model", cache_dir="./ckpts/Vevo", allow_patterns=["acoustic_modeling/Vocoder/*"])
downloaded_resources["vocoder"] = True
downloaded_content_style_tokenizer_path = None
downloaded_fmt_path = None
downloaded_vocoder_path = None
preload_all_resources()
def get_pipeline():
if "timbre" in inference_pipelines: return inference_pipelines["timbre"]
pipeline = VevoInferencePipeline(
content_style_tokenizer_ckpt_path=os.path.join(downloaded_content_style_tokenizer_path, "tokenizer/vq8192"),
fmt_cfg_path="./models/vc/vevo/config/Vq8192ToMels.json",
fmt_ckpt_path=os.path.join(downloaded_fmt_path, "acoustic_modeling/Vq8192ToMels"),
vocoder_cfg_path="./models/vc/vevo/config/Vocoder.json",
vocoder_ckpt_path=os.path.join(downloaded_vocoder_path, "acoustic_modeling/Vocoder"),
device=device,
)
inference_pipelines["timbre"] = pipeline
return pipeline
@spaces.GPU()
def vevo_timbre(content_wav, reference_wav):
session_id = str(uuid.uuid4())[:8]
temp_content_path = f"wav/c_{session_id}.wav"
temp_reference_path = f"wav/r_{session_id}.wav"
output_path = f"wav/out_{session_id}.wav"
if content_wav is None or reference_wav is None:
raise ValueError("Please upload audio files")
try:
SR = 24000
# --- آماده‌سازی ورودی ---
if isinstance(content_wav, tuple):
content_sr, content_data = content_wav if isinstance(content_wav[0], int) else (content_wav[1], content_wav[0])
else:
content_sr, content_data = content_wav
if len(content_data.shape) > 1: content_data = np.mean(content_data, axis=1)
content_tensor = torch.FloatTensor(content_data).unsqueeze(0)
if content_sr != SR:
content_tensor = torchaudio.functional.resample(content_tensor, content_sr, SR)
content_tensor = content_tensor / (torch.max(torch.abs(content_tensor)) + 1e-6) * 0.95
content_full_np = content_tensor.squeeze().numpy()
# --- آماده‌سازی رفرنس ---
if isinstance(reference_wav, tuple):
ref_sr, ref_data = reference_wav if isinstance(reference_wav[0], int) else (reference_wav[1], reference_wav[0])
else:
ref_sr, ref_data = reference_wav
if len(ref_data.shape) > 1: ref_data = np.mean(ref_data, axis=1)
ref_tensor = torch.FloatTensor(ref_data).unsqueeze(0)
if ref_sr != SR:
ref_tensor = torchaudio.functional.resample(ref_tensor, ref_sr, SR)
ref_tensor = ref_tensor / (torch.max(torch.abs(ref_tensor)) + 1e-6) * 0.95
if ref_tensor.shape[1] > SR * 20: ref_tensor = ref_tensor[:, :SR * 20]
save_audio_pcm16(ref_tensor, temp_reference_path, SR)
# --- استراتژی Center-Only Processing (حذف لرزش) ---
pipeline = get_pipeline()
# تنظیمات: ۱۰ ثانیه تمیز نگه می‌داریم، ۲ ثانیه از هر طرف دور می‌ریزیم
CORE_CHUNK_SEC = 10.0
PADDING_SEC = 2.0
core_samples = int(CORE_CHUNK_SEC * SR)
padding_samples = int(PADDING_SEC * SR)
total_samples = len(content_full_np)
final_output = []
cursor = 0
print(f"[{session_id}] Starting Center-Only processing...")
while cursor < total_samples:
# ۱. خواندن بازه وسیع‌تر (شامل پدینگ)
read_start = max(0, cursor - padding_samples)
read_end = min(total_samples, cursor + core_samples + padding_samples)
if cursor >= total_samples:
break
chunk_input = content_full_np[read_start:read_end]
# اگر تکه انتهایی خیلی کوچک است، بیخیال شو
if len(chunk_input) < SR * 0.5:
break
save_audio_pcm16(torch.FloatTensor(chunk_input).unsqueeze(0), temp_content_path, SR)
try:
# ۲. تولید صدا
gen = pipeline.inference_fm(
src_wav_path=temp_content_path,
timbre_ref_wav_path=temp_reference_path,
flow_matching_steps=64,
)
if torch.isnan(gen).any(): gen = torch.nan_to_num(gen, nan=0.0)
gen_np = gen.detach().cpu().squeeze().numpy()
# ۳. حذف حاشیه‌های خراب (Trimming)
# محاسبه برش از جلو
if cursor == 0:
trim_front = 0 # در اولین تکه، پدینگ جلو نداریم
else:
trim_front = padding_samples # در بقیه، به اندازه پدینگ جلو می‌بریم
# محاسبه طول مفید
valid_length = min(core_samples, total_samples - cursor)
if len(gen_np) > trim_front:
# استخراج فقط هسته مرکزی (بدون لرزش)
core_audio = gen_np[trim_front : trim_front + valid_length]
# ۴. اتصال میکروسکوپی (۵۰ میلی ثانیه) فقط برای حذف کلیک
fade_len = int(0.05 * SR)
if len(final_output) > 0 and len(core_audio) > fade_len:
fade_out = np.linspace(1, 0, fade_len)
fade_in = np.linspace(0, 1, fade_len)
prev_tail = final_output[-1][-fade_len:]
curr_head = core_audio[:fade_len]
if len(prev_tail) == fade_len:
mixed = (prev_tail * fade_out) + (curr_head * fade_in)
final_output[-1][-fade_len:] = mixed
core_audio = core_audio[fade_len:]
final_output.append(core_audio)
except Exception as e:
print(f"Error processing chunk at {cursor}: {e}")
missing = min(core_samples, total_samples - cursor)
final_output.append(np.zeros(missing))
# ۵. جلو رفتن دقیق به اندازه ۱۰ ثانیه
cursor += core_samples
# چسباندن نهایی
if len(final_output) > 0:
full_audio = np.concatenate(final_output)
else:
full_audio = np.zeros(SR)
save_audio_pcm16(full_audio, output_path, SR)
return output_path
finally:
if os.path.exists(temp_content_path): os.remove(temp_content_path)
if os.path.exists(temp_reference_path): os.remove(temp_reference_path)
with gr.Blocks(title="Vevo-Timbre (Stable Core)") as demo:
gr.Markdown("## Vevo-Timbre: Zero-Shot Voice Conversion")
gr.Markdown("Stable Core Logic: Removes generated artifacts at boundaries.")
with gr.Row():
with gr.Column():
timbre_content = gr.Audio(label="Source Audio", type="numpy")
timbre_reference = gr.Audio(label="Target Timbre", type="numpy")
timbre_button = gr.Button("Generate", variant="primary")
with gr.Column():
timbre_output = gr.Audio(label="Result")
timbre_button.click(vevo_timbre, inputs=[timbre_content, timbre_reference], outputs=timbre_output)
demo.launch()