Spaces:
Running
Running
| import logging | |
| import time | |
| from typing import List, Literal | |
| import librosa | |
| from funasr import AutoModel | |
| from resampy.core import resample | |
| from tqdm.auto import tqdm | |
| import torch | |
| from corrector.Corrector import Corrector | |
| from transcriber.TranscribeResult import TranscribeResult | |
| logger = logging.getLogger(__name__) | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| class AutoTranscriber: | |
| """ | |
| Transcriber class that uses FunASR's AutoModel for VAD and ASR | |
| """ | |
| def __init__( | |
| self, | |
| corrector: Literal["opencc", "bert", None] = None, | |
| with_punct=True, | |
| offset_in_seconds=-0.25, | |
| max_length_seconds=5, | |
| sr=16000, | |
| ): | |
| self.corrector = corrector | |
| self.with_punct = with_punct | |
| self.sr = sr | |
| self.offset_in_seconds = offset_in_seconds | |
| self.max_length_seconds = max_length_seconds | |
| # Initialize models | |
| self.vad_model = AutoModel(model="fsmn-vad", device=device, max_single_segment_time=self.max_length_seconds * 1000) | |
| self.asr_model = AutoModel( | |
| model="iic/SenseVoiceSmall", | |
| vad_model=None, # We'll handle VAD separately | |
| punc_model=None, | |
| ban_emo_unks=True, | |
| device=device, | |
| ) | |
| def transcribe( | |
| self, | |
| audio_file: str, | |
| ) -> List[TranscribeResult]: | |
| """ | |
| Transcribe audio file to text with timestamps. | |
| Args: | |
| audio_file (str): Path to audio file | |
| Returns: | |
| List[TranscribeResult]: List of transcription results | |
| """ | |
| # Load and preprocess audio | |
| speech, sr = librosa.load(audio_file, sr=self.sr) | |
| if sr != 16_000: | |
| speech = resample(speech, sr, 16_000, filter="kaiser_best", parallel=True) | |
| # Get VAD segments | |
| logger.info("Segmenting speech...") | |
| start_time = time.time() | |
| vad_results = self.vad_model.generate(input=speech, disable_pbar=True) | |
| logger.info("VAD took %.2f seconds", time.time() - start_time) | |
| if not vad_results or not vad_results[0]["value"]: | |
| return [] | |
| vad_segments = vad_results[0]["value"] | |
| # Process each segment | |
| results = [] | |
| start_time = time.time() | |
| for segment in tqdm(vad_segments, desc="Transcribing"): | |
| start_sample = int(segment[0] * 16) # Convert ms to samples | |
| end_sample = int(segment[1] * 16) | |
| segment_audio = speech[start_sample:end_sample] | |
| # Get ASR results for segment | |
| asr_result = self.asr_model.generate( | |
| input=segment_audio, language="yue", use_itn=self.with_punct | |
| ) | |
| if not asr_result: | |
| continue | |
| start_time = max(0, segment[0] / 1000.0 + self.offset_in_seconds) | |
| end_time = segment[1] / 1000.0 + self.offset_in_seconds | |
| # Convert ASR result to TranscribeResult format | |
| segment_result = TranscribeResult( | |
| text=asr_result[0]["text"], | |
| start_time=start_time, # Convert ms to seconds | |
| end_time=end_time, | |
| ) | |
| results.append(segment_result) | |
| logger.info("ASR took %.2f seconds", time.time() - start_time) | |
| # Apply Chinese conversion if needed | |
| start_time = time.time() | |
| results = self._convert_to_traditional_chinese(results) | |
| logger.info("Conversion took %.2f seconds", time.time() - start_time) | |
| return results | |
| def _convert_to_traditional_chinese( | |
| self, results: List[TranscribeResult] | |
| ) -> List[TranscribeResult]: | |
| """Convert simplified Chinese to traditional Chinese""" | |
| if not results or not self.corrector: | |
| return results | |
| corrector = Corrector(self.corrector) | |
| if self.corrector == "bert": | |
| for result in tqdm( | |
| results, total=len(results), desc="Converting to Traditional Chinese" | |
| ): | |
| result.text = corrector.correct(result.text) | |
| elif self.corrector == "opencc": | |
| # Use a special delimiter that won't appear in Chinese text | |
| delimiter = "|||" | |
| # Concatenate all texts with delimiter | |
| combined_text = delimiter.join(result.text for result in results) | |
| # Convert all text at once | |
| converted_text = corrector.correct(combined_text) | |
| # Split back into individual results | |
| converted_parts = converted_text.split(delimiter) | |
| # Update results with converted text | |
| for result, converted in zip(results, converted_parts): | |
| result.text = converted | |
| return results | |