|
|
import os |
|
|
import sys |
|
|
import importlib.util |
|
|
import site |
|
|
import json |
|
|
import torch |
|
|
import gradio as gr |
|
|
import torchaudio |
|
|
import numpy as np |
|
|
from huggingface_hub import snapshot_download, hf_hub_download |
|
|
import subprocess |
|
|
import re |
|
|
import spaces |
|
|
import soundfile as sf |
|
|
|
|
|
|
|
|
downloaded_resources = { |
|
|
"configs": False, |
|
|
"tokenizer_vq8192": False, |
|
|
"fmt_Vq8192ToMels": False, |
|
|
"vocoder": False |
|
|
} |
|
|
|
|
|
def install_espeak(): |
|
|
"""Detect and install espeak-ng dependency""" |
|
|
try: |
|
|
result = subprocess.run(["which", "espeak-ng"], capture_output=True, text=True) |
|
|
if result.returncode != 0: |
|
|
print("Installing espeak-ng...") |
|
|
subprocess.run(["apt-get", "update"], check=True) |
|
|
subprocess.run(["apt-get", "install", "-y", "espeak-ng", "espeak-ng-data"], check=True) |
|
|
else: |
|
|
print("espeak-ng is already installed.") |
|
|
except Exception as e: |
|
|
print(f"Error installing espeak-ng: {e}") |
|
|
|
|
|
install_espeak() |
|
|
|
|
|
def patch_langsegment_init(): |
|
|
try: |
|
|
spec = importlib.util.find_spec("LangSegment") |
|
|
if spec is None or spec.origin is None: return |
|
|
init_path = os.path.join(os.path.dirname(spec.origin), '__init__.py') |
|
|
if not os.path.exists(init_path): |
|
|
for site_pkg_path in site.getsitepackages(): |
|
|
potential_path = os.path.join(site_pkg_path, 'LangSegment', '__init__.py') |
|
|
if os.path.exists(potential_path): |
|
|
init_path = potential_path |
|
|
break |
|
|
else: return |
|
|
|
|
|
with open(init_path, 'r') as f: lines = f.readlines() |
|
|
modified = False |
|
|
new_lines = [] |
|
|
target_line_prefix = "from .LangSegment import" |
|
|
|
|
|
for line in lines: |
|
|
if line.strip().startswith(target_line_prefix) and ('setLangfilters' in line or 'getLangfilters' in line): |
|
|
mod_line = line.replace(',setLangfilters', '').replace(',getLangfilters', '') |
|
|
mod_line = mod_line.replace('setLangfilters,', '').replace('getLangfilters,', '').rstrip(',') |
|
|
new_lines.append(mod_line + '\n') |
|
|
modified = True |
|
|
else: |
|
|
new_lines.append(line) |
|
|
|
|
|
if modified: |
|
|
with open(init_path, 'w') as f: f.writelines(new_lines) |
|
|
try: |
|
|
import LangSegment |
|
|
importlib.reload(LangSegment) |
|
|
except: pass |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error patching LangSegment: {e}") |
|
|
|
|
|
patch_langsegment_init() |
|
|
|
|
|
if not os.path.exists("Amphion"): |
|
|
subprocess.run(["git", "clone", "https://github.com/open-mmlab/Amphion.git"]) |
|
|
os.chdir("Amphion") |
|
|
else: |
|
|
if not os.getcwd().endswith("Amphion"): |
|
|
os.chdir("Amphion") |
|
|
|
|
|
if os.path.dirname(os.path.abspath("Amphion")) not in sys.path: |
|
|
sys.path.append(os.path.dirname(os.path.abspath("Amphion"))) |
|
|
|
|
|
os.makedirs("wav", exist_ok=True) |
|
|
os.makedirs("ckpts/Vevo", exist_ok=True) |
|
|
|
|
|
from models.vc.vevo.vevo_utils import VevoInferencePipeline |
|
|
|
|
|
|
|
|
def my_save_audio(waveform, output_path, sample_rate=24000): |
|
|
try: |
|
|
if isinstance(waveform, torch.Tensor): |
|
|
waveform = waveform.detach().cpu() |
|
|
if waveform.dim() == 2 and waveform.shape[0] == 1: |
|
|
waveform = waveform.squeeze(0) |
|
|
waveform = waveform.numpy() |
|
|
|
|
|
sf.write(output_path, waveform, sample_rate) |
|
|
print(f"Audio saved successfully to {output_path}") |
|
|
except Exception as e: |
|
|
print(f"Failed to save audio with soundfile: {e}") |
|
|
raise e |
|
|
|
|
|
def setup_configs(): |
|
|
if downloaded_resources["configs"]: return |
|
|
config_path = "models/vc/vevo/config" |
|
|
os.makedirs(config_path, exist_ok=True) |
|
|
config_files = ["Vq8192ToMels.json", "Vocoder.json"] |
|
|
|
|
|
for file in config_files: |
|
|
file_path = f"{config_path}/{file}" |
|
|
if not os.path.exists(file_path): |
|
|
try: |
|
|
file_data = hf_hub_download(repo_id="amphion/Vevo", filename=f"config/{file}", repo_type="model") |
|
|
subprocess.run(["cp", file_data, file_path]) |
|
|
except Exception as e: print(f"Error downloading config {file}: {e}") |
|
|
downloaded_resources["configs"] = True |
|
|
|
|
|
setup_configs() |
|
|
|
|
|
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu") |
|
|
print(f"Using device: {device}") |
|
|
|
|
|
inference_pipelines = {} |
|
|
|
|
|
def preload_all_resources(): |
|
|
print("Preloading Timbre resources...") |
|
|
setup_configs() |
|
|
|
|
|
global downloaded_content_style_tokenizer_path |
|
|
global downloaded_fmt_path |
|
|
global downloaded_vocoder_path |
|
|
|
|
|
if not downloaded_resources["tokenizer_vq8192"]: |
|
|
local_dir = snapshot_download(repo_id="amphion/Vevo", repo_type="model", cache_dir="./ckpts/Vevo", allow_patterns=["tokenizer/vq8192/*"]) |
|
|
downloaded_content_style_tokenizer_path = local_dir |
|
|
downloaded_resources["tokenizer_vq8192"] = True |
|
|
|
|
|
if not downloaded_resources["fmt_Vq8192ToMels"]: |
|
|
local_dir = snapshot_download(repo_id="amphion/Vevo", repo_type="model", cache_dir="./ckpts/Vevo", allow_patterns=["acoustic_modeling/Vq8192ToMels/*"]) |
|
|
downloaded_fmt_path = local_dir |
|
|
downloaded_resources["fmt_Vq8192ToMels"] = True |
|
|
|
|
|
if not downloaded_resources["vocoder"]: |
|
|
local_dir = snapshot_download(repo_id="amphion/Vevo", repo_type="model", cache_dir="./ckpts/Vevo", allow_patterns=["acoustic_modeling/Vocoder/*"]) |
|
|
downloaded_vocoder_path = local_dir |
|
|
downloaded_resources["vocoder"] = True |
|
|
|
|
|
print("Timbre resources ready!") |
|
|
|
|
|
downloaded_content_style_tokenizer_path = None |
|
|
downloaded_fmt_path = None |
|
|
downloaded_vocoder_path = None |
|
|
|
|
|
preload_all_resources() |
|
|
|
|
|
def get_pipeline(): |
|
|
if "timbre" in inference_pipelines: |
|
|
return inference_pipelines["timbre"] |
|
|
|
|
|
content_style_tokenizer_ckpt_path = os.path.join(downloaded_content_style_tokenizer_path, "tokenizer/vq8192") |
|
|
fmt_cfg_path = "./models/vc/vevo/config/Vq8192ToMels.json" |
|
|
fmt_ckpt_path = os.path.join(downloaded_fmt_path, "acoustic_modeling/Vq8192ToMels") |
|
|
vocoder_cfg_path = "./models/vc/vevo/config/Vocoder.json" |
|
|
vocoder_ckpt_path = os.path.join(downloaded_vocoder_path, "acoustic_modeling/Vocoder") |
|
|
|
|
|
pipeline = VevoInferencePipeline( |
|
|
content_style_tokenizer_ckpt_path=content_style_tokenizer_ckpt_path, |
|
|
fmt_cfg_path=fmt_cfg_path, |
|
|
fmt_ckpt_path=fmt_ckpt_path, |
|
|
vocoder_cfg_path=vocoder_cfg_path, |
|
|
vocoder_ckpt_path=vocoder_ckpt_path, |
|
|
device=device, |
|
|
) |
|
|
|
|
|
inference_pipelines["timbre"] = pipeline |
|
|
return pipeline |
|
|
|
|
|
@spaces.GPU() |
|
|
def vevo_timbre(content_wav, reference_wav): |
|
|
temp_content_path = "wav/temp_content.wav" |
|
|
temp_reference_path = "wav/temp_reference.wav" |
|
|
output_path = "wav/output_vevotimbre.wav" |
|
|
|
|
|
if content_wav is None or reference_wav is None: |
|
|
raise ValueError("Please upload audio files") |
|
|
|
|
|
|
|
|
if isinstance(content_wav, tuple): |
|
|
content_sr, content_data = content_wav if isinstance(content_wav[0], int) else (content_wav[1], content_wav[0]) |
|
|
else: |
|
|
content_sr, content_data = content_wav |
|
|
|
|
|
if len(content_data.shape) > 1 and content_data.shape[1] > 1: |
|
|
content_data = np.mean(content_data, axis=1) |
|
|
|
|
|
content_tensor = torch.FloatTensor(content_data).unsqueeze(0) |
|
|
if content_sr != 24000: |
|
|
content_tensor = torchaudio.functional.resample(content_tensor, content_sr, 24000) |
|
|
content_sr = 24000 |
|
|
|
|
|
content_tensor = content_tensor / (torch.max(torch.abs(content_tensor)) + 1e-6) * 0.95 |
|
|
|
|
|
|
|
|
if isinstance(reference_wav, tuple): |
|
|
ref_sr, ref_data = reference_wav if isinstance(reference_wav[0], int) else (reference_wav[1], reference_wav[0]) |
|
|
else: |
|
|
ref_sr, ref_data = reference_wav |
|
|
|
|
|
if len(ref_data.shape) > 1 and ref_data.shape[1] > 1: |
|
|
ref_data = np.mean(ref_data, axis=1) |
|
|
|
|
|
ref_tensor = torch.FloatTensor(ref_data).unsqueeze(0) |
|
|
if ref_sr != 24000: |
|
|
ref_tensor = torchaudio.functional.resample(ref_tensor, ref_sr, 24000) |
|
|
ref_sr = 24000 |
|
|
|
|
|
ref_tensor = ref_tensor / (torch.max(torch.abs(ref_tensor)) + 1e-6) * 0.95 |
|
|
|
|
|
|
|
|
|
|
|
if ref_tensor.shape[1] > 24000 * 20: |
|
|
ref_tensor = ref_tensor[:, :24000 * 20] |
|
|
|
|
|
|
|
|
sf.write(temp_reference_path, ref_tensor.squeeze().cpu().numpy(), ref_sr) |
|
|
|
|
|
print(f"Total Duration: {content_tensor.shape[1]/24000:.2f}s") |
|
|
|
|
|
|
|
|
pipeline = get_pipeline() |
|
|
|
|
|
CHUNK_DURATION = 15 |
|
|
CHUNK_SAMPLES = CHUNK_DURATION * 24000 |
|
|
total_samples = content_tensor.shape[1] |
|
|
|
|
|
generated_chunks = [] |
|
|
|
|
|
|
|
|
for i in range(0, total_samples, CHUNK_SAMPLES): |
|
|
end = min(i + CHUNK_SAMPLES, total_samples) |
|
|
chunk = content_tensor[:, i:end] |
|
|
|
|
|
print(f"Processing Chunk: {i/24000:.1f}s to {end/24000:.1f}s") |
|
|
|
|
|
|
|
|
sf.write(temp_content_path, chunk.squeeze().cpu().numpy(), 24000) |
|
|
|
|
|
try: |
|
|
|
|
|
gen_chunk = pipeline.inference_fm( |
|
|
src_wav_path=temp_content_path, |
|
|
timbre_ref_wav_path=temp_reference_path, |
|
|
flow_matching_steps=32, |
|
|
) |
|
|
|
|
|
|
|
|
if torch.isnan(gen_chunk).any() or torch.isinf(gen_chunk).any(): |
|
|
print("Warning: NaN in chunk, fixing...") |
|
|
gen_chunk = torch.nan_to_num(gen_chunk, nan=0.0, posinf=0.95, neginf=-0.95) |
|
|
|
|
|
|
|
|
if gen_chunk.dim() == 1: |
|
|
gen_chunk = gen_chunk.unsqueeze(0) |
|
|
generated_chunks.append(gen_chunk.cpu()) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error processing chunk starting at {i}: {e}") |
|
|
|
|
|
silence = torch.zeros_like(chunk) |
|
|
generated_chunks.append(silence) |
|
|
|
|
|
|
|
|
if not generated_chunks: |
|
|
raise ValueError("No audio generated") |
|
|
|
|
|
final_audio = torch.cat(generated_chunks, dim=1) |
|
|
|
|
|
print(f"Final Audio Duration: {final_audio.shape[1]/24000:.2f}s") |
|
|
|
|
|
|
|
|
my_save_audio(final_audio, output_path=output_path) |
|
|
return output_path |
|
|
|
|
|
|
|
|
with gr.Blocks(title="Vevo-Timbre (Long Audio Fix)") as demo: |
|
|
gr.Markdown("## Vevo-Timbre: Zero-Shot Voice Conversion (Unlimited Length)") |
|
|
gr.Markdown("این نسخه فایلهای طولانی را به صورت اتوماتیک به تکههای ۱۵ ثانیهای تقسیم کرده و پردازش میکند تا صدا خراب نشود.") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
timbre_content = gr.Audio(label="Source Audio (صدای اصلی - هر چقدر طولانی باشد مشکلی نیست)", type="numpy") |
|
|
timbre_reference = gr.Audio(label="Target Timbre (صدای هدف - ۲۰ ثانیه اول استفاده میشود)", type="numpy") |
|
|
timbre_button = gr.Button("Generate (ساخت صدا)", variant="primary") |
|
|
with gr.Column(): |
|
|
timbre_output = gr.Audio(label="Result (خروجی نهایی)") |
|
|
|
|
|
timbre_button.click(vevo_timbre, inputs=[timbre_content, timbre_reference], outputs=timbre_output) |
|
|
|
|
|
demo.launch() |