tcid / app.py
Ákos Hadnagy
Hook it up to the data-source
2018d03
raw
history blame
27.7 kB
import matplotlib.pyplot as plt
import matplotlib
import numpy as np
import pandas as pd
import gradio as gr
import threading
import time
from datetime import datetime
from data import get_data
# Configure matplotlib to prevent memory warnings and set dark background
matplotlib.rcParams['figure.max_open_warning'] = 0
matplotlib.rcParams['figure.facecolor'] = '#000000'
matplotlib.rcParams['axes.facecolor'] = '#000000'
matplotlib.rcParams['savefig.facecolor'] = '#000000'
plt.ioff() # Turn off interactive mode to prevent figure accumulation
# Global variables for data
df = pd.DataFrame()
available_models = []
last_update_time = None
def load_data():
"""Load data from the data source."""
global df, available_models, last_update_time
try:
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Loading data...")
new_df = get_data()
new_models = new_df.index.tolist()
# Update global variables
df = new_df
available_models = new_models
last_update_time = datetime.now()
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Data loaded successfully: {len(available_models)} models")
print(f"Models: {available_models[:5]}{'...' if len(available_models) > 5 else ''}")
return True
except Exception as e:
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Error loading data: {e}")
return False
def schedule_data_reload():
"""Schedule the next data reload."""
def reload_data():
load_data()
# Schedule the next reload in 15 minutes (900 seconds)
timer = threading.Timer(900.0, reload_data)
timer.daemon = True # Dies when main thread dies
timer.start()
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Next data reload scheduled in 15 minutes")
# Start the first reload timer
timer = threading.Timer(900.0, reload_data)
timer.daemon = True
timer.start()
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Data auto-reload scheduled every 15 minutes")
# Load data once at startup
if not load_data():
print("WARNING: Failed to load data! Adding fallback models.")
available_models = ["auto", "bert", "clip", "llama", "t5"] # Fallback models for testing
# Start the auto-reload scheduler
schedule_data_reload()
def generate_underlined_line(text: str) -> str:
return text + "\n" + "─" * len(text) + "\n"
def plot_model_stats(model_name: str) -> tuple[plt.Figure, str, str]:
"""Draws a pie chart of model's passed, failed, skipped, and error stats."""
if df.empty or model_name not in df.index:
# Handle case where model data is not available
fig, ax = plt.subplots(figsize=(10, 8), facecolor='#000000')
ax.set_facecolor('#000000')
ax.text(0.5, 0.5, f'No data available for {model_name}',
horizontalalignment='center', verticalalignment='center',
transform=ax.transAxes, fontsize=16, color='#888888',
fontfamily='monospace', weight='normal')
ax.set_xlim(0, 1)
ax.set_ylim(0, 1)
ax.axis('off')
return fig, "No data available", "No data available"
row = df.loc[model_name]
# Handle missing values and get counts directly from dataframe
success_amd = int(row.get('success_amd', 0)) if pd.notna(row.get('success_amd', 0)) else 0
success_nvidia = int(row.get('success_nvidia', 0)) if pd.notna(row.get('success_nvidia', 0)) else 0
failed_multi_amd = int(row.get('failed_multi_no_amd', 0)) if pd.notna(row.get('failed_multi_no_amd', 0)) else 0
failed_multi_nvidia = int(row.get('failed_multi_no_nvidia', 0)) if pd.notna(row.get('failed_multi_no_nvidia', 0)) else 0
failed_single_amd = int(row.get('failed_single_no_amd', 0)) if pd.notna(row.get('failed_single_no_amd', 0)) else 0
failed_single_nvidia = int(row.get('failed_single_no_nvidia', 0)) if pd.notna(row.get('failed_single_no_nvidia', 0)) else 0
# Calculate total failures
total_failed_amd = failed_multi_amd + failed_single_amd
total_failed_nvidia = failed_multi_nvidia + failed_single_nvidia
# Softer color palette - less pastel, more vibrant
colors = {
'passed': '#4CAF50', # Medium green
'failed': '#E53E3E', # More red
'skipped': '#FFD54F', # Medium yellow
'error': '#8B0000' # Dark red
}
# Create stats dictionaries directly from dataframe values
amd_stats = {
'passed': success_amd,
'failed': total_failed_amd,
'skipped': 0, # Not available in this dataset
'error': 0 # Not available in this dataset
}
nvidia_stats = {
'passed': success_nvidia,
'failed': total_failed_nvidia,
'skipped': 0, # Not available in this dataset
'error': 0 # Not available in this dataset
}
# Filter out categories with 0 values for cleaner visualization
amd_filtered = {k: v for k, v in amd_stats.items() if v > 0}
nvidia_filtered = {k: v for k, v in nvidia_stats.items() if v > 0}
if not amd_filtered and not nvidia_filtered:
# Handle case where all values are 0 - minimal empty state
fig, ax = plt.subplots(figsize=(10, 8), facecolor='#000000')
ax.set_facecolor('#000000')
ax.text(0.5, 0.5, 'No test results available',
horizontalalignment='center', verticalalignment='center',
transform=ax.transAxes, fontsize=16, color='#888888',
fontfamily='monospace', weight='normal')
ax.set_xlim(0, 1)
ax.set_ylim(0, 1)
ax.axis('off')
return fig, "", ""
# Create figure with two subplots side by side with padding
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(18, 9), facecolor='#000000')
ax1.set_facecolor('#000000')
ax2.set_facecolor('#000000')
def create_pie_chart(ax, device_label, filtered_stats):
if not filtered_stats:
ax.text(0.5, 0.5, 'No test results',
horizontalalignment='center', verticalalignment='center',
transform=ax.transAxes, fontsize=14, color='#888888',
fontfamily='monospace', weight='normal')
ax.set_title(device_label,
fontsize=28, weight='bold', pad=2, color='#FFFFFF',
fontfamily='monospace')
ax.axis('off')
return
chart_colors = [colors[category] for category in filtered_stats.keys()]
# Create minimal pie chart - full pie, no donut effect
wedges, texts, autotexts = ax.pie(
filtered_stats.values(),
labels=[label.lower() for label in filtered_stats.keys()], # Lowercase for minimal look
colors=chart_colors,
autopct=lambda pct: f'{int(pct/100*sum(filtered_stats.values()))}',
startangle=90,
explode=None, # No separation
shadow=False,
wedgeprops=dict(edgecolor='#1a1a1a', linewidth=0.5), # Minimal borders
textprops={'fontsize': 12, 'weight': 'normal', 'color': '#CCCCCC', 'fontfamily': 'monospace'}
)
# Enhanced percentage text styling for better readability
for autotext in autotexts:
autotext.set_color('#000000') # Black text for better contrast
autotext.set_weight('bold')
autotext.set_fontsize(14)
autotext.set_fontfamily('monospace')
# Minimal category labels
for text in texts:
text.set_color('#AAAAAA')
text.set_weight('normal')
text.set_fontsize(13)
text.set_fontfamily('monospace')
# Device label closer to chart and bigger
ax.set_title(device_label,
fontsize=28, weight='normal', pad=2, color='#FFFFFF',
fontfamily='monospace')
# Create both pie charts with device labels
create_pie_chart(ax1, "amd", amd_filtered)
create_pie_chart(ax2, "nvidia", nvidia_filtered)
# Add subtle separation line between charts - stops at device labels level
line_x = 0.5
fig.add_artist(plt.Line2D([line_x, line_x], [0.0, 0.85],
color='#333333', linewidth=1, alpha=0.5,
transform=fig.transFigure))
# Add central shared title for model name
fig.suptitle(f'{model_name.lower()}',
fontsize=32, weight='bold', color='#CCCCCC',
fontfamily='monospace', y=1)
# Clean layout with padding and space for central title
plt.tight_layout()
plt.subplots_adjust(top=0.85, wspace=0.4) # Added wspace for padding between charts
# Generate failure info directly from dataframe
failures_amd = row.get('failures_amd', {})
failures_nvidia = row.get('failures_nvidia', {})
amd_failed_info = extract_failure_info(failures_amd, 'AMD', failed_multi_amd, failed_single_amd)
nvidia_failed_info = extract_failure_info(failures_nvidia, 'NVIDIA', failed_multi_nvidia, failed_single_nvidia)
return fig, amd_failed_info, nvidia_failed_info
def extract_failure_info(failures_obj, device: str, multi_count: int, single_count: int) -> str:
"""Extract failure information from failures object."""
if (not failures_obj or pd.isna(failures_obj)) and multi_count == 0 and single_count == 0:
return f"No failures on {device}"
info_lines = []
# Add counts summary
if multi_count > 0 or single_count > 0:
info_lines.append(generate_underlined_line(f"Failure Summary for {device}:"))
if multi_count > 0:
info_lines.append(f"Multi GPU failures: {multi_count}")
if single_count > 0:
info_lines.append(f"Single GPU failures: {single_count}")
info_lines.append("")
# Try to extract detailed failure information
try:
if isinstance(failures_obj, dict):
# Check for multi and single failure categories
if 'multi' in failures_obj and failures_obj['multi']:
info_lines.append(generate_underlined_line(f"Multi GPU failure details:"))
if isinstance(failures_obj['multi'], list):
# Handle list of failures (could be strings or dicts)
for i, failure in enumerate(failures_obj['multi'][:10]): # Limit to first 10
if isinstance(failure, dict):
# Extract meaningful info from dict (e.g., test name, line, etc.)
failure_str = failure.get('line', failure.get('test', failure.get('name', str(failure))))
info_lines.append(f" {i+1}. {failure_str}")
else:
info_lines.append(f" {i+1}. {str(failure)}")
if len(failures_obj['multi']) > 10:
info_lines.append(f"... and {len(failures_obj['multi']) - 10} more")
else:
info_lines.append(str(failures_obj['multi']))
info_lines.append("")
if 'single' in failures_obj and failures_obj['single']:
info_lines.append(generate_underlined_line(f"Single GPU failure details:"))
if isinstance(failures_obj['single'], list):
# Handle list of failures (could be strings or dicts)
for i, failure in enumerate(failures_obj['single'][:10]): # Limit to first 10
if isinstance(failure, dict):
# Extract meaningful info from dict (e.g., test name, line, etc.)
failure_str = failure.get('line', failure.get('test', failure.get('name', str(failure))))
info_lines.append(f" {i+1}. {failure_str}")
else:
info_lines.append(f" {i+1}. {str(failure)}")
if len(failures_obj['single']) > 10:
info_lines.append(f"... and {len(failures_obj['single']) - 10} more")
else:
info_lines.append(str(failures_obj['single']))
return "\n".join(info_lines) if info_lines else f"No detailed failure info for {device}"
except Exception as e:
if multi_count > 0 or single_count > 0:
return f"Failures detected on {device} (Multi: {multi_count}, Single: {single_count})\nDetails unavailable: {str(e)}"
return f"Error processing failure info for {device}: {str(e)}"
def create_summary_page() -> plt.Figure:
"""Create a summary page with model names and both AMD/NVIDIA test stats bars."""
if df.empty:
fig, ax = plt.subplots(figsize=(16, 8), facecolor='#000000')
ax.set_facecolor('#000000')
ax.text(0.5, 0.5, 'No data available',
horizontalalignment='center', verticalalignment='center',
transform=ax.transAxes, fontsize=20, color='#888888',
fontfamily='monospace', weight='normal')
ax.axis('off')
return fig
fig, ax = plt.subplots(figsize=(16, len(available_models) * 2.5 + 2), facecolor='#000000')
ax.set_facecolor('#000000')
colors = {
'passed': '#4CAF50',
'failed': '#E53E3E',
'skipped': '#FFD54F',
'error': '#8B0000'
}
visible_model_count = 0
max_y = 0
for i, model_name in enumerate(available_models):
if model_name not in df.index:
continue
row = df.loc[model_name]
# Get values directly from dataframe
success_amd = int(row.get('success_amd', 0)) if pd.notna(row.get('success_amd', 0)) else 0
success_nvidia = int(row.get('success_nvidia', 0)) if pd.notna(row.get('success_nvidia', 0)) else 0
failed_multi_amd = int(row.get('failed_multi_no_amd', 0)) if pd.notna(row.get('failed_multi_no_amd', 0)) else 0
failed_multi_nvidia = int(row.get('failed_multi_no_nvidia', 0)) if pd.notna(row.get('failed_multi_no_nvidia', 0)) else 0
failed_single_amd = int(row.get('failed_single_no_amd', 0)) if pd.notna(row.get('failed_single_no_amd', 0)) else 0
failed_single_nvidia = int(row.get('failed_single_no_nvidia', 0)) if pd.notna(row.get('failed_single_no_nvidia', 0)) else 0
# Calculate stats
amd_stats = {
'passed': success_amd,
'failed': failed_multi_amd + failed_single_amd,
'skipped': 0,
'error': 0
}
nvidia_stats = {
'passed': success_nvidia,
'failed': failed_multi_nvidia + failed_single_nvidia,
'skipped': 0,
'error': 0
}
amd_total = sum(amd_stats.values())
nvidia_total = sum(nvidia_stats.values())
if amd_total == 0 and nvidia_total == 0:
continue
# Position for this model - use visible model count for spacing
y_base = (2.2 + visible_model_count) * 1.8
y_model_name = y_base # Model name above AMD bar
y_amd_bar = y_base + 0.45 # AMD bar
y_nvidia_bar = y_base + 0.97 # NVIDIA bar
max_y = max(max_y, y_nvidia_bar + 0.5)
# Model name centered above the AMD bar
left_0 = 8
bar_length = 92
ax.text(bar_length / 2 + left_0, y_model_name, f"{model_name.lower()}",
ha='center', va='center', color='#FFFFFF',
fontsize=20, fontfamily='monospace', fontweight='bold')
# AMD label and bar on the same level
if amd_total > 0:
ax.text(left_0 - 2, y_amd_bar, "amd",
ha='right', va='center', color='#CCCCCC',
fontsize=18, fontfamily='monospace', fontweight='normal')
# AMD bar starts after labels
left = left_0
for category in ['passed', 'failed', 'skipped', 'error']:
if amd_stats[category] > 0:
width = amd_stats[category] / amd_total * bar_length
ax.barh(y_amd_bar, width, left=left, height=0.405,
color=colors[category], alpha=0.9)
if width > 4:
ax.text(left + width/2, y_amd_bar, str(amd_stats[category]),
ha='center', va='center', color='black',
fontweight='bold', fontsize=12, fontfamily='monospace')
left += width
# NVIDIA label and bar on the same level
if nvidia_total > 0:
ax.text(left_0 - 2, y_nvidia_bar, "nvidia",
ha='right', va='center', color='#CCCCCC',
fontsize=18, fontfamily='monospace', fontweight='normal')
# NVIDIA bar starts after labels
left = left_0
for category in ['passed', 'failed', 'skipped', 'error']:
if nvidia_stats[category] > 0:
width = nvidia_stats[category] / nvidia_total * bar_length
ax.barh(y_nvidia_bar, width, left=left, height=0.405,
color=colors[category], alpha=0.9)
if width > 4:
ax.text(left + width/2, y_nvidia_bar, str(nvidia_stats[category]),
ha='center', va='center', color='black',
fontweight='bold', fontsize=12, fontfamily='monospace')
left += width
# Increment counter for next visible model
visible_model_count += 1
# Style the axes to be completely invisible and span full width
ax.set_xlim(0, 100)
ax.set_ylim(-0.5, max_y)
ax.set_xlabel('')
ax.set_ylabel('')
ax.spines['bottom'].set_visible(False)
ax.spines['left'].set_visible(False)
ax.spines['top'].set_visible(False)
ax.spines['right'].set_visible(False)
ax.set_xticks([])
ax.set_yticks([])
ax.yaxis.set_inverted(True)
# Remove all margins to make bars span full width
plt.tight_layout()
plt.subplots_adjust(left=0.02, right=0.98, top=0.98, bottom=0.02)
return fig
# Load CSS from external file
def load_css():
try:
with open("styles.css", "r") as f:
return f.read()
except FileNotFoundError:
print("Warning: styles.css not found, using minimal default styles")
return "body { background: #000; color: #fff; }"
# Create the Gradio interface with sidebar and dark theme
with gr.Blocks(title="Model Test Results Dashboard", css=load_css()) as demo:
with gr.Row():
# Sidebar for model selection
with gr.Column(scale=1, elem_classes=["sidebar"]):
gr.Markdown("# 🤖 TCID")
gr.Markdown("**Transformer CI Dashboard**\n\n*Result overview by model and hardware*\n")
# Data status indicator
if last_update_time:
status_text = f"📊 **Updated:** {last_update_time.strftime('%H:%M')}\n\n*Auto-refresh: 15min*"
else:
status_text = f"📊 **Loading...**\n\n*Auto-refresh: 15min*"
status_display = gr.Markdown(status_text)
# Manual refresh button
refresh_button = gr.Button(
"🔄 refresh data",
variant="secondary",
size="sm",
elem_classes=["refresh-button"]
)
# CI job links
ci_links_display = gr.Markdown("🔗 **CI Jobs:** *Loading...*")
# Summary button at the top
summary_button = gr.Button(
"summary\n📊",
variant="primary",
size="lg",
elem_classes=["summary-button"]
)
# Back to simple buttons that work
# Model selector dropdown - much better for long lists
gr.Markdown(f"**Select Model ({len(available_models)}):**")
model_choices = [model.lower() for model in available_models] if available_models else ["auto", "bert", "clip", "llama"]
model_dropdown = gr.Dropdown(
choices=model_choices,
value=model_choices[0] if model_choices else "auto",
label="Choose Model",
interactive=True,
allow_custom_value=False
)
# Main content area
with gr.Column(scale=4, elem_classes=["main-content"]):
# Summary display (default view)
summary_display = gr.Plot(
value=create_summary_page(),
label="",
format="png",
elem_classes=["plot-container"],
visible=True
)
# Detailed view components (hidden by default)
with gr.Column(visible=False, elem_classes=["detail-view"]) as detail_view:
# Create the plot output
plot_output = gr.Plot(
label="",
format="png",
elem_classes=["plot-container"]
)
# Create two separate failed tests displays in a row layout
with gr.Row():
with gr.Column(scale=1):
amd_failed_tests_output = gr.Textbox(
value="",
lines=8,
max_lines=8,
interactive=False,
container=False,
elem_classes=["failed-tests"]
)
with gr.Column(scale=1):
nvidia_failed_tests_output = gr.Textbox(
value="",
lines=8,
max_lines=8,
interactive=False,
container=False,
elem_classes=["failed-tests"]
)
# Set up change handler for dropdown
model_dropdown.change(
fn=lambda selected_model: plot_model_stats(selected_model),
inputs=[model_dropdown],
outputs=[plot_output, amd_failed_tests_output, nvidia_failed_tests_output]
).then(
fn=lambda: [gr.update(visible=False), gr.update(visible=True)],
outputs=[summary_display, detail_view]
)
# Summary button click handler
def show_summary_and_update_links():
"""Show summary page and update CI links."""
return create_summary_page(), get_ci_links()
summary_button.click(
fn=show_summary_and_update_links,
outputs=[summary_display, ci_links_display]
).then(
fn=lambda: [gr.update(visible=True), gr.update(visible=False)],
outputs=[summary_display, detail_view]
)
# Function to get current status text
def get_status_text():
"""Get current status text with last update time."""
if last_update_time:
return f"📊 **Updated:** {last_update_time.strftime('%H:%M')}\n\n*Auto-refresh: 15min*"
else:
return f"📊 **Loading...**\n\n*Auto-refresh: 15min*"
# Function to get CI job links
def get_ci_links():
"""Get CI job links from the most recent data."""
try:
# Check if df exists and is not empty
if 'df' not in globals() or df is None or df.empty:
return "🔗 **CI Jobs:** *Loading...*"
# Get links from any available model (they should be the same for all models in a run)
amd_multi_link = None
amd_single_link = None
nvidia_multi_link = None
nvidia_single_link = None
for model_name in df.index:
row = df.loc[model_name]
# Extract AMD links
if pd.notna(row.get('job_link_amd')) and (not amd_multi_link or not amd_single_link):
amd_link_raw = row.get('job_link_amd')
if isinstance(amd_link_raw, dict):
if 'multi' in amd_link_raw and not amd_multi_link:
amd_multi_link = amd_link_raw['multi']
if 'single' in amd_link_raw and not amd_single_link:
amd_single_link = amd_link_raw['single']
# Extract NVIDIA links
if pd.notna(row.get('job_link_nvidia')) and (not nvidia_multi_link or not nvidia_single_link):
nvidia_link_raw = row.get('job_link_nvidia')
if isinstance(nvidia_link_raw, dict):
if 'multi' in nvidia_link_raw and not nvidia_multi_link:
nvidia_multi_link = nvidia_link_raw['multi']
if 'single' in nvidia_link_raw and not nvidia_single_link:
nvidia_single_link = nvidia_link_raw['single']
# Break if we have all links
if amd_multi_link and amd_single_link and nvidia_multi_link and nvidia_single_link:
break
links_md = "🔗 **CI Jobs:**\n\n"
# AMD links
if amd_multi_link or amd_single_link:
links_md += "**AMD:**\n"
if amd_multi_link:
links_md += f"• [Multi GPU]({amd_multi_link})\n"
if amd_single_link:
links_md += f"• [Single GPU]({amd_single_link})\n"
links_md += "\n"
# NVIDIA links
if nvidia_multi_link or nvidia_single_link:
links_md += "**NVIDIA:**\n"
if nvidia_multi_link:
links_md += f"• [Multi GPU]({nvidia_multi_link})\n"
if nvidia_single_link:
links_md += f"• [Single GPU]({nvidia_single_link})\n"
if not (amd_multi_link or amd_single_link or nvidia_multi_link or nvidia_single_link):
links_md += "*No links available*"
return links_md
except Exception as e:
print(f"Error getting CI links: {e}")
return "🔗 **CI Jobs:** *Error loading links*"
# Refresh button click handler
def refresh_data_and_status():
"""Manual data refresh triggered by user."""
success = load_data()
if success:
# Return updated summary page, status, and CI links
return create_summary_page(), get_status_text(), get_ci_links()
else:
# Return current summary page, status, and CI links if reload failed
return create_summary_page(), get_status_text(), get_ci_links()
refresh_button.click(
fn=refresh_data_and_status,
outputs=[summary_display, status_display, ci_links_display]
).then(
fn=lambda: [gr.update(visible=True), gr.update(visible=False)],
outputs=[summary_display, detail_view]
)
# Auto-update CI links when the interface loads
demo.load(
fn=get_ci_links,
outputs=[ci_links_display]
)
if __name__ == "__main__":
demo.launch()