Spaces:
Sleeping
Sleeping
| """Audio processing utilities for CSM-1B TTS API.""" | |
| import logging | |
| import numpy as np | |
| import torch | |
| from scipy import signal | |
| logger = logging.getLogger(__name__) | |
| def remove_long_silences( | |
| audio: torch.Tensor, | |
| sample_rate: int, | |
| min_speech_energy: float = 0.015, | |
| max_silence_sec: float = 0.4, | |
| keep_silence_sec: float = 0.1, | |
| ) -> torch.Tensor: | |
| """ | |
| Remove uncomfortably long silences from audio while preserving natural pauses. | |
| Args: | |
| audio: Audio tensor | |
| sample_rate: Sample rate in Hz | |
| min_speech_energy: Minimum RMS energy to consider as speech | |
| max_silence_sec: Maximum silence duration to keep in seconds | |
| keep_silence_sec: Amount of silence to keep at speech boundaries | |
| Returns: | |
| Audio with long silences removed | |
| """ | |
| # Convert to numpy for processing | |
| audio_np = audio.cpu().numpy() | |
| # Calculate frame size and hop length | |
| frame_size = int(0.02 * sample_rate) # 20ms frames | |
| hop_length = int(0.01 * sample_rate) # 10ms hop | |
| # Compute frame energy | |
| frames = [] | |
| for i in range(0, len(audio_np) - frame_size + 1, hop_length): | |
| frames.append(audio_np[i:i+frame_size]) | |
| if len(frames) < 2: # If audio is too short for analysis | |
| return audio | |
| frames = np.array(frames) | |
| # Root mean square energy | |
| frame_energy = np.sqrt(np.mean(frames**2, axis=1)) | |
| # Adaptive threshold based on audio content | |
| # Uses a percentile to adapt to different audio characteristics | |
| energy_threshold = max( | |
| min_speech_energy, # Minimum threshold | |
| np.percentile(frame_energy, 10) # Adapt to audio | |
| ) | |
| # Identify speech frames | |
| is_speech = frame_energy > energy_threshold | |
| # Convert frame indices to sample indices considering overlapping frames | |
| speech_segments = [] | |
| in_speech = False | |
| speech_start = 0 | |
| for i in range(len(is_speech)): | |
| if is_speech[i] and not in_speech: | |
| # Start of speech | |
| in_speech = True | |
| # Calculate start sample including keep_silence | |
| speech_start = max(0, i * hop_length - int(keep_silence_sec * sample_rate)) | |
| elif not is_speech[i] and in_speech: | |
| # Potential end of speech, look ahead to check if silence continues | |
| silence_length = 0 | |
| for j in range(i, min(len(is_speech), i + int(max_silence_sec * sample_rate / hop_length))): | |
| if not is_speech[j]: | |
| silence_length += 1 | |
| else: | |
| break | |
| if silence_length * hop_length >= max_silence_sec * sample_rate: | |
| # End of speech, long enough silence detected | |
| in_speech = False | |
| # Calculate end sample including keep_silence | |
| speech_end = min(len(audio_np), i * hop_length + int(keep_silence_sec * sample_rate)) | |
| speech_segments.append((speech_start, speech_end)) | |
| # Handle the case where audio ends during speech | |
| if in_speech: | |
| speech_segments.append((speech_start, len(audio_np))) | |
| if not speech_segments: | |
| logger.warning("No speech segments detected, returning original audio") | |
| return audio | |
| # Combine speech segments with controlled silence durations | |
| result = [] | |
| # Add initial silence if the first segment doesn't start at the beginning | |
| if speech_segments[0][0] > 0: | |
| # Add a short leading silence (100ms) | |
| silence_samples = min(int(0.1 * sample_rate), speech_segments[0][0]) | |
| if silence_samples > 0: | |
| result.append(audio_np[speech_segments[0][0] - silence_samples:speech_segments[0][0]]) | |
| # Process each speech segment | |
| for i, (start, end) in enumerate(speech_segments): | |
| # Add this speech segment | |
| result.append(audio_np[start:end]) | |
| # Add a controlled silence between segments | |
| if i < len(speech_segments) - 1: | |
| next_start = speech_segments[i+1][0] | |
| # Calculate available silence duration | |
| available_silence = next_start - end | |
| if available_silence > 0: | |
| # Use either the actual silence (if shorter than max) or the max allowed | |
| silence_duration = min(available_silence, int(max_silence_sec * sample_rate)) | |
| # Take the first portion of the silence - usually cleaner | |
| result.append(audio_np[end:end + silence_duration]) | |
| # Combine all parts | |
| processed_audio = np.concatenate(result) | |
| # Log the results | |
| original_duration = len(audio_np) / sample_rate | |
| processed_duration = len(processed_audio) / sample_rate | |
| logger.info(f"Silence removal: {original_duration:.2f}s -> {processed_duration:.2f}s ({processed_duration/original_duration*100:.1f}%)") | |
| # Return as tensor with original device and dtype | |
| return torch.tensor(processed_audio, device=audio.device, dtype=audio.dtype) | |
| def create_high_shelf_filter(audio, sample_rate, frequency=4000, gain_db=3.0): | |
| """ | |
| Create a high shelf filter to boost frequencies above the given frequency. | |
| Args: | |
| audio: Audio numpy array | |
| sample_rate: Sample rate in Hz | |
| frequency: Shelf frequency in Hz | |
| gain_db: Gain in dB for frequencies above the shelf | |
| Returns: | |
| Filtered audio | |
| """ | |
| # Convert gain from dB to linear | |
| gain = 10 ** (gain_db / 20.0) | |
| # Normalized frequency (0 to 1, where 1 is Nyquist frequency) | |
| normalized_freq = 2.0 * frequency / sample_rate | |
| # Design a high-shelf biquad filter | |
| # This is a standard second-order section (SOS) implementation | |
| b0 = gain | |
| b1 = 0 | |
| b2 = 0 | |
| a0 = 1 | |
| a1 = 0 | |
| a2 = 0 | |
| # Simple first-order high-shelf filter | |
| alpha = np.sin(np.pi * normalized_freq) / 2 * np.sqrt((gain + 1/gain) * (1/0.5 - 1) + 2) | |
| cos_w0 = np.cos(np.pi * normalized_freq) | |
| b0 = gain * ((gain + 1) + (gain - 1) * cos_w0 + 2 * np.sqrt(gain) * alpha) | |
| b1 = -2 * gain * ((gain - 1) + (gain + 1) * cos_w0) | |
| b2 = gain * ((gain + 1) + (gain - 1) * cos_w0 - 2 * np.sqrt(gain) * alpha) | |
| a0 = (gain + 1) - (gain - 1) * cos_w0 + 2 * np.sqrt(gain) * alpha | |
| a1 = 2 * ((gain - 1) - (gain + 1) * cos_w0) | |
| a2 = (gain + 1) - (gain - 1) * cos_w0 - 2 * np.sqrt(gain) * alpha | |
| # Normalize coefficients | |
| b = np.array([b0, b1, b2]) / a0 | |
| a = np.array([1.0, a1/a0, a2/a0]) | |
| # Apply the filter | |
| return signal.lfilter(b, a, audio) | |
| def enhance_audio_quality(audio: torch.Tensor, sample_rate: int) -> torch.Tensor: | |
| """ | |
| Enhance audio quality by applying various processing techniques. | |
| Args: | |
| audio: Audio tensor | |
| sample_rate: Sample rate in Hz | |
| Returns: | |
| Enhanced audio tensor | |
| """ | |
| try: | |
| audio_np = audio.cpu().numpy() | |
| # Remove DC offset | |
| audio_np = audio_np - np.mean(audio_np) | |
| # Apply light compression to improve perceived loudness | |
| # Compress by reducing peaks and increasing quieter parts slightly | |
| threshold = 0.5 | |
| ratio = 1.5 | |
| attack = 0.01 | |
| release = 0.1 | |
| # Simple compression algorithm | |
| gain = np.ones_like(audio_np) | |
| for i in range(1, len(audio_np)): | |
| level = abs(audio_np[i]) | |
| if level > threshold: | |
| gain[i] = threshold + (level - threshold) / ratio | |
| gain[i] = gain[i] / level if level > 0 else 1.0 | |
| else: | |
| gain[i] = 1.0 | |
| # Smooth gain changes | |
| gain[i] = gain[i-1] + (gain[i] - gain[i-1]) * (attack if gain[i] < gain[i-1] else release) | |
| audio_np = audio_np * gain | |
| # Apply high-shelf filter to enhance speech clarity | |
| # Boost frequencies above 4000 Hz by 3 dB | |
| audio_np = create_high_shelf_filter(audio_np, sample_rate, frequency=4000, gain_db=3.0) | |
| # Normalize to prevent clipping | |
| max_val = np.max(np.abs(audio_np)) | |
| if max_val > 0: | |
| audio_np = audio_np * 0.95 / max_val | |
| return torch.tensor(audio_np, device=audio.device, dtype=audio.dtype) | |
| except Exception as e: | |
| logger.warning(f"Audio quality enhancement failed: {e}") | |
| return audio |