import matplotlib.pyplot as plt import matplotlib import numpy as np import pandas as pd import gradio as gr import threading import time from datetime import datetime from data import get_data # Configure matplotlib to prevent memory warnings and set dark background matplotlib.rcParams['figure.max_open_warning'] = 0 matplotlib.rcParams['figure.facecolor'] = '#000000' matplotlib.rcParams['axes.facecolor'] = '#000000' matplotlib.rcParams['savefig.facecolor'] = '#000000' plt.ioff() # Turn off interactive mode to prevent figure accumulation # Global variables for data df = pd.DataFrame() available_models = [] last_update_time = None def load_data(): """Load data from the data source.""" global df, available_models, last_update_time try: print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Loading data...") new_df = get_data() new_models = new_df.index.tolist() # Update global variables df = new_df available_models = new_models last_update_time = datetime.now() print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Data loaded successfully: {len(available_models)} models") print(f"Models: {available_models[:5]}{'...' if len(available_models) > 5 else ''}") return True except Exception as e: print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Error loading data: {e}") return False def schedule_data_reload(): """Schedule the next data reload.""" def reload_data(): load_data() # Schedule the next reload in 15 minutes (900 seconds) timer = threading.Timer(900.0, reload_data) timer.daemon = True # Dies when main thread dies timer.start() print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Next data reload scheduled in 15 minutes") # Start the first reload timer timer = threading.Timer(900.0, reload_data) timer.daemon = True timer.start() print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Data auto-reload scheduled every 15 minutes") # Load data once at startup if not load_data(): print("WARNING: Failed to load data! Adding fallback models.") available_models = ["auto", "bert", "clip", "llama", "t5"] # Fallback models for testing # Start the auto-reload scheduler schedule_data_reload() def generate_underlined_line(text: str) -> str: return text + "\n" + "─" * len(text) + "\n" def plot_model_stats(model_name: str) -> tuple[plt.Figure, str, str]: """Draws a pie chart of model's passed, failed, skipped, and error stats.""" if df.empty or model_name not in df.index: # Handle case where model data is not available fig, ax = plt.subplots(figsize=(10, 8), facecolor='#000000') ax.set_facecolor('#000000') ax.text(0.5, 0.5, f'No data available for {model_name}', horizontalalignment='center', verticalalignment='center', transform=ax.transAxes, fontsize=16, color='#888888', fontfamily='monospace', weight='normal') ax.set_xlim(0, 1) ax.set_ylim(0, 1) ax.axis('off') return fig, "No data available", "No data available" row = df.loc[model_name] # Handle missing values and get counts directly from dataframe success_amd = int(row.get('success_amd', 0)) if pd.notna(row.get('success_amd', 0)) else 0 success_nvidia = int(row.get('success_nvidia', 0)) if pd.notna(row.get('success_nvidia', 0)) else 0 failed_multi_amd = int(row.get('failed_multi_no_amd', 0)) if pd.notna(row.get('failed_multi_no_amd', 0)) else 0 failed_multi_nvidia = int(row.get('failed_multi_no_nvidia', 0)) if pd.notna(row.get('failed_multi_no_nvidia', 0)) else 0 failed_single_amd = int(row.get('failed_single_no_amd', 0)) if pd.notna(row.get('failed_single_no_amd', 0)) else 0 failed_single_nvidia = int(row.get('failed_single_no_nvidia', 0)) if pd.notna(row.get('failed_single_no_nvidia', 0)) else 0 # Calculate total failures total_failed_amd = failed_multi_amd + failed_single_amd total_failed_nvidia = failed_multi_nvidia + failed_single_nvidia # Softer color palette - less pastel, more vibrant colors = { 'passed': '#4CAF50', # Medium green 'failed': '#E53E3E', # More red 'skipped': '#FFD54F', # Medium yellow 'error': '#8B0000' # Dark red } # Create stats dictionaries directly from dataframe values amd_stats = { 'passed': success_amd, 'failed': total_failed_amd, 'skipped': 0, # Not available in this dataset 'error': 0 # Not available in this dataset } nvidia_stats = { 'passed': success_nvidia, 'failed': total_failed_nvidia, 'skipped': 0, # Not available in this dataset 'error': 0 # Not available in this dataset } # Filter out categories with 0 values for cleaner visualization amd_filtered = {k: v for k, v in amd_stats.items() if v > 0} nvidia_filtered = {k: v for k, v in nvidia_stats.items() if v > 0} if not amd_filtered and not nvidia_filtered: # Handle case where all values are 0 - minimal empty state fig, ax = plt.subplots(figsize=(10, 8), facecolor='#000000') ax.set_facecolor('#000000') ax.text(0.5, 0.5, 'No test results available', horizontalalignment='center', verticalalignment='center', transform=ax.transAxes, fontsize=16, color='#888888', fontfamily='monospace', weight='normal') ax.set_xlim(0, 1) ax.set_ylim(0, 1) ax.axis('off') return fig, "", "" # Create figure with two subplots side by side with padding fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(18, 9), facecolor='#000000') ax1.set_facecolor('#000000') ax2.set_facecolor('#000000') def create_pie_chart(ax, device_label, filtered_stats): if not filtered_stats: ax.text(0.5, 0.5, 'No test results', horizontalalignment='center', verticalalignment='center', transform=ax.transAxes, fontsize=14, color='#888888', fontfamily='monospace', weight='normal') ax.set_title(device_label, fontsize=28, weight='bold', pad=2, color='#FFFFFF', fontfamily='monospace') ax.axis('off') return chart_colors = [colors[category] for category in filtered_stats.keys()] # Create minimal pie chart - full pie, no donut effect wedges, texts, autotexts = ax.pie( filtered_stats.values(), labels=[label.lower() for label in filtered_stats.keys()], # Lowercase for minimal look colors=chart_colors, autopct=lambda pct: f'{int(pct/100*sum(filtered_stats.values()))}', startangle=90, explode=None, # No separation shadow=False, wedgeprops=dict(edgecolor='#1a1a1a', linewidth=0.5), # Minimal borders textprops={'fontsize': 12, 'weight': 'normal', 'color': '#CCCCCC', 'fontfamily': 'monospace'} ) # Enhanced percentage text styling for better readability for autotext in autotexts: autotext.set_color('#000000') # Black text for better contrast autotext.set_weight('bold') autotext.set_fontsize(14) autotext.set_fontfamily('monospace') # Minimal category labels for text in texts: text.set_color('#AAAAAA') text.set_weight('normal') text.set_fontsize(13) text.set_fontfamily('monospace') # Device label closer to chart and bigger ax.set_title(device_label, fontsize=28, weight='normal', pad=2, color='#FFFFFF', fontfamily='monospace') # Create both pie charts with device labels create_pie_chart(ax1, "amd", amd_filtered) create_pie_chart(ax2, "nvidia", nvidia_filtered) # Add subtle separation line between charts - stops at device labels level line_x = 0.5 fig.add_artist(plt.Line2D([line_x, line_x], [0.0, 0.85], color='#333333', linewidth=1, alpha=0.5, transform=fig.transFigure)) # Add central shared title for model name fig.suptitle(f'{model_name.lower()}', fontsize=32, weight='bold', color='#CCCCCC', fontfamily='monospace', y=1) # Clean layout with padding and space for central title plt.tight_layout() plt.subplots_adjust(top=0.85, wspace=0.4) # Added wspace for padding between charts # Generate failure info directly from dataframe failures_amd = row.get('failures_amd', {}) failures_nvidia = row.get('failures_nvidia', {}) amd_failed_info = extract_failure_info(failures_amd, 'AMD', failed_multi_amd, failed_single_amd) nvidia_failed_info = extract_failure_info(failures_nvidia, 'NVIDIA', failed_multi_nvidia, failed_single_nvidia) return fig, amd_failed_info, nvidia_failed_info def extract_failure_info(failures_obj, device: str, multi_count: int, single_count: int) -> str: """Extract failure information from failures object.""" if (not failures_obj or pd.isna(failures_obj)) and multi_count == 0 and single_count == 0: return f"No failures on {device}" info_lines = [] # Add counts summary if multi_count > 0 or single_count > 0: info_lines.append(generate_underlined_line(f"Failure Summary for {device}:")) if multi_count > 0: info_lines.append(f"Multi GPU failures: {multi_count}") if single_count > 0: info_lines.append(f"Single GPU failures: {single_count}") info_lines.append("") # Try to extract detailed failure information try: if isinstance(failures_obj, dict): # Check for multi and single failure categories if 'multi' in failures_obj and failures_obj['multi']: info_lines.append(generate_underlined_line(f"Multi GPU failure details:")) if isinstance(failures_obj['multi'], list): # Handle list of failures (could be strings or dicts) for i, failure in enumerate(failures_obj['multi'][:10]): # Limit to first 10 if isinstance(failure, dict): # Extract meaningful info from dict (e.g., test name, line, etc.) failure_str = failure.get('line', failure.get('test', failure.get('name', str(failure)))) info_lines.append(f" {i+1}. {failure_str}") else: info_lines.append(f" {i+1}. {str(failure)}") if len(failures_obj['multi']) > 10: info_lines.append(f"... and {len(failures_obj['multi']) - 10} more") else: info_lines.append(str(failures_obj['multi'])) info_lines.append("") if 'single' in failures_obj and failures_obj['single']: info_lines.append(generate_underlined_line(f"Single GPU failure details:")) if isinstance(failures_obj['single'], list): # Handle list of failures (could be strings or dicts) for i, failure in enumerate(failures_obj['single'][:10]): # Limit to first 10 if isinstance(failure, dict): # Extract meaningful info from dict (e.g., test name, line, etc.) failure_str = failure.get('line', failure.get('test', failure.get('name', str(failure)))) info_lines.append(f" {i+1}. {failure_str}") else: info_lines.append(f" {i+1}. {str(failure)}") if len(failures_obj['single']) > 10: info_lines.append(f"... and {len(failures_obj['single']) - 10} more") else: info_lines.append(str(failures_obj['single'])) return "\n".join(info_lines) if info_lines else f"No detailed failure info for {device}" except Exception as e: if multi_count > 0 or single_count > 0: return f"Failures detected on {device} (Multi: {multi_count}, Single: {single_count})\nDetails unavailable: {str(e)}" return f"Error processing failure info for {device}: {str(e)}" def create_summary_page() -> plt.Figure: """Create a summary page with model names and both AMD/NVIDIA test stats bars.""" if df.empty: fig, ax = plt.subplots(figsize=(16, 8), facecolor='#000000') ax.set_facecolor('#000000') ax.text(0.5, 0.5, 'No data available', horizontalalignment='center', verticalalignment='center', transform=ax.transAxes, fontsize=20, color='#888888', fontfamily='monospace', weight='normal') ax.axis('off') return fig fig, ax = plt.subplots(figsize=(16, len(available_models) * 2.5 + 2), facecolor='#000000') ax.set_facecolor('#000000') colors = { 'passed': '#4CAF50', 'failed': '#E53E3E', 'skipped': '#FFD54F', 'error': '#8B0000' } visible_model_count = 0 max_y = 0 for i, model_name in enumerate(available_models): if model_name not in df.index: continue row = df.loc[model_name] # Get values directly from dataframe success_amd = int(row.get('success_amd', 0)) if pd.notna(row.get('success_amd', 0)) else 0 success_nvidia = int(row.get('success_nvidia', 0)) if pd.notna(row.get('success_nvidia', 0)) else 0 failed_multi_amd = int(row.get('failed_multi_no_amd', 0)) if pd.notna(row.get('failed_multi_no_amd', 0)) else 0 failed_multi_nvidia = int(row.get('failed_multi_no_nvidia', 0)) if pd.notna(row.get('failed_multi_no_nvidia', 0)) else 0 failed_single_amd = int(row.get('failed_single_no_amd', 0)) if pd.notna(row.get('failed_single_no_amd', 0)) else 0 failed_single_nvidia = int(row.get('failed_single_no_nvidia', 0)) if pd.notna(row.get('failed_single_no_nvidia', 0)) else 0 # Calculate stats amd_stats = { 'passed': success_amd, 'failed': failed_multi_amd + failed_single_amd, 'skipped': 0, 'error': 0 } nvidia_stats = { 'passed': success_nvidia, 'failed': failed_multi_nvidia + failed_single_nvidia, 'skipped': 0, 'error': 0 } amd_total = sum(amd_stats.values()) nvidia_total = sum(nvidia_stats.values()) if amd_total == 0 and nvidia_total == 0: continue # Position for this model - use visible model count for spacing y_base = (2.2 + visible_model_count) * 1.8 y_model_name = y_base # Model name above AMD bar y_amd_bar = y_base + 0.45 # AMD bar y_nvidia_bar = y_base + 0.97 # NVIDIA bar max_y = max(max_y, y_nvidia_bar + 0.5) # Model name centered above the AMD bar left_0 = 8 bar_length = 92 ax.text(bar_length / 2 + left_0, y_model_name, f"{model_name.lower()}", ha='center', va='center', color='#FFFFFF', fontsize=20, fontfamily='monospace', fontweight='bold') # AMD label and bar on the same level if amd_total > 0: ax.text(left_0 - 2, y_amd_bar, "amd", ha='right', va='center', color='#CCCCCC', fontsize=18, fontfamily='monospace', fontweight='normal') # AMD bar starts after labels left = left_0 for category in ['passed', 'failed', 'skipped', 'error']: if amd_stats[category] > 0: width = amd_stats[category] / amd_total * bar_length ax.barh(y_amd_bar, width, left=left, height=0.405, color=colors[category], alpha=0.9) if width > 4: ax.text(left + width/2, y_amd_bar, str(amd_stats[category]), ha='center', va='center', color='black', fontweight='bold', fontsize=12, fontfamily='monospace') left += width # NVIDIA label and bar on the same level if nvidia_total > 0: ax.text(left_0 - 2, y_nvidia_bar, "nvidia", ha='right', va='center', color='#CCCCCC', fontsize=18, fontfamily='monospace', fontweight='normal') # NVIDIA bar starts after labels left = left_0 for category in ['passed', 'failed', 'skipped', 'error']: if nvidia_stats[category] > 0: width = nvidia_stats[category] / nvidia_total * bar_length ax.barh(y_nvidia_bar, width, left=left, height=0.405, color=colors[category], alpha=0.9) if width > 4: ax.text(left + width/2, y_nvidia_bar, str(nvidia_stats[category]), ha='center', va='center', color='black', fontweight='bold', fontsize=12, fontfamily='monospace') left += width # Increment counter for next visible model visible_model_count += 1 # Style the axes to be completely invisible and span full width ax.set_xlim(0, 100) ax.set_ylim(-0.5, max_y) ax.set_xlabel('') ax.set_ylabel('') ax.spines['bottom'].set_visible(False) ax.spines['left'].set_visible(False) ax.spines['top'].set_visible(False) ax.spines['right'].set_visible(False) ax.set_xticks([]) ax.set_yticks([]) ax.yaxis.set_inverted(True) # Remove all margins to make bars span full width plt.tight_layout() plt.subplots_adjust(left=0.02, right=0.98, top=0.98, bottom=0.02) return fig # Load CSS from external file def load_css(): try: with open("styles.css", "r") as f: return f.read() except FileNotFoundError: print("Warning: styles.css not found, using minimal default styles") return "body { background: #000; color: #fff; }" # Create the Gradio interface with sidebar and dark theme with gr.Blocks(title="Model Test Results Dashboard", css=load_css()) as demo: with gr.Row(): # Sidebar for model selection with gr.Column(scale=1, elem_classes=["sidebar"]): gr.Markdown("# šŸ¤– TCID") gr.Markdown("**Transformer CI Dashboard**\n\n*Result overview by model and hardware*\n") # Data status indicator if last_update_time: status_text = f"šŸ“Š **Updated:** {last_update_time.strftime('%H:%M')}\n\n*Auto-refresh: 15min*" else: status_text = f"šŸ“Š **Loading...**\n\n*Auto-refresh: 15min*" status_display = gr.Markdown(status_text) # Manual refresh button refresh_button = gr.Button( "šŸ”„ refresh data", variant="secondary", size="sm", elem_classes=["refresh-button"] ) # CI job links ci_links_display = gr.Markdown("šŸ”— **CI Jobs:** *Loading...*") # Summary button at the top summary_button = gr.Button( "summary\nšŸ“Š", variant="primary", size="lg", elem_classes=["summary-button"] ) # Back to simple buttons that work # Model selector dropdown - much better for long lists gr.Markdown(f"**Select Model ({len(available_models)}):**") model_choices = [model.lower() for model in available_models] if available_models else ["auto", "bert", "clip", "llama"] model_dropdown = gr.Dropdown( choices=model_choices, value=model_choices[0] if model_choices else "auto", label="Choose Model", interactive=True, allow_custom_value=False ) # Main content area with gr.Column(scale=4, elem_classes=["main-content"]): # Summary display (default view) summary_display = gr.Plot( value=create_summary_page(), label="", format="png", elem_classes=["plot-container"], visible=True ) # Detailed view components (hidden by default) with gr.Column(visible=False, elem_classes=["detail-view"]) as detail_view: # Create the plot output plot_output = gr.Plot( label="", format="png", elem_classes=["plot-container"] ) # Create two separate failed tests displays in a row layout with gr.Row(): with gr.Column(scale=1): amd_failed_tests_output = gr.Textbox( value="", lines=8, max_lines=8, interactive=False, container=False, elem_classes=["failed-tests"] ) with gr.Column(scale=1): nvidia_failed_tests_output = gr.Textbox( value="", lines=8, max_lines=8, interactive=False, container=False, elem_classes=["failed-tests"] ) # Set up change handler for dropdown model_dropdown.change( fn=lambda selected_model: plot_model_stats(selected_model), inputs=[model_dropdown], outputs=[plot_output, amd_failed_tests_output, nvidia_failed_tests_output] ).then( fn=lambda: [gr.update(visible=False), gr.update(visible=True)], outputs=[summary_display, detail_view] ) # Summary button click handler def show_summary_and_update_links(): """Show summary page and update CI links.""" return create_summary_page(), get_ci_links() summary_button.click( fn=show_summary_and_update_links, outputs=[summary_display, ci_links_display] ).then( fn=lambda: [gr.update(visible=True), gr.update(visible=False)], outputs=[summary_display, detail_view] ) # Function to get current status text def get_status_text(): """Get current status text with last update time.""" if last_update_time: return f"šŸ“Š **Updated:** {last_update_time.strftime('%H:%M')}\n\n*Auto-refresh: 15min*" else: return f"šŸ“Š **Loading...**\n\n*Auto-refresh: 15min*" # Function to get CI job links def get_ci_links(): """Get CI job links from the most recent data.""" try: # Check if df exists and is not empty if 'df' not in globals() or df is None or df.empty: return "šŸ”— **CI Jobs:** *Loading...*" # Get links from any available model (they should be the same for all models in a run) amd_multi_link = None amd_single_link = None nvidia_multi_link = None nvidia_single_link = None for model_name in df.index: row = df.loc[model_name] # Extract AMD links if pd.notna(row.get('job_link_amd')) and (not amd_multi_link or not amd_single_link): amd_link_raw = row.get('job_link_amd') if isinstance(amd_link_raw, dict): if 'multi' in amd_link_raw and not amd_multi_link: amd_multi_link = amd_link_raw['multi'] if 'single' in amd_link_raw and not amd_single_link: amd_single_link = amd_link_raw['single'] # Extract NVIDIA links if pd.notna(row.get('job_link_nvidia')) and (not nvidia_multi_link or not nvidia_single_link): nvidia_link_raw = row.get('job_link_nvidia') if isinstance(nvidia_link_raw, dict): if 'multi' in nvidia_link_raw and not nvidia_multi_link: nvidia_multi_link = nvidia_link_raw['multi'] if 'single' in nvidia_link_raw and not nvidia_single_link: nvidia_single_link = nvidia_link_raw['single'] # Break if we have all links if amd_multi_link and amd_single_link and nvidia_multi_link and nvidia_single_link: break links_md = "šŸ”— **CI Jobs:**\n\n" # AMD links if amd_multi_link or amd_single_link: links_md += "**AMD:**\n" if amd_multi_link: links_md += f"• [Multi GPU]({amd_multi_link})\n" if amd_single_link: links_md += f"• [Single GPU]({amd_single_link})\n" links_md += "\n" # NVIDIA links if nvidia_multi_link or nvidia_single_link: links_md += "**NVIDIA:**\n" if nvidia_multi_link: links_md += f"• [Multi GPU]({nvidia_multi_link})\n" if nvidia_single_link: links_md += f"• [Single GPU]({nvidia_single_link})\n" if not (amd_multi_link or amd_single_link or nvidia_multi_link or nvidia_single_link): links_md += "*No links available*" return links_md except Exception as e: print(f"Error getting CI links: {e}") return "šŸ”— **CI Jobs:** *Error loading links*" # Refresh button click handler def refresh_data_and_status(): """Manual data refresh triggered by user.""" success = load_data() if success: # Return updated summary page, status, and CI links return create_summary_page(), get_status_text(), get_ci_links() else: # Return current summary page, status, and CI links if reload failed return create_summary_page(), get_status_text(), get_ci_links() refresh_button.click( fn=refresh_data_and_status, outputs=[summary_display, status_display, ci_links_display] ).then( fn=lambda: [gr.update(visible=True), gr.update(visible=False)], outputs=[summary_display, detail_view] ) # Auto-update CI links when the interface loads demo.load( fn=get_ci_links, outputs=[ci_links_display] ) if __name__ == "__main__": demo.launch()