|
|
import os, sys, traceback |
|
|
from transformers import HubertModel |
|
|
import librosa |
|
|
from torch import nn |
|
|
import torch |
|
|
|
|
|
import json |
|
|
os.environ["PYTORCH_ENABLE_MPS_FALLBACK"] = "1" |
|
|
os.environ["PYTORCH_MPS_HIGH_WATERMARK_RATIO"] = "0.0" |
|
|
|
|
|
device=sys.argv[1] |
|
|
n_part = int(sys.argv[2]) |
|
|
i_part = int(sys.argv[3]) |
|
|
if len(sys.argv) == 6: |
|
|
exp_dir = sys.argv[4] |
|
|
version = sys.argv[5] |
|
|
else: |
|
|
i_gpu = sys.argv[4] |
|
|
exp_dir = sys.argv[5] |
|
|
os.environ["CUDA_VISIBLE_DEVICES"] = str(i_gpu) |
|
|
version = sys.argv[6] |
|
|
import torch |
|
|
import torch.nn.functional as F |
|
|
import soundfile as sf |
|
|
import numpy as np |
|
|
from fairseq import checkpoint_utils |
|
|
|
|
|
|
|
|
if torch.cuda.is_available(): |
|
|
device = "cuda" |
|
|
elif torch.backends.mps.is_available(): |
|
|
device = "mps" |
|
|
|
|
|
version_config_paths = [ |
|
|
os.path.join("", "32k.json"), |
|
|
os.path.join("", "40k.json"), |
|
|
os.path.join("", "48k.json"), |
|
|
os.path.join("", "48k_v2.json"), |
|
|
os.path.join("", "40k.json"), |
|
|
os.path.join("", "32k_v2.json"), |
|
|
] |
|
|
|
|
|
class Config: |
|
|
def __init__(self): |
|
|
self.device = "cuda:0" if torch.cuda.is_available() else "cpu" |
|
|
self.is_half = self.device != "cpu" |
|
|
self.gpu_name = ( |
|
|
torch.cuda.get_device_name(int(self.device.split(":")[-1])) |
|
|
if self.device.startswith("cuda") |
|
|
else None |
|
|
) |
|
|
self.json_config = self.load_config_json() |
|
|
self.gpu_mem = None |
|
|
self.x_pad, self.x_query, self.x_center, self.x_max = self.device_config() |
|
|
|
|
|
def load_config_json(self) -> dict: |
|
|
configs = {} |
|
|
for config_file in version_config_paths: |
|
|
config_path = os.path.join("configs", config_file) |
|
|
with open(config_path, "r") as f: |
|
|
configs[config_file] = json.load(f) |
|
|
return configs |
|
|
|
|
|
def has_mps(self) -> bool: |
|
|
|
|
|
return torch.backends.mps.is_available() |
|
|
|
|
|
def has_xpu(self) -> bool: |
|
|
|
|
|
return hasattr(torch, "xpu") and torch.xpu.is_available() |
|
|
|
|
|
def set_precision(self, precision): |
|
|
if precision not in ["fp32", "fp16"]: |
|
|
raise ValueError("Invalid precision type. Must be 'fp32' or 'fp16'.") |
|
|
|
|
|
fp16_run_value = precision == "fp16" |
|
|
preprocess_target_version = "3.7" if precision == "fp16" else "3.0" |
|
|
preprocess_path = os.path.join( |
|
|
os.path.dirname(__file__), |
|
|
os.pardir, |
|
|
"" |
|
|
"preprocess.py", |
|
|
) |
|
|
|
|
|
for config_path in version_config_paths: |
|
|
full_config_path = os.path.join("configs", config_path) |
|
|
try: |
|
|
with open(full_config_path, "r") as f: |
|
|
config = json.load(f) |
|
|
config["train"]["fp16_run"] = fp16_run_value |
|
|
with open(full_config_path, "w") as f: |
|
|
json.dump(config, f, indent=4) |
|
|
except FileNotFoundError: |
|
|
print(f"File not found: {full_config_path}") |
|
|
|
|
|
if os.path.exists(preprocess_path): |
|
|
with open(preprocess_path, "r") as f: |
|
|
preprocess_content = f.read() |
|
|
preprocess_content = preprocess_content.replace( |
|
|
"3.0" if precision == "fp16" else "3.7", preprocess_target_version |
|
|
) |
|
|
with open(preprocess_path, "w") as f: |
|
|
f.write(preprocess_content) |
|
|
|
|
|
return f"Overwritten preprocess and config.json to use {precision}." |
|
|
|
|
|
def get_precision(self): |
|
|
if not version_config_paths: |
|
|
raise FileNotFoundError("No configuration paths provided.") |
|
|
|
|
|
full_config_path = os.path.join("configs", version_config_paths[0]) |
|
|
try: |
|
|
with open(full_config_path, "r") as f: |
|
|
config = json.load(f) |
|
|
fp16_run_value = config["train"].get("fp16_run", False) |
|
|
precision = "fp16" if fp16_run_value else "fp32" |
|
|
return precision |
|
|
except FileNotFoundError: |
|
|
print(f"File not found: {full_config_path}") |
|
|
return None |
|
|
|
|
|
def device_config(self) -> tuple: |
|
|
if self.device.startswith("cuda"): |
|
|
self.set_cuda_config() |
|
|
elif self.has_mps(): |
|
|
self.device = "mps" |
|
|
self.is_half = False |
|
|
self.set_precision("fp32") |
|
|
else: |
|
|
self.device = "cpu" |
|
|
self.is_half = False |
|
|
self.set_precision("fp32") |
|
|
|
|
|
|
|
|
x_pad, x_query, x_center, x_max = ( |
|
|
(3, 10, 60, 65) if self.is_half else (1, 6, 38, 41) |
|
|
) |
|
|
if self.gpu_mem is not None and self.gpu_mem <= 4: |
|
|
|
|
|
x_pad, x_query, x_center, x_max = (1, 5, 30, 32) |
|
|
|
|
|
return x_pad, x_query, x_center, x_max |
|
|
|
|
|
def set_cuda_config(self): |
|
|
i_device = int(self.device.split(":")[-1]) |
|
|
self.gpu_name = torch.cuda.get_device_name(i_device) |
|
|
low_end_gpus = ["16", "P40", "P10", "1060", "1070", "1080"] |
|
|
if ( |
|
|
any(gpu in self.gpu_name for gpu in low_end_gpus) |
|
|
and "V100" not in self.gpu_name.upper() |
|
|
): |
|
|
self.is_half = False |
|
|
self.set_precision("fp32") |
|
|
|
|
|
self.gpu_mem = torch.cuda.get_device_properties(i_device).total_memory // ( |
|
|
1024**3 |
|
|
) |
|
|
config = Config() |
|
|
|
|
|
def load_audio(file, sample_rate): |
|
|
try: |
|
|
file = file.strip(" ").strip('"').strip("\n").strip('"').strip(" ") |
|
|
audio, sr = sf.read(file) |
|
|
if len(audio.shape) > 1: |
|
|
audio = librosa.to_mono(audio.T) |
|
|
if sr != sample_rate: |
|
|
audio = librosa.resample(audio, orig_sr=sr, target_sr=sample_rate) |
|
|
except Exception as error: |
|
|
raise RuntimeError(f"An error occurred loading the audio: {error}") |
|
|
|
|
|
return audio.flatten() |
|
|
|
|
|
|
|
|
class HubertModelWithFinalProj(HubertModel): |
|
|
def __init__(self, config): |
|
|
super().__init__(config) |
|
|
self.final_proj = nn.Linear(config.hidden_size, config.classifier_proj_size) |
|
|
print(config.hidden_size, config.classifier_proj_size) |
|
|
|
|
|
f = open("%s/extract_f0_feature.log" % exp_dir, "a+") |
|
|
|
|
|
|
|
|
def printt(strr): |
|
|
print(strr) |
|
|
f.write("%s\n" % strr) |
|
|
f.flush() |
|
|
|
|
|
|
|
|
printt(sys.argv) |
|
|
model_path = sys.argv[7] |
|
|
Custom_Embed = False |
|
|
sample_embedding = sys.argv[8] |
|
|
if os.path.split(model_path)[-1] == "Custom" and sample_embedding == "hubert_base": |
|
|
model_path = "hubert_base.pt" |
|
|
Custom_Embed = True |
|
|
elif os.path.split(model_path)[-1] == "Custom" and sample_embedding == "contentvec_base": |
|
|
model_path = "contentvec_base.pt" |
|
|
Custom_Embed = True |
|
|
elif os.path.split(model_path)[-1] == "Custom" and sample_embedding == "hubert_base_japanese": |
|
|
model_path = "japanese_hubert_base.pt" |
|
|
Custom_Embed = True |
|
|
|
|
|
printt(exp_dir) |
|
|
wavPath = "%s/1_16k_wavs" % exp_dir |
|
|
outPath = ( |
|
|
"%s/3_feature256" % exp_dir if version == "v1" else "%s/3_feature768" % exp_dir |
|
|
) |
|
|
os.makedirs(outPath, exist_ok=True) |
|
|
|
|
|
|
|
|
|
|
|
def readwave(wav_path, normalize=False): |
|
|
wav, sr = sf.read(wav_path) |
|
|
assert sr == 16000 |
|
|
if Custom_Embed == False: |
|
|
feats = torch.from_numpy(wav).float() |
|
|
else: |
|
|
feats = torch.from_numpy(load_audio(wav_path, sr)).to(dtype).to(device) |
|
|
if feats.dim() == 2: |
|
|
feats = feats.mean(-1) |
|
|
assert feats.dim() == 1, feats.dim() |
|
|
if normalize: |
|
|
with torch.no_grad(): |
|
|
feats = F.layer_norm(feats, feats.shape) |
|
|
feats = feats.view(1, -1) |
|
|
return feats |
|
|
|
|
|
|
|
|
|
|
|
printt("load model(s) from {}".format(model_path)) |
|
|
|
|
|
if os.access(model_path, os.F_OK) == False: |
|
|
printt( |
|
|
"Error: Extracting is shut down because %s does not exist, you may download it from https://huggingface.co/lj1995/VoiceConversionWebUI/tree/main" |
|
|
% model_path |
|
|
) |
|
|
exit(0) |
|
|
models, saved_cfg, task = checkpoint_utils.load_model_ensemble_and_task( |
|
|
[model_path], |
|
|
suffix="", |
|
|
) |
|
|
if Custom_Embed == False: |
|
|
model = models[0] |
|
|
if device not in ["mps", "cpu"]: |
|
|
model = model.half() |
|
|
else: |
|
|
dtype = torch.float16 if config.is_half and "cuda" in device else torch.float32 |
|
|
model = HubertModelWithFinalProj.from_pretrained("Custom/").to(dtype).to(device) |
|
|
model = model.to(device) |
|
|
printt("move model to %s" % device) |
|
|
model.eval() |
|
|
|
|
|
todo = sorted(list(os.listdir(wavPath)))[i_part::n_part] |
|
|
n = max(1, len(todo) // 10) |
|
|
if len(todo) == 0: |
|
|
printt("no-feature-todo") |
|
|
else: |
|
|
printt("all-feature-%s" % len(todo)) |
|
|
for idx, file in enumerate(todo): |
|
|
try: |
|
|
if file.endswith(".wav"): |
|
|
wav_path = "%s/%s" % (wavPath, file) |
|
|
out_path = "%s/%s" % (outPath, file.replace("wav", "npy")) |
|
|
|
|
|
if os.path.exists(out_path): |
|
|
continue |
|
|
|
|
|
feats = readwave(wav_path, normalize=saved_cfg.task.normalize) |
|
|
padding_mask = torch.BoolTensor(feats.shape).fill_(False) |
|
|
inputs = { |
|
|
"source": feats.half().to(device) |
|
|
if device not in ["mps", "cpu"] |
|
|
else feats.to(device), |
|
|
"padding_mask": padding_mask.to(device), |
|
|
"output_layer": 9 if version == "v1" else 12, |
|
|
} |
|
|
with torch.no_grad(): |
|
|
if Custom_Embed == False: |
|
|
logits = model.extract_features(**inputs) |
|
|
feats = ( |
|
|
model.final_proj(logits[0]) if version == "v1" else logits[0] |
|
|
) |
|
|
elif Custom_Embed == True: |
|
|
feats = model(feats)["last_hidden_state"] |
|
|
feats = ( |
|
|
model.final_proj(feats[0]).unsqueeze(0) if version == "v1" else feats |
|
|
) |
|
|
|
|
|
feats = feats.squeeze(0).float().cpu().numpy() |
|
|
if np.isnan(feats).sum() == 0: |
|
|
np.save(out_path, feats, allow_pickle=False) |
|
|
else: |
|
|
printt("%s-contains nan" % file) |
|
|
if idx % n == 0: |
|
|
printt("now-%s,all-%s,%s,%s" % (idx, len(todo), file, feats.shape)) |
|
|
except: |
|
|
printt(traceback.format_exc()) |
|
|
printt("all-feature-done") |
|
|
|