Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| import logging | |
| import random | |
| import subprocess | |
| import soundfile as sf | |
| import gradio as gr | |
| import numpy as np | |
| import sherpa_onnx | |
| from huggingface_hub import hf_hub_download | |
| sample_rate = 16000 | |
| def _get_nn_model_filename( | |
| repo_id: str, | |
| filename: str, | |
| subfolder: str = "exp", | |
| ) -> str: | |
| nn_model_filename = hf_hub_download( | |
| repo_id=repo_id, | |
| filename=filename, | |
| subfolder=subfolder, | |
| ) | |
| return nn_model_filename | |
| def get_vad() -> sherpa_onnx.VoiceActivityDetector: | |
| vad_model = _get_nn_model_filename( | |
| repo_id="csukuangfj/vad", | |
| filename="silero_vad.onnx", | |
| subfolder=".", | |
| ) | |
| config = sherpa_onnx.VadModelConfig() | |
| config.silero_vad.model = vad_model | |
| config.silero_vad.threshold = 0.5 | |
| config.silero_vad.min_silence_duration = 0.1 | |
| config.silero_vad.min_speech_duration = 0.25 | |
| config.sample_rate = sample_rate | |
| config.silero_vad.max_speech_duration = 20 # seconds | |
| vad = sherpa_onnx.VoiceActivityDetector( | |
| config, | |
| buffer_size_in_seconds=180, | |
| ) | |
| return vad | |
| def build_html_output(s: str, style: str = "result_item_success"): | |
| return f""" | |
| <div class='result'> | |
| <div class='result_item {style}'> | |
| {s} | |
| </div> | |
| </div> | |
| """ | |
| def process_uploaded_audio_file( | |
| in_filename: str, | |
| ): | |
| logging.warning(f"Processing audio {in_filename}") | |
| if in_filename is None or in_filename == "": | |
| return ( | |
| "", | |
| build_html_output( | |
| "Please first upload a file and then click " 'the button "Submit"', | |
| "result_item_error", | |
| ), | |
| "", | |
| "", | |
| ) | |
| return process_file(in_filename) | |
| def process_uploaded_video_file( | |
| in_filename: str, | |
| ): | |
| logging.warning(f"Processing video {in_filename}") | |
| if in_filename is None or in_filename == "": | |
| return ( | |
| "", | |
| build_html_output( | |
| "Please first upload a file and then click " 'the button "Submit"', | |
| "result_item_error", | |
| ), | |
| "", | |
| "", | |
| ) | |
| logging.warning(f"Processing uploaded video file: {in_filename}") | |
| return process_file(in_filename) | |
| def process_file(filename: str): | |
| vad = get_vad() | |
| ffmpeg_cmd = [ | |
| "ffmpeg", | |
| "-i", | |
| filename, | |
| "-f", | |
| "s16le", | |
| "-acodec", | |
| "pcm_s16le", | |
| "-ac", | |
| "1", | |
| "-ar", | |
| str(sample_rate), | |
| "-", | |
| ] | |
| process = subprocess.Popen( | |
| ffmpeg_cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL | |
| ) | |
| frames_per_read = int(sample_rate * 100) # 100 second | |
| window_size = 512 | |
| buffer = [] | |
| all_samples = [] | |
| is_last = False | |
| while True: | |
| # *2 because int16_t has two bytes | |
| data = process.stdout.read(frames_per_read * 2) | |
| if not data: | |
| if is_last: | |
| break | |
| is_last = True | |
| data = np.zeros(sample_rate, dtype=np.int16) | |
| samples = np.frombuffer(data, dtype=np.int16) | |
| samples = samples.astype(np.float32) / 32768 | |
| buffer = np.concatenate([buffer, samples]) | |
| while len(buffer) > window_size: | |
| vad.accept_waveform(buffer[:window_size]) | |
| buffer = buffer[window_size:] | |
| if is_last: | |
| vad.flush() | |
| while not vad.empty(): | |
| all_samples.extend(vad.front.samples) | |
| vad.pop() | |
| suffix = random.randint(1000, 10000) | |
| out_filename = f"{filename}-{suffix}.wav" | |
| speech_samples = np.array(all_samples, dtype=np.float32) | |
| sf.write(out_filename, speech_samples, samplerate=sample_rate) | |
| return ( | |
| out_filename, | |
| build_html_output( | |
| "Done! Please download the generated .wav file", "result_item_success" | |
| ), | |
| ) | |
| css = """ | |
| .result {display:flex;flex-direction:column} | |
| .result_item {padding:15px;margin-bottom:8px;border-radius:15px;width:100%} | |
| .result_item_success {background-color:mediumaquamarine;color:white;align-self:start} | |
| .result_item_error {background-color:#ff7070;color:white;align-self:start} | |
| """ | |
| demo = gr.Blocks(css=css) | |
| with demo: | |
| gr.Markdown("Remove non-speeches") | |
| with gr.Tabs(): | |
| with gr.TabItem("Upload audio from disk (音频)"): | |
| uploaded_audio_file = gr.Audio( | |
| sources=["upload"], # Choose between "microphone", "upload" | |
| type="filepath", | |
| label="Upload audio from disk", | |
| ) | |
| upload_audio_button = gr.Button("Submit") | |
| output_audio = gr.Audio(label="Output") | |
| output_info_audio = gr.HTML(label="Info") | |
| with gr.TabItem("Upload video from disk (视频)"): | |
| uploaded_video_file = gr.Video( | |
| sources=["upload"], | |
| label="Upload from disk", | |
| show_share_button=True, | |
| ) | |
| upload_video_button = gr.Button("Submit") | |
| output_video = gr.Video(label="Output") | |
| output_info_video = gr.HTML(label="Info") | |
| upload_video_button.click( | |
| process_uploaded_video_file, | |
| inputs=[ | |
| uploaded_video_file, | |
| ], | |
| outputs=[ | |
| output_video, | |
| output_info_video, | |
| ], | |
| ) | |
| upload_audio_button.click( | |
| process_uploaded_audio_file, | |
| inputs=[ | |
| uploaded_audio_file, | |
| ], | |
| outputs=[ | |
| output_audio, | |
| output_info_audio, | |
| ], | |
| ) | |
| if __name__ == "__main__": | |
| formatter = "%(asctime)s %(levelname)s [%(filename)s:%(lineno)d] %(message)s" | |
| logging.basicConfig(format=formatter, level=logging.WARNING) | |
| demo.launch(share=True) | |