|
|
import os |
|
|
import re |
|
|
import sys |
|
|
import math |
|
|
import torch |
|
|
import parselmouth |
|
|
|
|
|
import numba as nb |
|
|
import numpy as np |
|
|
|
|
|
from scipy.signal import medfilt |
|
|
from librosa import yin, pyin, piptrack |
|
|
|
|
|
sys.path.append(os.getcwd()) |
|
|
|
|
|
from infer.lib.predictors.CREPE.filter import mean, median |
|
|
from infer.lib.predictors.WORLD.SWIPE import swipe, stonemask |
|
|
from infer.lib.variables import config, configs, logger, translations |
|
|
from infer.lib.utils import autotune_f0, proposal_f0_up_key, circular_write |
|
|
|
|
|
@nb.jit(nopython=True) |
|
|
def post_process( |
|
|
tf0, |
|
|
f0, |
|
|
f0_up_key, |
|
|
manual_x_pad, |
|
|
f0_mel_min, |
|
|
f0_mel_max, |
|
|
manual_f0 = None |
|
|
): |
|
|
f0 *= pow(2, f0_up_key / 12) |
|
|
|
|
|
if manual_f0 is not None: |
|
|
replace_f0 = np.interp( |
|
|
list( |
|
|
range( |
|
|
np.round( |
|
|
(manual_f0[:, 0].max() - manual_f0[:, 0].min()) * tf0 + 1 |
|
|
).astype(np.int16) |
|
|
) |
|
|
), |
|
|
manual_f0[:, 0] * 100, |
|
|
manual_f0[:, 1] |
|
|
) |
|
|
|
|
|
f0[ |
|
|
manual_x_pad * tf0 : manual_x_pad * tf0 + len(replace_f0) |
|
|
] = replace_f0[ |
|
|
:f0[ |
|
|
manual_x_pad * tf0 : manual_x_pad * tf0 + len(replace_f0) |
|
|
].shape[0] |
|
|
] |
|
|
|
|
|
f0_mel = 1127 * np.log(1 + f0 / 700) |
|
|
f0_mel[f0_mel > 0] = (f0_mel[f0_mel > 0] - f0_mel_min) * 254 / (f0_mel_max - f0_mel_min) + 1 |
|
|
f0_mel[f0_mel <= 1] = 1 |
|
|
f0_mel[f0_mel > 255] = 255 |
|
|
|
|
|
return np.rint(f0_mel).astype(np.int32), f0 |
|
|
|
|
|
def realtime_post_process( |
|
|
f0, |
|
|
pitch, |
|
|
pitchf, |
|
|
f0_up_key = 0, |
|
|
f0_mel_min = 50.0, |
|
|
f0_mel_max = 1100.0 |
|
|
): |
|
|
f0 *= 2 ** (f0_up_key / 12) |
|
|
|
|
|
f0_mel = 1127.0 * (1.0 + f0 / 700.0).log() |
|
|
f0_mel = torch.clip((f0_mel - f0_mel_min) * 254 / (f0_mel_max - f0_mel_min) + 1, 1, 255, out=f0_mel) |
|
|
f0_coarse = torch.round(f0_mel, out=f0_mel).long() |
|
|
|
|
|
if pitch is not None and pitchf is not None: |
|
|
circular_write(f0_coarse, pitch) |
|
|
circular_write(f0, pitchf) |
|
|
else: |
|
|
pitch = f0_coarse |
|
|
pitchf = f0 |
|
|
|
|
|
return pitch.unsqueeze(0), pitchf.unsqueeze(0) |
|
|
|
|
|
class Generator: |
|
|
def __init__( |
|
|
self, |
|
|
sample_rate = 16000, |
|
|
hop_length = 160, |
|
|
f0_min = 50, |
|
|
f0_max = 1100, |
|
|
alpha = 0.5, |
|
|
is_half = False, |
|
|
device = "cpu", |
|
|
predictor_onnx = False, |
|
|
delete_predictor_onnx = True |
|
|
): |
|
|
self.sample_rate = sample_rate |
|
|
self.hop_length = hop_length |
|
|
self.f0_min = f0_min |
|
|
self.f0_max = f0_max |
|
|
self.is_half = is_half |
|
|
self.device = device |
|
|
self.providers = config.providers |
|
|
self.predictor_onnx = predictor_onnx |
|
|
self.delete_predictor_onnx = delete_predictor_onnx |
|
|
self.window = 160 |
|
|
self.batch_size = 512 |
|
|
self.alpha = alpha |
|
|
self.ref_freqs = [ |
|
|
49.00, |
|
|
51.91, |
|
|
55.00, |
|
|
58.27, |
|
|
61.74, |
|
|
65.41, |
|
|
69.30, |
|
|
73.42, |
|
|
77.78, |
|
|
82.41, |
|
|
87.31, |
|
|
92.50, |
|
|
98.00, |
|
|
103.83, |
|
|
110.00, |
|
|
116.54, |
|
|
123.47, |
|
|
130.81, |
|
|
138.59, |
|
|
146.83, |
|
|
155.56, |
|
|
164.81, |
|
|
174.61, |
|
|
185.00, |
|
|
196.00, |
|
|
207.65, |
|
|
220.00, |
|
|
233.08, |
|
|
246.94, |
|
|
261.63, |
|
|
277.18, |
|
|
293.66, |
|
|
311.13, |
|
|
329.63, |
|
|
349.23, |
|
|
369.99, |
|
|
392.00, |
|
|
415.30, |
|
|
440.00, |
|
|
466.16, |
|
|
493.88, |
|
|
523.25, |
|
|
554.37, |
|
|
587.33, |
|
|
622.25, |
|
|
659.25, |
|
|
698.46, |
|
|
739.99, |
|
|
783.99, |
|
|
830.61, |
|
|
880.00, |
|
|
932.33, |
|
|
987.77, |
|
|
1046.50 |
|
|
] |
|
|
|
|
|
def calculator( |
|
|
self, |
|
|
x_pad, |
|
|
f0_method, |
|
|
x, |
|
|
f0_up_key = 0, |
|
|
p_len = None, |
|
|
filter_radius = 3, |
|
|
f0_autotune = False, |
|
|
f0_autotune_strength = 1, |
|
|
manual_f0 = None, |
|
|
proposal_pitch = False, |
|
|
proposal_pitch_threshold = 255.0 |
|
|
): |
|
|
if p_len is None: p_len = x.shape[0] // self.window |
|
|
if "hybrid" in f0_method: logger.debug(translations["hybrid_calc"].format(f0_method=f0_method)) |
|
|
|
|
|
compute_fn = ( |
|
|
self.get_f0_hybrid if "hybrid" in f0_method else self.compute_f0 |
|
|
) |
|
|
|
|
|
f0 = compute_fn( |
|
|
f0_method, |
|
|
x, |
|
|
p_len, |
|
|
filter_radius if filter_radius % 2 != 0 else filter_radius + 1 |
|
|
) |
|
|
|
|
|
if proposal_pitch: |
|
|
up_key = proposal_f0_up_key( |
|
|
f0, |
|
|
proposal_pitch_threshold, |
|
|
configs["limit_f0"] |
|
|
) |
|
|
|
|
|
logger.debug(translations["proposal_f0"].format(up_key=up_key)) |
|
|
f0_up_key += up_key |
|
|
|
|
|
if f0_autotune: |
|
|
logger.debug(translations["startautotune"]) |
|
|
|
|
|
f0 = autotune_f0( |
|
|
self.ref_freqs, |
|
|
f0, |
|
|
f0_autotune_strength |
|
|
) |
|
|
|
|
|
return post_process( |
|
|
self.sample_rate // self.window, |
|
|
f0, |
|
|
f0_up_key, |
|
|
x_pad, |
|
|
1127 * math.log(1 + self.f0_min / 700), |
|
|
1127 * math.log(1 + self.f0_max / 700), |
|
|
manual_f0 |
|
|
) |
|
|
|
|
|
def realtime_calculator( |
|
|
self, |
|
|
audio, |
|
|
f0_method, |
|
|
pitch, |
|
|
pitchf, |
|
|
f0_up_key = 0, |
|
|
filter_radius = 3, |
|
|
f0_autotune = False, |
|
|
f0_autotune_strength = 1, |
|
|
proposal_pitch = False, |
|
|
proposal_pitch_threshold = 255.0 |
|
|
): |
|
|
if torch.is_tensor(audio): audio = audio.cpu().numpy() |
|
|
p_len = audio.shape[0] // self.window |
|
|
|
|
|
f0 = self.compute_f0( |
|
|
f0_method, |
|
|
audio, |
|
|
p_len, |
|
|
filter_radius if filter_radius % 2 != 0 else filter_radius + 1 |
|
|
) |
|
|
|
|
|
if f0_autotune: |
|
|
f0 = autotune_f0( |
|
|
self.ref_freqs, |
|
|
f0, |
|
|
f0_autotune_strength |
|
|
) |
|
|
|
|
|
if proposal_pitch: |
|
|
up_key = proposal_f0_up_key( |
|
|
f0, |
|
|
proposal_pitch_threshold, |
|
|
configs["limit_f0"] |
|
|
) |
|
|
|
|
|
f0_up_key += up_key |
|
|
|
|
|
return realtime_post_process( |
|
|
torch.from_numpy(f0).float().to(self.device), |
|
|
pitch, |
|
|
pitchf, |
|
|
f0_up_key, |
|
|
self.f0_min, |
|
|
self.f0_max |
|
|
) |
|
|
|
|
|
def _resize_f0(self, x, target_len): |
|
|
if len(x) == target_len: return x |
|
|
|
|
|
source = np.array(x) |
|
|
source[source < 0.001] = np.nan |
|
|
|
|
|
return np.nan_to_num( |
|
|
np.interp( |
|
|
np.arange(0, len(source) * target_len, len(source)) / target_len, |
|
|
np.arange(0, len(source)), |
|
|
source |
|
|
) |
|
|
) |
|
|
|
|
|
def compute_f0(self, f0_method, x, p_len, filter_radius): |
|
|
if "pm" in f0_method: |
|
|
f0 = self.get_f0_pm( |
|
|
x, |
|
|
p_len, |
|
|
filter_radius=filter_radius, |
|
|
mode=f0_method.split("-")[1] |
|
|
) |
|
|
elif f0_method.split("-")[0] in ["harvest", "dio"]: |
|
|
f0 = self.get_f0_pyworld( |
|
|
x, |
|
|
p_len, |
|
|
filter_radius, |
|
|
f0_method.split("-")[0], |
|
|
use_stonemask="stonemask" in f0_method |
|
|
) |
|
|
elif "crepe" in f0_method: |
|
|
split_f0 = f0_method.split("-") |
|
|
f0 = ( |
|
|
self.get_f0_mangio_crepe( |
|
|
x, |
|
|
p_len, |
|
|
split_f0[2] |
|
|
) |
|
|
) if split_f0[0] == "mangio" else ( |
|
|
self.get_f0_crepe( |
|
|
x, |
|
|
p_len, |
|
|
split_f0[1], |
|
|
filter_radius=filter_radius |
|
|
) |
|
|
) |
|
|
elif "fcpe" in f0_method: |
|
|
f0 = self.get_f0_fcpe( |
|
|
x, |
|
|
p_len, |
|
|
legacy="legacy" in f0_method and "previous" not in f0_method, |
|
|
previous="previous" in f0_method, |
|
|
filter_radius=filter_radius |
|
|
) |
|
|
elif "rmvpe" in f0_method: |
|
|
f0 = self.get_f0_rmvpe( |
|
|
x, |
|
|
p_len, |
|
|
clipping="clipping" in f0_method, |
|
|
filter_radius=filter_radius, |
|
|
hpa="hpa" in f0_method, |
|
|
previous="previous" in f0_method |
|
|
) |
|
|
elif f0_method in ["yin", "pyin", "piptrack"]: |
|
|
f0 = self.get_f0_librosa( |
|
|
x, |
|
|
p_len, |
|
|
mode=f0_method, |
|
|
filter_radius=filter_radius |
|
|
) |
|
|
|
|
|
elif "djcm" in f0_method: |
|
|
f0 = self.get_f0_djcm( |
|
|
x, |
|
|
p_len, |
|
|
clipping="clipping" in f0_method, |
|
|
svs="svs" in f0_method, |
|
|
filter_radius=filter_radius |
|
|
) |
|
|
|
|
|
else: |
|
|
raise ValueError(translations["option_not_valid"]) |
|
|
|
|
|
if isinstance(f0, tuple): f0 = f0[0] |
|
|
if "medfilt" in f0_method or "svs" in f0_method: f0 = medfilt(f0, kernel_size=5) |
|
|
|
|
|
return f0 |
|
|
|
|
|
def get_f0_hybrid(self, methods_str, x, p_len, filter_radius): |
|
|
methods_str = re.search(r"hybrid\[(.+)\]", methods_str) |
|
|
if methods_str: |
|
|
methods = [ |
|
|
method.strip() |
|
|
for method in methods_str.group(1).split("+") |
|
|
] |
|
|
|
|
|
n = len(methods) |
|
|
f0_stack = [] |
|
|
|
|
|
for method in methods: |
|
|
f0_stack.append( |
|
|
self._resize_f0( |
|
|
self.compute_f0( |
|
|
method, |
|
|
x, |
|
|
p_len, |
|
|
filter_radius |
|
|
), |
|
|
p_len |
|
|
) |
|
|
) |
|
|
|
|
|
f0_mix = np.zeros(p_len) |
|
|
|
|
|
if not f0_stack: return f0_mix |
|
|
if len(f0_stack) == 1: return f0_stack[0] |
|
|
|
|
|
weights = (1 - np.abs(np.arange(n) / (n - 1) - (1 - self.alpha))) ** 2 |
|
|
weights /= weights.sum() |
|
|
|
|
|
stacked = np.vstack(f0_stack) |
|
|
voiced_mask = np.any(stacked > 0, axis=0) |
|
|
|
|
|
f0_mix[voiced_mask] = np.exp( |
|
|
np.nansum( |
|
|
np.log(stacked + 1e-6) * weights[:, None], axis=0 |
|
|
)[voiced_mask] |
|
|
) |
|
|
|
|
|
return f0_mix |
|
|
|
|
|
def get_f0_pm(self, x, p_len, filter_radius=3, mode="ac"): |
|
|
time_step = self.window / self.sample_rate * 1000 / 1000 |
|
|
|
|
|
pm = parselmouth.Sound( |
|
|
x, |
|
|
self.sample_rate |
|
|
) |
|
|
pm_fn = { |
|
|
"ac": pm.to_pitch_ac, |
|
|
"cc": pm.to_pitch_cc, |
|
|
"shs": pm.to_pitch_shs |
|
|
}.get(mode, pm.to_pitch_ac) |
|
|
|
|
|
pitch = ( |
|
|
pm_fn( |
|
|
time_step=time_step, |
|
|
voicing_threshold=filter_radius / 10 * 2, |
|
|
pitch_floor=self.f0_min, |
|
|
pitch_ceiling=self.f0_max |
|
|
) |
|
|
) if mode != "shs" else ( |
|
|
pm_fn( |
|
|
time_step=time_step, |
|
|
minimum_pitch=self.f0_min, |
|
|
maximum_frequency_component=self.f0_max |
|
|
) |
|
|
) |
|
|
|
|
|
f0 = pitch.selected_array["frequency"] |
|
|
pad_size = (p_len - len(f0) + 1) // 2 |
|
|
|
|
|
if pad_size > 0 or p_len - len(f0) - pad_size > 0: |
|
|
f0 = np.pad( |
|
|
f0, |
|
|
[[pad_size, p_len - len(f0) - pad_size]], |
|
|
mode="constant" |
|
|
) |
|
|
|
|
|
return f0 |
|
|
|
|
|
def get_f0_mangio_crepe(self, x, p_len, model="full"): |
|
|
if not hasattr(self, "mangio_crepe"): |
|
|
from infer.lib.predictors.CREPE.CREPE import CREPE |
|
|
|
|
|
self.mangio_crepe = CREPE( |
|
|
os.path.join( |
|
|
configs["predictors_path"], |
|
|
f"crepe_{model}.{'onnx' if self.predictor_onnx else 'pth'}" |
|
|
), |
|
|
model_size=model, |
|
|
hop_length=self.hop_length, |
|
|
batch_size=self.hop_length * 2, |
|
|
f0_min=self.f0_min, |
|
|
f0_max=self.f0_max, |
|
|
device=self.device, |
|
|
sample_rate=self.sample_rate, |
|
|
providers=self.providers, |
|
|
onnx=self.predictor_onnx, |
|
|
return_periodicity=False |
|
|
) |
|
|
|
|
|
x = x.astype(np.float32) |
|
|
x /= np.quantile(np.abs(x), 0.999) |
|
|
|
|
|
audio = torch.from_numpy(x).to(self.device, copy=True).unsqueeze(dim=0) |
|
|
if audio.ndim == 2 and audio.shape[0] > 1: audio = audio.mean(dim=0, keepdim=True).detach() |
|
|
|
|
|
f0 = self.mangio_crepe.compute_f0(audio.detach(), pad=True) |
|
|
if self.predictor_onnx and self.delete_predictor_onnx: del self.mangio_crepe.model, self.mangio_crepe |
|
|
|
|
|
return self._resize_f0(f0.squeeze(0).cpu().float().numpy(), p_len) |
|
|
|
|
|
def get_f0_crepe(self, x, p_len, model="full", filter_radius=3): |
|
|
if not hasattr(self, "crepe"): |
|
|
from infer.lib.predictors.CREPE.CREPE import CREPE |
|
|
|
|
|
self.crepe = CREPE( |
|
|
os.path.join( |
|
|
configs["predictors_path"], |
|
|
f"crepe_{model}.{'onnx' if self.predictor_onnx else 'pth'}" |
|
|
), |
|
|
model_size=model, |
|
|
hop_length=self.window, |
|
|
batch_size=self.batch_size, |
|
|
f0_min=self.f0_min, |
|
|
f0_max=self.f0_max, |
|
|
device=self.device, |
|
|
sample_rate=self.sample_rate, |
|
|
providers=self.providers, |
|
|
onnx=self.predictor_onnx, |
|
|
return_periodicity=True |
|
|
) |
|
|
|
|
|
f0, pd = self.crepe.compute_f0(torch.tensor(np.copy(x))[None].float(), pad=True) |
|
|
if self.predictor_onnx and self.delete_predictor_onnx: del self.crepe.model, self.crepe |
|
|
|
|
|
f0, pd = mean(f0, filter_radius), median(pd, filter_radius) |
|
|
f0[pd < 0.1] = 0 |
|
|
|
|
|
return self._resize_f0(f0[0].cpu().numpy(), p_len) |
|
|
|
|
|
def get_f0_fcpe(self, x, p_len, legacy=False, previous=False, filter_radius=3): |
|
|
if not hasattr(self, "fcpe"): |
|
|
from infer.lib.predictors.FCPE.FCPE import FCPE |
|
|
|
|
|
self.fcpe = FCPE( |
|
|
configs, |
|
|
os.path.join( |
|
|
configs["predictors_path"], |
|
|
( |
|
|
"fcpe_legacy" |
|
|
if legacy else |
|
|
("fcpe" if previous else "ddsp_200k") |
|
|
) + (".onnx" if self.predictor_onnx else ".pt") |
|
|
), |
|
|
hop_length=self.hop_length, |
|
|
f0_min=self.f0_min, |
|
|
f0_max=self.f0_max, |
|
|
dtype=torch.float32, |
|
|
device=self.device, |
|
|
sample_rate=self.sample_rate, |
|
|
threshold=( |
|
|
filter_radius / 100 |
|
|
) if legacy else ( |
|
|
filter_radius / 1000 * 2 |
|
|
), |
|
|
providers=self.providers, |
|
|
onnx=self.predictor_onnx, |
|
|
legacy=legacy |
|
|
) |
|
|
|
|
|
f0 = self.fcpe.compute_f0(x, p_len) |
|
|
if self.predictor_onnx and self.delete_predictor_onnx: del self.fcpe.fcpe.model, self.fcpe |
|
|
|
|
|
return f0 |
|
|
|
|
|
def get_f0_rmvpe(self, x, p_len, clipping=False, filter_radius=3, hpa=False, previous=False): |
|
|
if not hasattr(self, "rmvpe"): |
|
|
from infer.lib.predictors.RMVPE.RMVPE import RMVPE |
|
|
|
|
|
self.rmvpe = RMVPE( |
|
|
os.path.join( |
|
|
configs["predictors_path"], |
|
|
( |
|
|
( |
|
|
"hpa-rmvpe-76000" |
|
|
if previous else |
|
|
"hpa-rmvpe-112000" |
|
|
) if hpa else "rmvpe" |
|
|
) + (".onnx" if self.predictor_onnx else ".pt") |
|
|
), |
|
|
is_half=self.is_half, |
|
|
device=self.device, |
|
|
onnx=self.predictor_onnx, |
|
|
providers=self.providers, |
|
|
hpa=hpa |
|
|
) |
|
|
|
|
|
filter_radius = filter_radius / 100 |
|
|
|
|
|
f0 = ( |
|
|
self.rmvpe.infer_from_audio_with_pitch( |
|
|
x, |
|
|
thred=filter_radius, |
|
|
f0_min=self.f0_min, |
|
|
f0_max=self.f0_max |
|
|
) |
|
|
) if clipping else ( |
|
|
self.rmvpe.infer_from_audio( |
|
|
x, |
|
|
thred=filter_radius |
|
|
) |
|
|
) |
|
|
|
|
|
if self.predictor_onnx and self.delete_predictor_onnx: del self.rmvpe.model, self.rmvpe |
|
|
return self._resize_f0(f0, p_len) |
|
|
|
|
|
|
|
|
def get_f0_librosa(self, x, p_len, mode="yin", filter_radius=3): |
|
|
if mode != "piptrack": |
|
|
self.if_yin = mode == "yin" |
|
|
self.yin = yin if self.if_yin else pyin |
|
|
|
|
|
f0 = self.yin( |
|
|
x.astype(np.float32), |
|
|
sr=self.sample_rate, |
|
|
fmin=self.f0_min, |
|
|
fmax=self.f0_max, |
|
|
hop_length=self.hop_length |
|
|
) |
|
|
|
|
|
if not self.if_yin: f0 = f0[0] |
|
|
else: |
|
|
pitches, magnitudes = piptrack( |
|
|
y=x.astype(np.float32), |
|
|
sr=self.sample_rate, |
|
|
fmin=self.f0_min, |
|
|
fmax=self.f0_max, |
|
|
hop_length=self.hop_length, |
|
|
threshold=filter_radius / 10 |
|
|
) |
|
|
|
|
|
max_indexes = np.argmax(magnitudes, axis=0) |
|
|
f0 = pitches[max_indexes, range(magnitudes.shape[1])] |
|
|
|
|
|
return self._resize_f0(f0, p_len) |
|
|
|
|
|
|
|
|
|
|
|
def get_f0_djcm(self, x, p_len, clipping=False, svs=False, filter_radius=3): |
|
|
if not hasattr(self, "djcm"): |
|
|
from main.library.predictors.DJCM.DJCM import DJCM |
|
|
|
|
|
self.djcm = DJCM( |
|
|
os.path.join( |
|
|
configs["predictors_path"], |
|
|
( |
|
|
"djcm-svs" |
|
|
if svs else |
|
|
"djcm" |
|
|
) + (".onnx" if self.predictor_onnx else ".pt") |
|
|
), |
|
|
is_half=self.is_half, |
|
|
device=self.device, |
|
|
onnx=self.predictor_onnx, |
|
|
svs=svs, |
|
|
providers=self.providers |
|
|
) |
|
|
|
|
|
filter_radius /= 10 |
|
|
|
|
|
f0 = ( |
|
|
self.djcm.infer_from_audio_with_pitch( |
|
|
x, |
|
|
thred=filter_radius, |
|
|
f0_min=self.f0_min, |
|
|
f0_max=self.f0_max |
|
|
) |
|
|
) if clipping else ( |
|
|
self.djcm.infer_from_audio( |
|
|
x, |
|
|
thred=filter_radius |
|
|
) |
|
|
) |
|
|
|
|
|
if self.predictor_onnx and self.delete_predictor_onnx: del self.djcm.model, self.djcm |
|
|
return self._resize_f0(f0, p_len) |
|
|
|
|
|
|
|
|
|