Spaces:
Sleeping
Sleeping
| from pathlib import Path | |
| import ffmpeg | |
| import keras | |
| import matplotlib.pyplot as plt | |
| import numpy as np | |
| import pandas as pd | |
| import streamlit as st | |
| import tensorflow as tf | |
| from huggingface_hub import hf_hub_download | |
| # ========= App title ========= | |
| st.title("Speaker Verification - Demo") | |
| st.markdown( | |
| """ | |
| **This demo was prepared as part of an ML project on speaker verification.** | |
| Full documentation: [github.com/JakubMk/speaker_verification_project](https://github.com/JakubMk/speaker_verification_project) | |
| **How it works:** | |
| 1. **Load the model.** | |
| 2. **Upload audio files** or **record** short speech samples. | |
| 3. **Test the model** by clicking **“Verify Speaker”**. | |
| """ | |
| ) | |
| # ========= Session state ========= | |
| if "load_model_button" not in st.session_state: | |
| st.session_state.load_model_button = False | |
| if "audio_left" not in st.session_state: | |
| st.session_state.audio_left = None | |
| if "audio_right" not in st.session_state: | |
| st.session_state.audio_right = None | |
| # ========= UI: choose model ========= | |
| model_df = pd.DataFrame({"first column": ["verification_model_resnet34_512dim"]}) | |
| option = st.selectbox("Choose a model to test:", model_df["first column"]) | |
| st.button("Load the model", on_click=lambda: st.session_state.update(load_model_button=True)) | |
| # ========= Helpers ========= | |
| FS = 16000 # target sample rate | |
| WT = 48560 # window length in samples | |
| EXT2FMT = { | |
| "wav": "wav", | |
| "mp3": "mp3", | |
| "ogg": "ogg", | |
| "aac": "aac", | |
| "m4a": "mp4" | |
| } | |
| def infer_input_format(name: str) -> str | None: | |
| if name and "." in name: | |
| ext = name.rsplit(".", 1)[-1].lower() | |
| return EXT2FMT.get(ext) | |
| return None | |
| def bytes_to_pcm16k_mono(data: bytes, in_format: str | None) -> np.ndarray: | |
| """ | |
| Converts the input audio (any supported container) to raw PCM 16 kHz mono 16-bit LE and returns it as float32 in the range [-1, 1]. | |
| Cached by (bytes, format). | |
| """ | |
| stream = ( | |
| ffmpeg | |
| .input("pipe:0", **({"format": in_format} if in_format else {})) | |
| .output("pipe:1", format="s16le", acodec="pcm_s16le", ar=str(FS), ac=1) | |
| .global_args("-hide_banner") | |
| ) | |
| out, err = ffmpeg.run(stream, capture_stdout=True, capture_stderr=True, input=data) | |
| audio = np.frombuffer(out, dtype="<i2").astype(np.float32) / 32768.0 | |
| if audio.size < WT: | |
| # Padding (centered) | |
| audio = np.pad(audio, (int((WT - audio.size) / 2) + 1, int((WT - audio.size) / 2) + 1), mode="constant") | |
| return audio | |
| def plot_waveform(audio_np: np.ndarray, fs: int = FS, title: str = "Waveform"): | |
| t = np.arange(audio_np.size) / fs if audio_np.size else np.array([0, 1e-6]) | |
| fig, ax = plt.subplots() | |
| ax.plot(t, audio_np) | |
| ax.set_title(title) | |
| ax.set_xlabel("Time [s]") | |
| ax.set_ylabel("Amplitude") | |
| ax.margins(x=0, y=0) | |
| if audio_np.size: | |
| ax.set_xlim(t[0], t[-1]) | |
| return fig | |
| def load_model_from_hub(repo_id: str, filename: str, revision: str): | |
| """Downloads and loads a Keras model (cached resource – stored in memory).""" | |
| model_path = hf_hub_download( | |
| repo_id=repo_id, | |
| filename=filename, | |
| repo_type="model", | |
| revision=revision, | |
| ) | |
| # Import custom modules | |
| import custom_models, custom_losses | |
| model = keras.models.load_model(model_path) | |
| if hasattr(model, "return_embedding"): | |
| model.return_embedding = True | |
| with open(model_path, "rb") as f: | |
| model_bytes = f.read() | |
| return model, model_path, model_bytes | |
| def handle_record(label: str) -> np.ndarray | None: | |
| rec = st.audio_input(label) | |
| if not rec: | |
| return None | |
| try: | |
| audio_np = bytes_to_pcm16k_mono(rec.getvalue(), in_format="wav") | |
| return audio_np | |
| except ffmpeg.Error as e: | |
| st.error("FFmpeg failed while processing recording.") | |
| st.code(e.stderr.decode("utf-8", "ignore")) | |
| return None | |
| def handle_upload(label: str, key: str) -> np.ndarray | None: | |
| file = st.file_uploader( | |
| label, | |
| type=["wav", "m4a", "aac", "mp3", "ogg", "webm", "flac"], | |
| key=key, | |
| ) | |
| if not file: | |
| return None | |
| in_fmt = infer_input_format(file.name) | |
| try: | |
| audio_np = bytes_to_pcm16k_mono(file.getvalue(), in_fmt) | |
| return audio_np | |
| except ffmpeg.Error as e: | |
| st.error("FFmpeg failed while converting uploaded file.") | |
| st.code(e.stderr.decode("utf-8", "ignore")) | |
| return None | |
| def delta(x): | |
| """Computes first-order derivative along time axis.""" | |
| return x[:, 1:] - x[:, :-1] | |
| def array_to_spectrogram(audio_np: np.ndarray, | |
| audio_in_samples: int = 48560, | |
| window_length: int = 400, | |
| step_length: int = 160, | |
| fft_length: int = 1023 | |
| ) -> tf.Tensor: | |
| audio = tf.convert_to_tensor(audio_np, dtype=tf.float32) | |
| audio_length = audio_np.size | |
| random_int = tf.random.uniform(shape=(), minval=0, maxval=(audio_length-audio_in_samples), dtype=tf.int32) | |
| stft = tf.signal.stft(audio[random_int:(random_int+audio_in_samples)], | |
| frame_length=window_length, | |
| frame_step=step_length, | |
| fft_length=fft_length) | |
| spectrogram = tf.abs(stft) | |
| spectrogram = tf.transpose(spectrogram) # shape: (freq, time) | |
| spectrogram = tf.math.log1p(spectrogram) | |
| spectrogram_delta = delta(spectrogram) | |
| spectrogram_delta2 = delta(spectrogram_delta) | |
| return tf.stack([spectrogram[:, :-2], | |
| spectrogram_delta[:, :-1], | |
| spectrogram_delta2], | |
| axis=-1) # shape: (freq, time, 3) | |
| def verify_speakers(model, audio_left, audio_right, margin): | |
| spec_left = array_to_spectrogram(audio_left)[tf.newaxis, ...] | |
| spec_right = array_to_spectrogram(audio_right)[tf.newaxis, ...] | |
| emb_left = model.predict(spec_left, verbose=0) | |
| emb_right = model.predict(spec_right, verbose=0) | |
| cosine_similarity = tf.linalg.matmul(emb_left, emb_right, transpose_b=True) | |
| cosine_similarity = float(cosine_similarity.numpy().squeeze()) | |
| if cosine_similarity >= margin: | |
| st.success("Both utterances belong to the same speaker.") | |
| else: | |
| st.warning("The utterances are from different speakers.") | |
| st.caption(f"Cosine similarity: {cosine_similarity:.4f}, margin: {margin:.4f}") | |
| # ========= Load model ========= | |
| if st.session_state.load_model_button: | |
| try: | |
| model, model_path, model_bytes = load_model_from_hub( | |
| repo_id="2pift/sv-resnet34-keras", | |
| filename="best_model.keras", | |
| revision="v1.0.0", | |
| ) | |
| st.success("Model loaded — you can upload audio files or record utterances.") | |
| st.download_button( | |
| "(Option) Download the model file", | |
| data=model_bytes, | |
| file_name="verification_model_resnet34_512dim.keras", | |
| ) | |
| except Exception as e: | |
| st.error(f"Error loading model: {e}") | |
| # ========= Two columns ========= | |
| left_column, right_column = st.columns(2) | |
| with left_column: | |
| st.subheader("Voice Sample 1") | |
| record_left = st.checkbox("Record first voice sample", key="chk_record_left") | |
| if record_left: | |
| audio_left = handle_record("Record (left)") | |
| else: | |
| audio_left = handle_upload("Upload left audio", key="file_left") | |
| if audio_left is not None: | |
| st.session_state.audio_left = audio_left | |
| fig = plot_waveform(audio_left, FS, "Left audio waveform") | |
| st.pyplot(fig, width="stretch") | |
| st.caption(f"Samples: {audio_left.size} • Duration: {audio_left.size/FS:.2f}s") | |
| with right_column: | |
| st.subheader("Voice Sample 2") | |
| record_right = st.checkbox("Record second voice sample", key="chk_record_right") | |
| if record_right: | |
| audio_right = handle_record("Record (right)") | |
| else: | |
| audio_right = handle_upload("Upload right audio", key="file_right") | |
| if audio_right is not None: | |
| st.session_state.audio_right = audio_right | |
| fig = plot_waveform(audio_right, FS, "Right audio waveform") | |
| st.pyplot(fig, width="stretch") | |
| st.caption(f"Samples: {audio_right.size} • Duration: {audio_right.size/FS:.2f}s") | |
| if audio_left is not None and audio_right is not None: | |
| margin = st.slider('Selected margin:', -1.0, 1.0, 0.26, 0.01) | |
| verify_button = st.button("Verify Speaker") | |
| if verify_button: | |
| try: | |
| verify_speakers(model, audio_left, audio_right, margin) | |
| except Exception as e: | |
| st.error(f"Error during verification: {e}") | |