from pathlib import Path import ffmpeg import keras import matplotlib.pyplot as plt import numpy as np import pandas as pd import streamlit as st import tensorflow as tf from huggingface_hub import hf_hub_download # ========= App title ========= st.title("Speaker Verification - Demo") st.markdown( """ **This demo was prepared as part of an ML project on speaker verification.** Full documentation: [github.com/JakubMk/speaker_verification_project](https://github.com/JakubMk/speaker_verification_project) **How it works:** 1. **Load the model.** 2. **Upload audio files** or **record** short speech samples. 3. **Test the model** by clicking **“Verify Speaker”**. """ ) # ========= Session state ========= if "load_model_button" not in st.session_state: st.session_state.load_model_button = False if "audio_left" not in st.session_state: st.session_state.audio_left = None if "audio_right" not in st.session_state: st.session_state.audio_right = None # ========= UI: choose model ========= model_df = pd.DataFrame({"first column": ["verification_model_resnet34_512dim"]}) option = st.selectbox("Choose a model to test:", model_df["first column"]) st.button("Load the model", on_click=lambda: st.session_state.update(load_model_button=True)) # ========= Helpers ========= FS = 16000 # target sample rate WT = 48560 # window length in samples EXT2FMT = { "wav": "wav", "mp3": "mp3", "ogg": "ogg", "aac": "aac", "m4a": "mp4" } def infer_input_format(name: str) -> str | None: if name and "." in name: ext = name.rsplit(".", 1)[-1].lower() return EXT2FMT.get(ext) return None @st.cache_data(show_spinner=False) def bytes_to_pcm16k_mono(data: bytes, in_format: str | None) -> np.ndarray: """ Converts the input audio (any supported container) to raw PCM 16 kHz mono 16-bit LE and returns it as float32 in the range [-1, 1]. Cached by (bytes, format). """ stream = ( ffmpeg .input("pipe:0", **({"format": in_format} if in_format else {})) .output("pipe:1", format="s16le", acodec="pcm_s16le", ar=str(FS), ac=1) .global_args("-hide_banner") ) out, err = ffmpeg.run(stream, capture_stdout=True, capture_stderr=True, input=data) audio = np.frombuffer(out, dtype=" np.ndarray | None: rec = st.audio_input(label) if not rec: return None try: audio_np = bytes_to_pcm16k_mono(rec.getvalue(), in_format="wav") return audio_np except ffmpeg.Error as e: st.error("FFmpeg failed while processing recording.") st.code(e.stderr.decode("utf-8", "ignore")) return None def handle_upload(label: str, key: str) -> np.ndarray | None: file = st.file_uploader( label, type=["wav", "m4a", "aac", "mp3", "ogg", "webm", "flac"], key=key, ) if not file: return None in_fmt = infer_input_format(file.name) try: audio_np = bytes_to_pcm16k_mono(file.getvalue(), in_fmt) return audio_np except ffmpeg.Error as e: st.error("FFmpeg failed while converting uploaded file.") st.code(e.stderr.decode("utf-8", "ignore")) return None def delta(x): """Computes first-order derivative along time axis.""" return x[:, 1:] - x[:, :-1] def array_to_spectrogram(audio_np: np.ndarray, audio_in_samples: int = 48560, window_length: int = 400, step_length: int = 160, fft_length: int = 1023 ) -> tf.Tensor: audio = tf.convert_to_tensor(audio_np, dtype=tf.float32) audio_length = audio_np.size random_int = tf.random.uniform(shape=(), minval=0, maxval=(audio_length-audio_in_samples), dtype=tf.int32) stft = tf.signal.stft(audio[random_int:(random_int+audio_in_samples)], frame_length=window_length, frame_step=step_length, fft_length=fft_length) spectrogram = tf.abs(stft) spectrogram = tf.transpose(spectrogram) # shape: (freq, time) spectrogram = tf.math.log1p(spectrogram) spectrogram_delta = delta(spectrogram) spectrogram_delta2 = delta(spectrogram_delta) return tf.stack([spectrogram[:, :-2], spectrogram_delta[:, :-1], spectrogram_delta2], axis=-1) # shape: (freq, time, 3) @st.cache_data(show_spinner=True) def verify_speakers(model, audio_left, audio_right, margin): spec_left = array_to_spectrogram(audio_left)[tf.newaxis, ...] spec_right = array_to_spectrogram(audio_right)[tf.newaxis, ...] emb_left = model.predict(spec_left, verbose=0) emb_right = model.predict(spec_right, verbose=0) cosine_similarity = tf.linalg.matmul(emb_left, emb_right, transpose_b=True) cosine_similarity = float(cosine_similarity.numpy().squeeze()) if cosine_similarity >= margin: st.success("Both utterances belong to the same speaker.") else: st.warning("The utterances are from different speakers.") st.caption(f"Cosine similarity: {cosine_similarity:.4f}, margin: {margin:.4f}") # ========= Load model ========= if st.session_state.load_model_button: try: model, model_path, model_bytes = load_model_from_hub( repo_id="2pift/sv-resnet34-keras", filename="best_model.keras", revision="v1.0.0", ) st.success("Model loaded — you can upload audio files or record utterances.") st.download_button( "(Option) Download the model file", data=model_bytes, file_name="verification_model_resnet34_512dim.keras", ) except Exception as e: st.error(f"Error loading model: {e}") # ========= Two columns ========= left_column, right_column = st.columns(2) with left_column: st.subheader("Voice Sample 1") record_left = st.checkbox("Record first voice sample", key="chk_record_left") if record_left: audio_left = handle_record("Record (left)") else: audio_left = handle_upload("Upload left audio", key="file_left") if audio_left is not None: st.session_state.audio_left = audio_left fig = plot_waveform(audio_left, FS, "Left audio waveform") st.pyplot(fig, width="stretch") st.caption(f"Samples: {audio_left.size} • Duration: {audio_left.size/FS:.2f}s") with right_column: st.subheader("Voice Sample 2") record_right = st.checkbox("Record second voice sample", key="chk_record_right") if record_right: audio_right = handle_record("Record (right)") else: audio_right = handle_upload("Upload right audio", key="file_right") if audio_right is not None: st.session_state.audio_right = audio_right fig = plot_waveform(audio_right, FS, "Right audio waveform") st.pyplot(fig, width="stretch") st.caption(f"Samples: {audio_right.size} • Duration: {audio_right.size/FS:.2f}s") if audio_left is not None and audio_right is not None: margin = st.slider('Selected margin:', -1.0, 1.0, 0.26, 0.01) verify_button = st.button("Verify Speaker") if verify_button: try: verify_speakers(model, audio_left, audio_right, margin) except Exception as e: st.error(f"Error during verification: {e}")