Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| WhisperKit Performance Regression Detection Script | |
| This script detects significant performance regressions per model by: | |
| - Tracking the best (lowest) WER for each model | |
| - Tracking the best (highest) speed and tokens per second for each model | |
| - Comparing all configurations against those best baselines | |
| - Alerting if any configuration deviates by > 20% | |
| If any model shows discrepancy > 20%, it alerts via Slack. | |
| """ | |
| import json | |
| import os | |
| import statistics | |
| from collections import defaultdict | |
| from typing import Dict, List, Tuple, Optional | |
| import pandas as pd | |
| def load_performance_data(file_path: str, commit_hash: Optional[str] = None) -> List[Dict]: | |
| """Load performance data from JSON file, optionally filtering by commit hash.""" | |
| data = [] | |
| try: | |
| with open(file_path, "r") as f: | |
| for line in f: | |
| try: | |
| item = json.loads(line.strip()) | |
| if commit_hash is None or item.get("commit_hash") == commit_hash: | |
| data.append(item) | |
| except json.JSONDecodeError: | |
| continue | |
| except FileNotFoundError: | |
| print(f"Warning: Performance data file not found: {file_path}") | |
| return [] | |
| return data | |
| def calculate_wer_statistics(wer_values: List[float]) -> Dict[str, float]: | |
| """Calculate WER statistics for a list of values.""" | |
| if not wer_values: | |
| return {"mean": 0, "median": 0, "min": 0, "max": 0, "std": 0} | |
| return { | |
| "mean": statistics.mean(wer_values), | |
| "median": statistics.median(wer_values), | |
| "min": min(wer_values), | |
| "max": max(wer_values), | |
| "std": statistics.stdev(wer_values) if len(wer_values) > 1 else 0 | |
| } | |
| def detect_device_regressions(current_data: List[Dict], all_historical_data: List[Dict], threshold: float = 20.0) -> List[Dict]: | |
| """ | |
| Detect WER regressions for devices in current release. | |
| Compares current data points against historical best for each model+device combination. | |
| Returns list of regression alerts. | |
| """ | |
| regressions = [] | |
| # Build historical best WER for each model+device combination | |
| historical_best = {} | |
| best_configs = {} | |
| for entry in all_historical_data: | |
| key = (entry["model"], entry["device"]) | |
| if key not in historical_best: | |
| historical_best[key] = entry["average_wer"] | |
| best_configs[key] = entry | |
| elif entry["average_wer"] < historical_best[key]: | |
| historical_best[key] = entry["average_wer"] | |
| best_configs[key] = entry | |
| # Check each current data point against historical best | |
| for entry in current_data: | |
| key = (entry["model"], entry["device"]) | |
| if key not in historical_best: | |
| continue # No historical data for this combination | |
| best_wer = historical_best[key] | |
| best_config = best_configs[key] | |
| current_wer = entry["average_wer"] | |
| if best_wer > 0: # Avoid division by zero | |
| pct_diff = (current_wer - best_wer) / best_wer * 100 | |
| # Only flag if current is significantly worse than historical best | |
| if pct_diff > threshold: | |
| regressions.append({ | |
| "type": "device_wer_discrepancy", | |
| "metric": "WER", | |
| "model": entry["model"], | |
| "device": entry["device"], | |
| "os": entry["os"], | |
| "current_value": round(current_wer, 2), | |
| "best_value": round(best_wer, 2), | |
| "best_device": best_config["device"], | |
| "best_os": best_config["os"], | |
| "percentage_diff": round(pct_diff, 1) | |
| }) | |
| return regressions | |
| def detect_os_regressions(current_data: List[Dict], all_historical_data: List[Dict], threshold: float = 20.0) -> List[Dict]: | |
| """ | |
| Detect WER regressions for OS versions in current release. | |
| Compares current data points against historical best for each model+OS combination. | |
| Returns list of regression alerts. | |
| """ | |
| regressions = [] | |
| # Build historical best WER for each model+OS combination | |
| historical_best = {} | |
| best_configs = {} | |
| for entry in all_historical_data: | |
| key = (entry["model"], entry["os"]) | |
| if key not in historical_best: | |
| historical_best[key] = entry["average_wer"] | |
| best_configs[key] = entry | |
| elif entry["average_wer"] < historical_best[key]: | |
| historical_best[key] = entry["average_wer"] | |
| best_configs[key] = entry | |
| # Check each current data point against historical best | |
| for entry in current_data: | |
| key = (entry["model"], entry["os"]) | |
| if key not in historical_best: | |
| continue # No historical data for this combination | |
| best_wer = historical_best[key] | |
| best_config = best_configs[key] | |
| current_wer = entry["average_wer"] | |
| if best_wer > 0: # Avoid division by zero | |
| pct_diff = (current_wer - best_wer) / best_wer * 100 | |
| # Only flag if current is significantly worse than historical best | |
| if pct_diff > threshold: | |
| regressions.append({ | |
| "type": "os_wer_discrepancy", | |
| "metric": "WER", | |
| "model": entry["model"], | |
| "device": entry["device"], | |
| "os": entry["os"], | |
| "current_value": round(current_wer, 2), | |
| "best_value": round(best_wer, 2), | |
| "best_device": best_config["device"], | |
| "best_os": best_config["os"], | |
| "percentage_diff": round(pct_diff, 1) | |
| }) | |
| return regressions | |
| def detect_release_regressions(current_data: List[Dict], previous_data: List[Dict], | |
| threshold: float = 20.0) -> List[Dict]: | |
| """ | |
| Detect WER regressions in current release for each model. | |
| Compares current WER against the best (lowest) historical WER for that model. | |
| Returns list of regression alerts. | |
| """ | |
| regressions = [] | |
| if not previous_data: | |
| print("No previous release data available for comparison") | |
| return regressions | |
| # Combine all historical data | |
| all_historical = previous_data | |
| # Group by model | |
| model_current = defaultdict(list) | |
| model_historical = defaultdict(list) | |
| for entry in current_data: | |
| model_current[entry["model"]].append(entry) | |
| for entry in all_historical: | |
| model_historical[entry["model"]].append(entry) | |
| # Check each model | |
| for model in model_current.keys(): | |
| if model not in model_historical: | |
| continue # No historical data for this model | |
| # Find best historical WER for this model | |
| best_historical_wer = min(entry["average_wer"] for entry in model_historical[model]) | |
| best_config = next(e for e in model_historical[model] if e["average_wer"] == best_historical_wer) | |
| # Check each current configuration against best historical | |
| for current_entry in model_current[model]: | |
| current_wer = current_entry["average_wer"] | |
| if best_historical_wer > 0: # Avoid division by zero | |
| pct_change = (current_wer - best_historical_wer) / best_historical_wer * 100 | |
| # Only flag significant WER increases (regressions) | |
| if pct_change > threshold: | |
| regressions.append({ | |
| "type": "release_wer_regression", | |
| "metric": "WER", | |
| "model": model, | |
| "device": current_entry["device"], | |
| "os": current_entry["os"], | |
| "current_value": round(current_wer, 2), | |
| "best_historical_value": round(best_historical_wer, 2), | |
| "best_device": best_config["device"], | |
| "best_os": best_config["os"], | |
| "percentage_increase": round(pct_change, 1) | |
| }) | |
| return regressions | |
| def detect_speed_device_regressions(current_data: List[Dict], all_historical_data: List[Dict], threshold: float = 20.0) -> List[Dict]: | |
| """ | |
| Detect speed regressions for devices in current release. | |
| Compares current data points against historical best for each model+device combination. | |
| Returns list of regression alerts. | |
| """ | |
| regressions = [] | |
| # Build historical best speed for each model+device combination | |
| historical_best = {} | |
| best_configs = {} | |
| for entry in all_historical_data: | |
| if "speed" not in entry: | |
| continue | |
| key = (entry["model"], entry["device"]) | |
| if key not in historical_best: | |
| historical_best[key] = entry["speed"] | |
| best_configs[key] = entry | |
| elif entry["speed"] > historical_best[key]: | |
| historical_best[key] = entry["speed"] | |
| best_configs[key] = entry | |
| # Check each current data point against historical best | |
| for entry in current_data: | |
| if "speed" not in entry: | |
| continue | |
| key = (entry["model"], entry["device"]) | |
| if key not in historical_best: | |
| continue # No historical data for this combination | |
| best_speed = historical_best[key] | |
| best_config = best_configs[key] | |
| current_speed = entry["speed"] | |
| if best_speed > 0: # Avoid division by zero | |
| pct_diff = (best_speed - current_speed) / best_speed * 100 | |
| # Only flag if current is significantly slower than historical best | |
| if pct_diff > threshold: | |
| regressions.append({ | |
| "type": "device_speed_discrepancy", | |
| "metric": "Speed", | |
| "model": entry["model"], | |
| "device": entry["device"], | |
| "os": entry["os"], | |
| "current_value": round(current_speed, 2), | |
| "best_value": round(best_speed, 2), | |
| "best_device": best_config["device"], | |
| "best_os": best_config["os"], | |
| "percentage_diff": round(pct_diff, 1) | |
| }) | |
| return regressions | |
| def detect_speed_os_regressions(current_data: List[Dict], all_historical_data: List[Dict], threshold: float = 20.0) -> List[Dict]: | |
| """ | |
| Detect speed regressions for OS versions in current release. | |
| Compares current data points against historical best for each model+OS combination. | |
| Returns list of regression alerts. | |
| """ | |
| regressions = [] | |
| # Build historical best speed for each model+OS combination | |
| historical_best = {} | |
| best_configs = {} | |
| for entry in all_historical_data: | |
| if "speed" not in entry: | |
| continue | |
| key = (entry["model"], entry["os"]) | |
| if key not in historical_best: | |
| historical_best[key] = entry["speed"] | |
| best_configs[key] = entry | |
| elif entry["speed"] > historical_best[key]: | |
| historical_best[key] = entry["speed"] | |
| best_configs[key] = entry | |
| # Check each current data point against historical best | |
| for entry in current_data: | |
| if "speed" not in entry: | |
| continue | |
| key = (entry["model"], entry["os"]) | |
| if key not in historical_best: | |
| continue # No historical data for this combination | |
| best_speed = historical_best[key] | |
| best_config = best_configs[key] | |
| current_speed = entry["speed"] | |
| if best_speed > 0: # Avoid division by zero | |
| pct_diff = (best_speed - current_speed) / best_speed * 100 | |
| # Only flag if current is significantly slower than historical best | |
| if pct_diff > threshold: | |
| regressions.append({ | |
| "type": "os_speed_discrepancy", | |
| "metric": "Speed", | |
| "model": entry["model"], | |
| "device": entry["device"], | |
| "os": entry["os"], | |
| "current_value": round(current_speed, 2), | |
| "best_value": round(best_speed, 2), | |
| "best_device": best_config["device"], | |
| "best_os": best_config["os"], | |
| "percentage_diff": round(pct_diff, 1) | |
| }) | |
| return regressions | |
| def detect_speed_release_regressions(current_data: List[Dict], previous_data: List[Dict], | |
| threshold: float = 20.0) -> List[Dict]: | |
| """ | |
| Detect speed regressions in current release for each model. | |
| Compares current speed against the best (highest) historical speed for that model. | |
| Returns list of regression alerts. | |
| """ | |
| regressions = [] | |
| if not previous_data: | |
| return regressions | |
| # Group by model | |
| model_current = defaultdict(list) | |
| model_historical = defaultdict(list) | |
| for entry in current_data: | |
| if "speed" in entry: | |
| model_current[entry["model"]].append(entry) | |
| for entry in previous_data: | |
| if "speed" in entry: | |
| model_historical[entry["model"]].append(entry) | |
| # Check each model | |
| for model in model_current.keys(): | |
| if model not in model_historical: | |
| continue # No historical data for this model | |
| # Find best historical speed for this model | |
| best_historical_speed = max(entry["speed"] for entry in model_historical[model]) | |
| best_config = next(e for e in model_historical[model] if e["speed"] == best_historical_speed) | |
| # Check each current configuration against best historical | |
| for current_entry in model_current[model]: | |
| current_speed = current_entry["speed"] | |
| if best_historical_speed > 0: # Avoid division by zero | |
| pct_change = (best_historical_speed - current_speed) / best_historical_speed * 100 | |
| # Only flag significant speed decreases (regressions) | |
| if pct_change > threshold: | |
| regressions.append({ | |
| "type": "release_speed_regression", | |
| "metric": "Speed", | |
| "model": model, | |
| "device": current_entry["device"], | |
| "os": current_entry["os"], | |
| "current_value": round(current_speed, 2), | |
| "best_historical_value": round(best_historical_speed, 2), | |
| "best_device": best_config["device"], | |
| "best_os": best_config["os"], | |
| "percentage_decrease": round(pct_change, 1) | |
| }) | |
| return regressions | |
| def detect_tokens_device_regressions(current_data: List[Dict], all_historical_data: List[Dict], threshold: float = 20.0) -> List[Dict]: | |
| """ | |
| Detect tokens per second regressions for devices in current release. | |
| Compares current data points against historical best for each model+device combination. | |
| Returns list of regression alerts. | |
| """ | |
| regressions = [] | |
| # Build historical best tokens/sec for each model+device combination | |
| historical_best = {} | |
| best_configs = {} | |
| for entry in all_historical_data: | |
| if "tokens_per_second" not in entry: | |
| continue | |
| key = (entry["model"], entry["device"]) | |
| if key not in historical_best: | |
| historical_best[key] = entry["tokens_per_second"] | |
| best_configs[key] = entry | |
| elif entry["tokens_per_second"] > historical_best[key]: | |
| historical_best[key] = entry["tokens_per_second"] | |
| best_configs[key] = entry | |
| # Check each current data point against historical best | |
| for entry in current_data: | |
| if "tokens_per_second" not in entry: | |
| continue | |
| key = (entry["model"], entry["device"]) | |
| if key not in historical_best: | |
| continue # No historical data for this combination | |
| best_tokens = historical_best[key] | |
| best_config = best_configs[key] | |
| current_tokens = entry["tokens_per_second"] | |
| if best_tokens > 0: # Avoid division by zero | |
| pct_diff = (best_tokens - current_tokens) / best_tokens * 100 | |
| # Only flag if current is significantly slower than historical best | |
| if pct_diff > threshold: | |
| regressions.append({ | |
| "type": "device_tokens_discrepancy", | |
| "metric": "Tokens/Second", | |
| "model": entry["model"], | |
| "device": entry["device"], | |
| "os": entry["os"], | |
| "current_value": round(current_tokens, 2), | |
| "best_value": round(best_tokens, 2), | |
| "best_device": best_config["device"], | |
| "best_os": best_config["os"], | |
| "percentage_diff": round(pct_diff, 1) | |
| }) | |
| return regressions | |
| def detect_tokens_os_regressions(current_data: List[Dict], all_historical_data: List[Dict], threshold: float = 20.0) -> List[Dict]: | |
| """ | |
| Detect tokens per second regressions for OS versions in current release. | |
| Compares current data points against historical best for each model+OS combination. | |
| Returns list of regression alerts. | |
| """ | |
| regressions = [] | |
| # Build historical best tokens/sec for each model+OS combination | |
| historical_best = {} | |
| best_configs = {} | |
| for entry in all_historical_data: | |
| if "tokens_per_second" not in entry: | |
| continue | |
| key = (entry["model"], entry["os"]) | |
| if key not in historical_best: | |
| historical_best[key] = entry["tokens_per_second"] | |
| best_configs[key] = entry | |
| elif entry["tokens_per_second"] > historical_best[key]: | |
| historical_best[key] = entry["tokens_per_second"] | |
| best_configs[key] = entry | |
| # Check each current data point against historical best | |
| for entry in current_data: | |
| if "tokens_per_second" not in entry: | |
| continue | |
| key = (entry["model"], entry["os"]) | |
| if key not in historical_best: | |
| continue # No historical data for this combination | |
| best_tokens = historical_best[key] | |
| best_config = best_configs[key] | |
| current_tokens = entry["tokens_per_second"] | |
| if best_tokens > 0: # Avoid division by zero | |
| pct_diff = (best_tokens - current_tokens) / best_tokens * 100 | |
| # Only flag if current is significantly slower than historical best | |
| if pct_diff > threshold: | |
| regressions.append({ | |
| "type": "os_tokens_discrepancy", | |
| "metric": "Tokens/Second", | |
| "model": entry["model"], | |
| "device": entry["device"], | |
| "os": entry["os"], | |
| "current_value": round(current_tokens, 2), | |
| "best_value": round(best_tokens, 2), | |
| "best_device": best_config["device"], | |
| "best_os": best_config["os"], | |
| "percentage_diff": round(pct_diff, 1) | |
| }) | |
| return regressions | |
| def detect_tokens_release_regressions(current_data: List[Dict], previous_data: List[Dict], | |
| threshold: float = 20.0) -> List[Dict]: | |
| """ | |
| Detect tokens per second regressions in current release for each model. | |
| Compares current tokens/sec against the best (highest) historical tokens/sec for that model. | |
| Returns list of regression alerts. | |
| """ | |
| regressions = [] | |
| if not previous_data: | |
| return regressions | |
| # Group by model | |
| model_current = defaultdict(list) | |
| model_historical = defaultdict(list) | |
| for entry in current_data: | |
| if "tokens_per_second" in entry: | |
| model_current[entry["model"]].append(entry) | |
| for entry in previous_data: | |
| if "tokens_per_second" in entry: | |
| model_historical[entry["model"]].append(entry) | |
| # Check each model | |
| for model in model_current.keys(): | |
| if model not in model_historical: | |
| continue # No historical data for this model | |
| # Find best historical tokens/sec for this model | |
| best_historical_tokens = max(entry["tokens_per_second"] for entry in model_historical[model]) | |
| best_config = next(e for e in model_historical[model] if e["tokens_per_second"] == best_historical_tokens) | |
| # Check each current configuration against best historical | |
| for current_entry in model_current[model]: | |
| current_tokens = current_entry["tokens_per_second"] | |
| if best_historical_tokens > 0: # Avoid division by zero | |
| pct_change = (best_historical_tokens - current_tokens) / best_historical_tokens * 100 | |
| # Only flag significant tokens/sec decreases (regressions) | |
| if pct_change > threshold: | |
| regressions.append({ | |
| "type": "release_tokens_regression", | |
| "metric": "Tokens/Second", | |
| "model": model, | |
| "device": current_entry["device"], | |
| "os": current_entry["os"], | |
| "current_value": round(current_tokens, 2), | |
| "best_historical_value": round(best_historical_tokens, 2), | |
| "best_device": best_config["device"], | |
| "best_os": best_config["os"], | |
| "percentage_decrease": round(pct_change, 1) | |
| }) | |
| return regressions | |
| def generate_slack_message(regressions: List[Dict]) -> Dict: | |
| """Generate Slack message payload for performance regression alerts.""" | |
| if not regressions: | |
| return None | |
| blocks = [ | |
| { | |
| "type": "header", | |
| "text": { | |
| "type": "plain_text", | |
| "text": "⚠️ WhisperKit Performance Regression Alert", | |
| "emoji": True | |
| } | |
| }, | |
| { | |
| "type": "context", | |
| "elements": [ | |
| { | |
| "type": "mrkdwn", | |
| "text": f"*Detected {len(regressions)} significant performance regression(s)*" | |
| } | |
| ] | |
| }, | |
| {"type": "divider"} | |
| ] | |
| # Group regressions by type | |
| wer_device = [r for r in regressions if r["type"] == "device_wer_discrepancy"] | |
| wer_os = [r for r in regressions if r["type"] == "os_wer_discrepancy"] | |
| wer_release = [r for r in regressions if r["type"] == "release_wer_regression"] | |
| speed_device = [r for r in regressions if r["type"] == "device_speed_discrepancy"] | |
| speed_os = [r for r in regressions if r["type"] == "os_speed_discrepancy"] | |
| speed_release = [r for r in regressions if r["type"] == "release_speed_regression"] | |
| tokens_device = [r for r in regressions if r["type"] == "device_tokens_discrepancy"] | |
| tokens_os = [r for r in regressions if r["type"] == "os_tokens_discrepancy"] | |
| tokens_release = [r for r in regressions if r["type"] == "release_tokens_regression"] | |
| # WER Regressions | |
| if wer_device: | |
| blocks.append({ | |
| "type": "section", | |
| "text": { | |
| "type": "mrkdwn", | |
| "text": "*WER Device Discrepancies:*" | |
| } | |
| }) | |
| for regression in wer_device: | |
| blocks.append({ | |
| "type": "section", | |
| "text": { | |
| "type": "mrkdwn", | |
| "text": f"*{regression['model']}*\n" | |
| f"• {regression['device']}: {regression['current_value']}% WER\n" | |
| f"• Best: {regression['best_value']}% WER ({regression['best_device']} on {regression['best_os']})\n" | |
| f"• Deviation: +{regression['percentage_diff']}%" | |
| } | |
| }) | |
| if wer_os: | |
| if wer_device: | |
| blocks.append({"type": "divider"}) | |
| blocks.append({ | |
| "type": "section", | |
| "text": { | |
| "type": "mrkdwn", | |
| "text": "*WER OS Version Discrepancies:*" | |
| } | |
| }) | |
| for regression in wer_os: | |
| blocks.append({ | |
| "type": "section", | |
| "text": { | |
| "type": "mrkdwn", | |
| "text": f"*{regression['model']}*\n" | |
| f"• {regression['os']}: {regression['current_value']}% WER\n" | |
| f"• Best: {regression['best_value']}% WER ({regression['best_device']} on {regression['best_os']})\n" | |
| f"• Deviation: +{regression['percentage_diff']}%" | |
| } | |
| }) | |
| if wer_release: | |
| if wer_device or wer_os: | |
| blocks.append({"type": "divider"}) | |
| blocks.append({ | |
| "type": "section", | |
| "text": { | |
| "type": "mrkdwn", | |
| "text": "*WER Release-to-Release Regressions:*" | |
| } | |
| }) | |
| for regression in wer_release: | |
| blocks.append({ | |
| "type": "section", | |
| "text": { | |
| "type": "mrkdwn", | |
| "text": f"*{regression['model']}* on {regression['device']} ({regression['os']})\n" | |
| f"• Current: {regression['current_value']}% WER\n" | |
| f"• Best Historical: {regression['best_historical_value']}% WER ({regression['best_device']} on {regression['best_os']})\n" | |
| f"• Increase: +{regression['percentage_increase']}%" | |
| } | |
| }) | |
| # Speed Regressions | |
| if speed_device: | |
| if wer_device or wer_os or wer_release: | |
| blocks.append({"type": "divider"}) | |
| blocks.append({ | |
| "type": "section", | |
| "text": { | |
| "type": "mrkdwn", | |
| "text": "*Speed Device Discrepancies:*" | |
| } | |
| }) | |
| for regression in speed_device: | |
| blocks.append({ | |
| "type": "section", | |
| "text": { | |
| "type": "mrkdwn", | |
| "text": f"*{regression['model']}*\n" | |
| f"• {regression['device']}: {regression['current_value']}x speed\n" | |
| f"• Best: {regression['best_value']}x speed ({regression['best_device']} on {regression['best_os']})\n" | |
| f"• Slower by: {regression['percentage_diff']}%" | |
| } | |
| }) | |
| if speed_os: | |
| if any([wer_device, wer_os, wer_release, speed_device]): | |
| blocks.append({"type": "divider"}) | |
| blocks.append({ | |
| "type": "section", | |
| "text": { | |
| "type": "mrkdwn", | |
| "text": "*Speed OS Version Discrepancies:*" | |
| } | |
| }) | |
| for regression in speed_os: | |
| blocks.append({ | |
| "type": "section", | |
| "text": { | |
| "type": "mrkdwn", | |
| "text": f"*{regression['model']}*\n" | |
| f"• {regression['os']}: {regression['current_value']}x speed\n" | |
| f"• Best: {regression['best_value']}x speed ({regression['best_device']} on {regression['best_os']})\n" | |
| f"• Slower by: {regression['percentage_diff']}%" | |
| } | |
| }) | |
| if speed_release: | |
| if any([wer_device, wer_os, wer_release, speed_device, speed_os]): | |
| blocks.append({"type": "divider"}) | |
| blocks.append({ | |
| "type": "section", | |
| "text": { | |
| "type": "mrkdwn", | |
| "text": "*Speed Release-to-Release Regressions:*" | |
| } | |
| }) | |
| for regression in speed_release: | |
| blocks.append({ | |
| "type": "section", | |
| "text": { | |
| "type": "mrkdwn", | |
| "text": f"*{regression['model']}* on {regression['device']} ({regression['os']})\n" | |
| f"• Current: {regression['current_value']}x speed\n" | |
| f"• Best Historical: {regression['best_historical_value']}x speed ({regression['best_device']} on {regression['best_os']})\n" | |
| f"• Slower by: {regression.get('percentage_decrease', regression.get('percentage_increase', 0))}%" | |
| } | |
| }) | |
| # Tokens Per Second Regressions | |
| if tokens_device: | |
| if any([wer_device, wer_os, wer_release, speed_device, speed_os, speed_release]): | |
| blocks.append({"type": "divider"}) | |
| blocks.append({ | |
| "type": "section", | |
| "text": { | |
| "type": "mrkdwn", | |
| "text": "*Tokens/Second Device Discrepancies:*" | |
| } | |
| }) | |
| for regression in tokens_device: | |
| blocks.append({ | |
| "type": "section", | |
| "text": { | |
| "type": "mrkdwn", | |
| "text": f"*{regression['model']}*\n" | |
| f"• {regression['device']}: {regression['current_value']} tokens/sec\n" | |
| f"• Best: {regression['best_value']} tokens/sec ({regression['best_device']} on {regression['best_os']})\n" | |
| f"• Slower by: {regression['percentage_diff']}%" | |
| } | |
| }) | |
| if tokens_os: | |
| if any([wer_device, wer_os, wer_release, speed_device, speed_os, speed_release, tokens_device]): | |
| blocks.append({"type": "divider"}) | |
| blocks.append({ | |
| "type": "section", | |
| "text": { | |
| "type": "mrkdwn", | |
| "text": "*Tokens/Second OS Version Discrepancies:*" | |
| } | |
| }) | |
| for regression in tokens_os: | |
| blocks.append({ | |
| "type": "section", | |
| "text": { | |
| "type": "mrkdwn", | |
| "text": f"*{regression['model']}*\n" | |
| f"• {regression['os']}: {regression['current_value']} tokens/sec\n" | |
| f"• Best: {regression['best_value']} tokens/sec ({regression['best_device']} on {regression['best_os']})\n" | |
| f"• Slower by: {regression['percentage_diff']}%" | |
| } | |
| }) | |
| if tokens_release: | |
| if any([wer_device, wer_os, wer_release, speed_device, speed_os, speed_release, tokens_device, tokens_os]): | |
| blocks.append({"type": "divider"}) | |
| blocks.append({ | |
| "type": "section", | |
| "text": { | |
| "type": "mrkdwn", | |
| "text": "*Tokens/Second Release-to-Release Regressions:*" | |
| } | |
| }) | |
| for regression in tokens_release: | |
| blocks.append({ | |
| "type": "section", | |
| "text": { | |
| "type": "mrkdwn", | |
| "text": f"*{regression['model']}* on {regression['device']} ({regression['os']})\n" | |
| f"• Current: {regression['current_value']} tokens/sec\n" | |
| f"• Best Historical: {regression['best_historical_value']} tokens/sec ({regression['best_device']} on {regression['best_os']})\n" | |
| f"• Slower by: {regression.get('percentage_decrease', regression.get('percentage_increase', 0))}%" | |
| } | |
| }) | |
| return {"blocks": blocks} | |
| def check_performance_regressions(): | |
| """Main function to check for performance regressions and generate alerts.""" | |
| # Load version data to get commit hashes | |
| try: | |
| with open("dashboard_data/version.json", "r") as f: | |
| version_data = json.load(f) | |
| except FileNotFoundError: | |
| print("Error: version.json not found") | |
| return | |
| releases = version_data.get("releases", []) | |
| if len(releases) < 1: | |
| print("Not enough release data for comparison") | |
| return | |
| # Get current and previous commit hashes | |
| current_commit = releases[-1] if releases else None | |
| previous_commit = releases[-2] if len(releases) >= 2 else None | |
| print(f"Checking performance regressions for current commit: {current_commit}") | |
| if previous_commit: | |
| print(f"Comparing against previous commit: {previous_commit}") | |
| # Load performance data - get all historical data for cross-version analysis | |
| all_historical_data = load_performance_data("dashboard_data/performance_data.json") | |
| current_data = load_performance_data("dashboard_data/performance_data.json", current_commit) | |
| previous_data = load_performance_data("dashboard_data/performance_data.json", previous_commit) if previous_commit else [] | |
| print(f"Loaded {len(current_data)} current data points, {len(previous_data)} previous data points") | |
| print(f"Loaded {len(all_historical_data)} total historical data points for cross-version analysis") | |
| all_regressions = [] | |
| # WER Checks | |
| print("\n=== Checking WER Regressions ===") | |
| device_regressions = detect_device_regressions(current_data, all_historical_data, threshold=20.0) | |
| all_regressions.extend(device_regressions) | |
| print(f"Found {len(device_regressions)} WER device discrepancies") | |
| os_regressions = detect_os_regressions(current_data, all_historical_data, threshold=20.0) | |
| all_regressions.extend(os_regressions) | |
| print(f"Found {len(os_regressions)} WER OS discrepancies") | |
| release_regressions = detect_release_regressions(current_data, previous_data, threshold=20.0) | |
| all_regressions.extend(release_regressions) | |
| print(f"Found {len(release_regressions)} WER release regressions") | |
| # Speed Checks | |
| print("\n=== Checking Speed Regressions ===") | |
| speed_device_regressions = detect_speed_device_regressions(current_data, all_historical_data, threshold=20.0) | |
| all_regressions.extend(speed_device_regressions) | |
| print(f"Found {len(speed_device_regressions)} speed device discrepancies") | |
| speed_os_regressions = detect_speed_os_regressions(current_data, all_historical_data, threshold=20.0) | |
| all_regressions.extend(speed_os_regressions) | |
| print(f"Found {len(speed_os_regressions)} speed OS discrepancies") | |
| speed_release_regressions = detect_speed_release_regressions(current_data, previous_data, threshold=20.0) | |
| all_regressions.extend(speed_release_regressions) | |
| print(f"Found {len(speed_release_regressions)} speed release regressions") | |
| # Tokens Per Second Checks | |
| print("\n=== Checking Tokens/Second Regressions ===") | |
| tokens_device_regressions = detect_tokens_device_regressions(current_data, all_historical_data, threshold=20.0) | |
| all_regressions.extend(tokens_device_regressions) | |
| print(f"Found {len(tokens_device_regressions)} tokens/sec device discrepancies") | |
| tokens_os_regressions = detect_tokens_os_regressions(current_data, all_historical_data, threshold=20.0) | |
| all_regressions.extend(tokens_os_regressions) | |
| print(f"Found {len(tokens_os_regressions)} tokens/sec OS discrepancies") | |
| tokens_release_regressions = detect_tokens_release_regressions(current_data, previous_data, threshold=20.0) | |
| all_regressions.extend(tokens_release_regressions) | |
| print(f"Found {len(tokens_release_regressions)} tokens/sec release regressions") | |
| # Generate outputs | |
| github_output = os.getenv("GITHUB_OUTPUT") | |
| if github_output: | |
| with open(github_output, "a") as f: | |
| print(f"has_performance_regressions={'true' if all_regressions else 'false'}", file=f) | |
| print(f"performance_regression_count={len(all_regressions)}", file=f) | |
| if all_regressions: | |
| slack_payload = generate_slack_message(all_regressions) | |
| if slack_payload: | |
| f.write("performance_regression_slack_payload<<EOF\n") | |
| json.dump(slack_payload, f, indent=2) | |
| f.write("\nEOF\n") | |
| # Print summary for debugging | |
| if all_regressions: | |
| print(f"\n⚠️ ALERT: Found {len(all_regressions)} performance regressions!") | |
| for regression in all_regressions: | |
| print(f" - {regression['type']}: {regression.get('model', 'N/A')}") | |
| else: | |
| print("\n✅ No significant performance regressions detected") | |
| if __name__ == "__main__": | |
| check_performance_regressions() | |