Spaces:
Running
on
Zero
Running
on
Zero
File size: 32,913 Bytes
d0cd3b0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 |
# coding: utf-8
__author__ = 'Roman Solovyev (ZFTurbo): https://github.com/ZFTurbo/'
import numpy as np
import torch
import torch.nn as nn
import yaml
import librosa
import torch.nn.functional as F
from ml_collections import ConfigDict
from omegaconf import OmegaConf
from tqdm.auto import tqdm
from typing import Dict, List, Tuple, Any, List, Optional
def load_config(model_type: str, config_path: str) -> Any:
"""
Load the configuration from the specified path based on the model type.
Parameters:
----------
model_type : str
The type of model to load (e.g., 'htdemucs', 'mdx23c', etc.).
config_path : str
The path to the YAML or OmegaConf configuration file.
Returns:
-------
config : Any
The loaded configuration, which can be in different formats (e.g., OmegaConf or ConfigDict).
Raises:
------
FileNotFoundError:
If the configuration file at `config_path` is not found.
ValueError:
If there is an error loading the configuration file.
"""
try:
with open(config_path, 'r') as f:
if model_type == 'htdemucs':
config = OmegaConf.load(config_path)
else:
config = ConfigDict(yaml.load(f, Loader=yaml.FullLoader))
return config
except FileNotFoundError:
raise FileNotFoundError(f"Configuration file not found at {config_path}")
except Exception as e:
raise ValueError(f"Error loading configuration: {e}")
def get_model_from_config(model_type: str, config_path: str) -> Tuple:
"""
Load the model specified by the model type and configuration file.
Parameters:
----------
model_type : str
The type of model to load (e.g., 'mdx23c', 'htdemucs', 'scnet', etc.).
config_path : str
The path to the configuration file (YAML or OmegaConf format).
Returns:
-------
model : nn.Module or None
The initialized model based on the `model_type`, or None if the model type is not recognized.
config : Any
The configuration used to initialize the model. This could be in different formats
depending on the model type (e.g., OmegaConf, ConfigDict).
Raises:
------
ValueError:
If the `model_type` is unknown or an error occurs during model initialization.
"""
config = load_config(model_type, config_path)
if model_type == 'mdx23c':
from models.mdx23c_tfc_tdf_v3 import TFC_TDF_net
model = TFC_TDF_net(config)
elif model_type == 'htdemucs':
from models.demucs4ht import get_model
model = get_model(config)
elif model_type == 'segm_models':
from models.segm_models import Segm_Models_Net
model = Segm_Models_Net(config)
elif model_type == 'torchseg':
from models.torchseg_models import Torchseg_Net
model = Torchseg_Net(config)
elif model_type == 'mel_band_roformer':
from models.bs_roformer import MelBandRoformer
model = MelBandRoformer(**dict(config.model))
elif model_type == 'bs_roformer':
if hasattr(config.model, 'use_shared_bias'):
from models.bs_roformer.bs_roformer_sw import BSRoformer_SW
model = BSRoformer_SW(**dict(config.model))
else:
from models.bs_roformer import BSRoformer
model = BSRoformer(**dict(config.model))
elif model_type == 'swin_upernet':
from models.upernet_swin_transformers import Swin_UperNet_Model
model = Swin_UperNet_Model(config)
elif model_type == 'bandit':
from models.bandit.core.model import MultiMaskMultiSourceBandSplitRNNSimple
model = MultiMaskMultiSourceBandSplitRNNSimple(**config.model)
elif model_type == 'bandit_v2':
from models.bandit_v2.bandit import Bandit
model = Bandit(**config.kwargs)
elif model_type == 'scnet_unofficial':
from models.scnet_unofficial import SCNet
model = SCNet(**config.model)
elif model_type == 'scnet':
from models.scnet import SCNet
model = SCNet(**config.model)
elif model_type == 'apollo':
from models.look2hear.models import BaseModel
model = BaseModel.apollo(**config.model)
elif model_type == 'bs_mamba2':
from models.ts_bs_mamba2 import Separator
model = Separator(**config.model)
else:
raise ValueError(f"Unknown model type: {model_type}")
return model, config
def _getWindowingArray(window_size: int, fade_size: int) -> torch.Tensor:
"""
Generate a windowing array with a linear fade-in at the beginning and a fade-out at the end.
This function creates a window of size `window_size` where the first `fade_size` elements
linearly increase from 0 to 1 (fade-in) and the last `fade_size` elements linearly decrease
from 1 to 0 (fade-out). The middle part of the window is filled with ones.
Parameters:
----------
window_size : int
The total size of the window.
fade_size : int
The size of the fade-in and fade-out regions.
Returns:
-------
torch.Tensor
A tensor of shape (window_size,) containing the generated windowing array.
Example:
-------
If `window_size=10` and `fade_size=3`, the output will be:
tensor([0.0000, 0.5000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 0.5000, 0.0000])
"""
fadein = torch.linspace(0, 1, fade_size)
fadeout = torch.linspace(1, 0, fade_size)
window = torch.ones(window_size)
window[-fade_size:] = fadeout
window[:fade_size] = fadein
return window
def demix(
config: ConfigDict,
model: torch.nn.Module,
mix: torch.Tensor,
device: torch.device,
model_type: str,
pbar: bool = False
) -> Tuple[List[Dict[str, np.ndarray]], np.ndarray]:
"""
Unified function for audio source separation with support for multiple processing modes.
This function separates audio into its constituent sources using either a generic custom logic
or a Demucs-specific logic. It supports batch processing and overlapping window-based chunking
for efficient and artifact-free separation.
Parameters:
----------
config : ConfigDict
Configuration object containing audio and inference settings.
model : torch.nn.Module
The trained model used for audio source separation.
mix : torch.Tensor
Input audio tensor with shape (channels, time).
device : torch.device
The computation device (CPU or CUDA).
model_type : str, optional
Processing mode:
- "demucs" for logic specific to the Demucs model.
Default is "generic".
pbar : bool, optional
If True, displays a progress bar during chunk processing. Default is False.
Returns:
-------
Union[Dict[str, np.ndarray], np.ndarray]
- A dictionary mapping target instruments to separated audio sources if multiple instruments are present.
- A numpy array of the separated source if only one instrument is present.
"""
mix = torch.tensor(mix, dtype=torch.float32)
if model_type == 'htdemucs':
mode = 'demucs'
else:
mode = 'generic'
# Define processing parameters based on the mode
if mode == 'demucs':
chunk_size = config.training.samplerate * config.training.segment
num_instruments = len(config.training.instruments)
num_overlap = config.inference.num_overlap
step = chunk_size // num_overlap
else:
chunk_size = config.audio.chunk_size
num_instruments = len(prefer_target_instrument(config))
num_overlap = config.inference.num_overlap
fade_size = chunk_size // 10
step = chunk_size // num_overlap
border = chunk_size - step
length_init = mix.shape[-1]
windowing_array = _getWindowingArray(chunk_size, fade_size)
# Add padding for generic mode to handle edge artifacts
if length_init > 2 * border and border > 0:
mix = nn.functional.pad(mix, (border, border), mode="reflect")
batch_size = config.inference.batch_size
use_amp = getattr(config.training, 'use_amp', True) # Works for both OmegaConf and ConfigDict
with torch.cuda.amp.autocast(enabled=use_amp):
with torch.inference_mode():
# Initialize result and counter tensors
req_shape = (num_instruments,) + mix.shape
result = torch.zeros(req_shape, dtype=torch.float32)
counter = torch.zeros(req_shape, dtype=torch.float32)
i = 0
batch_data = []
batch_locations = []
progress_bar = tqdm(
total=mix.shape[1], desc="Обработка аудио фрагментов", leave=False
) if pbar else None
while i < mix.shape[1]:
# Extract chunk and apply padding if necessary
part = mix[:, i:i + chunk_size].to(device)
chunk_len = part.shape[-1]
if mode == "generic" and chunk_len > chunk_size // 2:
pad_mode = "reflect"
else:
pad_mode = "constant"
part = nn.functional.pad(part, (0, chunk_size - chunk_len), mode=pad_mode, value=0)
batch_data.append(part)
batch_locations.append((i, chunk_len))
i += step
# Process batch if it's full or the end is reached
if len(batch_data) >= batch_size or i >= mix.shape[1]:
arr = torch.stack(batch_data, dim=0)
x = model(arr)
if mode == "generic":
window = windowing_array.clone() # fix for clicks issue with batch_size=1
if i - step == 0: # First audio chunk, no fadein
window[:fade_size] = 1
elif i >= mix.shape[1]: # Last audio chunk, no fadeout
window[-fade_size:] = 1
for j, (start, seg_len) in enumerate(batch_locations):
if mode == "generic":
result[..., start:start + seg_len] += x[j, ..., :seg_len].cpu() * window[..., :seg_len]
counter[..., start:start + seg_len] += window[..., :seg_len]
else:
result[..., start:start + seg_len] += x[j, ..., :seg_len].cpu()
counter[..., start:start + seg_len] += 1.0
batch_data.clear()
batch_locations.clear()
if progress_bar:
progress_bar.update(step)
if progress_bar:
progress_bar.close()
# Compute final estimated sources
estimated_sources = result / counter
estimated_sources = estimated_sources.cpu().numpy()
np.nan_to_num(estimated_sources, copy=False, nan=0.0)
# Remove padding for generic mode
if mode == "generic":
if length_init > 2 * border and border > 0:
estimated_sources = estimated_sources[..., border:-border]
# Return the result as a dictionary or a single array
if mode == "demucs":
instruments = config.training.instruments
else:
instruments = prefer_target_instrument(config)
ret_data = {k: v for k, v in zip(instruments, estimated_sources)}
if mode == "demucs" and num_instruments <= 1:
return estimated_sources
else:
return ret_data
def demix_demucs(config, model, mix, device, model_type, pbar=False):
mix = torch.tensor(mix, dtype=torch.float32)
if model_type == 'htdemucs':
mode = 'demucs'
else:
mode = 'generic'
if mode == 'demucs':
chunk_size = config.training.samplerate * config.training.segment
num_instruments = len(config.training.instruments)
num_overlap = config.inference.num_overlap
step = chunk_size // num_overlap
fade_size = chunk_size // 10 # Добавляем fade_size для оконной функции
windowing_array = _getWindowingArray(chunk_size, fade_size) # Создаём окно
else:
chunk_size = config.audio.chunk_size
num_instruments = len(prefer_target_instrument(config))
num_overlap = config.inference.num_overlap
fade_size = chunk_size // 10
step = chunk_size // num_overlap
border = chunk_size - step
length_init = mix.shape[-1]
windowing_array = _getWindowingArray(chunk_size, fade_size)
if length_init > 2 * border and border > 0:
mix = nn.functional.pad(mix, (border, border), mode="reflect")
batch_size = config.inference.batch_size
use_amp = getattr(config.training, 'use_amp', True)
with torch.cuda.amp.autocast(enabled=use_amp):
with torch.inference_mode():
req_shape = (num_instruments,) + mix.shape
result = torch.zeros(req_shape, dtype=torch.float32)
counter = torch.zeros(req_shape, dtype=torch.float32)
i = 0
batch_data = []
batch_locations = []
progress_bar = tqdm(total=mix.shape[1], desc="Обработка аудио фрагментов", leave=False) if pbar else None
while i < mix.shape[1]:
part = mix[:, i:i + chunk_size].to(device)
chunk_len = part.shape[-1]
pad_mode = "reflect" if chunk_len > chunk_size // 2 else "constant"
part = nn.functional.pad(part, (0, chunk_size - chunk_len), mode=pad_mode, value=0)
batch_data.append(part)
batch_locations.append((i, chunk_len))
i += step
if len(batch_data) >= batch_size or i >= mix.shape[1]:
arr = torch.stack(batch_data, dim=0)
x = model(arr)
window = windowing_array.clone()
if i - step == 0: # Первый чанк, без fade-in
window[:fade_size] = 1
elif i >= mix.shape[1]: # Последний чанк, без fade-out
window[-fade_size:] = 1
for j, (start, seg_len) in enumerate(batch_locations):
result[..., start:start + seg_len] += x[j, ..., :seg_len].cpu() * window[..., :seg_len]
counter[..., start:start + seg_len] += window[..., :seg_len]
batch_data.clear()
batch_locations.clear()
if progress_bar:
progress_bar.update(step)
if progress_bar:
progress_bar.close()
estimated_sources = result / counter
estimated_sources = estimated_sources.cpu().numpy()
np.nan_to_num(estimated_sources, copy=False, nan=0.0)
if mode == "demucs" and num_instruments <= 1:
return estimated_sources
else:
instruments = config.training.instruments
return {k: v for k, v in zip(instruments, estimated_sources)}
def sdr(references: np.ndarray, estimates: np.ndarray) -> np.ndarray:
"""
Compute Signal-to-Distortion Ratio (SDR) for one or more audio tracks.
SDR is a measure of how well the predicted source (estimate) matches the reference source.
It is calculated as the ratio of the energy of the reference signal to the energy of the error (difference between reference and estimate).
Return SDR in decibels (dB)
Parameters:
----------
references : np.ndarray
A 3D numpy array of shape (num_sources, num_channels, num_samples), where num_sources is the number of sources,
num_channels is the number of channels (e.g., 1 for mono, 2 for stereo), and num_samples is the length of the audio signal.
estimates : np.ndarray
A 3D numpy array of shape (num_sources, num_channels, num_samples) representing the estimated sources.
Returns:
-------
np.ndarray
A 1D numpy array containing the SDR values for each source.
"""
eps = 1e-8 # to avoid numerical errors
num = np.sum(np.square(references), axis=(1, 2))
den = np.sum(np.square(references - estimates), axis=(1, 2))
num += eps
den += eps
return 10 * np.log10(num / den)
def si_sdr(reference: np.ndarray, estimate: np.ndarray) -> float:
"""
Compute Scale-Invariant Signal-to-Distortion Ratio (SI-SDR) for one or more audio tracks.
SI-SDR is a variant of the SDR metric that is invariant to the scaling of the estimate relative to the reference.
It is calculated by scaling the estimate to match the reference signal and then computing the SDR.
Parameters:
----------
reference : np.ndarray
A 3D numpy array of shape (num_sources, num_channels, num_samples), where num_sources is the number of sources,
num_channels is the number of channels (e.g., 1 for mono, 2 for stereo), and num_samples is the length of the audio signal.
estimate : np.ndarray
A 3D numpy array of shape (num_sources, num_channels, num_samples) representing the estimated sources.
Returns:
-------
float
The SI-SDR value for the source. It is a scalar representing the Signal-to-Distortion Ratio in decibels (dB).
"""
eps = 1e-8 # To avoid numerical errors
scale = np.sum(estimate * reference + eps, axis=(0, 1)) / np.sum(reference ** 2 + eps, axis=(0, 1))
scale = np.expand_dims(scale, axis=(0, 1)) # Reshape to [num_sources, 1]
reference = reference * scale
si_sdr = np.mean(10 * np.log10(
np.sum(reference ** 2, axis=(0, 1)) / (np.sum((reference - estimate) ** 2, axis=(0, 1)) + eps) + eps))
return si_sdr
def L1Freq_metric(
reference: np.ndarray,
estimate: np.ndarray,
fft_size: int = 2048,
hop_size: int = 1024,
device: str = 'cpu'
) -> float:
"""
Compute the L1 Frequency Metric between the reference and estimated audio signals.
This metric compares the magnitude spectrograms of the reference and estimated audio signals
using the Short-Time Fourier Transform (STFT) and calculates the L1 loss between them. The result
is scaled to the range [0, 100] where a higher value indicates better performance.
Parameters:
----------
reference : np.ndarray
A 2D numpy array of shape (num_channels, num_samples) representing the reference (ground truth) audio signal.
estimate : np.ndarray
A 2D numpy array of shape (num_channels, num_samples) representing the estimated (predicted) audio signal.
fft_size : int, optional
The size of the FFT (Short-Time Fourier Transform). Default is 2048.
hop_size : int, optional
The hop size between STFT frames. Default is 1024.
device : str, optional
The device to run the computation on ('cpu' or 'cuda'). Default is 'cpu'.
Returns:
-------
float
The L1 Frequency Metric in the range [0, 100], where higher values indicate better performance.
"""
reference = torch.from_numpy(reference).to(device)
estimate = torch.from_numpy(estimate).to(device)
reference_stft = torch.stft(reference, fft_size, hop_size, return_complex=True)
estimated_stft = torch.stft(estimate, fft_size, hop_size, return_complex=True)
reference_mag = torch.abs(reference_stft)
estimate_mag = torch.abs(estimated_stft)
loss = 10 * F.l1_loss(estimate_mag, reference_mag)
ret = 100 / (1. + float(loss.cpu().numpy()))
return ret
def LogWMSE_metric(
reference: np.ndarray,
estimate: np.ndarray,
mixture: np.ndarray,
device: str = 'cpu',
) -> float:
"""
Calculate the Log-WMSE (Logarithmic Weighted Mean Squared Error) between the reference, estimate, and mixture signals.
This metric evaluates the quality of the estimated signal compared to the reference signal in the
context of audio source separation. The result is given in logarithmic scale, which helps in evaluating
signals with large amplitude differences.
Parameters:
----------
reference : np.ndarray
The ground truth audio signal of shape (channels, time), where channels is the number of audio channels
(e.g., 1 for mono, 2 for stereo) and time is the length of the audio in samples.
estimate : np.ndarray
The estimated audio signal of shape (channels, time).
mixture : np.ndarray
The mixed audio signal of shape (channels, time).
device : str, optional
The device to run the computation on, either 'cpu' or 'cuda'. Default is 'cpu'.
Returns:
-------
float
The Log-WMSE value, which quantifies the difference between the reference and estimated signal on a logarithmic scale.
"""
from torch_log_wmse import LogWMSE
log_wmse = LogWMSE(
audio_length=reference.shape[-1] / 44100, # audio length in seconds
sample_rate=44100, # sample rate of 44100 Hz
return_as_loss=False, # return as loss (False means return as metric)
bypass_filter=False, # bypass frequency filtering (False means apply filter)
)
reference = torch.from_numpy(reference).unsqueeze(0).unsqueeze(0).to(device)
estimate = torch.from_numpy(estimate).unsqueeze(0).unsqueeze(0).to(device)
mixture = torch.from_numpy(mixture).unsqueeze(0).to(device)
res = log_wmse(mixture, reference, estimate)
return float(res.cpu().numpy())
def AuraSTFT_metric(
reference: np.ndarray,
estimate: np.ndarray,
device: str = 'cpu',
) -> float:
"""
Calculate the AuraSTFT metric, which evaluates the spectral difference between the reference and estimated
audio signals using Short-Time Fourier Transform (STFT) loss.
The AuraSTFT metric computes the STFT loss in both logarithmic and linear magnitudes, and it is commonly used
to assess the quality of audio separation tasks. The result is returned as a value scaled to the range [0, 100].
Parameters:
----------
reference : np.ndarray
The ground truth audio signal of shape (channels, time), where channels is the number of audio channels
(e.g., 1 for mono, 2 for stereo) and time is the length of the audio in samples.
estimate : np.ndarray
The estimated audio signal of shape (channels, time).
device : str, optional
The device to run the computation on, either 'cpu' or 'cuda'. Default is 'cpu'.
Returns:
-------
float
The AuraSTFT metric value, scaled to the range [0, 100], which quantifies the difference between
the reference and estimated signal in the spectral domain.
"""
from auraloss.freq import STFTLoss
stft_loss = STFTLoss(
w_log_mag=1.0, # weight for log magnitude
w_lin_mag=0.0, # weight for linear magnitude
w_sc=1.0, # weight for spectral centroid
device=device,
)
reference = torch.from_numpy(reference).unsqueeze(0).to(device)
estimate = torch.from_numpy(estimate).unsqueeze(0).to(device)
res = 100 / (1. + 10 * stft_loss(reference, estimate))
return float(res.cpu().numpy())
def AuraMRSTFT_metric(
reference: np.ndarray,
estimate: np.ndarray,
device: str = 'cpu',
) -> float:
"""
Calculate the AuraMRSTFT metric, which evaluates the spectral difference between the reference and estimated
audio signals using Multi-Resolution Short-Time Fourier Transform (STFT) loss.
The AuraMRSTFT metric uses multi-resolution STFT analysis, which allows better representation of both
low- and high-frequency components in the audio signals. The result is returned as a value scaled to the range [0, 100].
Parameters:
----------
reference : np.ndarray
The ground truth audio signal of shape (channels, time), where channels is the number of audio channels
(e.g., 1 for mono, 2 for stereo) and time is the length of the audio in samples.
estimate : np.ndarray
The estimated audio signal of shape (channels, time).
device : str, optional
The device to run the computation on, either 'cpu' or 'cuda'. Default is 'cpu'.
Returns:
-------
float
The AuraMRSTFT metric value, scaled to the range [0, 100], which quantifies the difference between
the reference and estimated signal in the multi-resolution spectral domain.
"""
from auraloss.freq import MultiResolutionSTFTLoss
mrstft_loss = MultiResolutionSTFTLoss(
fft_sizes=[1024, 2048, 4096],
hop_sizes=[256, 512, 1024],
win_lengths=[1024, 2048, 4096],
scale="mel", # mel scale for frequency resolution
n_bins=128, # number of bins for mel scale
sample_rate=44100,
perceptual_weighting=True, # apply perceptual weighting
device=device
)
reference = torch.from_numpy(reference).unsqueeze(0).float().to(device)
estimate = torch.from_numpy(estimate).unsqueeze(0).float().to(device)
res = 100 / (1. + 10 * mrstft_loss(reference, estimate))
return float(res.cpu().numpy())
def bleed_full(
reference: np.ndarray,
estimate: np.ndarray,
sr: int = 44100,
n_fft: int = 4096,
hop_length: int = 1024,
n_mels: int = 512,
device: str = 'cpu',
) -> Tuple[float, float]:
"""
Calculate the 'bleed' and 'fullness' metrics between a reference and an estimated audio signal.
The 'bleed' metric measures how much the estimated signal bleeds into the reference signal,
while the 'fullness' metric measures how much the estimated signal retains its distinctiveness
in relation to the reference signal, both using mel spectrograms and decibel scaling.
Parameters:
----------
reference : np.ndarray
The reference audio signal, shape (channels, time), where channels is the number of audio channels
(e.g., 1 for mono, 2 for stereo) and time is the length of the audio in samples.
estimate : np.ndarray
The estimated audio signal, shape (channels, time).
sr : int, optional
The sample rate of the audio signals. Default is 44100 Hz.
n_fft : int, optional
The FFT size used to compute the STFT. Default is 4096.
hop_length : int, optional
The hop length for STFT computation. Default is 1024.
n_mels : int, optional
The number of mel frequency bins. Default is 512.
device : str, optional
The device for computation, either 'cpu' or 'cuda'. Default is 'cpu'.
Returns:
-------
tuple
A tuple containing two values:
- `bleedless` (float): A score indicating how much 'bleeding' the estimated signal has (higher is better).
- `fullness` (float): A score indicating how 'full' the estimated signal is (higher is better).
"""
from torchaudio.transforms import AmplitudeToDB
reference = torch.from_numpy(reference).float().to(device)
estimate = torch.from_numpy(estimate).float().to(device)
window = torch.hann_window(n_fft).to(device)
# Compute STFTs with the Hann window
D1 = torch.abs(torch.stft(reference, n_fft=n_fft, hop_length=hop_length, window=window, return_complex=True,
pad_mode="constant"))
D2 = torch.abs(torch.stft(estimate, n_fft=n_fft, hop_length=hop_length, window=window, return_complex=True,
pad_mode="constant"))
mel_basis = librosa.filters.mel(sr=sr, n_fft=n_fft, n_mels=n_mels)
mel_filter_bank = torch.from_numpy(mel_basis).to(device)
S1_mel = torch.matmul(mel_filter_bank, D1)
S2_mel = torch.matmul(mel_filter_bank, D2)
S1_db = AmplitudeToDB(stype="magnitude", top_db=80)(S1_mel)
S2_db = AmplitudeToDB(stype="magnitude", top_db=80)(S2_mel)
diff = S2_db - S1_db
positive_diff = diff[diff > 0]
negative_diff = diff[diff < 0]
average_positive = torch.mean(positive_diff) if positive_diff.numel() > 0 else torch.tensor(0.0).to(device)
average_negative = torch.mean(negative_diff) if negative_diff.numel() > 0 else torch.tensor(0.0).to(device)
bleedless = 100 * 1 / (average_positive + 1)
fullness = 100 * 1 / (-average_negative + 1)
return bleedless.cpu().numpy(), fullness.cpu().numpy()
def get_metrics(
metrics: List[str],
reference: np.ndarray,
estimate: np.ndarray,
mix: np.ndarray,
device: str = 'cpu',
) -> Dict[str, float]:
"""
Calculate a list of metrics to evaluate the performance of audio source separation models.
The function computes the specified metrics based on the reference, estimate, and mixture.
Parameters:
----------
metrics : List[str]
A list of metric names to compute (e.g., ['sdr', 'si_sdr', 'l1_freq']).
reference : np.ndarray
The reference audio (true signal) with shape (channels, length).
estimate : np.ndarray
The estimated audio (predicted signal) with shape (channels, length).
mix : np.ndarray
The mixed audio signal with shape (channels, length).
device : str, optional, default='cpu'
The device ('cpu' or 'cuda') to perform the calculations on.
Returns:
-------
Dict[str, float]
A dictionary containing the computed metric values.
"""
result = dict()
# Adjust the length to be the same across all inputs
min_length = min(reference.shape[1], estimate.shape[1])
reference = reference[..., :min_length]
estimate = estimate[..., :min_length]
mix = mix[..., :min_length]
if 'sdr' in metrics:
references = np.expand_dims(reference, axis=0)
estimates = np.expand_dims(estimate, axis=0)
result['sdr'] = sdr(references, estimates)[0]
if 'si_sdr' in metrics:
result['si_sdr'] = si_sdr(reference, estimate)
if 'l1_freq' in metrics:
result['l1_freq'] = L1Freq_metric(reference, estimate, device=device)
if 'log_wmse' in metrics:
result['log_wmse'] = LogWMSE_metric(reference, estimate, mix, device)
if 'aura_stft' in metrics:
result['aura_stft'] = AuraSTFT_metric(reference, estimate, device)
if 'aura_mrstft' in metrics:
result['aura_mrstft'] = AuraMRSTFT_metric(reference, estimate, device)
if 'bleedless' in metrics or 'fullness' in metrics:
bleedless, fullness = bleed_full(reference, estimate, device=device)
if 'bleedless' in metrics:
result['bleedless'] = bleedless
if 'fullness' in metrics:
result['fullness'] = fullness
return result
def prefer_target_instrument(config: ConfigDict) -> List[str]:
"""
Return the list of target instruments based on the configuration.
If a specific target instrument is specified in the configuration,
it returns a list with that instrument. Otherwise, it returns the list of instruments.
Parameters:
----------
config : ConfigDict
Configuration object containing the list of instruments or the target instrument.
Returns:
-------
List[str]
A list of target instruments.
"""
if config.training.get('target_instrument'):
return [config.training.target_instrument]
else:
return config.training.instruments
def prefer_target_instrument_test(config: ConfigDict, selected_instruments: Optional[List[str]] = None) -> List[str]:
"""
Return the list of target instruments based on the configuration and selected instruments.
If selected_instruments is specified, returns the intersection with available instruments.
Otherwise, if a target instrument is specified, returns it, else returns all instruments.
Parameters:
----------
config : ConfigDict
Configuration object containing the list of instruments or the target instrument.
selected_instruments : Optional[List[str]]
List of instruments to select (optional)
Returns:
-------
List[str]
A list of target instruments.
"""
available_instruments = config.training.instruments
if selected_instruments is not None:
# Return only selected instruments that are available
return [instr for instr in selected_instruments if instr in available_instruments]
elif config.training.get('target_instrument'):
# Default behavior if no selection - return target instrument
return [config.training.target_instrument]
else:
# If no target and no selection, return all instruments
return available_instruments
|