#!/usr/bin/env python3 """ WhisperKit Performance Regression Detection Script This script detects significant performance regressions per model by: - Tracking the best (lowest) WER for each model - Tracking the best (highest) speed and tokens per second for each model - Comparing all configurations against those best baselines - Alerting if any configuration deviates by > 20% If any model shows discrepancy > 20%, it alerts via Slack. """ import json import os import statistics from collections import defaultdict from typing import Dict, List, Tuple, Optional import pandas as pd def load_performance_data(file_path: str, commit_hash: Optional[str] = None) -> List[Dict]: """Load performance data from JSON file, optionally filtering by commit hash.""" data = [] try: with open(file_path, "r") as f: for line in f: try: item = json.loads(line.strip()) if commit_hash is None or item.get("commit_hash") == commit_hash: data.append(item) except json.JSONDecodeError: continue except FileNotFoundError: print(f"Warning: Performance data file not found: {file_path}") return [] return data def calculate_wer_statistics(wer_values: List[float]) -> Dict[str, float]: """Calculate WER statistics for a list of values.""" if not wer_values: return {"mean": 0, "median": 0, "min": 0, "max": 0, "std": 0} return { "mean": statistics.mean(wer_values), "median": statistics.median(wer_values), "min": min(wer_values), "max": max(wer_values), "std": statistics.stdev(wer_values) if len(wer_values) > 1 else 0 } def detect_device_regressions(current_data: List[Dict], all_historical_data: List[Dict], threshold: float = 20.0) -> List[Dict]: """ Detect WER regressions for devices in current release. Compares current data points against historical best for each model+device combination. Returns list of regression alerts. """ regressions = [] # Build historical best WER for each model+device combination historical_best = {} best_configs = {} for entry in all_historical_data: key = (entry["model"], entry["device"]) if key not in historical_best: historical_best[key] = entry["average_wer"] best_configs[key] = entry elif entry["average_wer"] < historical_best[key]: historical_best[key] = entry["average_wer"] best_configs[key] = entry # Check each current data point against historical best for entry in current_data: key = (entry["model"], entry["device"]) if key not in historical_best: continue # No historical data for this combination best_wer = historical_best[key] best_config = best_configs[key] current_wer = entry["average_wer"] if best_wer > 0: # Avoid division by zero pct_diff = (current_wer - best_wer) / best_wer * 100 # Only flag if current is significantly worse than historical best if pct_diff > threshold: regressions.append({ "type": "device_wer_discrepancy", "metric": "WER", "model": entry["model"], "device": entry["device"], "os": entry["os"], "current_value": round(current_wer, 2), "best_value": round(best_wer, 2), "best_device": best_config["device"], "best_os": best_config["os"], "percentage_diff": round(pct_diff, 1) }) return regressions def detect_os_regressions(current_data: List[Dict], all_historical_data: List[Dict], threshold: float = 20.0) -> List[Dict]: """ Detect WER regressions for OS versions in current release. Compares current data points against historical best for each model+OS combination. Returns list of regression alerts. """ regressions = [] # Build historical best WER for each model+OS combination historical_best = {} best_configs = {} for entry in all_historical_data: key = (entry["model"], entry["os"]) if key not in historical_best: historical_best[key] = entry["average_wer"] best_configs[key] = entry elif entry["average_wer"] < historical_best[key]: historical_best[key] = entry["average_wer"] best_configs[key] = entry # Check each current data point against historical best for entry in current_data: key = (entry["model"], entry["os"]) if key not in historical_best: continue # No historical data for this combination best_wer = historical_best[key] best_config = best_configs[key] current_wer = entry["average_wer"] if best_wer > 0: # Avoid division by zero pct_diff = (current_wer - best_wer) / best_wer * 100 # Only flag if current is significantly worse than historical best if pct_diff > threshold: regressions.append({ "type": "os_wer_discrepancy", "metric": "WER", "model": entry["model"], "device": entry["device"], "os": entry["os"], "current_value": round(current_wer, 2), "best_value": round(best_wer, 2), "best_device": best_config["device"], "best_os": best_config["os"], "percentage_diff": round(pct_diff, 1) }) return regressions def detect_release_regressions(current_data: List[Dict], previous_data: List[Dict], threshold: float = 20.0) -> List[Dict]: """ Detect WER regressions in current release for each model. Compares current WER against the best (lowest) historical WER for that model. Returns list of regression alerts. """ regressions = [] if not previous_data: print("No previous release data available for comparison") return regressions # Combine all historical data all_historical = previous_data # Group by model model_current = defaultdict(list) model_historical = defaultdict(list) for entry in current_data: model_current[entry["model"]].append(entry) for entry in all_historical: model_historical[entry["model"]].append(entry) # Check each model for model in model_current.keys(): if model not in model_historical: continue # No historical data for this model # Find best historical WER for this model best_historical_wer = min(entry["average_wer"] for entry in model_historical[model]) best_config = next(e for e in model_historical[model] if e["average_wer"] == best_historical_wer) # Check each current configuration against best historical for current_entry in model_current[model]: current_wer = current_entry["average_wer"] if best_historical_wer > 0: # Avoid division by zero pct_change = (current_wer - best_historical_wer) / best_historical_wer * 100 # Only flag significant WER increases (regressions) if pct_change > threshold: regressions.append({ "type": "release_wer_regression", "metric": "WER", "model": model, "device": current_entry["device"], "os": current_entry["os"], "current_value": round(current_wer, 2), "best_historical_value": round(best_historical_wer, 2), "best_device": best_config["device"], "best_os": best_config["os"], "percentage_increase": round(pct_change, 1) }) return regressions def detect_speed_device_regressions(current_data: List[Dict], all_historical_data: List[Dict], threshold: float = 20.0) -> List[Dict]: """ Detect speed regressions for devices in current release. Compares current data points against historical best for each model+device combination. Returns list of regression alerts. """ regressions = [] # Build historical best speed for each model+device combination historical_best = {} best_configs = {} for entry in all_historical_data: if "speed" not in entry: continue key = (entry["model"], entry["device"]) if key not in historical_best: historical_best[key] = entry["speed"] best_configs[key] = entry elif entry["speed"] > historical_best[key]: historical_best[key] = entry["speed"] best_configs[key] = entry # Check each current data point against historical best for entry in current_data: if "speed" not in entry: continue key = (entry["model"], entry["device"]) if key not in historical_best: continue # No historical data for this combination best_speed = historical_best[key] best_config = best_configs[key] current_speed = entry["speed"] if best_speed > 0: # Avoid division by zero pct_diff = (best_speed - current_speed) / best_speed * 100 # Only flag if current is significantly slower than historical best if pct_diff > threshold: regressions.append({ "type": "device_speed_discrepancy", "metric": "Speed", "model": entry["model"], "device": entry["device"], "os": entry["os"], "current_value": round(current_speed, 2), "best_value": round(best_speed, 2), "best_device": best_config["device"], "best_os": best_config["os"], "percentage_diff": round(pct_diff, 1) }) return regressions def detect_speed_os_regressions(current_data: List[Dict], all_historical_data: List[Dict], threshold: float = 20.0) -> List[Dict]: """ Detect speed regressions for OS versions in current release. Compares current data points against historical best for each model+OS combination. Returns list of regression alerts. """ regressions = [] # Build historical best speed for each model+OS combination historical_best = {} best_configs = {} for entry in all_historical_data: if "speed" not in entry: continue key = (entry["model"], entry["os"]) if key not in historical_best: historical_best[key] = entry["speed"] best_configs[key] = entry elif entry["speed"] > historical_best[key]: historical_best[key] = entry["speed"] best_configs[key] = entry # Check each current data point against historical best for entry in current_data: if "speed" not in entry: continue key = (entry["model"], entry["os"]) if key not in historical_best: continue # No historical data for this combination best_speed = historical_best[key] best_config = best_configs[key] current_speed = entry["speed"] if best_speed > 0: # Avoid division by zero pct_diff = (best_speed - current_speed) / best_speed * 100 # Only flag if current is significantly slower than historical best if pct_diff > threshold: regressions.append({ "type": "os_speed_discrepancy", "metric": "Speed", "model": entry["model"], "device": entry["device"], "os": entry["os"], "current_value": round(current_speed, 2), "best_value": round(best_speed, 2), "best_device": best_config["device"], "best_os": best_config["os"], "percentage_diff": round(pct_diff, 1) }) return regressions def detect_speed_release_regressions(current_data: List[Dict], previous_data: List[Dict], threshold: float = 20.0) -> List[Dict]: """ Detect speed regressions in current release for each model. Compares current speed against the best (highest) historical speed for that model. Returns list of regression alerts. """ regressions = [] if not previous_data: return regressions # Group by model model_current = defaultdict(list) model_historical = defaultdict(list) for entry in current_data: if "speed" in entry: model_current[entry["model"]].append(entry) for entry in previous_data: if "speed" in entry: model_historical[entry["model"]].append(entry) # Check each model for model in model_current.keys(): if model not in model_historical: continue # No historical data for this model # Find best historical speed for this model best_historical_speed = max(entry["speed"] for entry in model_historical[model]) best_config = next(e for e in model_historical[model] if e["speed"] == best_historical_speed) # Check each current configuration against best historical for current_entry in model_current[model]: current_speed = current_entry["speed"] if best_historical_speed > 0: # Avoid division by zero pct_change = (best_historical_speed - current_speed) / best_historical_speed * 100 # Only flag significant speed decreases (regressions) if pct_change > threshold: regressions.append({ "type": "release_speed_regression", "metric": "Speed", "model": model, "device": current_entry["device"], "os": current_entry["os"], "current_value": round(current_speed, 2), "best_historical_value": round(best_historical_speed, 2), "best_device": best_config["device"], "best_os": best_config["os"], "percentage_decrease": round(pct_change, 1) }) return regressions def detect_tokens_device_regressions(current_data: List[Dict], all_historical_data: List[Dict], threshold: float = 20.0) -> List[Dict]: """ Detect tokens per second regressions for devices in current release. Compares current data points against historical best for each model+device combination. Returns list of regression alerts. """ regressions = [] # Build historical best tokens/sec for each model+device combination historical_best = {} best_configs = {} for entry in all_historical_data: if "tokens_per_second" not in entry: continue key = (entry["model"], entry["device"]) if key not in historical_best: historical_best[key] = entry["tokens_per_second"] best_configs[key] = entry elif entry["tokens_per_second"] > historical_best[key]: historical_best[key] = entry["tokens_per_second"] best_configs[key] = entry # Check each current data point against historical best for entry in current_data: if "tokens_per_second" not in entry: continue key = (entry["model"], entry["device"]) if key not in historical_best: continue # No historical data for this combination best_tokens = historical_best[key] best_config = best_configs[key] current_tokens = entry["tokens_per_second"] if best_tokens > 0: # Avoid division by zero pct_diff = (best_tokens - current_tokens) / best_tokens * 100 # Only flag if current is significantly slower than historical best if pct_diff > threshold: regressions.append({ "type": "device_tokens_discrepancy", "metric": "Tokens/Second", "model": entry["model"], "device": entry["device"], "os": entry["os"], "current_value": round(current_tokens, 2), "best_value": round(best_tokens, 2), "best_device": best_config["device"], "best_os": best_config["os"], "percentage_diff": round(pct_diff, 1) }) return regressions def detect_tokens_os_regressions(current_data: List[Dict], all_historical_data: List[Dict], threshold: float = 20.0) -> List[Dict]: """ Detect tokens per second regressions for OS versions in current release. Compares current data points against historical best for each model+OS combination. Returns list of regression alerts. """ regressions = [] # Build historical best tokens/sec for each model+OS combination historical_best = {} best_configs = {} for entry in all_historical_data: if "tokens_per_second" not in entry: continue key = (entry["model"], entry["os"]) if key not in historical_best: historical_best[key] = entry["tokens_per_second"] best_configs[key] = entry elif entry["tokens_per_second"] > historical_best[key]: historical_best[key] = entry["tokens_per_second"] best_configs[key] = entry # Check each current data point against historical best for entry in current_data: if "tokens_per_second" not in entry: continue key = (entry["model"], entry["os"]) if key not in historical_best: continue # No historical data for this combination best_tokens = historical_best[key] best_config = best_configs[key] current_tokens = entry["tokens_per_second"] if best_tokens > 0: # Avoid division by zero pct_diff = (best_tokens - current_tokens) / best_tokens * 100 # Only flag if current is significantly slower than historical best if pct_diff > threshold: regressions.append({ "type": "os_tokens_discrepancy", "metric": "Tokens/Second", "model": entry["model"], "device": entry["device"], "os": entry["os"], "current_value": round(current_tokens, 2), "best_value": round(best_tokens, 2), "best_device": best_config["device"], "best_os": best_config["os"], "percentage_diff": round(pct_diff, 1) }) return regressions def detect_tokens_release_regressions(current_data: List[Dict], previous_data: List[Dict], threshold: float = 20.0) -> List[Dict]: """ Detect tokens per second regressions in current release for each model. Compares current tokens/sec against the best (highest) historical tokens/sec for that model. Returns list of regression alerts. """ regressions = [] if not previous_data: return regressions # Group by model model_current = defaultdict(list) model_historical = defaultdict(list) for entry in current_data: if "tokens_per_second" in entry: model_current[entry["model"]].append(entry) for entry in previous_data: if "tokens_per_second" in entry: model_historical[entry["model"]].append(entry) # Check each model for model in model_current.keys(): if model not in model_historical: continue # No historical data for this model # Find best historical tokens/sec for this model best_historical_tokens = max(entry["tokens_per_second"] for entry in model_historical[model]) best_config = next(e for e in model_historical[model] if e["tokens_per_second"] == best_historical_tokens) # Check each current configuration against best historical for current_entry in model_current[model]: current_tokens = current_entry["tokens_per_second"] if best_historical_tokens > 0: # Avoid division by zero pct_change = (best_historical_tokens - current_tokens) / best_historical_tokens * 100 # Only flag significant tokens/sec decreases (regressions) if pct_change > threshold: regressions.append({ "type": "release_tokens_regression", "metric": "Tokens/Second", "model": model, "device": current_entry["device"], "os": current_entry["os"], "current_value": round(current_tokens, 2), "best_historical_value": round(best_historical_tokens, 2), "best_device": best_config["device"], "best_os": best_config["os"], "percentage_decrease": round(pct_change, 1) }) return regressions def generate_slack_message(regressions: List[Dict]) -> Dict: """Generate Slack message payload for performance regression alerts.""" if not regressions: return None blocks = [ { "type": "header", "text": { "type": "plain_text", "text": "⚠️ WhisperKit Performance Regression Alert", "emoji": True } }, { "type": "context", "elements": [ { "type": "mrkdwn", "text": f"*Detected {len(regressions)} significant performance regression(s)*" } ] }, {"type": "divider"} ] # Group regressions by type wer_device = [r for r in regressions if r["type"] == "device_wer_discrepancy"] wer_os = [r for r in regressions if r["type"] == "os_wer_discrepancy"] wer_release = [r for r in regressions if r["type"] == "release_wer_regression"] speed_device = [r for r in regressions if r["type"] == "device_speed_discrepancy"] speed_os = [r for r in regressions if r["type"] == "os_speed_discrepancy"] speed_release = [r for r in regressions if r["type"] == "release_speed_regression"] tokens_device = [r for r in regressions if r["type"] == "device_tokens_discrepancy"] tokens_os = [r for r in regressions if r["type"] == "os_tokens_discrepancy"] tokens_release = [r for r in regressions if r["type"] == "release_tokens_regression"] # WER Regressions if wer_device: blocks.append({ "type": "section", "text": { "type": "mrkdwn", "text": "*WER Device Discrepancies:*" } }) for regression in wer_device: blocks.append({ "type": "section", "text": { "type": "mrkdwn", "text": f"*{regression['model']}*\n" f"• {regression['device']}: {regression['current_value']}% WER\n" f"• Best: {regression['best_value']}% WER ({regression['best_device']} on {regression['best_os']})\n" f"• Deviation: +{regression['percentage_diff']}%" } }) if wer_os: if wer_device: blocks.append({"type": "divider"}) blocks.append({ "type": "section", "text": { "type": "mrkdwn", "text": "*WER OS Version Discrepancies:*" } }) for regression in wer_os: blocks.append({ "type": "section", "text": { "type": "mrkdwn", "text": f"*{regression['model']}*\n" f"• {regression['os']}: {regression['current_value']}% WER\n" f"• Best: {regression['best_value']}% WER ({regression['best_device']} on {regression['best_os']})\n" f"• Deviation: +{regression['percentage_diff']}%" } }) if wer_release: if wer_device or wer_os: blocks.append({"type": "divider"}) blocks.append({ "type": "section", "text": { "type": "mrkdwn", "text": "*WER Release-to-Release Regressions:*" } }) for regression in wer_release: blocks.append({ "type": "section", "text": { "type": "mrkdwn", "text": f"*{regression['model']}* on {regression['device']} ({regression['os']})\n" f"• Current: {regression['current_value']}% WER\n" f"• Best Historical: {regression['best_historical_value']}% WER ({regression['best_device']} on {regression['best_os']})\n" f"• Increase: +{regression['percentage_increase']}%" } }) # Speed Regressions if speed_device: if wer_device or wer_os or wer_release: blocks.append({"type": "divider"}) blocks.append({ "type": "section", "text": { "type": "mrkdwn", "text": "*Speed Device Discrepancies:*" } }) for regression in speed_device: blocks.append({ "type": "section", "text": { "type": "mrkdwn", "text": f"*{regression['model']}*\n" f"• {regression['device']}: {regression['current_value']}x speed\n" f"• Best: {regression['best_value']}x speed ({regression['best_device']} on {regression['best_os']})\n" f"• Slower by: {regression['percentage_diff']}%" } }) if speed_os: if any([wer_device, wer_os, wer_release, speed_device]): blocks.append({"type": "divider"}) blocks.append({ "type": "section", "text": { "type": "mrkdwn", "text": "*Speed OS Version Discrepancies:*" } }) for regression in speed_os: blocks.append({ "type": "section", "text": { "type": "mrkdwn", "text": f"*{regression['model']}*\n" f"• {regression['os']}: {regression['current_value']}x speed\n" f"• Best: {regression['best_value']}x speed ({regression['best_device']} on {regression['best_os']})\n" f"• Slower by: {regression['percentage_diff']}%" } }) if speed_release: if any([wer_device, wer_os, wer_release, speed_device, speed_os]): blocks.append({"type": "divider"}) blocks.append({ "type": "section", "text": { "type": "mrkdwn", "text": "*Speed Release-to-Release Regressions:*" } }) for regression in speed_release: blocks.append({ "type": "section", "text": { "type": "mrkdwn", "text": f"*{regression['model']}* on {regression['device']} ({regression['os']})\n" f"• Current: {regression['current_value']}x speed\n" f"• Best Historical: {regression['best_historical_value']}x speed ({regression['best_device']} on {regression['best_os']})\n" f"• Slower by: {regression.get('percentage_decrease', regression.get('percentage_increase', 0))}%" } }) # Tokens Per Second Regressions if tokens_device: if any([wer_device, wer_os, wer_release, speed_device, speed_os, speed_release]): blocks.append({"type": "divider"}) blocks.append({ "type": "section", "text": { "type": "mrkdwn", "text": "*Tokens/Second Device Discrepancies:*" } }) for regression in tokens_device: blocks.append({ "type": "section", "text": { "type": "mrkdwn", "text": f"*{regression['model']}*\n" f"• {regression['device']}: {regression['current_value']} tokens/sec\n" f"• Best: {regression['best_value']} tokens/sec ({regression['best_device']} on {regression['best_os']})\n" f"• Slower by: {regression['percentage_diff']}%" } }) if tokens_os: if any([wer_device, wer_os, wer_release, speed_device, speed_os, speed_release, tokens_device]): blocks.append({"type": "divider"}) blocks.append({ "type": "section", "text": { "type": "mrkdwn", "text": "*Tokens/Second OS Version Discrepancies:*" } }) for regression in tokens_os: blocks.append({ "type": "section", "text": { "type": "mrkdwn", "text": f"*{regression['model']}*\n" f"• {regression['os']}: {regression['current_value']} tokens/sec\n" f"• Best: {regression['best_value']} tokens/sec ({regression['best_device']} on {regression['best_os']})\n" f"• Slower by: {regression['percentage_diff']}%" } }) if tokens_release: if any([wer_device, wer_os, wer_release, speed_device, speed_os, speed_release, tokens_device, tokens_os]): blocks.append({"type": "divider"}) blocks.append({ "type": "section", "text": { "type": "mrkdwn", "text": "*Tokens/Second Release-to-Release Regressions:*" } }) for regression in tokens_release: blocks.append({ "type": "section", "text": { "type": "mrkdwn", "text": f"*{regression['model']}* on {regression['device']} ({regression['os']})\n" f"• Current: {regression['current_value']} tokens/sec\n" f"• Best Historical: {regression['best_historical_value']} tokens/sec ({regression['best_device']} on {regression['best_os']})\n" f"• Slower by: {regression.get('percentage_decrease', regression.get('percentage_increase', 0))}%" } }) return {"blocks": blocks} def check_performance_regressions(): """Main function to check for performance regressions and generate alerts.""" # Load version data to get commit hashes try: with open("dashboard_data/version.json", "r") as f: version_data = json.load(f) except FileNotFoundError: print("Error: version.json not found") return releases = version_data.get("releases", []) if len(releases) < 1: print("Not enough release data for comparison") return # Get current and previous commit hashes current_commit = releases[-1] if releases else None previous_commit = releases[-2] if len(releases) >= 2 else None print(f"Checking performance regressions for current commit: {current_commit}") if previous_commit: print(f"Comparing against previous commit: {previous_commit}") # Load performance data - get all historical data for cross-version analysis all_historical_data = load_performance_data("dashboard_data/performance_data.json") current_data = load_performance_data("dashboard_data/performance_data.json", current_commit) previous_data = load_performance_data("dashboard_data/performance_data.json", previous_commit) if previous_commit else [] print(f"Loaded {len(current_data)} current data points, {len(previous_data)} previous data points") print(f"Loaded {len(all_historical_data)} total historical data points for cross-version analysis") all_regressions = [] # WER Checks print("\n=== Checking WER Regressions ===") device_regressions = detect_device_regressions(current_data, all_historical_data, threshold=20.0) all_regressions.extend(device_regressions) print(f"Found {len(device_regressions)} WER device discrepancies") os_regressions = detect_os_regressions(current_data, all_historical_data, threshold=20.0) all_regressions.extend(os_regressions) print(f"Found {len(os_regressions)} WER OS discrepancies") release_regressions = detect_release_regressions(current_data, previous_data, threshold=20.0) all_regressions.extend(release_regressions) print(f"Found {len(release_regressions)} WER release regressions") # Speed Checks print("\n=== Checking Speed Regressions ===") speed_device_regressions = detect_speed_device_regressions(current_data, all_historical_data, threshold=20.0) all_regressions.extend(speed_device_regressions) print(f"Found {len(speed_device_regressions)} speed device discrepancies") speed_os_regressions = detect_speed_os_regressions(current_data, all_historical_data, threshold=20.0) all_regressions.extend(speed_os_regressions) print(f"Found {len(speed_os_regressions)} speed OS discrepancies") speed_release_regressions = detect_speed_release_regressions(current_data, previous_data, threshold=20.0) all_regressions.extend(speed_release_regressions) print(f"Found {len(speed_release_regressions)} speed release regressions") # Tokens Per Second Checks print("\n=== Checking Tokens/Second Regressions ===") tokens_device_regressions = detect_tokens_device_regressions(current_data, all_historical_data, threshold=20.0) all_regressions.extend(tokens_device_regressions) print(f"Found {len(tokens_device_regressions)} tokens/sec device discrepancies") tokens_os_regressions = detect_tokens_os_regressions(current_data, all_historical_data, threshold=20.0) all_regressions.extend(tokens_os_regressions) print(f"Found {len(tokens_os_regressions)} tokens/sec OS discrepancies") tokens_release_regressions = detect_tokens_release_regressions(current_data, previous_data, threshold=20.0) all_regressions.extend(tokens_release_regressions) print(f"Found {len(tokens_release_regressions)} tokens/sec release regressions") # Generate outputs github_output = os.getenv("GITHUB_OUTPUT") if github_output: with open(github_output, "a") as f: print(f"has_performance_regressions={'true' if all_regressions else 'false'}", file=f) print(f"performance_regression_count={len(all_regressions)}", file=f) if all_regressions: slack_payload = generate_slack_message(all_regressions) if slack_payload: f.write("performance_regression_slack_payload<