Sada / app.py
Opera8's picture
Update app.py
bb4972a verified
import os
import sys
import importlib.util
import site
import json
import torch
import gradio as gr
import torchaudio
import numpy as np
from huggingface_hub import snapshot_download, hf_hub_download
import subprocess
import uuid
import soundfile as sf
import spaces
import librosa
# --- 1. نصب و راه‌اندازی ---
downloaded_resources = {
"configs": False,
"tokenizer_vq8192": False,
"fmt_Vq8192ToMels": False,
"vocoder": False
}
def install_espeak():
try:
result = subprocess.run(["which", "espeak-ng"], capture_output=True, text=True)
if result.returncode != 0:
print("Installing espeak-ng...")
subprocess.run(["apt-get", "update"], check=True)
subprocess.run(["apt-get", "install", "-y", "espeak-ng", "espeak-ng-data"], check=True)
except Exception as e:
print(f"Error installing espeak-ng: {e}")
install_espeak()
def patch_langsegment_init():
try:
spec = importlib.util.find_spec("LangSegment")
if spec is None or spec.origin is None: return
init_path = os.path.join(os.path.dirname(spec.origin), '__init__.py')
with open(init_path, 'r') as f: lines = f.readlines()
modified = False
new_lines = []
target_line_prefix = "from .LangSegment import"
for line in lines:
if line.strip().startswith(target_line_prefix) and ('setLangfilters' in line or 'getLangfilters' in line):
mod_line = line.replace(',setLangfilters', '').replace(',getLangfilters', '')
mod_line = mod_line.replace('setLangfilters,', '').replace('getLangfilters,', '').rstrip(',')
new_lines.append(mod_line + '\n')
modified = True
else:
new_lines.append(line)
if modified:
with open(init_path, 'w') as f: f.writelines(new_lines)
try:
import LangSegment
importlib.reload(LangSegment)
except: pass
except: pass
patch_langsegment_init()
if not os.path.exists("Amphion"):
subprocess.run(["git", "clone", "https://github.com/open-mmlab/Amphion.git"])
os.chdir("Amphion")
if os.path.dirname(os.path.abspath("Amphion")) not in sys.path:
sys.path.append(os.path.dirname(os.path.abspath("Amphion")))
os.makedirs("wav", exist_ok=True)
os.makedirs("ckpts/Vevo", exist_ok=True)
from models.vc.vevo.vevo_utils import VevoInferencePipeline
def save_audio_pcm16(waveform, output_path, sample_rate=24000):
try:
if isinstance(waveform, torch.Tensor):
waveform = waveform.detach().cpu()
if waveform.dim() == 2 and waveform.shape[0] == 1:
waveform = waveform.squeeze(0)
waveform = waveform.numpy()
sf.write(output_path, waveform, sample_rate, subtype='PCM_16')
except Exception as e:
print(f"Save error: {e}")
def setup_configs():
if downloaded_resources["configs"]: return
config_path = "models/vc/vevo/config"
os.makedirs(config_path, exist_ok=True)
config_files = ["Vq8192ToMels.json", "Vocoder.json"]
for file in config_files:
file_path = f"{config_path}/{file}"
if not os.path.exists(file_path):
try:
file_data = hf_hub_download(repo_id="amphion/Vevo", filename=f"config/{file}", repo_type="model")
subprocess.run(["cp", file_data, file_path])
except Exception as e: print(f"Error downloading config {file}: {e}")
downloaded_resources["configs"] = True
setup_configs()
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
inference_pipelines = {}
def preload_all_resources():
setup_configs()
global downloaded_content_style_tokenizer_path, downloaded_fmt_path, downloaded_vocoder_path
if not downloaded_resources["tokenizer_vq8192"]:
downloaded_content_style_tokenizer_path = snapshot_download(repo_id="amphion/Vevo", repo_type="model", cache_dir="./ckpts/Vevo", allow_patterns=["tokenizer/vq8192/*"])
downloaded_resources["tokenizer_vq8192"] = True
if not downloaded_resources["fmt_Vq8192ToMels"]:
downloaded_fmt_path = snapshot_download(repo_id="amphion/Vevo", repo_type="model", cache_dir="./ckpts/Vevo", allow_patterns=["acoustic_modeling/Vq8192ToMels/*"])
downloaded_resources["fmt_Vq8192ToMels"] = True
if not downloaded_resources["vocoder"]:
downloaded_vocoder_path = snapshot_download(repo_id="amphion/Vevo", repo_type="model", cache_dir="./ckpts/Vevo", allow_patterns=["acoustic_modeling/Vocoder/*"])
downloaded_resources["vocoder"] = True
downloaded_content_style_tokenizer_path = None
downloaded_fmt_path = None
downloaded_vocoder_path = None
preload_all_resources()
def get_pipeline():
if "timbre" in inference_pipelines: return inference_pipelines["timbre"]
pipeline = VevoInferencePipeline(
content_style_tokenizer_ckpt_path=os.path.join(downloaded_content_style_tokenizer_path, "tokenizer/vq8192"),
fmt_cfg_path="./models/vc/vevo/config/Vq8192ToMels.json",
fmt_ckpt_path=os.path.join(downloaded_fmt_path, "acoustic_modeling/Vq8192ToMels"),
vocoder_cfg_path="./models/vc/vevo/config/Vocoder.json",
vocoder_ckpt_path=os.path.join(downloaded_vocoder_path, "acoustic_modeling/Vocoder"),
device=device,
)
inference_pipelines["timbre"] = pipeline
return pipeline
# --- 2. الگوریتم برش فوق هوشمند ---
def find_advanced_split_points(audio_np, sr):
"""
پیدا کردن نقاط برش با استراتژی فال‌بک (Fallback Strategy):
۱. تلاش برای پیدا کردن سکوت در بازه ۸ تا ۱۲ ثانیه.
۲. اگر نشد، تلاش در بازه وسیع‌تر ۶ تا ۱۴ ثانیه.
۳. انتخاب نقطه با کمترین انرژی (حتی اگر سکوت نباشد).
۴. تنظیم دقیق روی نزدیک‌ترین Zero-Crossing.
"""
total_samples = len(audio_np)
# تنظیمات بازه جستجو
MIN_PREFERRED = 8.0
MAX_PREFERRED = 12.0
MIN_HARD = 6.0
MAX_HARD = 15.0
split_points = [0]
current_pos = 0
hop_length = 512
frame_length = 1024
while current_pos < total_samples:
# استراتژی ۱: بازه ایده‌آل
start_search = current_pos + int(MIN_PREFERRED * sr)
end_search = current_pos + int(MAX_PREFERRED * sr)
# اگر به انتهای فایل نزدیکیم
if start_search >= total_samples:
split_points.append(total_samples)
break
end_search = min(end_search, total_samples)
# استراتژی ۲: اگر بازه ایده‌آل خیلی کوتاه است (ته فایل)، گسترش بده
if end_search - start_search < sr:
# استفاده از بازه سخت (وسیع)
start_search = current_pos + int(MIN_HARD * sr)
end_search = current_pos + int(MAX_HARD * sr)
start_search = min(start_search, total_samples)
end_search = min(end_search, total_samples)
# برش منطقه جستجو
region = audio_np[start_search:end_search]
if len(region) == 0:
split_points.append(total_samples)
break
# محاسبه انرژی
rms = librosa.feature.rms(y=region, frame_length=frame_length, hop_length=hop_length)[0]
# پیدا کردن کم‌انرژی‌ترین نقطه (Local Minimum)
min_idx = np.argmin(rms)
local_cut_sample = min_idx * hop_length
# --- تکنیک Zero Crossing ---
# نقطه برش تقریبی را پیدا کردیم. حالا باید دقیقاً روی محور صفر برش دهیم
# تا صدای "کلیک" ایجاد نشود.
cut_absolute_approx = start_search + local_cut_sample
# جستجو در اطراف نقطه تقریبی (±500 نمونه) برای پیدا کردن صفر
search_radius = 500
zc_start = max(0, cut_absolute_approx - search_radius)
zc_end = min(total_samples, cut_absolute_approx + search_radius)
zc_region = audio_np[zc_start:zc_end]
# پیدا کردن نزدیک‌ترین عبور از صفر
# (جایی که علامت عدد تغییر می‌کند)
zero_crossings = np.where(np.diff(np.signbit(zc_region)))[0]
if len(zero_crossings) > 0:
# نزدیک‌ترین صفر به وسط بازه جستجو
closest_zc = zero_crossings[np.argmin(np.abs(zero_crossings - search_radius))]
best_cut_absolute = zc_start + closest_zc
else:
# اگر صفر پیدا نشد (خیلی بعید)، همان نقطه کم‌انرژی را بگیر
best_cut_absolute = cut_absolute_approx
split_points.append(best_cut_absolute)
current_pos = best_cut_absolute
return split_points
@spaces.GPU()
def vevo_timbre(content_wav, reference_wav):
session_id = str(uuid.uuid4())[:8]
temp_content_path = f"wav/c_{session_id}.wav"
temp_reference_path = f"wav/r_{session_id}.wav"
output_path = f"wav/out_{session_id}.wav"
if content_wav is None or reference_wav is None:
raise ValueError("Please upload audio files")
try:
SR = 24000
# --- ورودی ---
if isinstance(content_wav, tuple):
content_sr, content_data = content_wav if isinstance(content_wav[0], int) else (content_wav[1], content_wav[0])
else:
content_sr, content_data = content_wav
if len(content_data.shape) > 1: content_data = np.mean(content_data, axis=1)
content_tensor = torch.FloatTensor(content_data).unsqueeze(0)
if content_sr != SR:
content_tensor = torchaudio.functional.resample(content_tensor, content_sr, SR)
content_tensor = content_tensor / (torch.max(torch.abs(content_tensor)) + 1e-6) * 0.95
content_full_np = content_tensor.squeeze().numpy()
# --- رفرنس ---
if isinstance(reference_wav, tuple):
ref_sr, ref_data = reference_wav if isinstance(reference_wav[0], int) else (reference_wav[1], reference_wav[0])
else:
ref_sr, ref_data = reference_wav
if len(ref_data.shape) > 1: ref_data = np.mean(ref_data, axis=1)
ref_tensor = torch.FloatTensor(ref_data).unsqueeze(0)
if ref_sr != SR:
ref_tensor = torchaudio.functional.resample(ref_tensor, ref_sr, SR)
ref_tensor = ref_tensor / (torch.max(torch.abs(ref_tensor)) + 1e-6) * 0.95
if ref_tensor.shape[1] > SR * 20: ref_tensor = ref_tensor[:, :SR * 20]
save_audio_pcm16(ref_tensor, temp_reference_path, SR)
pipeline = get_pipeline()
# --- تقسیم‌بندی پیشرفته ---
print(f"[{session_id}] Finding best energy split points (Zero-Crossing)...")
split_points = find_advanced_split_points(content_full_np, SR)
print(f"[{session_id}] Split into {len(split_points)-1} chunks.")
final_output = []
PADDING_SAMPLES = int(2.5 * SR) # کمی پدینگ بیشتر برای اطمینان
for i in range(len(split_points) - 1):
start = split_points[i]
end = split_points[i+1]
read_start = max(0, start - PADDING_SAMPLES)
read_end = end
chunk_input = content_full_np[read_start:read_end]
save_audio_pcm16(torch.FloatTensor(chunk_input).unsqueeze(0), temp_content_path, SR)
try:
gen = pipeline.inference_fm(
src_wav_path=temp_content_path,
timbre_ref_wav_path=temp_reference_path,
flow_matching_steps=32,
)
if torch.isnan(gen).any(): gen = torch.nan_to_num(gen, nan=0.0)
gen_np = gen.detach().cpu().squeeze().numpy()
trim_amount = start - read_start
if len(gen_np) > trim_amount:
valid_audio = gen_np[trim_amount:]
# اتصال
if len(final_output) > 0:
# اگر برش روی سکوت نبوده (اجباری)، باید کمی بیشتر کراس‌فید کنیم
# تا تغییر ناگهانی لحن مخفی شود.
fade_len = int(0.03 * SR) # 30ms standard
if len(final_output[-1]) > fade_len and len(valid_audio) > fade_len:
fade_out = np.linspace(1, 0, fade_len)
fade_in = np.linspace(0, 1, fade_len)
prev_tail = final_output[-1][-fade_len:]
curr_head = valid_audio[:fade_len]
mixed = (prev_tail * fade_out) + (curr_head * fade_in)
final_output[-1][-fade_len:] = mixed
valid_audio = valid_audio[fade_len:]
final_output.append(valid_audio)
except Exception as e:
print(f"Error segment {i}: {e}")
# پر کردن جای خالی با سکوت برای به هم نریختن تایم
final_output.append(np.zeros(end - start))
if len(final_output) > 0:
full_audio = np.concatenate(final_output)
else:
full_audio = np.zeros(SR)
save_audio_pcm16(full_audio, output_path, SR)
return output_path
finally:
if os.path.exists(temp_content_path): os.remove(temp_content_path)
if os.path.exists(temp_reference_path): os.remove(temp_reference_path)
with gr.Blocks(title="Vevo-Timbre (Pro Logic)") as demo:
gr.Markdown("## Vevo-Timbre: Zero-Shot Voice Conversion")
gr.Markdown("Robust Splitting: Uses Minimum Energy + Zero Crossing detection to handle fast speech without glitches.")
with gr.Row():
with gr.Column():
timbre_content = gr.Audio(label="Source Audio", type="numpy")
timbre_reference = gr.Audio(label="Target Timbre", type="numpy")
timbre_button = gr.Button("Generate", variant="primary")
with gr.Column():
timbre_output = gr.Audio(label="Result")
timbre_button.click(vevo_timbre, inputs=[timbre_content, timbre_reference], outputs=timbre_output)
demo.launch()