Sada1 / app.py
Opera8's picture
Update app.py
3b10964 verified
raw
history blame
14.8 kB
import os
import sys
import importlib.util
import site
import json
import torch
import gradio as gr
import torchaudio
import numpy as np
from huggingface_hub import snapshot_download, hf_hub_download
import subprocess
import re
import spaces
import uuid
import soundfile as sf
# منابع ضروری
downloaded_resources = {
"configs": False,
"tokenizer_vq8192": False,
"fmt_Vq8192ToMels": False,
"vocoder": False
}
def install_espeak():
try:
result = subprocess.run(["which", "espeak-ng"], capture_output=True, text=True)
if result.returncode != 0:
print("Installing espeak-ng...")
subprocess.run(["apt-get", "update"], check=True)
subprocess.run(["apt-get", "install", "-y", "espeak-ng", "espeak-ng-data"], check=True)
except Exception as e:
print(f"Error installing espeak-ng: {e}")
install_espeak()
def patch_langsegment_init():
try:
spec = importlib.util.find_spec("LangSegment")
if spec is None or spec.origin is None: return
init_path = os.path.join(os.path.dirname(spec.origin), '__init__.py')
if not os.path.exists(init_path):
for site_pkg_path in site.getsitepackages():
potential_path = os.path.join(site_pkg_path, 'LangSegment', '__init__.py')
if os.path.exists(potential_path):
init_path = potential_path
break
else: return
with open(init_path, 'r') as f: lines = f.readlines()
modified = False
new_lines = []
target_line_prefix = "from .LangSegment import"
for line in lines:
if line.strip().startswith(target_line_prefix) and ('setLangfilters' in line or 'getLangfilters' in line):
mod_line = line.replace(',setLangfilters', '').replace(',getLangfilters', '')
mod_line = mod_line.replace('setLangfilters,', '').replace('getLangfilters,', '').rstrip(',')
new_lines.append(mod_line + '\n')
modified = True
else:
new_lines.append(line)
if modified:
with open(init_path, 'w') as f: f.writelines(new_lines)
try:
import LangSegment
importlib.reload(LangSegment)
except: pass
except: pass
patch_langsegment_init()
if not os.path.exists("Amphion"):
subprocess.run(["git", "clone", "https://github.com/open-mmlab/Amphion.git"])
os.chdir("Amphion")
else:
if not os.getcwd().endswith("Amphion"):
os.chdir("Amphion")
if os.path.dirname(os.path.abspath("Amphion")) not in sys.path:
sys.path.append(os.path.dirname(os.path.abspath("Amphion")))
os.makedirs("wav", exist_ok=True)
os.makedirs("ckpts/Vevo", exist_ok=True)
from models.vc.vevo.vevo_utils import VevoInferencePipeline
def save_audio_pcm16(waveform, output_path, sample_rate=24000):
try:
if isinstance(waveform, torch.Tensor):
waveform = waveform.detach().cpu()
if waveform.dim() == 2 and waveform.shape[0] == 1:
waveform = waveform.squeeze(0)
waveform = waveform.numpy()
sf.write(output_path, waveform, sample_rate, subtype='PCM_16')
except Exception as e:
print(f"Save error: {e}")
raise e
def setup_configs():
if downloaded_resources["configs"]: return
config_path = "models/vc/vevo/config"
os.makedirs(config_path, exist_ok=True)
config_files = ["Vq8192ToMels.json", "Vocoder.json"]
for file in config_files:
file_path = f"{config_path}/{file}"
if not os.path.exists(file_path):
try:
file_data = hf_hub_download(repo_id="amphion/Vevo", filename=f"config/{file}", repo_type="model")
subprocess.run(["cp", file_data, file_path])
except Exception as e: print(f"Error downloading config {file}: {e}")
downloaded_resources["configs"] = True
setup_configs()
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
print(f"Using device: {device}")
inference_pipelines = {}
def preload_all_resources():
print("Preloading resources...")
setup_configs()
global downloaded_content_style_tokenizer_path, downloaded_fmt_path, downloaded_vocoder_path
if not downloaded_resources["tokenizer_vq8192"]:
local_dir = snapshot_download(repo_id="amphion/Vevo", repo_type="model", cache_dir="./ckpts/Vevo", allow_patterns=["tokenizer/vq8192/*"])
downloaded_content_style_tokenizer_path = local_dir
downloaded_resources["tokenizer_vq8192"] = True
if not downloaded_resources["fmt_Vq8192ToMels"]:
local_dir = snapshot_download(repo_id="amphion/Vevo", repo_type="model", cache_dir="./ckpts/Vevo", allow_patterns=["acoustic_modeling/Vq8192ToMels/*"])
downloaded_fmt_path = local_dir
downloaded_resources["fmt_Vq8192ToMels"] = True
if not downloaded_resources["vocoder"]:
local_dir = snapshot_download(repo_id="amphion/Vevo", repo_type="model", cache_dir="./ckpts/Vevo", allow_patterns=["acoustic_modeling/Vocoder/*"])
downloaded_vocoder_path = local_dir
downloaded_resources["vocoder"] = True
print("Resources ready.")
downloaded_content_style_tokenizer_path = None
downloaded_fmt_path = None
downloaded_vocoder_path = None
preload_all_resources()
def get_pipeline():
if "timbre" in inference_pipelines:
return inference_pipelines["timbre"]
pipeline = VevoInferencePipeline(
content_style_tokenizer_ckpt_path=os.path.join(downloaded_content_style_tokenizer_path, "tokenizer/vq8192"),
fmt_cfg_path="./models/vc/vevo/config/Vq8192ToMels.json",
fmt_ckpt_path=os.path.join(downloaded_fmt_path, "acoustic_modeling/Vq8192ToMels"),
vocoder_cfg_path="./models/vc/vevo/config/Vocoder.json",
vocoder_ckpt_path=os.path.join(downloaded_vocoder_path, "acoustic_modeling/Vocoder"),
device=device,
)
inference_pipelines["timbre"] = pipeline
return pipeline
@spaces.GPU()
def vevo_timbre(content_wav, reference_wav):
session_id = str(uuid.uuid4())[:8]
temp_content_path = f"wav/c_{session_id}.wav"
temp_reference_path = f"wav/r_{session_id}.wav"
output_path = f"wav/out_{session_id}.wav"
if content_wav is None or reference_wav is None:
raise ValueError("Please upload audio files")
try:
# --- پردازش ورودی ---
if isinstance(content_wav, tuple):
content_sr, content_data = content_wav if isinstance(content_wav[0], int) else (content_wav[1], content_wav[0])
else:
content_sr, content_data = content_wav
if len(content_data.shape) > 1 and content_data.shape[1] > 1:
content_data = np.mean(content_data, axis=1)
content_tensor = torch.FloatTensor(content_data).unsqueeze(0)
if content_sr != 24000:
content_tensor = torchaudio.functional.resample(content_tensor, content_sr, 24000)
content_sr = 24000
content_tensor = content_tensor / (torch.max(torch.abs(content_tensor)) + 1e-6) * 0.95
# --- پردازش رفرنس ---
if isinstance(reference_wav, tuple):
ref_sr, ref_data = reference_wav if isinstance(reference_wav[0], int) else (reference_wav[1], reference_wav[0])
else:
ref_sr, ref_data = reference_wav
if len(ref_data.shape) > 1 and ref_data.shape[1] > 1:
ref_data = np.mean(ref_data, axis=1)
ref_tensor = torch.FloatTensor(ref_data).unsqueeze(0)
if ref_sr != 24000:
ref_tensor = torchaudio.functional.resample(ref_tensor, ref_sr, 24000)
ref_sr = 24000
ref_tensor = ref_tensor / (torch.max(torch.abs(ref_tensor)) + 1e-6) * 0.95
if ref_tensor.shape[1] > 24000 * 20:
ref_tensor = ref_tensor[:, :24000 * 20]
save_audio_pcm16(ref_tensor, temp_reference_path, ref_sr)
# --- منطق حرفه‌ای Warm-up Context Stitching ---
pipeline = get_pipeline()
SR = 24000
STEP_SIZE = 10 * SR # هر 10 ثانیه جلو می‌رویم
WARMUP_SIZE = 3 * SR # 3 ثانیه کانتکست (نگاه به عقب) برای گرم شدن
CROSSFADE_SIZE = 1 * SR # 1 ثانیه میکس برای نرم کردن اتصال
total_samples = content_tensor.shape[1]
print(f"[{session_id}] Duration: {total_samples/SR:.2f}s. Studio Mode (Warm-up + Crossfade)...")
final_audio = []
previous_tail = None # نگهداری ۱ ثانیه آخر تکه قبلی برای میکس
# حلقه روی تکه‌ها
current_pos = 0
while current_pos < total_samples:
# محاسبه دقیق بازه ورودی
# اگر اولین تکه نیستیم، 3 ثانیه عقب‌تر شروع می‌کنیم (Warm-up)
if current_pos == 0:
start_input = 0
warmup_cut = 0
else:
start_input = max(0, current_pos - WARMUP_SIZE)
warmup_cut = current_pos - start_input # مقداری که باید از اول خروجی دور بریزیم
# پایان این تکه (10 ثانیه جلوتر + 1 ثانیه اضافه برای میکس بعدی)
end_input = min(current_pos + STEP_SIZE + CROSSFADE_SIZE, total_samples)
# اگر دیتایی نمانده، تمام
if start_input >= end_input:
break
# استخراج تکه ورودی
chunk_tensor = content_tensor[:, start_input:end_input]
save_audio_pcm16(chunk_tensor, temp_content_path, SR)
print(f"[{session_id}] Processing chunk starting at {current_pos/SR:.1f}s (with context)")
try:
gen = pipeline.inference_fm(
src_wav_path=temp_content_path,
timbre_ref_wav_path=temp_reference_path,
flow_matching_steps=64, # کیفیت 64 پله‌ای
)
if torch.isnan(gen).any(): gen = torch.nan_to_num(gen, nan=0.0)
if gen.dim() == 1: gen = gen.unsqueeze(0)
gen = gen.cpu().squeeze(0).numpy()
# 1. حذف قسمت Warm-up (که قبلاً ساخته شده بود)
if warmup_cut > 0:
# اما صبر کن! ما باید CROSSFADE_SIZE تا قبل از نقطه برش را نگه داریم برای میکس
# پس برش را کمی عقب‌تر می‌زنیم تا همپوشانی داشته باشیم
valid_start = warmup_cut - CROSSFADE_SIZE
if valid_start < 0: valid_start = 0 # نباید پیش بیاد
gen = gen[valid_start:]
# الان `gen` شامل: [همپوشانی با قبلی] + [تکه جدید] + [همپوشانی با بعدی] است.
# 2. میکس با تکه قبلی (اگر وجود دارد)
if previous_tail is not None:
# جدا کردن قسمت همپوشانی از این تکه
overlap_part = gen[:CROSSFADE_SIZE]
new_part = gen[CROSSFADE_SIZE:]
# اگر سایزها یکی بود میکس کن
if len(overlap_part) == len(previous_tail):
alpha = np.linspace(0, 1, len(overlap_part))
blended = (previous_tail * (1 - alpha)) + (overlap_part * alpha)
final_audio.append(blended)
else:
# فال‌بک (نباید پیش بیاد)
final_audio.append(previous_tail)
# حالا قسمت جدید را پردازش می‌کنیم
# باید قسمت انتهایی را برای دور بعد ذخیره کنیم
if len(new_part) > CROSSFADE_SIZE and end_input < total_samples:
# ذخیره دم برای دور بعد
previous_tail = new_part[-CROSSFADE_SIZE:]
# اضافه کردن بدنه اصلی
final_audio.append(new_part[:-CROSSFADE_SIZE])
else:
# تکه آخر است، کلش را اضافه کن
final_audio.append(new_part)
previous_tail = None
else:
# تکه اول است
if len(gen) > CROSSFADE_SIZE and end_input < total_samples:
previous_tail = gen[-CROSSFADE_SIZE:]
final_audio.append(gen[:-CROSSFADE_SIZE])
else:
final_audio.append(gen)
previous_tail = None
current_pos += STEP_SIZE
except Exception as e:
print(f"Error in chunk: {e}")
# در صورت خطا، پرش کن (بهتر از قطع شدن است)
current_pos += STEP_SIZE
# اضافه کردن سکوت
final_audio.append(np.zeros(STEP_SIZE))
previous_tail = None
# چسباندن نهایی
if len(final_audio) > 0:
full_audio = np.concatenate(final_audio)
else:
full_audio = np.zeros(24000)
save_audio_pcm16(full_audio, output_path, SR)
return output_path
finally:
if os.path.exists(temp_content_path): os.remove(temp_content_path)
if os.path.exists(temp_reference_path): os.remove(temp_reference_path)
with gr.Blocks(title="Vevo-Timbre (Studio)") as demo:
gr.Markdown("## Vevo-Timbre: Zero-Shot Voice Conversion")
gr.Markdown("نسخه استودیویی: بدون پرش، بدون تداخل زمانی.")
with gr.Row():
with gr.Column():
timbre_content = gr.Audio(label="Source Audio", type="numpy")
timbre_reference = gr.Audio(label="Target Timbre", type="numpy")
timbre_button = gr.Button("Generate", variant="primary")
with gr.Column():
timbre_output = gr.Audio(label="Result")
timbre_button.click(vevo_timbre, inputs=[timbre_content, timbre_reference], outputs=timbre_output)
demo.launch()