noblebarkrr's picture
test1
d0cd3b0
# coding: utf-8
__author__ = 'Roman Solovyev (ZFTurbo): https://github.com/ZFTurbo/'
import numpy as np
import torch
import torch.nn as nn
import yaml
import librosa
import torch.nn.functional as F
from ml_collections import ConfigDict
from omegaconf import OmegaConf
from tqdm.auto import tqdm
from typing import Dict, List, Tuple, Any, List, Optional
def load_config(model_type: str, config_path: str) -> Any:
"""
Load the configuration from the specified path based on the model type.
Parameters:
----------
model_type : str
The type of model to load (e.g., 'htdemucs', 'mdx23c', etc.).
config_path : str
The path to the YAML or OmegaConf configuration file.
Returns:
-------
config : Any
The loaded configuration, which can be in different formats (e.g., OmegaConf or ConfigDict).
Raises:
------
FileNotFoundError:
If the configuration file at `config_path` is not found.
ValueError:
If there is an error loading the configuration file.
"""
try:
with open(config_path, 'r') as f:
if model_type == 'htdemucs':
config = OmegaConf.load(config_path)
else:
config = ConfigDict(yaml.load(f, Loader=yaml.FullLoader))
return config
except FileNotFoundError:
raise FileNotFoundError(f"Configuration file not found at {config_path}")
except Exception as e:
raise ValueError(f"Error loading configuration: {e}")
def get_model_from_config(model_type: str, config_path: str) -> Tuple:
"""
Load the model specified by the model type and configuration file.
Parameters:
----------
model_type : str
The type of model to load (e.g., 'mdx23c', 'htdemucs', 'scnet', etc.).
config_path : str
The path to the configuration file (YAML or OmegaConf format).
Returns:
-------
model : nn.Module or None
The initialized model based on the `model_type`, or None if the model type is not recognized.
config : Any
The configuration used to initialize the model. This could be in different formats
depending on the model type (e.g., OmegaConf, ConfigDict).
Raises:
------
ValueError:
If the `model_type` is unknown or an error occurs during model initialization.
"""
config = load_config(model_type, config_path)
if model_type == 'mdx23c':
from models.mdx23c_tfc_tdf_v3 import TFC_TDF_net
model = TFC_TDF_net(config)
elif model_type == 'htdemucs':
from models.demucs4ht import get_model
model = get_model(config)
elif model_type == 'segm_models':
from models.segm_models import Segm_Models_Net
model = Segm_Models_Net(config)
elif model_type == 'torchseg':
from models.torchseg_models import Torchseg_Net
model = Torchseg_Net(config)
elif model_type == 'mel_band_roformer':
from models.bs_roformer import MelBandRoformer
model = MelBandRoformer(**dict(config.model))
elif model_type == 'bs_roformer':
if hasattr(config.model, 'use_shared_bias'):
from models.bs_roformer.bs_roformer_sw import BSRoformer_SW
model = BSRoformer_SW(**dict(config.model))
else:
from models.bs_roformer import BSRoformer
model = BSRoformer(**dict(config.model))
elif model_type == 'swin_upernet':
from models.upernet_swin_transformers import Swin_UperNet_Model
model = Swin_UperNet_Model(config)
elif model_type == 'bandit':
from models.bandit.core.model import MultiMaskMultiSourceBandSplitRNNSimple
model = MultiMaskMultiSourceBandSplitRNNSimple(**config.model)
elif model_type == 'bandit_v2':
from models.bandit_v2.bandit import Bandit
model = Bandit(**config.kwargs)
elif model_type == 'scnet_unofficial':
from models.scnet_unofficial import SCNet
model = SCNet(**config.model)
elif model_type == 'scnet':
from models.scnet import SCNet
model = SCNet(**config.model)
elif model_type == 'apollo':
from models.look2hear.models import BaseModel
model = BaseModel.apollo(**config.model)
elif model_type == 'bs_mamba2':
from models.ts_bs_mamba2 import Separator
model = Separator(**config.model)
else:
raise ValueError(f"Unknown model type: {model_type}")
return model, config
def _getWindowingArray(window_size: int, fade_size: int) -> torch.Tensor:
"""
Generate a windowing array with a linear fade-in at the beginning and a fade-out at the end.
This function creates a window of size `window_size` where the first `fade_size` elements
linearly increase from 0 to 1 (fade-in) and the last `fade_size` elements linearly decrease
from 1 to 0 (fade-out). The middle part of the window is filled with ones.
Parameters:
----------
window_size : int
The total size of the window.
fade_size : int
The size of the fade-in and fade-out regions.
Returns:
-------
torch.Tensor
A tensor of shape (window_size,) containing the generated windowing array.
Example:
-------
If `window_size=10` and `fade_size=3`, the output will be:
tensor([0.0000, 0.5000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 0.5000, 0.0000])
"""
fadein = torch.linspace(0, 1, fade_size)
fadeout = torch.linspace(1, 0, fade_size)
window = torch.ones(window_size)
window[-fade_size:] = fadeout
window[:fade_size] = fadein
return window
def demix(
config: ConfigDict,
model: torch.nn.Module,
mix: torch.Tensor,
device: torch.device,
model_type: str,
pbar: bool = False
) -> Tuple[List[Dict[str, np.ndarray]], np.ndarray]:
"""
Unified function for audio source separation with support for multiple processing modes.
This function separates audio into its constituent sources using either a generic custom logic
or a Demucs-specific logic. It supports batch processing and overlapping window-based chunking
for efficient and artifact-free separation.
Parameters:
----------
config : ConfigDict
Configuration object containing audio and inference settings.
model : torch.nn.Module
The trained model used for audio source separation.
mix : torch.Tensor
Input audio tensor with shape (channels, time).
device : torch.device
The computation device (CPU or CUDA).
model_type : str, optional
Processing mode:
- "demucs" for logic specific to the Demucs model.
Default is "generic".
pbar : bool, optional
If True, displays a progress bar during chunk processing. Default is False.
Returns:
-------
Union[Dict[str, np.ndarray], np.ndarray]
- A dictionary mapping target instruments to separated audio sources if multiple instruments are present.
- A numpy array of the separated source if only one instrument is present.
"""
mix = torch.tensor(mix, dtype=torch.float32)
if model_type == 'htdemucs':
mode = 'demucs'
else:
mode = 'generic'
# Define processing parameters based on the mode
if mode == 'demucs':
chunk_size = config.training.samplerate * config.training.segment
num_instruments = len(config.training.instruments)
num_overlap = config.inference.num_overlap
step = chunk_size // num_overlap
else:
chunk_size = config.audio.chunk_size
num_instruments = len(prefer_target_instrument(config))
num_overlap = config.inference.num_overlap
fade_size = chunk_size // 10
step = chunk_size // num_overlap
border = chunk_size - step
length_init = mix.shape[-1]
windowing_array = _getWindowingArray(chunk_size, fade_size)
# Add padding for generic mode to handle edge artifacts
if length_init > 2 * border and border > 0:
mix = nn.functional.pad(mix, (border, border), mode="reflect")
batch_size = config.inference.batch_size
use_amp = getattr(config.training, 'use_amp', True) # Works for both OmegaConf and ConfigDict
with torch.cuda.amp.autocast(enabled=use_amp):
with torch.inference_mode():
# Initialize result and counter tensors
req_shape = (num_instruments,) + mix.shape
result = torch.zeros(req_shape, dtype=torch.float32)
counter = torch.zeros(req_shape, dtype=torch.float32)
i = 0
batch_data = []
batch_locations = []
progress_bar = tqdm(
total=mix.shape[1], desc="Обработка аудио фрагментов", leave=False
) if pbar else None
while i < mix.shape[1]:
# Extract chunk and apply padding if necessary
part = mix[:, i:i + chunk_size].to(device)
chunk_len = part.shape[-1]
if mode == "generic" and chunk_len > chunk_size // 2:
pad_mode = "reflect"
else:
pad_mode = "constant"
part = nn.functional.pad(part, (0, chunk_size - chunk_len), mode=pad_mode, value=0)
batch_data.append(part)
batch_locations.append((i, chunk_len))
i += step
# Process batch if it's full or the end is reached
if len(batch_data) >= batch_size or i >= mix.shape[1]:
arr = torch.stack(batch_data, dim=0)
x = model(arr)
if mode == "generic":
window = windowing_array.clone() # fix for clicks issue with batch_size=1
if i - step == 0: # First audio chunk, no fadein
window[:fade_size] = 1
elif i >= mix.shape[1]: # Last audio chunk, no fadeout
window[-fade_size:] = 1
for j, (start, seg_len) in enumerate(batch_locations):
if mode == "generic":
result[..., start:start + seg_len] += x[j, ..., :seg_len].cpu() * window[..., :seg_len]
counter[..., start:start + seg_len] += window[..., :seg_len]
else:
result[..., start:start + seg_len] += x[j, ..., :seg_len].cpu()
counter[..., start:start + seg_len] += 1.0
batch_data.clear()
batch_locations.clear()
if progress_bar:
progress_bar.update(step)
if progress_bar:
progress_bar.close()
# Compute final estimated sources
estimated_sources = result / counter
estimated_sources = estimated_sources.cpu().numpy()
np.nan_to_num(estimated_sources, copy=False, nan=0.0)
# Remove padding for generic mode
if mode == "generic":
if length_init > 2 * border and border > 0:
estimated_sources = estimated_sources[..., border:-border]
# Return the result as a dictionary or a single array
if mode == "demucs":
instruments = config.training.instruments
else:
instruments = prefer_target_instrument(config)
ret_data = {k: v for k, v in zip(instruments, estimated_sources)}
if mode == "demucs" and num_instruments <= 1:
return estimated_sources
else:
return ret_data
def demix_demucs(config, model, mix, device, model_type, pbar=False):
mix = torch.tensor(mix, dtype=torch.float32)
if model_type == 'htdemucs':
mode = 'demucs'
else:
mode = 'generic'
if mode == 'demucs':
chunk_size = config.training.samplerate * config.training.segment
num_instruments = len(config.training.instruments)
num_overlap = config.inference.num_overlap
step = chunk_size // num_overlap
fade_size = chunk_size // 10 # Добавляем fade_size для оконной функции
windowing_array = _getWindowingArray(chunk_size, fade_size) # Создаём окно
else:
chunk_size = config.audio.chunk_size
num_instruments = len(prefer_target_instrument(config))
num_overlap = config.inference.num_overlap
fade_size = chunk_size // 10
step = chunk_size // num_overlap
border = chunk_size - step
length_init = mix.shape[-1]
windowing_array = _getWindowingArray(chunk_size, fade_size)
if length_init > 2 * border and border > 0:
mix = nn.functional.pad(mix, (border, border), mode="reflect")
batch_size = config.inference.batch_size
use_amp = getattr(config.training, 'use_amp', True)
with torch.cuda.amp.autocast(enabled=use_amp):
with torch.inference_mode():
req_shape = (num_instruments,) + mix.shape
result = torch.zeros(req_shape, dtype=torch.float32)
counter = torch.zeros(req_shape, dtype=torch.float32)
i = 0
batch_data = []
batch_locations = []
progress_bar = tqdm(total=mix.shape[1], desc="Обработка аудио фрагментов", leave=False) if pbar else None
while i < mix.shape[1]:
part = mix[:, i:i + chunk_size].to(device)
chunk_len = part.shape[-1]
pad_mode = "reflect" if chunk_len > chunk_size // 2 else "constant"
part = nn.functional.pad(part, (0, chunk_size - chunk_len), mode=pad_mode, value=0)
batch_data.append(part)
batch_locations.append((i, chunk_len))
i += step
if len(batch_data) >= batch_size or i >= mix.shape[1]:
arr = torch.stack(batch_data, dim=0)
x = model(arr)
window = windowing_array.clone()
if i - step == 0: # Первый чанк, без fade-in
window[:fade_size] = 1
elif i >= mix.shape[1]: # Последний чанк, без fade-out
window[-fade_size:] = 1
for j, (start, seg_len) in enumerate(batch_locations):
result[..., start:start + seg_len] += x[j, ..., :seg_len].cpu() * window[..., :seg_len]
counter[..., start:start + seg_len] += window[..., :seg_len]
batch_data.clear()
batch_locations.clear()
if progress_bar:
progress_bar.update(step)
if progress_bar:
progress_bar.close()
estimated_sources = result / counter
estimated_sources = estimated_sources.cpu().numpy()
np.nan_to_num(estimated_sources, copy=False, nan=0.0)
if mode == "demucs" and num_instruments <= 1:
return estimated_sources
else:
instruments = config.training.instruments
return {k: v for k, v in zip(instruments, estimated_sources)}
def sdr(references: np.ndarray, estimates: np.ndarray) -> np.ndarray:
"""
Compute Signal-to-Distortion Ratio (SDR) for one or more audio tracks.
SDR is a measure of how well the predicted source (estimate) matches the reference source.
It is calculated as the ratio of the energy of the reference signal to the energy of the error (difference between reference and estimate).
Return SDR in decibels (dB)
Parameters:
----------
references : np.ndarray
A 3D numpy array of shape (num_sources, num_channels, num_samples), where num_sources is the number of sources,
num_channels is the number of channels (e.g., 1 for mono, 2 for stereo), and num_samples is the length of the audio signal.
estimates : np.ndarray
A 3D numpy array of shape (num_sources, num_channels, num_samples) representing the estimated sources.
Returns:
-------
np.ndarray
A 1D numpy array containing the SDR values for each source.
"""
eps = 1e-8 # to avoid numerical errors
num = np.sum(np.square(references), axis=(1, 2))
den = np.sum(np.square(references - estimates), axis=(1, 2))
num += eps
den += eps
return 10 * np.log10(num / den)
def si_sdr(reference: np.ndarray, estimate: np.ndarray) -> float:
"""
Compute Scale-Invariant Signal-to-Distortion Ratio (SI-SDR) for one or more audio tracks.
SI-SDR is a variant of the SDR metric that is invariant to the scaling of the estimate relative to the reference.
It is calculated by scaling the estimate to match the reference signal and then computing the SDR.
Parameters:
----------
reference : np.ndarray
A 3D numpy array of shape (num_sources, num_channels, num_samples), where num_sources is the number of sources,
num_channels is the number of channels (e.g., 1 for mono, 2 for stereo), and num_samples is the length of the audio signal.
estimate : np.ndarray
A 3D numpy array of shape (num_sources, num_channels, num_samples) representing the estimated sources.
Returns:
-------
float
The SI-SDR value for the source. It is a scalar representing the Signal-to-Distortion Ratio in decibels (dB).
"""
eps = 1e-8 # To avoid numerical errors
scale = np.sum(estimate * reference + eps, axis=(0, 1)) / np.sum(reference ** 2 + eps, axis=(0, 1))
scale = np.expand_dims(scale, axis=(0, 1)) # Reshape to [num_sources, 1]
reference = reference * scale
si_sdr = np.mean(10 * np.log10(
np.sum(reference ** 2, axis=(0, 1)) / (np.sum((reference - estimate) ** 2, axis=(0, 1)) + eps) + eps))
return si_sdr
def L1Freq_metric(
reference: np.ndarray,
estimate: np.ndarray,
fft_size: int = 2048,
hop_size: int = 1024,
device: str = 'cpu'
) -> float:
"""
Compute the L1 Frequency Metric between the reference and estimated audio signals.
This metric compares the magnitude spectrograms of the reference and estimated audio signals
using the Short-Time Fourier Transform (STFT) and calculates the L1 loss between them. The result
is scaled to the range [0, 100] where a higher value indicates better performance.
Parameters:
----------
reference : np.ndarray
A 2D numpy array of shape (num_channels, num_samples) representing the reference (ground truth) audio signal.
estimate : np.ndarray
A 2D numpy array of shape (num_channels, num_samples) representing the estimated (predicted) audio signal.
fft_size : int, optional
The size of the FFT (Short-Time Fourier Transform). Default is 2048.
hop_size : int, optional
The hop size between STFT frames. Default is 1024.
device : str, optional
The device to run the computation on ('cpu' or 'cuda'). Default is 'cpu'.
Returns:
-------
float
The L1 Frequency Metric in the range [0, 100], where higher values indicate better performance.
"""
reference = torch.from_numpy(reference).to(device)
estimate = torch.from_numpy(estimate).to(device)
reference_stft = torch.stft(reference, fft_size, hop_size, return_complex=True)
estimated_stft = torch.stft(estimate, fft_size, hop_size, return_complex=True)
reference_mag = torch.abs(reference_stft)
estimate_mag = torch.abs(estimated_stft)
loss = 10 * F.l1_loss(estimate_mag, reference_mag)
ret = 100 / (1. + float(loss.cpu().numpy()))
return ret
def LogWMSE_metric(
reference: np.ndarray,
estimate: np.ndarray,
mixture: np.ndarray,
device: str = 'cpu',
) -> float:
"""
Calculate the Log-WMSE (Logarithmic Weighted Mean Squared Error) between the reference, estimate, and mixture signals.
This metric evaluates the quality of the estimated signal compared to the reference signal in the
context of audio source separation. The result is given in logarithmic scale, which helps in evaluating
signals with large amplitude differences.
Parameters:
----------
reference : np.ndarray
The ground truth audio signal of shape (channels, time), where channels is the number of audio channels
(e.g., 1 for mono, 2 for stereo) and time is the length of the audio in samples.
estimate : np.ndarray
The estimated audio signal of shape (channels, time).
mixture : np.ndarray
The mixed audio signal of shape (channels, time).
device : str, optional
The device to run the computation on, either 'cpu' or 'cuda'. Default is 'cpu'.
Returns:
-------
float
The Log-WMSE value, which quantifies the difference between the reference and estimated signal on a logarithmic scale.
"""
from torch_log_wmse import LogWMSE
log_wmse = LogWMSE(
audio_length=reference.shape[-1] / 44100, # audio length in seconds
sample_rate=44100, # sample rate of 44100 Hz
return_as_loss=False, # return as loss (False means return as metric)
bypass_filter=False, # bypass frequency filtering (False means apply filter)
)
reference = torch.from_numpy(reference).unsqueeze(0).unsqueeze(0).to(device)
estimate = torch.from_numpy(estimate).unsqueeze(0).unsqueeze(0).to(device)
mixture = torch.from_numpy(mixture).unsqueeze(0).to(device)
res = log_wmse(mixture, reference, estimate)
return float(res.cpu().numpy())
def AuraSTFT_metric(
reference: np.ndarray,
estimate: np.ndarray,
device: str = 'cpu',
) -> float:
"""
Calculate the AuraSTFT metric, which evaluates the spectral difference between the reference and estimated
audio signals using Short-Time Fourier Transform (STFT) loss.
The AuraSTFT metric computes the STFT loss in both logarithmic and linear magnitudes, and it is commonly used
to assess the quality of audio separation tasks. The result is returned as a value scaled to the range [0, 100].
Parameters:
----------
reference : np.ndarray
The ground truth audio signal of shape (channels, time), where channels is the number of audio channels
(e.g., 1 for mono, 2 for stereo) and time is the length of the audio in samples.
estimate : np.ndarray
The estimated audio signal of shape (channels, time).
device : str, optional
The device to run the computation on, either 'cpu' or 'cuda'. Default is 'cpu'.
Returns:
-------
float
The AuraSTFT metric value, scaled to the range [0, 100], which quantifies the difference between
the reference and estimated signal in the spectral domain.
"""
from auraloss.freq import STFTLoss
stft_loss = STFTLoss(
w_log_mag=1.0, # weight for log magnitude
w_lin_mag=0.0, # weight for linear magnitude
w_sc=1.0, # weight for spectral centroid
device=device,
)
reference = torch.from_numpy(reference).unsqueeze(0).to(device)
estimate = torch.from_numpy(estimate).unsqueeze(0).to(device)
res = 100 / (1. + 10 * stft_loss(reference, estimate))
return float(res.cpu().numpy())
def AuraMRSTFT_metric(
reference: np.ndarray,
estimate: np.ndarray,
device: str = 'cpu',
) -> float:
"""
Calculate the AuraMRSTFT metric, which evaluates the spectral difference between the reference and estimated
audio signals using Multi-Resolution Short-Time Fourier Transform (STFT) loss.
The AuraMRSTFT metric uses multi-resolution STFT analysis, which allows better representation of both
low- and high-frequency components in the audio signals. The result is returned as a value scaled to the range [0, 100].
Parameters:
----------
reference : np.ndarray
The ground truth audio signal of shape (channels, time), where channels is the number of audio channels
(e.g., 1 for mono, 2 for stereo) and time is the length of the audio in samples.
estimate : np.ndarray
The estimated audio signal of shape (channels, time).
device : str, optional
The device to run the computation on, either 'cpu' or 'cuda'. Default is 'cpu'.
Returns:
-------
float
The AuraMRSTFT metric value, scaled to the range [0, 100], which quantifies the difference between
the reference and estimated signal in the multi-resolution spectral domain.
"""
from auraloss.freq import MultiResolutionSTFTLoss
mrstft_loss = MultiResolutionSTFTLoss(
fft_sizes=[1024, 2048, 4096],
hop_sizes=[256, 512, 1024],
win_lengths=[1024, 2048, 4096],
scale="mel", # mel scale for frequency resolution
n_bins=128, # number of bins for mel scale
sample_rate=44100,
perceptual_weighting=True, # apply perceptual weighting
device=device
)
reference = torch.from_numpy(reference).unsqueeze(0).float().to(device)
estimate = torch.from_numpy(estimate).unsqueeze(0).float().to(device)
res = 100 / (1. + 10 * mrstft_loss(reference, estimate))
return float(res.cpu().numpy())
def bleed_full(
reference: np.ndarray,
estimate: np.ndarray,
sr: int = 44100,
n_fft: int = 4096,
hop_length: int = 1024,
n_mels: int = 512,
device: str = 'cpu',
) -> Tuple[float, float]:
"""
Calculate the 'bleed' and 'fullness' metrics between a reference and an estimated audio signal.
The 'bleed' metric measures how much the estimated signal bleeds into the reference signal,
while the 'fullness' metric measures how much the estimated signal retains its distinctiveness
in relation to the reference signal, both using mel spectrograms and decibel scaling.
Parameters:
----------
reference : np.ndarray
The reference audio signal, shape (channels, time), where channels is the number of audio channels
(e.g., 1 for mono, 2 for stereo) and time is the length of the audio in samples.
estimate : np.ndarray
The estimated audio signal, shape (channels, time).
sr : int, optional
The sample rate of the audio signals. Default is 44100 Hz.
n_fft : int, optional
The FFT size used to compute the STFT. Default is 4096.
hop_length : int, optional
The hop length for STFT computation. Default is 1024.
n_mels : int, optional
The number of mel frequency bins. Default is 512.
device : str, optional
The device for computation, either 'cpu' or 'cuda'. Default is 'cpu'.
Returns:
-------
tuple
A tuple containing two values:
- `bleedless` (float): A score indicating how much 'bleeding' the estimated signal has (higher is better).
- `fullness` (float): A score indicating how 'full' the estimated signal is (higher is better).
"""
from torchaudio.transforms import AmplitudeToDB
reference = torch.from_numpy(reference).float().to(device)
estimate = torch.from_numpy(estimate).float().to(device)
window = torch.hann_window(n_fft).to(device)
# Compute STFTs with the Hann window
D1 = torch.abs(torch.stft(reference, n_fft=n_fft, hop_length=hop_length, window=window, return_complex=True,
pad_mode="constant"))
D2 = torch.abs(torch.stft(estimate, n_fft=n_fft, hop_length=hop_length, window=window, return_complex=True,
pad_mode="constant"))
mel_basis = librosa.filters.mel(sr=sr, n_fft=n_fft, n_mels=n_mels)
mel_filter_bank = torch.from_numpy(mel_basis).to(device)
S1_mel = torch.matmul(mel_filter_bank, D1)
S2_mel = torch.matmul(mel_filter_bank, D2)
S1_db = AmplitudeToDB(stype="magnitude", top_db=80)(S1_mel)
S2_db = AmplitudeToDB(stype="magnitude", top_db=80)(S2_mel)
diff = S2_db - S1_db
positive_diff = diff[diff > 0]
negative_diff = diff[diff < 0]
average_positive = torch.mean(positive_diff) if positive_diff.numel() > 0 else torch.tensor(0.0).to(device)
average_negative = torch.mean(negative_diff) if negative_diff.numel() > 0 else torch.tensor(0.0).to(device)
bleedless = 100 * 1 / (average_positive + 1)
fullness = 100 * 1 / (-average_negative + 1)
return bleedless.cpu().numpy(), fullness.cpu().numpy()
def get_metrics(
metrics: List[str],
reference: np.ndarray,
estimate: np.ndarray,
mix: np.ndarray,
device: str = 'cpu',
) -> Dict[str, float]:
"""
Calculate a list of metrics to evaluate the performance of audio source separation models.
The function computes the specified metrics based on the reference, estimate, and mixture.
Parameters:
----------
metrics : List[str]
A list of metric names to compute (e.g., ['sdr', 'si_sdr', 'l1_freq']).
reference : np.ndarray
The reference audio (true signal) with shape (channels, length).
estimate : np.ndarray
The estimated audio (predicted signal) with shape (channels, length).
mix : np.ndarray
The mixed audio signal with shape (channels, length).
device : str, optional, default='cpu'
The device ('cpu' or 'cuda') to perform the calculations on.
Returns:
-------
Dict[str, float]
A dictionary containing the computed metric values.
"""
result = dict()
# Adjust the length to be the same across all inputs
min_length = min(reference.shape[1], estimate.shape[1])
reference = reference[..., :min_length]
estimate = estimate[..., :min_length]
mix = mix[..., :min_length]
if 'sdr' in metrics:
references = np.expand_dims(reference, axis=0)
estimates = np.expand_dims(estimate, axis=0)
result['sdr'] = sdr(references, estimates)[0]
if 'si_sdr' in metrics:
result['si_sdr'] = si_sdr(reference, estimate)
if 'l1_freq' in metrics:
result['l1_freq'] = L1Freq_metric(reference, estimate, device=device)
if 'log_wmse' in metrics:
result['log_wmse'] = LogWMSE_metric(reference, estimate, mix, device)
if 'aura_stft' in metrics:
result['aura_stft'] = AuraSTFT_metric(reference, estimate, device)
if 'aura_mrstft' in metrics:
result['aura_mrstft'] = AuraMRSTFT_metric(reference, estimate, device)
if 'bleedless' in metrics or 'fullness' in metrics:
bleedless, fullness = bleed_full(reference, estimate, device=device)
if 'bleedless' in metrics:
result['bleedless'] = bleedless
if 'fullness' in metrics:
result['fullness'] = fullness
return result
def prefer_target_instrument(config: ConfigDict) -> List[str]:
"""
Return the list of target instruments based on the configuration.
If a specific target instrument is specified in the configuration,
it returns a list with that instrument. Otherwise, it returns the list of instruments.
Parameters:
----------
config : ConfigDict
Configuration object containing the list of instruments or the target instrument.
Returns:
-------
List[str]
A list of target instruments.
"""
if config.training.get('target_instrument'):
return [config.training.target_instrument]
else:
return config.training.instruments
def prefer_target_instrument_test(config: ConfigDict, selected_instruments: Optional[List[str]] = None) -> List[str]:
"""
Return the list of target instruments based on the configuration and selected instruments.
If selected_instruments is specified, returns the intersection with available instruments.
Otherwise, if a target instrument is specified, returns it, else returns all instruments.
Parameters:
----------
config : ConfigDict
Configuration object containing the list of instruments or the target instrument.
selected_instruments : Optional[List[str]]
List of instruments to select (optional)
Returns:
-------
List[str]
A list of target instruments.
"""
available_instruments = config.training.instruments
if selected_instruments is not None:
# Return only selected instruments that are available
return [instr for instr in selected_instruments if instr in available_instruments]
elif config.training.get('target_instrument'):
# Default behavior if no selection - return target instrument
return [config.training.target_instrument]
else:
# If no target and no selection, return all instruments
return available_instruments