RVC / infer /lib /predictors /Generator.py
NeoPy's picture
Update infer/lib/predictors/Generator.py
0a6f6ac verified
import os
import re
import sys
import math
import torch
import parselmouth
import numba as nb
import numpy as np
from scipy.signal import medfilt
from librosa import yin, pyin, piptrack
sys.path.append(os.getcwd())
from infer.lib.predictors.CREPE.filter import mean, median
from infer.lib.predictors.WORLD.SWIPE import swipe, stonemask
from infer.lib.variables import config, configs, logger, translations
from infer.lib.utils import autotune_f0, proposal_f0_up_key, circular_write
@nb.jit(nopython=True)
def post_process(
tf0,
f0,
f0_up_key,
manual_x_pad,
f0_mel_min,
f0_mel_max,
manual_f0 = None
):
f0 *= pow(2, f0_up_key / 12)
if manual_f0 is not None:
replace_f0 = np.interp(
list(
range(
np.round(
(manual_f0[:, 0].max() - manual_f0[:, 0].min()) * tf0 + 1
).astype(np.int16)
)
),
manual_f0[:, 0] * 100,
manual_f0[:, 1]
)
f0[
manual_x_pad * tf0 : manual_x_pad * tf0 + len(replace_f0)
] = replace_f0[
:f0[
manual_x_pad * tf0 : manual_x_pad * tf0 + len(replace_f0)
].shape[0]
]
f0_mel = 1127 * np.log(1 + f0 / 700)
f0_mel[f0_mel > 0] = (f0_mel[f0_mel > 0] - f0_mel_min) * 254 / (f0_mel_max - f0_mel_min) + 1
f0_mel[f0_mel <= 1] = 1
f0_mel[f0_mel > 255] = 255
return np.rint(f0_mel).astype(np.int32), f0
def realtime_post_process(
f0,
pitch,
pitchf,
f0_up_key = 0,
f0_mel_min = 50.0,
f0_mel_max = 1100.0
):
f0 *= 2 ** (f0_up_key / 12)
f0_mel = 1127.0 * (1.0 + f0 / 700.0).log()
f0_mel = torch.clip((f0_mel - f0_mel_min) * 254 / (f0_mel_max - f0_mel_min) + 1, 1, 255, out=f0_mel)
f0_coarse = torch.round(f0_mel, out=f0_mel).long()
if pitch is not None and pitchf is not None:
circular_write(f0_coarse, pitch)
circular_write(f0, pitchf)
else:
pitch = f0_coarse
pitchf = f0
return pitch.unsqueeze(0), pitchf.unsqueeze(0)
class Generator:
def __init__(
self,
sample_rate = 16000,
hop_length = 160,
f0_min = 50,
f0_max = 1100,
alpha = 0.5,
is_half = False,
device = "cpu",
predictor_onnx = False,
delete_predictor_onnx = True
):
self.sample_rate = sample_rate
self.hop_length = hop_length
self.f0_min = f0_min
self.f0_max = f0_max
self.is_half = is_half
self.device = device
self.providers = config.providers
self.predictor_onnx = predictor_onnx
self.delete_predictor_onnx = delete_predictor_onnx
self.window = 160
self.batch_size = 512
self.alpha = alpha
self.ref_freqs = [
49.00,
51.91,
55.00,
58.27,
61.74,
65.41,
69.30,
73.42,
77.78,
82.41,
87.31,
92.50,
98.00,
103.83,
110.00,
116.54,
123.47,
130.81,
138.59,
146.83,
155.56,
164.81,
174.61,
185.00,
196.00,
207.65,
220.00,
233.08,
246.94,
261.63,
277.18,
293.66,
311.13,
329.63,
349.23,
369.99,
392.00,
415.30,
440.00,
466.16,
493.88,
523.25,
554.37,
587.33,
622.25,
659.25,
698.46,
739.99,
783.99,
830.61,
880.00,
932.33,
987.77,
1046.50
]
def calculator(
self,
x_pad,
f0_method,
x,
f0_up_key = 0,
p_len = None,
filter_radius = 3,
f0_autotune = False,
f0_autotune_strength = 1,
manual_f0 = None,
proposal_pitch = False,
proposal_pitch_threshold = 255.0
):
if p_len is None: p_len = x.shape[0] // self.window
if "hybrid" in f0_method: logger.debug(translations["hybrid_calc"].format(f0_method=f0_method))
compute_fn = (
self.get_f0_hybrid if "hybrid" in f0_method else self.compute_f0
)
f0 = compute_fn(
f0_method,
x,
p_len,
filter_radius if filter_radius % 2 != 0 else filter_radius + 1
)
if proposal_pitch:
up_key = proposal_f0_up_key(
f0,
proposal_pitch_threshold,
configs["limit_f0"]
)
logger.debug(translations["proposal_f0"].format(up_key=up_key))
f0_up_key += up_key
if f0_autotune:
logger.debug(translations["startautotune"])
f0 = autotune_f0(
self.ref_freqs,
f0,
f0_autotune_strength
)
return post_process(
self.sample_rate // self.window,
f0,
f0_up_key,
x_pad,
1127 * math.log(1 + self.f0_min / 700),
1127 * math.log(1 + self.f0_max / 700),
manual_f0
)
def realtime_calculator(
self,
audio,
f0_method,
pitch,
pitchf,
f0_up_key = 0,
filter_radius = 3,
f0_autotune = False,
f0_autotune_strength = 1,
proposal_pitch = False,
proposal_pitch_threshold = 255.0
):
if torch.is_tensor(audio): audio = audio.cpu().numpy()
p_len = audio.shape[0] // self.window
f0 = self.compute_f0(
f0_method,
audio,
p_len,
filter_radius if filter_radius % 2 != 0 else filter_radius + 1
)
if f0_autotune:
f0 = autotune_f0(
self.ref_freqs,
f0,
f0_autotune_strength
)
if proposal_pitch:
up_key = proposal_f0_up_key(
f0,
proposal_pitch_threshold,
configs["limit_f0"]
)
f0_up_key += up_key
return realtime_post_process(
torch.from_numpy(f0).float().to(self.device),
pitch,
pitchf,
f0_up_key,
self.f0_min,
self.f0_max
)
def _resize_f0(self, x, target_len):
if len(x) == target_len: return x
source = np.array(x)
source[source < 0.001] = np.nan
return np.nan_to_num(
np.interp(
np.arange(0, len(source) * target_len, len(source)) / target_len,
np.arange(0, len(source)),
source
)
)
def compute_f0(self, f0_method, x, p_len, filter_radius):
if "pm" in f0_method:
f0 = self.get_f0_pm(
x,
p_len,
filter_radius=filter_radius,
mode=f0_method.split("-")[1]
)
elif f0_method.split("-")[0] in ["harvest", "dio"]:
f0 = self.get_f0_pyworld(
x,
p_len,
filter_radius,
f0_method.split("-")[0],
use_stonemask="stonemask" in f0_method
)
elif "crepe" in f0_method:
split_f0 = f0_method.split("-")
f0 = (
self.get_f0_mangio_crepe(
x,
p_len,
split_f0[2]
)
) if split_f0[0] == "mangio" else (
self.get_f0_crepe(
x,
p_len,
split_f0[1],
filter_radius=filter_radius
)
)
elif "fcpe" in f0_method:
f0 = self.get_f0_fcpe(
x,
p_len,
legacy="legacy" in f0_method and "previous" not in f0_method,
previous="previous" in f0_method,
filter_radius=filter_radius
)
elif "rmvpe" in f0_method:
f0 = self.get_f0_rmvpe(
x,
p_len,
clipping="clipping" in f0_method,
filter_radius=filter_radius,
hpa="hpa" in f0_method,
previous="previous" in f0_method
)
elif f0_method in ["yin", "pyin", "piptrack"]:
f0 = self.get_f0_librosa(
x,
p_len,
mode=f0_method,
filter_radius=filter_radius
)
elif "djcm" in f0_method:
f0 = self.get_f0_djcm(
x,
p_len,
clipping="clipping" in f0_method,
svs="svs" in f0_method,
filter_radius=filter_radius
)
else:
raise ValueError(translations["option_not_valid"])
if isinstance(f0, tuple): f0 = f0[0]
if "medfilt" in f0_method or "svs" in f0_method: f0 = medfilt(f0, kernel_size=5)
return f0
def get_f0_hybrid(self, methods_str, x, p_len, filter_radius):
methods_str = re.search(r"hybrid\[(.+)\]", methods_str)
if methods_str:
methods = [
method.strip()
for method in methods_str.group(1).split("+")
]
n = len(methods)
f0_stack = []
for method in methods:
f0_stack.append(
self._resize_f0(
self.compute_f0(
method,
x,
p_len,
filter_radius
),
p_len
)
)
f0_mix = np.zeros(p_len)
if not f0_stack: return f0_mix
if len(f0_stack) == 1: return f0_stack[0]
weights = (1 - np.abs(np.arange(n) / (n - 1) - (1 - self.alpha))) ** 2
weights /= weights.sum()
stacked = np.vstack(f0_stack)
voiced_mask = np.any(stacked > 0, axis=0)
f0_mix[voiced_mask] = np.exp(
np.nansum(
np.log(stacked + 1e-6) * weights[:, None], axis=0
)[voiced_mask]
)
return f0_mix
def get_f0_pm(self, x, p_len, filter_radius=3, mode="ac"):
time_step = self.window / self.sample_rate * 1000 / 1000
pm = parselmouth.Sound(
x,
self.sample_rate
)
pm_fn = {
"ac": pm.to_pitch_ac,
"cc": pm.to_pitch_cc,
"shs": pm.to_pitch_shs
}.get(mode, pm.to_pitch_ac)
pitch = (
pm_fn(
time_step=time_step,
voicing_threshold=filter_radius / 10 * 2,
pitch_floor=self.f0_min,
pitch_ceiling=self.f0_max
)
) if mode != "shs" else (
pm_fn(
time_step=time_step,
minimum_pitch=self.f0_min,
maximum_frequency_component=self.f0_max
)
)
f0 = pitch.selected_array["frequency"]
pad_size = (p_len - len(f0) + 1) // 2
if pad_size > 0 or p_len - len(f0) - pad_size > 0:
f0 = np.pad(
f0,
[[pad_size, p_len - len(f0) - pad_size]],
mode="constant"
)
return f0
def get_f0_mangio_crepe(self, x, p_len, model="full"):
if not hasattr(self, "mangio_crepe"):
from infer.lib.predictors.CREPE.CREPE import CREPE
self.mangio_crepe = CREPE(
os.path.join(
configs["predictors_path"],
f"crepe_{model}.{'onnx' if self.predictor_onnx else 'pth'}"
),
model_size=model,
hop_length=self.hop_length,
batch_size=self.hop_length * 2,
f0_min=self.f0_min,
f0_max=self.f0_max,
device=self.device,
sample_rate=self.sample_rate,
providers=self.providers,
onnx=self.predictor_onnx,
return_periodicity=False
)
x = x.astype(np.float32)
x /= np.quantile(np.abs(x), 0.999)
audio = torch.from_numpy(x).to(self.device, copy=True).unsqueeze(dim=0)
if audio.ndim == 2 and audio.shape[0] > 1: audio = audio.mean(dim=0, keepdim=True).detach()
f0 = self.mangio_crepe.compute_f0(audio.detach(), pad=True)
if self.predictor_onnx and self.delete_predictor_onnx: del self.mangio_crepe.model, self.mangio_crepe
return self._resize_f0(f0.squeeze(0).cpu().float().numpy(), p_len)
def get_f0_crepe(self, x, p_len, model="full", filter_radius=3):
if not hasattr(self, "crepe"):
from infer.lib.predictors.CREPE.CREPE import CREPE
self.crepe = CREPE(
os.path.join(
configs["predictors_path"],
f"crepe_{model}.{'onnx' if self.predictor_onnx else 'pth'}"
),
model_size=model,
hop_length=self.window,
batch_size=self.batch_size,
f0_min=self.f0_min,
f0_max=self.f0_max,
device=self.device,
sample_rate=self.sample_rate,
providers=self.providers,
onnx=self.predictor_onnx,
return_periodicity=True
)
f0, pd = self.crepe.compute_f0(torch.tensor(np.copy(x))[None].float(), pad=True)
if self.predictor_onnx and self.delete_predictor_onnx: del self.crepe.model, self.crepe
f0, pd = mean(f0, filter_radius), median(pd, filter_radius)
f0[pd < 0.1] = 0
return self._resize_f0(f0[0].cpu().numpy(), p_len)
def get_f0_fcpe(self, x, p_len, legacy=False, previous=False, filter_radius=3):
if not hasattr(self, "fcpe"):
from infer.lib.predictors.FCPE.FCPE import FCPE
self.fcpe = FCPE(
configs,
os.path.join(
configs["predictors_path"],
(
"fcpe_legacy"
if legacy else
("fcpe" if previous else "ddsp_200k")
) + (".onnx" if self.predictor_onnx else ".pt")
),
hop_length=self.hop_length,
f0_min=self.f0_min,
f0_max=self.f0_max,
dtype=torch.float32,
device=self.device,
sample_rate=self.sample_rate,
threshold=(
filter_radius / 100
) if legacy else (
filter_radius / 1000 * 2
),
providers=self.providers,
onnx=self.predictor_onnx,
legacy=legacy
)
f0 = self.fcpe.compute_f0(x, p_len)
if self.predictor_onnx and self.delete_predictor_onnx: del self.fcpe.fcpe.model, self.fcpe
return f0
def get_f0_rmvpe(self, x, p_len, clipping=False, filter_radius=3, hpa=False, previous=False):
if not hasattr(self, "rmvpe"):
from infer.lib.predictors.RMVPE.RMVPE import RMVPE
self.rmvpe = RMVPE(
os.path.join(
configs["predictors_path"],
(
(
"hpa-rmvpe-76000"
if previous else
"hpa-rmvpe-112000"
) if hpa else "rmvpe"
) + (".onnx" if self.predictor_onnx else ".pt")
),
is_half=self.is_half,
device=self.device,
onnx=self.predictor_onnx,
providers=self.providers,
hpa=hpa
)
filter_radius = filter_radius / 100
f0 = (
self.rmvpe.infer_from_audio_with_pitch(
x,
thred=filter_radius,
f0_min=self.f0_min,
f0_max=self.f0_max
)
) if clipping else (
self.rmvpe.infer_from_audio(
x,
thred=filter_radius
)
)
if self.predictor_onnx and self.delete_predictor_onnx: del self.rmvpe.model, self.rmvpe
return self._resize_f0(f0, p_len)
def get_f0_librosa(self, x, p_len, mode="yin", filter_radius=3):
if mode != "piptrack":
self.if_yin = mode == "yin"
self.yin = yin if self.if_yin else pyin
f0 = self.yin(
x.astype(np.float32),
sr=self.sample_rate,
fmin=self.f0_min,
fmax=self.f0_max,
hop_length=self.hop_length
)
if not self.if_yin: f0 = f0[0]
else:
pitches, magnitudes = piptrack(
y=x.astype(np.float32),
sr=self.sample_rate,
fmin=self.f0_min,
fmax=self.f0_max,
hop_length=self.hop_length,
threshold=filter_radius / 10
)
max_indexes = np.argmax(magnitudes, axis=0)
f0 = pitches[max_indexes, range(magnitudes.shape[1])]
return self._resize_f0(f0, p_len)
def get_f0_djcm(self, x, p_len, clipping=False, svs=False, filter_radius=3):
if not hasattr(self, "djcm"):
from main.library.predictors.DJCM.DJCM import DJCM
self.djcm = DJCM(
os.path.join(
configs["predictors_path"],
(
"djcm-svs"
if svs else
"djcm"
) + (".onnx" if self.predictor_onnx else ".pt")
),
is_half=self.is_half,
device=self.device,
onnx=self.predictor_onnx,
svs=svs,
providers=self.providers
)
filter_radius /= 10
f0 = (
self.djcm.infer_from_audio_with_pitch(
x,
thred=filter_radius,
f0_min=self.f0_min,
f0_max=self.f0_max
)
) if clipping else (
self.djcm.infer_from_audio(
x,
thred=filter_radius
)
)
if self.predictor_onnx and self.delete_predictor_onnx: del self.djcm.model, self.djcm
return self._resize_f0(f0, p_len)