Sada / app.py
Opera8's picture
Update app.py
a6cd2a1 verified
raw
history blame
12.7 kB
import os
import sys
import importlib.util
import site
import json
import torch
import gradio as gr
import torchaudio
import numpy as np
from huggingface_hub import snapshot_download, hf_hub_download
import subprocess
import re
import spaces
import uuid
import soundfile as sf
# منابع ضروری
downloaded_resources = {
"configs": False,
"tokenizer_vq8192": False,
"fmt_Vq8192ToMels": False,
"vocoder": False
}
def install_espeak():
try:
result = subprocess.run(["which", "espeak-ng"], capture_output=True, text=True)
if result.returncode != 0:
print("Installing espeak-ng...")
subprocess.run(["apt-get", "update"], check=True)
subprocess.run(["apt-get", "install", "-y", "espeak-ng", "espeak-ng-data"], check=True)
except Exception as e:
print(f"Error installing espeak-ng: {e}")
install_espeak()
def patch_langsegment_init():
try:
spec = importlib.util.find_spec("LangSegment")
if spec is None or spec.origin is None: return
init_path = os.path.join(os.path.dirname(spec.origin), '__init__.py')
if not os.path.exists(init_path):
for site_pkg_path in site.getsitepackages():
potential_path = os.path.join(site_pkg_path, 'LangSegment', '__init__.py')
if os.path.exists(potential_path):
init_path = potential_path
break
else: return
with open(init_path, 'r') as f: lines = f.readlines()
modified = False
new_lines = []
target_line_prefix = "from .LangSegment import"
for line in lines:
if line.strip().startswith(target_line_prefix) and ('setLangfilters' in line or 'getLangfilters' in line):
mod_line = line.replace(',setLangfilters', '').replace(',getLangfilters', '')
mod_line = mod_line.replace('setLangfilters,', '').replace('getLangfilters,', '').rstrip(',')
new_lines.append(mod_line + '\n')
modified = True
else:
new_lines.append(line)
if modified:
with open(init_path, 'w') as f: f.writelines(new_lines)
try:
import LangSegment
importlib.reload(LangSegment)
except: pass
except: pass
patch_langsegment_init()
if not os.path.exists("Amphion"):
subprocess.run(["git", "clone", "https://github.com/open-mmlab/Amphion.git"])
os.chdir("Amphion")
else:
if not os.getcwd().endswith("Amphion"):
os.chdir("Amphion")
if os.path.dirname(os.path.abspath("Amphion")) not in sys.path:
sys.path.append(os.path.dirname(os.path.abspath("Amphion")))
os.makedirs("wav", exist_ok=True)
os.makedirs("ckpts/Vevo", exist_ok=True)
from models.vc.vevo.vevo_utils import VevoInferencePipeline
def save_audio_pcm16(waveform, output_path, sample_rate=24000):
try:
if isinstance(waveform, torch.Tensor):
waveform = waveform.detach().cpu()
if waveform.dim() == 2 and waveform.shape[0] == 1:
waveform = waveform.squeeze(0)
waveform = waveform.numpy()
sf.write(output_path, waveform, sample_rate, subtype='PCM_16')
except Exception as e:
print(f"Save error: {e}")
raise e
def setup_configs():
if downloaded_resources["configs"]: return
config_path = "models/vc/vevo/config"
os.makedirs(config_path, exist_ok=True)
config_files = ["Vq8192ToMels.json", "Vocoder.json"]
for file in config_files:
file_path = f"{config_path}/{file}"
if not os.path.exists(file_path):
try:
file_data = hf_hub_download(repo_id="amphion/Vevo", filename=f"config/{file}", repo_type="model")
subprocess.run(["cp", file_data, file_path])
except Exception as e: print(f"Error downloading config {file}: {e}")
downloaded_resources["configs"] = True
setup_configs()
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
print(f"Using device: {device}")
inference_pipelines = {}
def preload_all_resources():
print("Preloading resources...")
setup_configs()
global downloaded_content_style_tokenizer_path, downloaded_fmt_path, downloaded_vocoder_path
if not downloaded_resources["tokenizer_vq8192"]:
local_dir = snapshot_download(repo_id="amphion/Vevo", repo_type="model", cache_dir="./ckpts/Vevo", allow_patterns=["tokenizer/vq8192/*"])
downloaded_content_style_tokenizer_path = local_dir
downloaded_resources["tokenizer_vq8192"] = True
if not downloaded_resources["fmt_Vq8192ToMels"]:
local_dir = snapshot_download(repo_id="amphion/Vevo", repo_type="model", cache_dir="./ckpts/Vevo", allow_patterns=["acoustic_modeling/Vq8192ToMels/*"])
downloaded_fmt_path = local_dir
downloaded_resources["fmt_Vq8192ToMels"] = True
if not downloaded_resources["vocoder"]:
local_dir = snapshot_download(repo_id="amphion/Vevo", repo_type="model", cache_dir="./ckpts/Vevo", allow_patterns=["acoustic_modeling/Vocoder/*"])
downloaded_vocoder_path = local_dir
downloaded_resources["vocoder"] = True
print("Resources ready.")
downloaded_content_style_tokenizer_path = None
downloaded_fmt_path = None
downloaded_vocoder_path = None
preload_all_resources()
def get_pipeline():
if "timbre" in inference_pipelines:
return inference_pipelines["timbre"]
pipeline = VevoInferencePipeline(
content_style_tokenizer_ckpt_path=os.path.join(downloaded_content_style_tokenizer_path, "tokenizer/vq8192"),
fmt_cfg_path="./models/vc/vevo/config/Vq8192ToMels.json",
fmt_ckpt_path=os.path.join(downloaded_fmt_path, "acoustic_modeling/Vq8192ToMels"),
vocoder_cfg_path="./models/vc/vevo/config/Vocoder.json",
vocoder_ckpt_path=os.path.join(downloaded_vocoder_path, "acoustic_modeling/Vocoder"),
device=device,
)
inference_pipelines["timbre"] = pipeline
return pipeline
@spaces.GPU()
def vevo_timbre(content_wav, reference_wav):
session_id = str(uuid.uuid4())[:8]
temp_content_path = f"wav/c_{session_id}.wav"
temp_reference_path = f"wav/r_{session_id}.wav"
output_path = f"wav/out_{session_id}.wav"
if content_wav is None or reference_wav is None:
raise ValueError("Please upload audio files")
try:
# --- آماده سازی Content ---
if isinstance(content_wav, tuple):
content_sr, content_data = content_wav if isinstance(content_wav[0], int) else (content_wav[1], content_wav[0])
else:
content_sr, content_data = content_wav
if len(content_data.shape) > 1 and content_data.shape[1] > 1:
content_data = np.mean(content_data, axis=1)
content_tensor = torch.FloatTensor(content_data).unsqueeze(0)
if content_sr != 24000:
content_tensor = torchaudio.functional.resample(content_tensor, content_sr, 24000)
content_sr = 24000
# نرمال سازی
content_tensor = content_tensor / (torch.max(torch.abs(content_tensor)) + 1e-6) * 0.95
# --- آماده سازی Reference ---
if isinstance(reference_wav, tuple):
ref_sr, ref_data = reference_wav if isinstance(reference_wav[0], int) else (reference_wav[1], reference_wav[0])
else:
ref_sr, ref_data = reference_wav
if len(ref_data.shape) > 1 and ref_data.shape[1] > 1:
ref_data = np.mean(ref_data, axis=1)
ref_tensor = torch.FloatTensor(ref_data).unsqueeze(0)
if ref_sr != 24000:
ref_tensor = torchaudio.functional.resample(ref_tensor, ref_sr, 24000)
ref_sr = 24000
# تنظیم لول رفرنس
ref_max = torch.max(torch.abs(ref_tensor)) + 1e-6
ref_tensor = ref_tensor / ref_max * 0.95
if ref_tensor.shape[1] > 24000 * 20:
ref_tensor = ref_tensor[:, :24000 * 20]
save_audio_pcm16(ref_tensor, temp_reference_path, ref_sr)
# --- منطق Chunking (اصلاح شده: همپوشانی کوتاه) ---
pipeline = get_pipeline()
SR = 24000
CHUNK_LEN = 10 * SR # 10 ثانیه اصلی
# تغییر مهم: کاهش همپوشانی به 0.1 ثانیه (100 میلی ثانیه)
# این باعث می‌شود اکو از بین برود ولی اتصال همچنان نرم باشد
OVERLAP = int(0.1 * SR)
INPUT_SIZE = CHUNK_LEN + OVERLAP
total_samples = content_tensor.shape[1]
print(f"[{session_id}] Processing (High Quality 64 Steps)...")
final_parts = []
overlap_buffer = None
for start in range(0, total_samples, CHUNK_LEN):
end_input = min(start + INPUT_SIZE, total_samples)
current_input_chunk = content_tensor[:, start:end_input]
save_audio_pcm16(current_input_chunk, temp_content_path, SR)
try:
gen = pipeline.inference_fm(
src_wav_path=temp_content_path,
timbre_ref_wav_path=temp_reference_path,
flow_matching_steps=64, # کیفیت بالا
)
if torch.isnan(gen).any(): gen = torch.nan_to_num(gen, nan=0.0)
if gen.dim() == 1: gen = gen.unsqueeze(0)
gen = gen.cpu().squeeze(0).numpy()
current_len = len(gen)
if overlap_buffer is not None:
mix_len = len(overlap_buffer)
if current_len < mix_len:
mix_len = current_len
overlap_buffer = overlap_buffer[:mix_len]
head_to_mix = gen[:mix_len]
body_rest = gen[mix_len:]
# میکس سریع (Fast Cross-Fade)
alpha = np.linspace(0, 1, mix_len)
blended_segment = (overlap_buffer * (1 - alpha)) + (head_to_mix * alpha)
final_parts.append(blended_segment)
if len(body_rest) > OVERLAP:
pure_body = body_rest[:-OVERLAP]
final_parts.append(pure_body)
overlap_buffer = body_rest[-OVERLAP:]
else:
final_parts.append(body_rest)
overlap_buffer = None
else:
if current_len > OVERLAP:
final_parts.append(gen[:-OVERLAP])
overlap_buffer = gen[-OVERLAP:]
else:
final_parts.append(gen)
overlap_buffer = None
except Exception as e:
print(f"Error in chunk: {e}")
missing_len = end_input - start
if overlap_buffer is not None:
missing_len -= len(overlap_buffer)
final_parts.append(overlap_buffer)
overlap_buffer = None
final_parts.append(np.zeros(max(0, missing_len)))
if overlap_buffer is not None:
final_parts.append(overlap_buffer)
if len(final_parts) > 0:
full_audio = np.concatenate(final_parts)
else:
full_audio = np.zeros(24000)
save_audio_pcm16(full_audio, output_path, SR)
return output_path
finally:
if os.path.exists(temp_content_path): os.remove(temp_content_path)
if os.path.exists(temp_reference_path): os.remove(temp_reference_path)
with gr.Blocks(title="Vevo-Timbre (No Echo)") as demo:
gr.Markdown("## Vevo-Timbre: Zero-Shot Voice Conversion")
gr.Markdown("نسخه اصلاح شده: حذف اکو در نقاط اتصال + کیفیت بالای ۶۴ مرحله‌ای.")
with gr.Row():
with gr.Column():
timbre_content = gr.Audio(label="Source Audio", type="numpy")
timbre_reference = gr.Audio(label="Target Timbre", type="numpy")
timbre_button = gr.Button("Generate", variant="primary")
with gr.Column():
timbre_output = gr.Audio(label="Result")
timbre_button.click(vevo_timbre, inputs=[timbre_content, timbre_reference], outputs=timbre_output)
demo.launch()