whisperkit-benchmarks / .github /scripts /wer_regression_check.py
ardaatahan's picture
Update logic and add tests
6921fc0
#!/usr/bin/env python3
"""
WhisperKit Performance Regression Detection Script
This script detects significant performance regressions per model by:
- Tracking the best (lowest) WER for each model
- Tracking the best (highest) speed and tokens per second for each model
- Comparing all configurations against those best baselines
- Alerting if any configuration deviates by > 20%
If any model shows discrepancy > 20%, it alerts via Slack.
"""
import json
import os
import statistics
from collections import defaultdict
from typing import Dict, List, Tuple, Optional
import pandas as pd
def load_performance_data(file_path: str, commit_hash: Optional[str] = None) -> List[Dict]:
"""Load performance data from JSON file, optionally filtering by commit hash."""
data = []
try:
with open(file_path, "r") as f:
for line in f:
try:
item = json.loads(line.strip())
if commit_hash is None or item.get("commit_hash") == commit_hash:
data.append(item)
except json.JSONDecodeError:
continue
except FileNotFoundError:
print(f"Warning: Performance data file not found: {file_path}")
return []
return data
def calculate_wer_statistics(wer_values: List[float]) -> Dict[str, float]:
"""Calculate WER statistics for a list of values."""
if not wer_values:
return {"mean": 0, "median": 0, "min": 0, "max": 0, "std": 0}
return {
"mean": statistics.mean(wer_values),
"median": statistics.median(wer_values),
"min": min(wer_values),
"max": max(wer_values),
"std": statistics.stdev(wer_values) if len(wer_values) > 1 else 0
}
def detect_device_regressions(current_data: List[Dict], all_historical_data: List[Dict], threshold: float = 20.0) -> List[Dict]:
"""
Detect WER regressions for devices in current release.
Compares current data points against historical best for each model+device combination.
Returns list of regression alerts.
"""
regressions = []
# Build historical best WER for each model+device combination
historical_best = {}
best_configs = {}
for entry in all_historical_data:
key = (entry["model"], entry["device"])
if key not in historical_best:
historical_best[key] = entry["average_wer"]
best_configs[key] = entry
elif entry["average_wer"] < historical_best[key]:
historical_best[key] = entry["average_wer"]
best_configs[key] = entry
# Check each current data point against historical best
for entry in current_data:
key = (entry["model"], entry["device"])
if key not in historical_best:
continue # No historical data for this combination
best_wer = historical_best[key]
best_config = best_configs[key]
current_wer = entry["average_wer"]
if best_wer > 0: # Avoid division by zero
pct_diff = (current_wer - best_wer) / best_wer * 100
# Only flag if current is significantly worse than historical best
if pct_diff > threshold:
regressions.append({
"type": "device_wer_discrepancy",
"metric": "WER",
"model": entry["model"],
"device": entry["device"],
"os": entry["os"],
"current_value": round(current_wer, 2),
"best_value": round(best_wer, 2),
"best_device": best_config["device"],
"best_os": best_config["os"],
"percentage_diff": round(pct_diff, 1)
})
return regressions
def detect_os_regressions(current_data: List[Dict], all_historical_data: List[Dict], threshold: float = 20.0) -> List[Dict]:
"""
Detect WER regressions for OS versions in current release.
Compares current data points against historical best for each model+OS combination.
Returns list of regression alerts.
"""
regressions = []
# Build historical best WER for each model+OS combination
historical_best = {}
best_configs = {}
for entry in all_historical_data:
key = (entry["model"], entry["os"])
if key not in historical_best:
historical_best[key] = entry["average_wer"]
best_configs[key] = entry
elif entry["average_wer"] < historical_best[key]:
historical_best[key] = entry["average_wer"]
best_configs[key] = entry
# Check each current data point against historical best
for entry in current_data:
key = (entry["model"], entry["os"])
if key not in historical_best:
continue # No historical data for this combination
best_wer = historical_best[key]
best_config = best_configs[key]
current_wer = entry["average_wer"]
if best_wer > 0: # Avoid division by zero
pct_diff = (current_wer - best_wer) / best_wer * 100
# Only flag if current is significantly worse than historical best
if pct_diff > threshold:
regressions.append({
"type": "os_wer_discrepancy",
"metric": "WER",
"model": entry["model"],
"device": entry["device"],
"os": entry["os"],
"current_value": round(current_wer, 2),
"best_value": round(best_wer, 2),
"best_device": best_config["device"],
"best_os": best_config["os"],
"percentage_diff": round(pct_diff, 1)
})
return regressions
def detect_release_regressions(current_data: List[Dict], previous_data: List[Dict],
threshold: float = 20.0) -> List[Dict]:
"""
Detect WER regressions in current release for each model.
Compares current WER against the best (lowest) historical WER for that model.
Returns list of regression alerts.
"""
regressions = []
if not previous_data:
print("No previous release data available for comparison")
return regressions
# Combine all historical data
all_historical = previous_data
# Group by model
model_current = defaultdict(list)
model_historical = defaultdict(list)
for entry in current_data:
model_current[entry["model"]].append(entry)
for entry in all_historical:
model_historical[entry["model"]].append(entry)
# Check each model
for model in model_current.keys():
if model not in model_historical:
continue # No historical data for this model
# Find best historical WER for this model
best_historical_wer = min(entry["average_wer"] for entry in model_historical[model])
best_config = next(e for e in model_historical[model] if e["average_wer"] == best_historical_wer)
# Check each current configuration against best historical
for current_entry in model_current[model]:
current_wer = current_entry["average_wer"]
if best_historical_wer > 0: # Avoid division by zero
pct_change = (current_wer - best_historical_wer) / best_historical_wer * 100
# Only flag significant WER increases (regressions)
if pct_change > threshold:
regressions.append({
"type": "release_wer_regression",
"metric": "WER",
"model": model,
"device": current_entry["device"],
"os": current_entry["os"],
"current_value": round(current_wer, 2),
"best_historical_value": round(best_historical_wer, 2),
"best_device": best_config["device"],
"best_os": best_config["os"],
"percentage_increase": round(pct_change, 1)
})
return regressions
def detect_speed_device_regressions(current_data: List[Dict], all_historical_data: List[Dict], threshold: float = 20.0) -> List[Dict]:
"""
Detect speed regressions for devices in current release.
Compares current data points against historical best for each model+device combination.
Returns list of regression alerts.
"""
regressions = []
# Build historical best speed for each model+device combination
historical_best = {}
best_configs = {}
for entry in all_historical_data:
if "speed" not in entry:
continue
key = (entry["model"], entry["device"])
if key not in historical_best:
historical_best[key] = entry["speed"]
best_configs[key] = entry
elif entry["speed"] > historical_best[key]:
historical_best[key] = entry["speed"]
best_configs[key] = entry
# Check each current data point against historical best
for entry in current_data:
if "speed" not in entry:
continue
key = (entry["model"], entry["device"])
if key not in historical_best:
continue # No historical data for this combination
best_speed = historical_best[key]
best_config = best_configs[key]
current_speed = entry["speed"]
if best_speed > 0: # Avoid division by zero
pct_diff = (best_speed - current_speed) / best_speed * 100
# Only flag if current is significantly slower than historical best
if pct_diff > threshold:
regressions.append({
"type": "device_speed_discrepancy",
"metric": "Speed",
"model": entry["model"],
"device": entry["device"],
"os": entry["os"],
"current_value": round(current_speed, 2),
"best_value": round(best_speed, 2),
"best_device": best_config["device"],
"best_os": best_config["os"],
"percentage_diff": round(pct_diff, 1)
})
return regressions
def detect_speed_os_regressions(current_data: List[Dict], all_historical_data: List[Dict], threshold: float = 20.0) -> List[Dict]:
"""
Detect speed regressions for OS versions in current release.
Compares current data points against historical best for each model+OS combination.
Returns list of regression alerts.
"""
regressions = []
# Build historical best speed for each model+OS combination
historical_best = {}
best_configs = {}
for entry in all_historical_data:
if "speed" not in entry:
continue
key = (entry["model"], entry["os"])
if key not in historical_best:
historical_best[key] = entry["speed"]
best_configs[key] = entry
elif entry["speed"] > historical_best[key]:
historical_best[key] = entry["speed"]
best_configs[key] = entry
# Check each current data point against historical best
for entry in current_data:
if "speed" not in entry:
continue
key = (entry["model"], entry["os"])
if key not in historical_best:
continue # No historical data for this combination
best_speed = historical_best[key]
best_config = best_configs[key]
current_speed = entry["speed"]
if best_speed > 0: # Avoid division by zero
pct_diff = (best_speed - current_speed) / best_speed * 100
# Only flag if current is significantly slower than historical best
if pct_diff > threshold:
regressions.append({
"type": "os_speed_discrepancy",
"metric": "Speed",
"model": entry["model"],
"device": entry["device"],
"os": entry["os"],
"current_value": round(current_speed, 2),
"best_value": round(best_speed, 2),
"best_device": best_config["device"],
"best_os": best_config["os"],
"percentage_diff": round(pct_diff, 1)
})
return regressions
def detect_speed_release_regressions(current_data: List[Dict], previous_data: List[Dict],
threshold: float = 20.0) -> List[Dict]:
"""
Detect speed regressions in current release for each model.
Compares current speed against the best (highest) historical speed for that model.
Returns list of regression alerts.
"""
regressions = []
if not previous_data:
return regressions
# Group by model
model_current = defaultdict(list)
model_historical = defaultdict(list)
for entry in current_data:
if "speed" in entry:
model_current[entry["model"]].append(entry)
for entry in previous_data:
if "speed" in entry:
model_historical[entry["model"]].append(entry)
# Check each model
for model in model_current.keys():
if model not in model_historical:
continue # No historical data for this model
# Find best historical speed for this model
best_historical_speed = max(entry["speed"] for entry in model_historical[model])
best_config = next(e for e in model_historical[model] if e["speed"] == best_historical_speed)
# Check each current configuration against best historical
for current_entry in model_current[model]:
current_speed = current_entry["speed"]
if best_historical_speed > 0: # Avoid division by zero
pct_change = (best_historical_speed - current_speed) / best_historical_speed * 100
# Only flag significant speed decreases (regressions)
if pct_change > threshold:
regressions.append({
"type": "release_speed_regression",
"metric": "Speed",
"model": model,
"device": current_entry["device"],
"os": current_entry["os"],
"current_value": round(current_speed, 2),
"best_historical_value": round(best_historical_speed, 2),
"best_device": best_config["device"],
"best_os": best_config["os"],
"percentage_decrease": round(pct_change, 1)
})
return regressions
def detect_tokens_device_regressions(current_data: List[Dict], all_historical_data: List[Dict], threshold: float = 20.0) -> List[Dict]:
"""
Detect tokens per second regressions for devices in current release.
Compares current data points against historical best for each model+device combination.
Returns list of regression alerts.
"""
regressions = []
# Build historical best tokens/sec for each model+device combination
historical_best = {}
best_configs = {}
for entry in all_historical_data:
if "tokens_per_second" not in entry:
continue
key = (entry["model"], entry["device"])
if key not in historical_best:
historical_best[key] = entry["tokens_per_second"]
best_configs[key] = entry
elif entry["tokens_per_second"] > historical_best[key]:
historical_best[key] = entry["tokens_per_second"]
best_configs[key] = entry
# Check each current data point against historical best
for entry in current_data:
if "tokens_per_second" not in entry:
continue
key = (entry["model"], entry["device"])
if key not in historical_best:
continue # No historical data for this combination
best_tokens = historical_best[key]
best_config = best_configs[key]
current_tokens = entry["tokens_per_second"]
if best_tokens > 0: # Avoid division by zero
pct_diff = (best_tokens - current_tokens) / best_tokens * 100
# Only flag if current is significantly slower than historical best
if pct_diff > threshold:
regressions.append({
"type": "device_tokens_discrepancy",
"metric": "Tokens/Second",
"model": entry["model"],
"device": entry["device"],
"os": entry["os"],
"current_value": round(current_tokens, 2),
"best_value": round(best_tokens, 2),
"best_device": best_config["device"],
"best_os": best_config["os"],
"percentage_diff": round(pct_diff, 1)
})
return regressions
def detect_tokens_os_regressions(current_data: List[Dict], all_historical_data: List[Dict], threshold: float = 20.0) -> List[Dict]:
"""
Detect tokens per second regressions for OS versions in current release.
Compares current data points against historical best for each model+OS combination.
Returns list of regression alerts.
"""
regressions = []
# Build historical best tokens/sec for each model+OS combination
historical_best = {}
best_configs = {}
for entry in all_historical_data:
if "tokens_per_second" not in entry:
continue
key = (entry["model"], entry["os"])
if key not in historical_best:
historical_best[key] = entry["tokens_per_second"]
best_configs[key] = entry
elif entry["tokens_per_second"] > historical_best[key]:
historical_best[key] = entry["tokens_per_second"]
best_configs[key] = entry
# Check each current data point against historical best
for entry in current_data:
if "tokens_per_second" not in entry:
continue
key = (entry["model"], entry["os"])
if key not in historical_best:
continue # No historical data for this combination
best_tokens = historical_best[key]
best_config = best_configs[key]
current_tokens = entry["tokens_per_second"]
if best_tokens > 0: # Avoid division by zero
pct_diff = (best_tokens - current_tokens) / best_tokens * 100
# Only flag if current is significantly slower than historical best
if pct_diff > threshold:
regressions.append({
"type": "os_tokens_discrepancy",
"metric": "Tokens/Second",
"model": entry["model"],
"device": entry["device"],
"os": entry["os"],
"current_value": round(current_tokens, 2),
"best_value": round(best_tokens, 2),
"best_device": best_config["device"],
"best_os": best_config["os"],
"percentage_diff": round(pct_diff, 1)
})
return regressions
def detect_tokens_release_regressions(current_data: List[Dict], previous_data: List[Dict],
threshold: float = 20.0) -> List[Dict]:
"""
Detect tokens per second regressions in current release for each model.
Compares current tokens/sec against the best (highest) historical tokens/sec for that model.
Returns list of regression alerts.
"""
regressions = []
if not previous_data:
return regressions
# Group by model
model_current = defaultdict(list)
model_historical = defaultdict(list)
for entry in current_data:
if "tokens_per_second" in entry:
model_current[entry["model"]].append(entry)
for entry in previous_data:
if "tokens_per_second" in entry:
model_historical[entry["model"]].append(entry)
# Check each model
for model in model_current.keys():
if model not in model_historical:
continue # No historical data for this model
# Find best historical tokens/sec for this model
best_historical_tokens = max(entry["tokens_per_second"] for entry in model_historical[model])
best_config = next(e for e in model_historical[model] if e["tokens_per_second"] == best_historical_tokens)
# Check each current configuration against best historical
for current_entry in model_current[model]:
current_tokens = current_entry["tokens_per_second"]
if best_historical_tokens > 0: # Avoid division by zero
pct_change = (best_historical_tokens - current_tokens) / best_historical_tokens * 100
# Only flag significant tokens/sec decreases (regressions)
if pct_change > threshold:
regressions.append({
"type": "release_tokens_regression",
"metric": "Tokens/Second",
"model": model,
"device": current_entry["device"],
"os": current_entry["os"],
"current_value": round(current_tokens, 2),
"best_historical_value": round(best_historical_tokens, 2),
"best_device": best_config["device"],
"best_os": best_config["os"],
"percentage_decrease": round(pct_change, 1)
})
return regressions
def generate_slack_message(regressions: List[Dict]) -> Dict:
"""Generate Slack message payload for performance regression alerts."""
if not regressions:
return None
blocks = [
{
"type": "header",
"text": {
"type": "plain_text",
"text": "⚠️ WhisperKit Performance Regression Alert",
"emoji": True
}
},
{
"type": "context",
"elements": [
{
"type": "mrkdwn",
"text": f"*Detected {len(regressions)} significant performance regression(s)*"
}
]
},
{"type": "divider"}
]
# Group regressions by type
wer_device = [r for r in regressions if r["type"] == "device_wer_discrepancy"]
wer_os = [r for r in regressions if r["type"] == "os_wer_discrepancy"]
wer_release = [r for r in regressions if r["type"] == "release_wer_regression"]
speed_device = [r for r in regressions if r["type"] == "device_speed_discrepancy"]
speed_os = [r for r in regressions if r["type"] == "os_speed_discrepancy"]
speed_release = [r for r in regressions if r["type"] == "release_speed_regression"]
tokens_device = [r for r in regressions if r["type"] == "device_tokens_discrepancy"]
tokens_os = [r for r in regressions if r["type"] == "os_tokens_discrepancy"]
tokens_release = [r for r in regressions if r["type"] == "release_tokens_regression"]
# WER Regressions
if wer_device:
blocks.append({
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*WER Device Discrepancies:*"
}
})
for regression in wer_device:
blocks.append({
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*{regression['model']}*\n"
f"• {regression['device']}: {regression['current_value']}% WER\n"
f"• Best: {regression['best_value']}% WER ({regression['best_device']} on {regression['best_os']})\n"
f"• Deviation: +{regression['percentage_diff']}%"
}
})
if wer_os:
if wer_device:
blocks.append({"type": "divider"})
blocks.append({
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*WER OS Version Discrepancies:*"
}
})
for regression in wer_os:
blocks.append({
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*{regression['model']}*\n"
f"• {regression['os']}: {regression['current_value']}% WER\n"
f"• Best: {regression['best_value']}% WER ({regression['best_device']} on {regression['best_os']})\n"
f"• Deviation: +{regression['percentage_diff']}%"
}
})
if wer_release:
if wer_device or wer_os:
blocks.append({"type": "divider"})
blocks.append({
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*WER Release-to-Release Regressions:*"
}
})
for regression in wer_release:
blocks.append({
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*{regression['model']}* on {regression['device']} ({regression['os']})\n"
f"• Current: {regression['current_value']}% WER\n"
f"• Best Historical: {regression['best_historical_value']}% WER ({regression['best_device']} on {regression['best_os']})\n"
f"• Increase: +{regression['percentage_increase']}%"
}
})
# Speed Regressions
if speed_device:
if wer_device or wer_os or wer_release:
blocks.append({"type": "divider"})
blocks.append({
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*Speed Device Discrepancies:*"
}
})
for regression in speed_device:
blocks.append({
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*{regression['model']}*\n"
f"• {regression['device']}: {regression['current_value']}x speed\n"
f"• Best: {regression['best_value']}x speed ({regression['best_device']} on {regression['best_os']})\n"
f"• Slower by: {regression['percentage_diff']}%"
}
})
if speed_os:
if any([wer_device, wer_os, wer_release, speed_device]):
blocks.append({"type": "divider"})
blocks.append({
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*Speed OS Version Discrepancies:*"
}
})
for regression in speed_os:
blocks.append({
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*{regression['model']}*\n"
f"• {regression['os']}: {regression['current_value']}x speed\n"
f"• Best: {regression['best_value']}x speed ({regression['best_device']} on {regression['best_os']})\n"
f"• Slower by: {regression['percentage_diff']}%"
}
})
if speed_release:
if any([wer_device, wer_os, wer_release, speed_device, speed_os]):
blocks.append({"type": "divider"})
blocks.append({
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*Speed Release-to-Release Regressions:*"
}
})
for regression in speed_release:
blocks.append({
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*{regression['model']}* on {regression['device']} ({regression['os']})\n"
f"• Current: {regression['current_value']}x speed\n"
f"• Best Historical: {regression['best_historical_value']}x speed ({regression['best_device']} on {regression['best_os']})\n"
f"• Slower by: {regression.get('percentage_decrease', regression.get('percentage_increase', 0))}%"
}
})
# Tokens Per Second Regressions
if tokens_device:
if any([wer_device, wer_os, wer_release, speed_device, speed_os, speed_release]):
blocks.append({"type": "divider"})
blocks.append({
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*Tokens/Second Device Discrepancies:*"
}
})
for regression in tokens_device:
blocks.append({
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*{regression['model']}*\n"
f"• {regression['device']}: {regression['current_value']} tokens/sec\n"
f"• Best: {regression['best_value']} tokens/sec ({regression['best_device']} on {regression['best_os']})\n"
f"• Slower by: {regression['percentage_diff']}%"
}
})
if tokens_os:
if any([wer_device, wer_os, wer_release, speed_device, speed_os, speed_release, tokens_device]):
blocks.append({"type": "divider"})
blocks.append({
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*Tokens/Second OS Version Discrepancies:*"
}
})
for regression in tokens_os:
blocks.append({
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*{regression['model']}*\n"
f"• {regression['os']}: {regression['current_value']} tokens/sec\n"
f"• Best: {regression['best_value']} tokens/sec ({regression['best_device']} on {regression['best_os']})\n"
f"• Slower by: {regression['percentage_diff']}%"
}
})
if tokens_release:
if any([wer_device, wer_os, wer_release, speed_device, speed_os, speed_release, tokens_device, tokens_os]):
blocks.append({"type": "divider"})
blocks.append({
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*Tokens/Second Release-to-Release Regressions:*"
}
})
for regression in tokens_release:
blocks.append({
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*{regression['model']}* on {regression['device']} ({regression['os']})\n"
f"• Current: {regression['current_value']} tokens/sec\n"
f"• Best Historical: {regression['best_historical_value']} tokens/sec ({regression['best_device']} on {regression['best_os']})\n"
f"• Slower by: {regression.get('percentage_decrease', regression.get('percentage_increase', 0))}%"
}
})
return {"blocks": blocks}
def check_performance_regressions():
"""Main function to check for performance regressions and generate alerts."""
# Load version data to get commit hashes
try:
with open("dashboard_data/version.json", "r") as f:
version_data = json.load(f)
except FileNotFoundError:
print("Error: version.json not found")
return
releases = version_data.get("releases", [])
if len(releases) < 1:
print("Not enough release data for comparison")
return
# Get current and previous commit hashes
current_commit = releases[-1] if releases else None
previous_commit = releases[-2] if len(releases) >= 2 else None
print(f"Checking performance regressions for current commit: {current_commit}")
if previous_commit:
print(f"Comparing against previous commit: {previous_commit}")
# Load performance data - get all historical data for cross-version analysis
all_historical_data = load_performance_data("dashboard_data/performance_data.json")
current_data = load_performance_data("dashboard_data/performance_data.json", current_commit)
previous_data = load_performance_data("dashboard_data/performance_data.json", previous_commit) if previous_commit else []
print(f"Loaded {len(current_data)} current data points, {len(previous_data)} previous data points")
print(f"Loaded {len(all_historical_data)} total historical data points for cross-version analysis")
all_regressions = []
# WER Checks
print("\n=== Checking WER Regressions ===")
device_regressions = detect_device_regressions(current_data, all_historical_data, threshold=20.0)
all_regressions.extend(device_regressions)
print(f"Found {len(device_regressions)} WER device discrepancies")
os_regressions = detect_os_regressions(current_data, all_historical_data, threshold=20.0)
all_regressions.extend(os_regressions)
print(f"Found {len(os_regressions)} WER OS discrepancies")
release_regressions = detect_release_regressions(current_data, previous_data, threshold=20.0)
all_regressions.extend(release_regressions)
print(f"Found {len(release_regressions)} WER release regressions")
# Speed Checks
print("\n=== Checking Speed Regressions ===")
speed_device_regressions = detect_speed_device_regressions(current_data, all_historical_data, threshold=20.0)
all_regressions.extend(speed_device_regressions)
print(f"Found {len(speed_device_regressions)} speed device discrepancies")
speed_os_regressions = detect_speed_os_regressions(current_data, all_historical_data, threshold=20.0)
all_regressions.extend(speed_os_regressions)
print(f"Found {len(speed_os_regressions)} speed OS discrepancies")
speed_release_regressions = detect_speed_release_regressions(current_data, previous_data, threshold=20.0)
all_regressions.extend(speed_release_regressions)
print(f"Found {len(speed_release_regressions)} speed release regressions")
# Tokens Per Second Checks
print("\n=== Checking Tokens/Second Regressions ===")
tokens_device_regressions = detect_tokens_device_regressions(current_data, all_historical_data, threshold=20.0)
all_regressions.extend(tokens_device_regressions)
print(f"Found {len(tokens_device_regressions)} tokens/sec device discrepancies")
tokens_os_regressions = detect_tokens_os_regressions(current_data, all_historical_data, threshold=20.0)
all_regressions.extend(tokens_os_regressions)
print(f"Found {len(tokens_os_regressions)} tokens/sec OS discrepancies")
tokens_release_regressions = detect_tokens_release_regressions(current_data, previous_data, threshold=20.0)
all_regressions.extend(tokens_release_regressions)
print(f"Found {len(tokens_release_regressions)} tokens/sec release regressions")
# Generate outputs
github_output = os.getenv("GITHUB_OUTPUT")
if github_output:
with open(github_output, "a") as f:
print(f"has_performance_regressions={'true' if all_regressions else 'false'}", file=f)
print(f"performance_regression_count={len(all_regressions)}", file=f)
if all_regressions:
slack_payload = generate_slack_message(all_regressions)
if slack_payload:
f.write("performance_regression_slack_payload<<EOF\n")
json.dump(slack_payload, f, indent=2)
f.write("\nEOF\n")
# Print summary for debugging
if all_regressions:
print(f"\n⚠️ ALERT: Found {len(all_regressions)} performance regressions!")
for regression in all_regressions:
print(f" - {regression['type']}: {regression.get('model', 'N/A')}")
else:
print("\n✅ No significant performance regressions detected")
if __name__ == "__main__":
check_performance_regressions()