Spaces:
Runtime error
Runtime error
| import soundfile as sf | |
| import datetime | |
| from pyctcdecode import BeamSearchDecoderCTC | |
| import torch | |
| import json | |
| import os | |
| import time | |
| import gc | |
| import gradio as gr | |
| import librosa | |
| from transformers import Wav2Vec2ForCTC, Wav2Vec2ProcessorWithLM, AutoModelForSeq2SeqLM, AutoTokenizer, AutoProcessor | |
| from huggingface_hub import hf_hub_download | |
| from torchaudio.models.decoder import ctc_decoder | |
| from numba import cuda | |
| # load pretrained model | |
| model = Wav2Vec2ForCTC.from_pretrained("facebook/mms-1b-all") | |
| processor = AutoProcessor.from_pretrained("facebook/mms-1b-all") | |
| lm_decoding_config = {} | |
| lm_decoding_configfile = hf_hub_download( | |
| repo_id="facebook/mms-cclms", | |
| filename="decoding_config.json", | |
| subfolder="mms-1b-all", | |
| ) | |
| with open(lm_decoding_configfile) as f: | |
| lm_decoding_config = json.loads(f.read()) | |
| # allow language model decoding for "eng" | |
| decoding_config = lm_decoding_config["amh"] | |
| lm_file = hf_hub_download( | |
| repo_id="facebook/mms-cclms", | |
| filename=decoding_config["lmfile"].rsplit("/", 1)[1], | |
| subfolder=decoding_config["lmfile"].rsplit("/", 1)[0], | |
| ) | |
| token_file = hf_hub_download( | |
| repo_id="facebook/mms-cclms", | |
| filename=decoding_config["tokensfile"].rsplit("/", 1)[1], | |
| subfolder=decoding_config["tokensfile"].rsplit("/", 1)[0], | |
| ) | |
| lexicon_file = None | |
| if decoding_config["lexiconfile"] is not None: | |
| lexicon_file = hf_hub_download( | |
| repo_id="facebook/mms-cclms", | |
| filename=decoding_config["lexiconfile"].rsplit("/", 1)[1], | |
| subfolder=decoding_config["lexiconfile"].rsplit("/", 1)[0], | |
| ) | |
| beam_search_decoder = ctc_decoder( | |
| lexicon="./vocab_correct_cleaned.txt", | |
| tokens=token_file, | |
| lm=lm_file, | |
| nbest=1, | |
| beam_size=500, | |
| beam_size_token=50, | |
| lm_weight=float(decoding_config["lmweight"]), | |
| word_score=float(decoding_config["wordscore"]), | |
| sil_score=float(decoding_config["silweight"]), | |
| blank_token="<s>", | |
| ) | |
| #Define Functions | |
| #convert time into .sbv format | |
| def format_time(seconds): | |
| # Convert seconds to hh:mm:ss,ms format | |
| return str(datetime.timedelta(seconds=seconds)).replace('.', ',') | |
| #Convert Video/Audio into 16K wav file | |
| def preprocessAudio(audioFile): | |
| os.system(f"ffmpeg -y -i {audioFile.name} -ar 16000 ./audioToConvert.wav") | |
| #Transcribe!!! | |
| def Transcribe(file): | |
| device = "cuda:0" if torch.cuda.is_available() else "cpu" | |
| start_time = time.time() | |
| model.load_adapter("amh") | |
| processor.tokenizer.set_target_lang("amh") | |
| preprocessAudio(file) | |
| block_size = 30 | |
| batch_size = 8 # or whatever number you choose | |
| transcripts = [] | |
| speech_segments = [] | |
| stream = librosa.stream( | |
| "./audioToConvert.wav", | |
| block_length=block_size, | |
| frame_length=16000, | |
| hop_length=16000 | |
| ) | |
| model.to(device) | |
| print(f"Model loaded to {device}: Entering transcription phase") | |
| #Code for timestamping | |
| encoding_start = 0 | |
| encoding_end = 0 | |
| sbv_file = open("subtitle.sbv", "w") | |
| # Define batch size | |
| batch_size = 11 | |
| # Create an empty list to hold batches | |
| batch = [] | |
| for speech_segment in stream: | |
| if len(speech_segment.shape) > 1: | |
| speech_segment = speech_segment[:,0] + speech_segment[:,1] | |
| # Add the current speech segment to the batch | |
| batch.append(speech_segment) | |
| # If the batch is full, process it | |
| if len(batch) == batch_size: | |
| # Concatenate all segments in the batch along the time axis | |
| input_values = processor(batch, sampling_rate=16_000, return_tensors="pt") | |
| input_values = input_values.to(device) | |
| with torch.no_grad(): | |
| logits = model(**input_values).logits | |
| if len(logits.shape) == 1: | |
| logits = logits.unsqueeze(0) | |
| beam_search_result = beam_search_decoder(logits.to("cpu")) | |
| # Transcribe each segment in the batch | |
| for i in range(batch_size): | |
| transcription = " ".join(beam_search_result[i][0].words).strip() | |
| print(transcription) | |
| transcripts.append(transcription) | |
| encoding_end = encoding_start + block_size | |
| formatted_start = format_time(encoding_start) | |
| formatted_end = format_time(encoding_end) | |
| sbv_file.write(f"{formatted_start},{formatted_end}\n") | |
| sbv_file.write(f"{transcription}\n\n") | |
| encoding_start = encoding_end | |
| # Freeing up memory | |
| del input_values | |
| del logits | |
| del transcription | |
| torch.cuda.empty_cache() | |
| gc.collect() | |
| # Clear the batch | |
| batch = [] | |
| if batch: | |
| # Concatenate all segments in the batch along the time axis | |
| input_values = processor(batch, sampling_rate=16_000, return_tensors="pt") | |
| input_values = input_values.to(device) | |
| with torch.no_grad(): | |
| logits = model(**input_values).logits | |
| if len(logits.shape) == 1: | |
| logits = logits.unsqueeze(0) | |
| beam_search_result = beam_search_decoder(logits.to("cpu")) | |
| # Transcribe each segment in the batch | |
| for i in range(batch_size): | |
| transcription = " ".join(beam_search_result[i][0].words).strip() | |
| print(transcription) | |
| transcripts.append(transcription) | |
| encoding_end = encoding_start + block_size | |
| formatted_start = format_time(encoding_start) | |
| formatted_end = format_time(encoding_end) | |
| sbv_file.write(f"{formatted_start},{formatted_end}\n") | |
| sbv_file.write(f"{transcription}\n\n") | |
| encoding_start = encoding_end | |
| # Freeing up memory | |
| del input_values | |
| del logits | |
| del transcription | |
| torch.cuda.empty_cache() | |
| gc.collect() | |
| # Join all transcripts into a single transcript | |
| transcript = ' '.join(transcripts) | |
| sbv_file.close() | |
| end_time = time.time() | |
| print(f"The script ran for {end_time - start_time} seconds.") | |
| return("./subtitle.sbv") | |
| demo = gr.Interface(fn=Transcribe, inputs=gr.File(label="Upload an audio file of Amharic content"), outputs=gr.File(label="Download .sbv transcription"), | |
| title="Amharic Audio Transcription", | |
| description="This application uses Meta MMS and a custom kenLM model to transcribe Amharic Audio files of arbitrary length into .sbv files. Upload an Amharic audio file and get your transcription! \n(Note: This is only a rough implementation of Meta's MMS for audio transcription, you should manually edit files after transcription has completed.)" | |
| ) | |
| demo.launch() | |