import os import re import sys import math import torch import parselmouth import numba as nb import numpy as np from scipy.signal import medfilt from librosa import yin, pyin, piptrack sys.path.append(os.getcwd()) from infer.lib.predictors.CREPE.filter import mean, median from infer.lib.predictors.WORLD.SWIPE import swipe, stonemask from infer.lib.variables import config, configs, logger, translations from infer.lib.utils import autotune_f0, proposal_f0_up_key, circular_write @nb.jit(nopython=True) def post_process( tf0, f0, f0_up_key, manual_x_pad, f0_mel_min, f0_mel_max, manual_f0 = None ): f0 *= pow(2, f0_up_key / 12) if manual_f0 is not None: replace_f0 = np.interp( list( range( np.round( (manual_f0[:, 0].max() - manual_f0[:, 0].min()) * tf0 + 1 ).astype(np.int16) ) ), manual_f0[:, 0] * 100, manual_f0[:, 1] ) f0[ manual_x_pad * tf0 : manual_x_pad * tf0 + len(replace_f0) ] = replace_f0[ :f0[ manual_x_pad * tf0 : manual_x_pad * tf0 + len(replace_f0) ].shape[0] ] f0_mel = 1127 * np.log(1 + f0 / 700) f0_mel[f0_mel > 0] = (f0_mel[f0_mel > 0] - f0_mel_min) * 254 / (f0_mel_max - f0_mel_min) + 1 f0_mel[f0_mel <= 1] = 1 f0_mel[f0_mel > 255] = 255 return np.rint(f0_mel).astype(np.int32), f0 def realtime_post_process( f0, pitch, pitchf, f0_up_key = 0, f0_mel_min = 50.0, f0_mel_max = 1100.0 ): f0 *= 2 ** (f0_up_key / 12) f0_mel = 1127.0 * (1.0 + f0 / 700.0).log() f0_mel = torch.clip((f0_mel - f0_mel_min) * 254 / (f0_mel_max - f0_mel_min) + 1, 1, 255, out=f0_mel) f0_coarse = torch.round(f0_mel, out=f0_mel).long() if pitch is not None and pitchf is not None: circular_write(f0_coarse, pitch) circular_write(f0, pitchf) else: pitch = f0_coarse pitchf = f0 return pitch.unsqueeze(0), pitchf.unsqueeze(0) class Generator: def __init__( self, sample_rate = 16000, hop_length = 160, f0_min = 50, f0_max = 1100, alpha = 0.5, is_half = False, device = "cpu", predictor_onnx = False, delete_predictor_onnx = True ): self.sample_rate = sample_rate self.hop_length = hop_length self.f0_min = f0_min self.f0_max = f0_max self.is_half = is_half self.device = device self.providers = config.providers self.predictor_onnx = predictor_onnx self.delete_predictor_onnx = delete_predictor_onnx self.window = 160 self.batch_size = 512 self.alpha = alpha self.ref_freqs = [ 49.00, 51.91, 55.00, 58.27, 61.74, 65.41, 69.30, 73.42, 77.78, 82.41, 87.31, 92.50, 98.00, 103.83, 110.00, 116.54, 123.47, 130.81, 138.59, 146.83, 155.56, 164.81, 174.61, 185.00, 196.00, 207.65, 220.00, 233.08, 246.94, 261.63, 277.18, 293.66, 311.13, 329.63, 349.23, 369.99, 392.00, 415.30, 440.00, 466.16, 493.88, 523.25, 554.37, 587.33, 622.25, 659.25, 698.46, 739.99, 783.99, 830.61, 880.00, 932.33, 987.77, 1046.50 ] def calculator( self, x_pad, f0_method, x, f0_up_key = 0, p_len = None, filter_radius = 3, f0_autotune = False, f0_autotune_strength = 1, manual_f0 = None, proposal_pitch = False, proposal_pitch_threshold = 255.0 ): if p_len is None: p_len = x.shape[0] // self.window if "hybrid" in f0_method: logger.debug(translations["hybrid_calc"].format(f0_method=f0_method)) compute_fn = ( self.get_f0_hybrid if "hybrid" in f0_method else self.compute_f0 ) f0 = compute_fn( f0_method, x, p_len, filter_radius if filter_radius % 2 != 0 else filter_radius + 1 ) if proposal_pitch: up_key = proposal_f0_up_key( f0, proposal_pitch_threshold, configs["limit_f0"] ) logger.debug(translations["proposal_f0"].format(up_key=up_key)) f0_up_key += up_key if f0_autotune: logger.debug(translations["startautotune"]) f0 = autotune_f0( self.ref_freqs, f0, f0_autotune_strength ) return post_process( self.sample_rate // self.window, f0, f0_up_key, x_pad, 1127 * math.log(1 + self.f0_min / 700), 1127 * math.log(1 + self.f0_max / 700), manual_f0 ) def realtime_calculator( self, audio, f0_method, pitch, pitchf, f0_up_key = 0, filter_radius = 3, f0_autotune = False, f0_autotune_strength = 1, proposal_pitch = False, proposal_pitch_threshold = 255.0 ): if torch.is_tensor(audio): audio = audio.cpu().numpy() p_len = audio.shape[0] // self.window f0 = self.compute_f0( f0_method, audio, p_len, filter_radius if filter_radius % 2 != 0 else filter_radius + 1 ) if f0_autotune: f0 = autotune_f0( self.ref_freqs, f0, f0_autotune_strength ) if proposal_pitch: up_key = proposal_f0_up_key( f0, proposal_pitch_threshold, configs["limit_f0"] ) f0_up_key += up_key return realtime_post_process( torch.from_numpy(f0).float().to(self.device), pitch, pitchf, f0_up_key, self.f0_min, self.f0_max ) def _resize_f0(self, x, target_len): if len(x) == target_len: return x source = np.array(x) source[source < 0.001] = np.nan return np.nan_to_num( np.interp( np.arange(0, len(source) * target_len, len(source)) / target_len, np.arange(0, len(source)), source ) ) def compute_f0(self, f0_method, x, p_len, filter_radius): if "pm" in f0_method: f0 = self.get_f0_pm( x, p_len, filter_radius=filter_radius, mode=f0_method.split("-")[1] ) elif f0_method.split("-")[0] in ["harvest", "dio"]: f0 = self.get_f0_pyworld( x, p_len, filter_radius, f0_method.split("-")[0], use_stonemask="stonemask" in f0_method ) elif "crepe" in f0_method: split_f0 = f0_method.split("-") f0 = ( self.get_f0_mangio_crepe( x, p_len, split_f0[2] ) ) if split_f0[0] == "mangio" else ( self.get_f0_crepe( x, p_len, split_f0[1], filter_radius=filter_radius ) ) elif "fcpe" in f0_method: f0 = self.get_f0_fcpe( x, p_len, legacy="legacy" in f0_method and "previous" not in f0_method, previous="previous" in f0_method, filter_radius=filter_radius ) elif "rmvpe" in f0_method: f0 = self.get_f0_rmvpe( x, p_len, clipping="clipping" in f0_method, filter_radius=filter_radius, hpa="hpa" in f0_method, previous="previous" in f0_method ) elif f0_method in ["yin", "pyin", "piptrack"]: f0 = self.get_f0_librosa( x, p_len, mode=f0_method, filter_radius=filter_radius ) elif "djcm" in f0_method: f0 = self.get_f0_djcm( x, p_len, clipping="clipping" in f0_method, svs="svs" in f0_method, filter_radius=filter_radius ) else: raise ValueError(translations["option_not_valid"]) if isinstance(f0, tuple): f0 = f0[0] if "medfilt" in f0_method or "svs" in f0_method: f0 = medfilt(f0, kernel_size=5) return f0 def get_f0_hybrid(self, methods_str, x, p_len, filter_radius): methods_str = re.search(r"hybrid\[(.+)\]", methods_str) if methods_str: methods = [ method.strip() for method in methods_str.group(1).split("+") ] n = len(methods) f0_stack = [] for method in methods: f0_stack.append( self._resize_f0( self.compute_f0( method, x, p_len, filter_radius ), p_len ) ) f0_mix = np.zeros(p_len) if not f0_stack: return f0_mix if len(f0_stack) == 1: return f0_stack[0] weights = (1 - np.abs(np.arange(n) / (n - 1) - (1 - self.alpha))) ** 2 weights /= weights.sum() stacked = np.vstack(f0_stack) voiced_mask = np.any(stacked > 0, axis=0) f0_mix[voiced_mask] = np.exp( np.nansum( np.log(stacked + 1e-6) * weights[:, None], axis=0 )[voiced_mask] ) return f0_mix def get_f0_pm(self, x, p_len, filter_radius=3, mode="ac"): time_step = self.window / self.sample_rate * 1000 / 1000 pm = parselmouth.Sound( x, self.sample_rate ) pm_fn = { "ac": pm.to_pitch_ac, "cc": pm.to_pitch_cc, "shs": pm.to_pitch_shs }.get(mode, pm.to_pitch_ac) pitch = ( pm_fn( time_step=time_step, voicing_threshold=filter_radius / 10 * 2, pitch_floor=self.f0_min, pitch_ceiling=self.f0_max ) ) if mode != "shs" else ( pm_fn( time_step=time_step, minimum_pitch=self.f0_min, maximum_frequency_component=self.f0_max ) ) f0 = pitch.selected_array["frequency"] pad_size = (p_len - len(f0) + 1) // 2 if pad_size > 0 or p_len - len(f0) - pad_size > 0: f0 = np.pad( f0, [[pad_size, p_len - len(f0) - pad_size]], mode="constant" ) return f0 def get_f0_mangio_crepe(self, x, p_len, model="full"): if not hasattr(self, "mangio_crepe"): from infer.lib.predictors.CREPE.CREPE import CREPE self.mangio_crepe = CREPE( os.path.join( configs["predictors_path"], f"crepe_{model}.{'onnx' if self.predictor_onnx else 'pth'}" ), model_size=model, hop_length=self.hop_length, batch_size=self.hop_length * 2, f0_min=self.f0_min, f0_max=self.f0_max, device=self.device, sample_rate=self.sample_rate, providers=self.providers, onnx=self.predictor_onnx, return_periodicity=False ) x = x.astype(np.float32) x /= np.quantile(np.abs(x), 0.999) audio = torch.from_numpy(x).to(self.device, copy=True).unsqueeze(dim=0) if audio.ndim == 2 and audio.shape[0] > 1: audio = audio.mean(dim=0, keepdim=True).detach() f0 = self.mangio_crepe.compute_f0(audio.detach(), pad=True) if self.predictor_onnx and self.delete_predictor_onnx: del self.mangio_crepe.model, self.mangio_crepe return self._resize_f0(f0.squeeze(0).cpu().float().numpy(), p_len) def get_f0_crepe(self, x, p_len, model="full", filter_radius=3): if not hasattr(self, "crepe"): from infer.lib.predictors.CREPE.CREPE import CREPE self.crepe = CREPE( os.path.join( configs["predictors_path"], f"crepe_{model}.{'onnx' if self.predictor_onnx else 'pth'}" ), model_size=model, hop_length=self.window, batch_size=self.batch_size, f0_min=self.f0_min, f0_max=self.f0_max, device=self.device, sample_rate=self.sample_rate, providers=self.providers, onnx=self.predictor_onnx, return_periodicity=True ) f0, pd = self.crepe.compute_f0(torch.tensor(np.copy(x))[None].float(), pad=True) if self.predictor_onnx and self.delete_predictor_onnx: del self.crepe.model, self.crepe f0, pd = mean(f0, filter_radius), median(pd, filter_radius) f0[pd < 0.1] = 0 return self._resize_f0(f0[0].cpu().numpy(), p_len) def get_f0_fcpe(self, x, p_len, legacy=False, previous=False, filter_radius=3): if not hasattr(self, "fcpe"): from infer.lib.predictors.FCPE.FCPE import FCPE self.fcpe = FCPE( configs, os.path.join( configs["predictors_path"], ( "fcpe_legacy" if legacy else ("fcpe" if previous else "ddsp_200k") ) + (".onnx" if self.predictor_onnx else ".pt") ), hop_length=self.hop_length, f0_min=self.f0_min, f0_max=self.f0_max, dtype=torch.float32, device=self.device, sample_rate=self.sample_rate, threshold=( filter_radius / 100 ) if legacy else ( filter_radius / 1000 * 2 ), providers=self.providers, onnx=self.predictor_onnx, legacy=legacy ) f0 = self.fcpe.compute_f0(x, p_len) if self.predictor_onnx and self.delete_predictor_onnx: del self.fcpe.fcpe.model, self.fcpe return f0 def get_f0_rmvpe(self, x, p_len, clipping=False, filter_radius=3, hpa=False, previous=False): if not hasattr(self, "rmvpe"): from infer.lib.predictors.RMVPE.RMVPE import RMVPE self.rmvpe = RMVPE( os.path.join( configs["predictors_path"], ( ( "hpa-rmvpe-76000" if previous else "hpa-rmvpe-112000" ) if hpa else "rmvpe" ) + (".onnx" if self.predictor_onnx else ".pt") ), is_half=self.is_half, device=self.device, onnx=self.predictor_onnx, providers=self.providers, hpa=hpa ) filter_radius = filter_radius / 100 f0 = ( self.rmvpe.infer_from_audio_with_pitch( x, thred=filter_radius, f0_min=self.f0_min, f0_max=self.f0_max ) ) if clipping else ( self.rmvpe.infer_from_audio( x, thred=filter_radius ) ) if self.predictor_onnx and self.delete_predictor_onnx: del self.rmvpe.model, self.rmvpe return self._resize_f0(f0, p_len) def get_f0_librosa(self, x, p_len, mode="yin", filter_radius=3): if mode != "piptrack": self.if_yin = mode == "yin" self.yin = yin if self.if_yin else pyin f0 = self.yin( x.astype(np.float32), sr=self.sample_rate, fmin=self.f0_min, fmax=self.f0_max, hop_length=self.hop_length ) if not self.if_yin: f0 = f0[0] else: pitches, magnitudes = piptrack( y=x.astype(np.float32), sr=self.sample_rate, fmin=self.f0_min, fmax=self.f0_max, hop_length=self.hop_length, threshold=filter_radius / 10 ) max_indexes = np.argmax(magnitudes, axis=0) f0 = pitches[max_indexes, range(magnitudes.shape[1])] return self._resize_f0(f0, p_len) def get_f0_djcm(self, x, p_len, clipping=False, svs=False, filter_radius=3): if not hasattr(self, "djcm"): from main.library.predictors.DJCM.DJCM import DJCM self.djcm = DJCM( os.path.join( configs["predictors_path"], ( "djcm-svs" if svs else "djcm" ) + (".onnx" if self.predictor_onnx else ".pt") ), is_half=self.is_half, device=self.device, onnx=self.predictor_onnx, svs=svs, providers=self.providers ) filter_radius /= 10 f0 = ( self.djcm.infer_from_audio_with_pitch( x, thred=filter_radius, f0_min=self.f0_min, f0_max=self.f0_max ) ) if clipping else ( self.djcm.infer_from_audio( x, thred=filter_radius ) ) if self.predictor_onnx and self.delete_predictor_onnx: del self.djcm.model, self.djcm return self._resize_f0(f0, p_len)