Spaces:
Runtime error
Runtime error
| """Metrics computation for voice model evaluation.""" | |
| import torch | |
| import numpy as np | |
| from typing import List, Dict, Any | |
| import logging | |
| import time | |
| logger = logging.getLogger(__name__) | |
| class MetricCalculator: | |
| """ | |
| Calculates various metrics for voice model evaluation. | |
| Includes word error rate, audio quality metrics, and latency measurements. | |
| """ | |
| def __init__(self): | |
| """Initialize metric calculator.""" | |
| self.metrics_cache = {} | |
| def compute_word_error_rate( | |
| self, | |
| predictions: List[str], | |
| references: List[str] | |
| ) -> float: | |
| """ | |
| Compute Word Error Rate (WER). | |
| WER = (Substitutions + Deletions + Insertions) / Total Words | |
| Args: | |
| predictions: List of predicted transcriptions | |
| references: List of reference transcriptions | |
| Returns: | |
| Word error rate as a float | |
| """ | |
| if len(predictions) != len(references): | |
| raise ValueError("Predictions and references must have same length") | |
| total_words = 0 | |
| total_errors = 0 | |
| for pred, ref in zip(predictions, references): | |
| pred_words = pred.lower().split() | |
| ref_words = ref.lower().split() | |
| # Compute edit distance | |
| errors = self._levenshtein_distance(pred_words, ref_words) | |
| total_errors += errors | |
| total_words += len(ref_words) | |
| if total_words == 0: | |
| return 0.0 | |
| wer = total_errors / total_words | |
| return wer | |
| def compute_character_error_rate( | |
| self, | |
| predictions: List[str], | |
| references: List[str] | |
| ) -> float: | |
| """ | |
| Compute Character Error Rate (CER). | |
| Args: | |
| predictions: List of predicted transcriptions | |
| references: List of reference transcriptions | |
| Returns: | |
| Character error rate as a float | |
| """ | |
| if len(predictions) != len(references): | |
| raise ValueError("Predictions and references must have same length") | |
| total_chars = 0 | |
| total_errors = 0 | |
| for pred, ref in zip(predictions, references): | |
| pred_chars = list(pred.lower()) | |
| ref_chars = list(ref.lower()) | |
| errors = self._levenshtein_distance(pred_chars, ref_chars) | |
| total_errors += errors | |
| total_chars += len(ref_chars) | |
| if total_chars == 0: | |
| return 0.0 | |
| cer = total_errors / total_chars | |
| return cer | |
| def _levenshtein_distance(self, seq1: List, seq2: List) -> int: | |
| """ | |
| Compute Levenshtein distance between two sequences. | |
| Args: | |
| seq1: First sequence | |
| seq2: Second sequence | |
| Returns: | |
| Edit distance | |
| """ | |
| m, n = len(seq1), len(seq2) | |
| dp = [[0] * (n + 1) for _ in range(m + 1)] | |
| for i in range(m + 1): | |
| dp[i][0] = i | |
| for j in range(n + 1): | |
| dp[0][j] = j | |
| for i in range(1, m + 1): | |
| for j in range(1, n + 1): | |
| if seq1[i-1] == seq2[j-1]: | |
| dp[i][j] = dp[i-1][j-1] | |
| else: | |
| dp[i][j] = 1 + min( | |
| dp[i-1][j], # deletion | |
| dp[i][j-1], # insertion | |
| dp[i-1][j-1] # substitution | |
| ) | |
| return dp[m][n] | |
| def compute_mel_cepstral_distortion( | |
| self, | |
| generated_audio: torch.Tensor, | |
| reference_audio: torch.Tensor | |
| ) -> float: | |
| """ | |
| Compute Mel-Cepstral Distortion (MCD). | |
| Simplified implementation for demonstration. | |
| Args: | |
| generated_audio: Generated audio tensor | |
| reference_audio: Reference audio tensor | |
| Returns: | |
| MCD score | |
| """ | |
| # Simplified MCD computation | |
| # In production, would use proper MFCC extraction | |
| if generated_audio.shape != reference_audio.shape: | |
| # Pad or truncate to match lengths | |
| min_len = min(generated_audio.shape[-1], reference_audio.shape[-1]) | |
| generated_audio = generated_audio[..., :min_len] | |
| reference_audio = reference_audio[..., :min_len] | |
| # Compute mean squared difference as proxy for MCD | |
| mse = torch.mean((generated_audio - reference_audio) ** 2).item() | |
| mcd = np.sqrt(mse) * 10 # Scale to typical MCD range | |
| return mcd | |
| def compute_perceptual_quality( | |
| self, | |
| generated_audio: torch.Tensor, | |
| reference_audio: torch.Tensor | |
| ) -> float: | |
| """ | |
| Compute perceptual quality score (PESQ proxy). | |
| Simplified implementation. In production, would use actual PESQ library. | |
| Args: | |
| generated_audio: Generated audio tensor | |
| reference_audio: Reference audio tensor | |
| Returns: | |
| Quality score (higher is better, range 1-5) | |
| """ | |
| # Simplified quality metric | |
| # In production, would use pesq library | |
| if generated_audio.shape != reference_audio.shape: | |
| min_len = min(generated_audio.shape[-1], reference_audio.shape[-1]) | |
| generated_audio = generated_audio[..., :min_len] | |
| reference_audio = reference_audio[..., :min_len] | |
| # Compute correlation as proxy for perceptual quality | |
| gen_flat = generated_audio.flatten() | |
| ref_flat = reference_audio.flatten() | |
| correlation = torch.corrcoef(torch.stack([gen_flat, ref_flat]))[0, 1].item() | |
| # Map correlation [-1, 1] to PESQ-like range [1, 5] | |
| quality = 3.0 + 2.0 * correlation | |
| quality = max(1.0, min(5.0, quality)) | |
| return quality | |
| def measure_inference_latency( | |
| self, | |
| model_fn, | |
| input_data: torch.Tensor, | |
| num_runs: int = 10 | |
| ) -> Dict[str, float]: | |
| """ | |
| Measure inference latency. | |
| Args: | |
| model_fn: Model inference function | |
| input_data: Input tensor | |
| num_runs: Number of runs for averaging | |
| Returns: | |
| Dictionary with latency statistics | |
| """ | |
| latencies = [] | |
| # Warm-up run | |
| _ = model_fn(input_data) | |
| # Measure latency | |
| for _ in range(num_runs): | |
| start_time = time.perf_counter() | |
| _ = model_fn(input_data) | |
| end_time = time.perf_counter() | |
| latencies.append((end_time - start_time) * 1000) # Convert to ms | |
| return { | |
| 'mean_latency_ms': np.mean(latencies), | |
| 'std_latency_ms': np.std(latencies), | |
| 'min_latency_ms': np.min(latencies), | |
| 'max_latency_ms': np.max(latencies), | |
| } | |
| def compute_samples_per_second( | |
| self, | |
| num_samples: int, | |
| total_time_seconds: float | |
| ) -> float: | |
| """ | |
| Compute throughput in samples per second. | |
| Args: | |
| num_samples: Number of samples processed | |
| total_time_seconds: Total time taken | |
| Returns: | |
| Samples per second | |
| """ | |
| if total_time_seconds <= 0: | |
| return 0.0 | |
| return num_samples / total_time_seconds | |