""" MCP Tool Implementations for TraceMind Implements: - 5 MCP Tools: analyze_leaderboard, debug_trace, estimate_cost, compare_runs, get_dataset - 3 MCP Resources: leaderboard data, trace data, cost data - 3 MCP Prompts: analysis prompts, debug prompts, optimization prompts With Gradio's native MCP support (mcp_server=True), these are automatically exposed based on decorators (@gr.mcp.tool, @gr.mcp.resource, @gr.mcp.prompt), docstrings, and type hints. """ import os import json from typing import Optional from datasets import load_dataset import pandas as pd from datetime import datetime, timedelta import gradio as gr from gemini_client import GeminiClient @gr.mcp.tool() async def analyze_leaderboard( leaderboard_repo: str = "kshitijthakkar/smoltrace-leaderboard", metric_focus: str = "overall", time_range: str = "last_week", top_n: int = 5, hf_token: Optional[str] = None, gemini_api_key: Optional[str] = None ) -> str: """ Answer questions about the leaderboard with AI-powered analysis and insights. USE THIS TOOL when you need to: - Answer questions like "Which model is leading?", "What's the best model for cost?" - Get intelligent insights about top performers and trends - Compare models and understand trade-offs - Get recommendations based on leaderboard data DO NOT use the leaderboard:// resource for questions - use this tool instead! The resource only returns raw JSON data without any analysis. This tool uses Google Gemini 2.5 Pro to provide intelligent analysis of agent evaluation results, including top performers, trends, cost/performance trade-offs, and actionable recommendations. Args: leaderboard_repo (str): HuggingFace dataset repository containing leaderboard data. Default: "kshitijthakkar/smoltrace-leaderboard" metric_focus (str): Primary metric to focus analysis on. Options: "overall", "accuracy", "cost", "latency", "co2". Default: "overall" time_range (str): Time range for analysis. Options: "last_week", "last_month", "all_time". Default: "last_week" top_n (int): Number of top models to highlight in analysis. Must be between 3 and 10. Default: 5 hf_token (Optional[str]): HuggingFace token for dataset access. If None, uses HF_TOKEN environment variable. gemini_api_key (Optional[str]): Google Gemini API key. If None, uses GEMINI_API_KEY environment variable. Returns: str: Markdown-formatted analysis with top performers, insights, trade-offs, and recommendations """ try: # Initialize Gemini client with provided key or from environment gemini_client = GeminiClient(api_key=gemini_api_key) if gemini_api_key else GeminiClient() # Load leaderboard data from HuggingFace print(f"Loading leaderboard from {leaderboard_repo}...") # Use user-provided token or fall back to environment variable token = hf_token if hf_token else os.getenv("HF_TOKEN") ds = load_dataset(leaderboard_repo, split="train", token=token) df = pd.DataFrame(ds) # Filter by time range if time_range != "all_time": df['timestamp'] = pd.to_datetime(df['timestamp']) now = datetime.now() if time_range == "last_week": cutoff = now - timedelta(days=7) elif time_range == "last_month": cutoff = now - timedelta(days=30) df = df[df['timestamp'] >= cutoff] # Sort by metric metric_column_map = { "overall": "success_rate", "accuracy": "success_rate", "cost": "total_cost_usd", "latency": "avg_duration_ms", "co2": "co2_emissions_g" } sort_column = metric_column_map.get(metric_focus, "success_rate") ascending = metric_focus in ["cost", "latency", "co2"] # Lower is better for these df_sorted = df.sort_values(sort_column, ascending=ascending) # Get top N top_models = df_sorted.head(top_n) # Prepare data summary for Gemini analysis_data = { "total_evaluations": len(df), "time_range": time_range, "metric_focus": metric_focus, "top_models": top_models[[ "model", "agent_type", "provider", "success_rate", "total_cost_usd", "avg_duration_ms", "co2_emissions_g", "submitted_by" ]].to_dict('records'), "summary_stats": { "avg_success_rate": float(df['success_rate'].mean()), "avg_cost": float(df['total_cost_usd'].mean()), "avg_duration_ms": float(df['avg_duration_ms'].mean()), "total_co2_g": float(df['co2_emissions_g'].sum()), "models_tested": df['model'].nunique(), "unique_submitters": df['submitted_by'].nunique() } } # Get AI analysis from Gemini result = await gemini_client.analyze_with_context( data=analysis_data, analysis_type="leaderboard", specific_question=f"Focus on {metric_focus} performance. What are the key insights?" ) return result except Exception as e: return f"❌ **Error analyzing leaderboard**: {str(e)}\n\nPlease check:\n- Repository name is correct\n- You have access to the dataset\n- HF_TOKEN is set correctly" @gr.mcp.tool() async def debug_trace( trace_id: str, traces_repo: str, question: str = "Analyze this trace and explain what happened", hf_token: Optional[str] = None, gemini_api_key: Optional[str] = None ) -> str: """ Answer questions about agent traces with AI-powered debugging and analysis. USE THIS TOOL when you need to: - Answer questions like "Why did this fail?", "What took the most time?", "Why was X called?" - Debug agent execution traces and understand what happened - Identify bottlenecks and performance issues - Get explanations about agent behavior DO NOT use the trace:// resource for questions - use this tool instead! The resource only returns raw OTEL JSON data without any analysis. This tool uses Google Gemini 2.5 Pro to analyze OpenTelemetry trace data and provide intelligent debugging insights, step-by-step breakdowns, and answers to specific questions about execution flow. Args: trace_id (str): Unique identifier for the trace to analyze (e.g., "trace_abc123") traces_repo (str): HuggingFace dataset repository containing trace data (e.g., "username/agent-traces-model-timestamp") question (str): Specific question about the trace. Default: "Analyze this trace and explain what happened" hf_token (Optional[str]): HuggingFace token for dataset access. If None, uses HF_TOKEN environment variable. gemini_api_key (Optional[str]): Google Gemini API key. If None, uses GEMINI_API_KEY environment variable. Returns: str: Markdown-formatted debug analysis with step-by-step breakdown, timing information, and answer to the question """ try: # Initialize Gemini client with provided key or from environment gemini_client = GeminiClient(api_key=gemini_api_key) if gemini_api_key else GeminiClient() # Load traces dataset print(f"Loading traces from {traces_repo}...") # Use user-provided token or fall back to environment variable token = hf_token if hf_token else os.getenv("HF_TOKEN") ds = load_dataset(traces_repo, split="train", token=token) df = pd.DataFrame(ds) # Find the specific trace trace_data = df[df['trace_id'] == trace_id] if len(trace_data) == 0: return f"❌ **Trace not found**: No trace with ID `{trace_id}` in repository `{traces_repo}`" trace_row = trace_data.iloc[0] # Parse spans (OpenTelemetry format) spans = trace_row['spans'] if isinstance(spans, str): import json spans = json.loads(spans) # Helper function to handle different OTEL timestamp field formats def get_timestamp(span, field): """Get timestamp handling multiple OTEL formats""" # Try different field name variations for key in [field, f"{field}UnixNano", f"{field}_unix_nano", "timeUnixNano"]: if key in span: return span[key] return 0 # Build trace analysis data start_time = get_timestamp(spans[0], 'startTime') end_time = get_timestamp(spans[-1], 'endTime') trace_analysis = { "trace_id": trace_id, "run_id": trace_row.get('run_id', 'unknown'), "total_duration_ms": (end_time - start_time) / 1_000_000 if end_time > start_time else 0, "num_spans": len(spans), "spans": [] } # Process each span for span in spans: span_start = get_timestamp(span, 'startTime') span_end = get_timestamp(span, 'endTime') span_info = { "name": span.get('name', 'Unknown'), "kind": span.get('kind', 'INTERNAL'), "duration_ms": (span_end - span_start) / 1_000_000 if span_end > span_start else 0, "attributes": span.get('attributes', {}), "status": span.get('status', {}).get('code', 'UNKNOWN') } trace_analysis["spans"].append(span_info) # Get AI analysis from Gemini result = await gemini_client.analyze_with_context( data=trace_analysis, analysis_type="trace", specific_question=question ) return result except Exception as e: return f"❌ **Error debugging trace**: {str(e)}\n\nPlease check:\n- Trace ID is correct\n- Repository name is correct\n- You have access to the dataset" @gr.mcp.tool() async def estimate_cost( model: str, agent_type: str, num_tests: int = 100, hardware: str = "auto", gemini_api_key: Optional[str] = None ) -> str: """ Answer questions about evaluation costs with AI-powered estimates and recommendations. USE THIS TOOL when you need to: - Answer questions like "How much will this cost?", "What's the cheapest option?" - Get cost predictions for running evaluations - Compare costs between different models or hardware - Get optimization recommendations to reduce costs DO NOT use the cost:// resource for estimates - use this tool instead! The resource only returns raw pricing tables without calculations. This tool uses Google Gemini 2.5 Pro to calculate LLM API costs, HuggingFace Jobs compute costs, CO2 emissions, and provide intelligent cost breakdowns with optimization recommendations. Args: model (str): Model identifier in litellm format (e.g., "openai/gpt-4", "meta-llama/Llama-3.1-8B") agent_type (str): Type of agent capabilities to test. Options: "tool", "code", "both" num_tests (int): Number of test cases to run. Must be between 10 and 1000. Default: 100 hardware (str): Hardware type for HuggingFace Jobs. Options: "auto", "cpu", "gpu_a10", "gpu_h200". Default: "auto" gemini_api_key (Optional[str]): Google Gemini API key. If None, uses GEMINI_API_KEY environment variable. Returns: str: Markdown-formatted cost estimate with breakdown of LLM costs, HF Jobs costs, duration, CO2 emissions, and optimization tips """ try: # Initialize Gemini client with provided key or from environment gemini_client = GeminiClient(api_key=gemini_api_key) if gemini_api_key else GeminiClient() # Determine if API or local model is_api_model = any(provider in model.lower() for provider in ["openai", "anthropic", "google", "cohere"]) # Auto-select hardware if hardware == "auto": hardware = "cpu" if is_api_model else "gpu_a10" # Cost data (simplified estimates) llm_costs = { "openai/gpt-4": {"input": 0.03, "output": 0.06}, # per 1K tokens "openai/gpt-3.5-turbo": {"input": 0.0015, "output": 0.002}, "anthropic/claude-3-opus": {"input": 0.015, "output": 0.075}, "anthropic/claude-3-sonnet": {"input": 0.003, "output": 0.015}, "meta-llama/Llama-3.1-8B": {"input": 0, "output": 0}, # Local model "default": {"input": 0.001, "output": 0.002} } hf_jobs_costs = { "cpu": 0.60, # per hour "gpu_a10": 1.10, # per hour "gpu_h200": 4.50 # per hour } # Get model costs model_cost = llm_costs.get(model, llm_costs["default"]) # Estimate token usage per test # Tool agent: ~200 tokens input, ~150 output # Code agent: ~300 tokens input, ~400 output # Both: ~400 tokens input, ~500 output token_estimates = { "tool": {"input": 200, "output": 150}, "code": {"input": 300, "output": 400}, "both": {"input": 400, "output": 500} } tokens_per_test = token_estimates[agent_type] # Calculate LLM costs llm_cost_per_test = ( (tokens_per_test["input"] / 1000) * model_cost["input"] + (tokens_per_test["output"] / 1000) * model_cost["output"] ) total_llm_cost = llm_cost_per_test * num_tests # Estimate duration (seconds per test) if is_api_model: duration_per_test = 3.0 # API models are fast else: duration_per_test = 8.0 # Local models slower but depends on GPU total_duration_hours = (duration_per_test * num_tests) / 3600 # Calculate HF Jobs costs jobs_hourly_rate = hf_jobs_costs.get(hardware, hf_jobs_costs["cpu"]) total_jobs_cost = total_duration_hours * jobs_hourly_rate # Estimate CO2 (rough estimates) co2_per_hour = { "cpu": 0.05, # kg CO2 "gpu_a10": 0.15, "gpu_h200": 0.30 } total_co2_kg = total_duration_hours * co2_per_hour.get(hardware, 0.05) # Prepare estimate data estimate_data = { "model": model, "agent_type": agent_type, "num_tests": num_tests, "hardware": hardware, "is_api_model": is_api_model, "estimates": { "llm_cost_usd": round(total_llm_cost, 4), "llm_cost_per_test": round(llm_cost_per_test, 4), "jobs_cost_usd": round(total_jobs_cost, 4), "total_cost_usd": round(total_llm_cost + total_jobs_cost, 4), "duration_hours": round(total_duration_hours, 2), "duration_per_test_seconds": round(duration_per_test, 2), "co2_emissions_kg": round(total_co2_kg, 3), "tokens_per_test": tokens_per_test } } # Get AI analysis from Gemini result = await gemini_client.analyze_with_context( data=estimate_data, analysis_type="cost_estimate", specific_question="Provide cost breakdown and optimization recommendations" ) return result except Exception as e: return f"❌ **Error estimating cost**: {str(e)}" @gr.mcp.tool() async def compare_runs( run_id_1: str, run_id_2: str, leaderboard_repo: str = "kshitijthakkar/smoltrace-leaderboard", comparison_focus: str = "comprehensive", hf_token: Optional[str] = None, gemini_api_key: Optional[str] = None ) -> str: """ Compare two evaluation runs and generate AI-powered comparative analysis. This tool fetches data for two evaluation runs from the leaderboard and uses Google Gemini 2.5 Pro to provide intelligent comparison across multiple dimensions: success rate, cost efficiency, speed, environmental impact, and use case recommendations. Args: run_id_1 (str): First run ID to compare run_id_2 (str): Second run ID to compare leaderboard_repo (str): HuggingFace dataset repository containing leaderboard data. Default: "kshitijthakkar/smoltrace-leaderboard" comparison_focus (str): Focus area for comparison. Options: "comprehensive", "cost", "performance", "eco_friendly". Default: "comprehensive" hf_token (Optional[str]): HuggingFace token for dataset access. If None, uses HF_TOKEN environment variable. gemini_api_key (Optional[str]): Google Gemini API key. If None, uses GEMINI_API_KEY environment variable. Returns: str: Markdown-formatted comparative analysis with winner for each category, trade-offs, and use case recommendations """ try: # Initialize Gemini client with provided key or from environment gemini_client = GeminiClient(api_key=gemini_api_key) if gemini_api_key else GeminiClient() # Load leaderboard data # Use user-provided token or fall back to environment variable token = hf_token if hf_token else os.getenv("HF_TOKEN") dataset = load_dataset(leaderboard_repo, split="train", token=token) df = pd.DataFrame(dataset) # Find the two runs run1 = df[df['run_id'] == run_id_1] run2 = df[df['run_id'] == run_id_2] if run1.empty: return f"❌ **Error**: Run ID '{run_id_1}' not found in leaderboard" if run2.empty: return f"❌ **Error**: Run ID '{run_id_2}' not found in leaderboard" run1_data = run1.iloc[0].to_dict() run2_data = run2.iloc[0].to_dict() # Build comparison context for Gemini comparison_data = { "run_1": { "run_id": run1_data.get('run_id'), "model": run1_data.get('model'), "agent_type": run1_data.get('agent_type'), "success_rate": run1_data.get('success_rate'), "total_tests": run1_data.get('total_tests'), "successful_tests": run1_data.get('successful_tests'), "avg_duration_ms": run1_data.get('avg_duration_ms'), "total_cost_usd": run1_data.get('total_cost_usd'), "avg_cost_per_test_usd": run1_data.get('avg_cost_per_test_usd'), "co2_emissions_g": run1_data.get('co2_emissions_g'), "gpu_utilization_avg": run1_data.get('gpu_utilization_avg'), "total_tokens": run1_data.get('total_tokens'), "provider": run1_data.get('provider'), "job_type": run1_data.get('job_type'), "timestamp": run1_data.get('timestamp') }, "run_2": { "run_id": run2_data.get('run_id'), "model": run2_data.get('model'), "agent_type": run2_data.get('agent_type'), "success_rate": run2_data.get('success_rate'), "total_tests": run2_data.get('total_tests'), "successful_tests": run2_data.get('successful_tests'), "avg_duration_ms": run2_data.get('avg_duration_ms'), "total_cost_usd": run2_data.get('total_cost_usd'), "avg_cost_per_test_usd": run2_data.get('avg_cost_per_test_usd'), "co2_emissions_g": run2_data.get('co2_emissions_g'), "gpu_utilization_avg": run2_data.get('gpu_utilization_avg'), "total_tokens": run2_data.get('total_tokens'), "provider": run2_data.get('provider'), "job_type": run2_data.get('job_type'), "timestamp": run2_data.get('timestamp') }, "comparison_focus": comparison_focus } # Create comparison prompt based on focus if comparison_focus == "comprehensive": prompt = f""" You are analyzing a comparison between two agent evaluation runs. Provide a comprehensive analysis covering all aspects. **Run 1 ({comparison_data['run_1']['model']}):** {json.dumps(comparison_data['run_1'], indent=2)} **Run 2 ({comparison_data['run_2']['model']}):** {json.dumps(comparison_data['run_2'], indent=2)} Please provide a detailed comparison in the following format: ## 📊 Head-to-Head Comparison ### 🎯 Accuracy Winner [Which run has better success rate and by how much? Explain significance] ### ⚡ Speed Winner [Which run is faster and by how much? Include average duration comparison] ### 💰 Cost Winner [Which run is more cost-effective? Compare total cost AND cost per test] ### 🌱 Eco-Friendly Winner [Which run has lower CO2 emissions? Calculate the difference] ### 🔧 GPU Efficiency Winner (if applicable) [For GPU jobs, which has better utilization? Explain implications] ## 📈 Performance Summary ### Run 1 Strengths - [List 3-4 key strengths] ### Run 2 Strengths - [List 3-4 key strengths] ## 💡 Use Case Recommendations ### When to Choose Run 1 ({comparison_data['run_1']['model']}) [Specific scenarios where Run 1 is the better choice] ### When to Choose Run 2 ({comparison_data['run_2']['model']}) [Specific scenarios where Run 2 is the better choice] ## ⚖️ Overall Recommendation [Based on the analysis, provide a balanced recommendation considering different priorities] Be specific with numbers and percentages. Make the comparison actionable and insightful. """ elif comparison_focus == "cost": prompt = f""" Compare these two evaluation runs focusing specifically on cost efficiency: **Run 1:** {json.dumps(comparison_data['run_1'], indent=2)} **Run 2:** {json.dumps(comparison_data['run_2'], indent=2)} Provide detailed cost analysis: 1. Which run has lower total cost and by what percentage? 2. Cost per test comparison - which is more efficient? 3. Calculate cost per successful test (accounting for failures) 4. Token usage efficiency - cost per 1000 tokens 5. ROI analysis - is higher cost justified by better accuracy? 6. Scaling implications - at 1000 tests, what would each cost? Provide actionable cost optimization recommendations. """ elif comparison_focus == "performance": prompt = f""" Compare these two evaluation runs focusing on performance (speed + accuracy): **Run 1:** {json.dumps(comparison_data['run_1'], indent=2)} **Run 2:** {json.dumps(comparison_data['run_2'], indent=2)} Analyze: 1. Success rate difference - statistical significance? 2. Speed comparison - average duration per test 3. Which delivers faster results without sacrificing accuracy? 4. Throughput analysis - tests per minute 5. Quality vs Speed trade-off assessment 6. GPU utilization efficiency (if applicable) Recommend which run offers best performance for production workloads. """ elif comparison_focus == "eco_friendly": prompt = f""" Compare these two evaluation runs focusing on environmental impact: **Run 1:** {json.dumps(comparison_data['run_1'], indent=2)} **Run 2:** {json.dumps(comparison_data['run_2'], indent=2)} Analyze: 1. CO2 emissions comparison - which is greener? 2. Emissions per test and per successful test 3. GPU vs API model environmental trade-offs 4. Energy efficiency based on duration and GPU utilization 5. Emissions reduction if scaled to 10,000 tests 6. Carbon offset cost comparison Provide eco-conscious recommendations for sustainable AI deployment. """ # Get AI analysis from Gemini analysis = await gemini_client.analyze_with_context( comparison_data, analysis_type="comparison", specific_question=prompt ) return analysis except Exception as e: return f"❌ **Error comparing runs**: {str(e)}" @gr.mcp.tool() async def analyze_results( results_repo: str, analysis_focus: str = "comprehensive", max_rows: int = 100, hf_token: Optional[str] = None, gemini_api_key: Optional[str] = None ) -> str: """ Analyze detailed test results and provide optimization recommendations. USE THIS TOOL when you need to: - Understand why tests are failing and get recommendations - Identify performance bottlenecks in specific test cases - Find cost optimization opportunities - Get insights about tool usage patterns - Analyze which types of tasks work well vs poorly This tool analyzes individual test case results (not aggregate leaderboard data) and uses Google Gemini 2.5 Pro to provide actionable optimization recommendations. Args: results_repo (str): HuggingFace dataset repository containing results (e.g., "username/smoltrace-results-gpt4-20251114") analysis_focus (str): Focus area. Options: "failures", "performance", "cost", "comprehensive". Default: "comprehensive" max_rows (int): Maximum test cases to analyze. Default: 100. Range: 10-500 hf_token (Optional[str]): HuggingFace token for dataset access. If None, uses HF_TOKEN environment variable. gemini_api_key (Optional[str]): Google Gemini API key. If None, uses GEMINI_API_KEY environment variable. Returns: str: Markdown-formatted analysis with failure patterns, performance insights, cost analysis, and optimization recommendations """ try: # Initialize Gemini client gemini_client = GeminiClient(api_key=gemini_api_key) if gemini_api_key else GeminiClient() # Load results dataset print(f"Loading results from {results_repo}...") token = hf_token if hf_token else os.getenv("HF_TOKEN") ds = load_dataset(results_repo, split="train", token=token) df = pd.DataFrame(ds) if df.empty: return "❌ **Error**: Results dataset is empty" # Limit rows max_rows = max(10, min(500, max_rows)) df_sample = df.head(max_rows) # Calculate statistics total_tests = len(df_sample) successful = df_sample[df_sample['success'] == True] failed = df_sample[df_sample['success'] == False] success_rate = (len(successful) / total_tests * 100) if total_tests > 0 else 0 # Analyze by category/difficulty category_stats = {} if 'category' in df_sample.columns: category_stats = df_sample.groupby('category').agg({ 'success': ['count', 'sum', 'mean'], 'execution_time_ms': 'mean', 'cost_usd': 'sum' }).to_dict() difficulty_stats = {} if 'difficulty' in df_sample.columns: difficulty_stats = df_sample.groupby('difficulty').agg({ 'success': ['count', 'sum', 'mean'], 'execution_time_ms': 'mean' }).to_dict() # Find slowest tests slowest_tests = df_sample.nlargest(5, 'execution_time_ms')[ ['task_id', 'prompt', 'execution_time_ms', 'success', 'cost_usd'] ].to_dict('records') # Find most expensive tests if 'cost_usd' in df_sample.columns: most_expensive = df_sample.nlargest(5, 'cost_usd')[ ['task_id', 'prompt', 'cost_usd', 'total_tokens', 'success'] ].to_dict('records') else: most_expensive = [] # Analyze failures failure_analysis = [] if len(failed) > 0: # Get sample of failures failure_sample = failed.head(10)[ ['task_id', 'prompt', 'error', 'error_type', 'tool_called', 'expected_tool'] ].to_dict('records') # Count error types if 'error_type' in failed.columns: error_type_counts = failed['error_type'].value_counts().to_dict() else: error_type_counts = {} failure_analysis = { "total_failures": len(failed), "failure_rate": (len(failed) / total_tests * 100), "error_type_counts": error_type_counts, "sample_failures": failure_sample } # Prepare data for Gemini analysis analysis_data = { "results_repo": results_repo, "total_tests_analyzed": total_tests, "overall_stats": { "success_rate": round(success_rate, 2), "successful_tests": len(successful), "failed_tests": len(failed), "avg_execution_time_ms": float(df_sample['execution_time_ms'].mean()), "total_cost_usd": float(df_sample['cost_usd'].sum()) if 'cost_usd' in df_sample.columns else 0, "avg_tokens_per_test": float(df_sample['total_tokens'].mean()) if 'total_tokens' in df_sample.columns else 0 }, "category_performance": category_stats, "difficulty_performance": difficulty_stats, "slowest_tests": slowest_tests, "most_expensive_tests": most_expensive, "failure_analysis": failure_analysis, "analysis_focus": analysis_focus } # Create focus-specific prompt focus_prompts = { "failures": "Focus specifically on failure patterns. Analyze why tests are failing, identify common error types, and provide actionable recommendations to improve success rate.", "performance": "Focus on performance optimization. Analyze execution times, identify bottlenecks, and recommend ways to speed up test execution.", "cost": "Focus on cost optimization. Analyze token usage and costs, identify expensive tests, and recommend ways to reduce evaluation costs.", "comprehensive": "Provide comprehensive analysis covering failures, performance, cost, and overall optimization opportunities." } specific_question = focus_prompts.get(analysis_focus, focus_prompts["comprehensive"]) # Get AI analysis result = await gemini_client.analyze_with_context( data=analysis_data, analysis_type="results", specific_question=specific_question ) return result except Exception as e: return f"❌ **Error analyzing results**: {str(e)}\n\nPlease check:\n- Repository name is correct (should be smoltrace-results-*)\n- You have access to the dataset\n- HF_TOKEN is set correctly" @gr.mcp.tool() async def get_dataset( dataset_repo: str, max_rows: int = 50, hf_token: Optional[str] = None ) -> str: """ Load SMOLTRACE datasets from HuggingFace and return as JSON. This tool loads datasets with the "smoltrace-" prefix and returns the raw data as JSON. Use this to access: - Leaderboard data (kshitijthakkar/smoltrace-leaderboard) - Results datasets (e.g., username/smoltrace-results-*) - Traces datasets (e.g., username/smoltrace-traces-*) - Metrics datasets (e.g., username/smoltrace-metrics-*) - Any other smoltrace-prefixed evaluation dataset If you don't know which dataset to load, first load the leaderboard to see the dataset references in the results_dataset, traces_dataset, metrics_dataset, and dataset_used fields. Args: dataset_repo (str): HuggingFace dataset repository path with "smoltrace-" prefix (e.g., "kshitijthakkar/smoltrace-leaderboard") max_rows (int): Maximum number of rows to return. Default: 50. Range: 1-200 hf_token (Optional[str]): HuggingFace token for dataset access. If None, uses HF_TOKEN environment variable. Returns: str: JSON object with dataset data and metadata """ try: # Validate dataset has smoltrace- prefix if "smoltrace-" not in dataset_repo: return json.dumps({ "dataset_repo": dataset_repo, "error": "Only datasets with 'smoltrace-' prefix are allowed. Please use smoltrace-leaderboard or other smoltrace-* datasets.", "data": [] }, indent=2) # Load dataset from HuggingFace # Use user-provided token or fall back to environment variable token = hf_token if hf_token else os.getenv("HF_TOKEN") dataset = load_dataset(dataset_repo, split="train", token=token) df = pd.DataFrame(dataset) if df.empty: return json.dumps({ "dataset_repo": dataset_repo, "error": "Dataset is empty", "total_rows": 0, "data": [] }, indent=2) # Get total row count before limiting total_rows = len(df) # Limit rows to avoid overwhelming the context max_rows = max(1, min(200, max_rows)) # Sort by timestamp if available (newest first) if "timestamp" in df.columns: df = df.sort_values("timestamp", ascending=False) df_limited = df.head(max_rows) # Convert to list of dictionaries data = df_limited.to_dict(orient="records") # Build response with metadata result = { "dataset_repo": dataset_repo, "total_rows": total_rows, "rows_returned": len(data), "columns": list(df.columns), "data": data } return json.dumps(result, indent=2, default=str) except Exception as e: return json.dumps({ "dataset_repo": dataset_repo, "error": f"Failed to load dataset: {str(e)}", "data": [] }, indent=2) # ============================================================================ # MCP RESOURCES - Expose data for retrieval by MCP clients # ============================================================================ @gr.mcp.resource("leaderboard://{repo}") def get_leaderboard_data(repo: str = "kshitijthakkar/smoltrace-leaderboard", hf_token: Optional[str] = None) -> str: """ [RAW DATA ONLY] Get raw leaderboard data in JSON format - NO analysis or insights. ⚠️ DO NOT USE THIS for questions like "Which model is leading?" or "What's the best model?" Instead, use the analyze_leaderboard TOOL which provides AI-powered insights. This resource is ONLY for: - Getting raw JSON data when you need to process it yourself - Low-level data access for custom analysis - Direct dataset retrieval without AI interpretation For questions, insights, recommendations, or analysis → use analyze_leaderboard tool instead! Args: repo (str): HuggingFace dataset repository name. Default: "kshitijthakkar/smoltrace-leaderboard" hf_token (Optional[str]): HuggingFace token for dataset access. If None, uses HF_TOKEN environment variable. Returns: str: Raw JSON string containing all evaluation runs without any analysis """ try: # Use user-provided token or fall back to environment variable token = hf_token if hf_token else os.getenv("HF_TOKEN") ds = load_dataset(repo, split="train", token=token) df = pd.DataFrame(ds) # Convert to JSON with proper formatting data = df.to_dict('records') return json.dumps({ "total_runs": len(data), "repository": repo, "data": data }, indent=2) except Exception as e: return json.dumps({ "error": str(e), "repository": repo }) @gr.mcp.resource("trace://{trace_id}/{repo}") def get_trace_data(trace_id: str, repo: str, hf_token: Optional[str] = None) -> str: """ [RAW DATA ONLY] Get raw OpenTelemetry trace data in JSON format - NO analysis. ⚠️ DO NOT USE THIS for questions like "Why did this fail?" or "What took the most time?" Instead, use the debug_trace TOOL which provides AI-powered debugging and insights. This resource is ONLY for: - Getting raw OTEL span data when you need to process it yourself - Low-level trace access for custom analysis - Direct dataset retrieval without AI interpretation For debugging, questions, or analysis → use debug_trace tool instead! Args: trace_id (str): Unique identifier for the trace (e.g., "trace_abc123") repo (str): HuggingFace dataset repository containing traces (e.g., "username/agent-traces-model") hf_token (Optional[str]): HuggingFace token for dataset access. If None, uses HF_TOKEN environment variable. Returns: str: Raw JSON string containing OpenTelemetry spans without any analysis """ try: # Use user-provided token or fall back to environment variable token = hf_token if hf_token else os.getenv("HF_TOKEN") ds = load_dataset(repo, split="train", token=token) df = pd.DataFrame(ds) # Find specific trace trace_data = df[df['trace_id'] == trace_id] if len(trace_data) == 0: return json.dumps({ "error": f"Trace {trace_id} not found", "trace_id": trace_id, "repository": repo }) trace_row = trace_data.iloc[0] # Parse spans if they're stored as string spans = trace_row['spans'] if isinstance(spans, str): spans = json.loads(spans) return json.dumps({ "trace_id": trace_id, "repository": repo, "run_id": trace_row.get('run_id', 'unknown'), "spans": spans }, indent=2) except Exception as e: return json.dumps({ "error": str(e), "trace_id": trace_id, "repository": repo }) @gr.mcp.resource("cost://model/{model_name}") def get_cost_data(model_name: str) -> str: """ [RAW DATA ONLY] Get raw pricing data for a model in JSON format - NO estimates or analysis. ⚠️ DO NOT USE THIS for questions like "How much will this cost?" or "What's the best value?" Instead, use the estimate_cost TOOL which provides AI-powered cost estimates and recommendations. This resource is ONLY for: - Getting raw pricing tables when you need to process them yourself - Looking up base rates for models and hardware - Direct price data retrieval without calculations For cost estimates, predictions, or recommendations → use estimate_cost tool instead! Args: model_name (str): Model identifier (e.g., "openai/gpt-4", "meta-llama/Llama-3.1-8B") Returns: str: Raw JSON string with pricing rates without any cost estimation """ # Cost database llm_costs = { "openai/gpt-4": { "input_per_1k_tokens": 0.03, "output_per_1k_tokens": 0.06, "type": "api", "provider": "openai" }, "openai/gpt-3.5-turbo": { "input_per_1k_tokens": 0.0015, "output_per_1k_tokens": 0.002, "type": "api", "provider": "openai" }, "anthropic/claude-3-opus": { "input_per_1k_tokens": 0.015, "output_per_1k_tokens": 0.075, "type": "api", "provider": "anthropic" }, "anthropic/claude-3-sonnet": { "input_per_1k_tokens": 0.003, "output_per_1k_tokens": 0.015, "type": "api", "provider": "anthropic" }, "meta-llama/Llama-3.1-8B": { "input_per_1k_tokens": 0, "output_per_1k_tokens": 0, "type": "local", "provider": "meta", "requires_gpu": True, "recommended_hardware": "gpu_a10" } } hardware_costs = { "cpu": {"hourly_rate_usd": 0.60, "type": "cpu"}, "gpu_a10": {"hourly_rate_usd": 1.10, "type": "gpu", "model": "A10"}, "gpu_h200": {"hourly_rate_usd": 4.50, "type": "gpu", "model": "H200"} } model_cost = llm_costs.get(model_name) if model_cost: return json.dumps({ "model": model_name, "cost_data": model_cost, "hardware_options": hardware_costs, "currency": "USD" }, indent=2) else: return json.dumps({ "model": model_name, "error": "Model not found in cost database", "available_models": list(llm_costs.keys()), "hardware_options": hardware_costs }, indent=2) # ============================================================================ # MCP PROMPTS - Reusable prompt templates for common workflows # ============================================================================ @gr.mcp.prompt() def analysis_prompt( analysis_type: str = "leaderboard", focus_area: str = "overall", detail_level: str = "detailed" ) -> str: """ Generate a prompt template for analyzing agent evaluation data. This prompt helps standardize analysis requests across different evaluation data types and focus areas. Args: analysis_type (str): Type of analysis. Options: "leaderboard", "trace", "cost". Default: "leaderboard" focus_area (str): What to focus on. Options: "overall", "performance", "cost", "efficiency". Default: "overall" detail_level (str): Level of detail. Options: "summary", "detailed", "comprehensive". Default: "detailed" Returns: str: Formatted prompt template for analysis """ templates = { "leaderboard": { "overall": "Analyze the agent evaluation leaderboard data comprehensively. Identify top performers across all metrics (accuracy, cost, latency, CO2), explain trade-offs between different approaches, and provide actionable recommendations for model selection.", "performance": "Focus on performance metrics in the leaderboard. Compare success rates and accuracy across different models and agent types. Identify which configurations achieve the highest success rates and explain why.", "cost": "Analyze cost efficiency in the leaderboard. Compare costs across different models and identify the best cost-performance ratios. Recommend the most cost-effective configurations for different use cases.", "efficiency": "Evaluate efficiency metrics including latency, GPU utilization, and CO2 emissions. Identify the most efficient models and explain how to optimize for speed while maintaining quality." }, "trace": { "overall": "Analyze this agent execution trace comprehensively. Explain the sequence of operations, identify any bottlenecks or inefficiencies, and suggest optimizations.", "performance": "Focus on performance aspects of this trace. Identify which steps took the most time, explain why, and suggest ways to improve execution speed.", "cost": "Analyze the cost implications of this trace execution. Break down token usage and API calls, calculate costs, and suggest ways to reduce expenses.", "efficiency": "Evaluate the efficiency of this trace. Identify redundant operations, suggest ways to optimize the execution flow, and recommend best practices." }, "cost": { "overall": "Analyze the cost estimation comprehensively. Break down LLM API costs, infrastructure costs, and provide optimization recommendations.", "performance": "Focus on the cost-performance trade-off. Compare different hardware options and explain which provides the best value.", "cost": "Deep dive into cost breakdown. Explain each cost component in detail and provide specific recommendations for cost reduction.", "efficiency": "Analyze cost efficiency. Compare different model configurations and recommend the most cost-effective approach for the given use case." } } detail_prefixes = { "summary": "Provide a brief, high-level summary. ", "detailed": "Provide a detailed analysis with specific insights. ", "comprehensive": "Provide a comprehensive, in-depth analysis with detailed recommendations. " } prefix = detail_prefixes.get(detail_level, detail_prefixes["detailed"]) template = templates.get(analysis_type, {}).get(focus_area, templates["leaderboard"]["overall"]) return f"{prefix}{template}" @gr.mcp.prompt() def debug_prompt( debug_type: str = "error", context: str = "agent_execution" ) -> str: """ Generate a prompt template for debugging agent traces. This prompt helps standardize debugging requests for different types of issues and contexts. Args: debug_type (str): Type of debugging. Options: "error", "performance", "behavior", "optimization". Default: "error" context (str): Execution context. Options: "agent_execution", "tool_calling", "llm_reasoning". Default: "agent_execution" Returns: str: Formatted prompt template for debugging """ templates = { "error": { "agent_execution": "Debug this agent execution trace to identify why it failed. Analyze each step in the execution flow, identify where the error occurred, explain the root cause, and suggest how to fix it.", "tool_calling": "Debug this tool calling sequence. Identify which tool call failed or produced unexpected results, explain why it happened, and suggest corrections.", "llm_reasoning": "Debug the LLM reasoning in this trace. Analyze the prompts and responses, identify where the reasoning went wrong, and suggest improvements to the prompts or approach." }, "performance": { "agent_execution": "Analyze this trace for performance issues. Identify bottlenecks, measure time spent in each component, and recommend optimizations to improve execution speed.", "tool_calling": "Analyze tool calling performance. Identify which tools are slow, explain why, and suggest ways to optimize tool execution or caching.", "llm_reasoning": "Analyze LLM reasoning efficiency. Identify unnecessary calls, redundant reasoning steps, and suggest ways to streamline the reasoning process." }, "behavior": { "agent_execution": "Analyze the agent's behavior in this trace. Explain why the agent made certain decisions, whether the behavior is expected, and suggest improvements if needed.", "tool_calling": "Analyze tool selection behavior. Explain why certain tools were called, whether the choices were optimal, and suggest alternative approaches if applicable.", "llm_reasoning": "Analyze the LLM's reasoning patterns. Explain the logic flow, identify any unexpected reasoning, and suggest how to guide the model toward better decisions." }, "optimization": { "agent_execution": "Analyze this trace for optimization opportunities. Identify redundant operations, suggest caching strategies, and recommend ways to reduce costs and improve efficiency.", "tool_calling": "Optimize tool usage in this trace. Suggest ways to reduce tool calls, batch operations, or use more efficient alternatives.", "llm_reasoning": "Optimize LLM usage. Suggest ways to reduce token usage, improve prompt efficiency, and achieve the same results with lower costs." } } template = templates.get(debug_type, {}).get(context, templates["error"]["agent_execution"]) return template @gr.mcp.prompt() def optimization_prompt( optimization_goal: str = "cost", constraints: str = "maintain_quality" ) -> str: """ Generate a prompt template for optimization recommendations. This prompt helps standardize optimization requests for different goals and constraints. Args: optimization_goal (str): What to optimize. Options: "cost", "speed", "quality", "efficiency". Default: "cost" constraints (str): Constraints to consider. Options: "maintain_quality", "maintain_speed", "no_constraints". Default: "maintain_quality" Returns: str: Formatted prompt template for optimization """ templates = { "cost": { "maintain_quality": "Analyze this evaluation setup and recommend cost optimizations while maintaining quality. Consider cheaper models, optimized prompts, caching strategies, and hardware selection. Quantify potential savings.", "maintain_speed": "Recommend cost optimizations while maintaining execution speed. Consider model alternatives, batch processing, and infrastructure choices that reduce costs without adding latency.", "no_constraints": "Recommend aggressive cost optimizations. Identify all opportunities to reduce expenses, even if it means trade-offs in quality or speed. Prioritize maximum cost reduction." }, "speed": { "maintain_quality": "Recommend speed optimizations while maintaining quality. Consider parallel execution, caching, faster models with similar accuracy, and infrastructure upgrades. Quantify potential speedups.", "maintain_cost": "Recommend speed optimizations within the current cost budget. Suggest configuration changes, caching strategies, and optimizations that don't increase expenses.", "no_constraints": "Recommend aggressive speed optimizations. Identify all opportunities to reduce latency, even if it increases costs. Prioritize maximum performance." }, "quality": { "maintain_cost": "Recommend quality improvements within the current cost budget. Suggest better prompts, model configurations, and strategies that improve accuracy without increasing expenses.", "maintain_speed": "Recommend quality improvements while maintaining execution speed. Suggest prompt improvements, reasoning enhancements, and configurations that improve accuracy without adding latency.", "no_constraints": "Recommend quality improvements without budget constraints. Suggest the best models, optimal configurations, and strategies to maximize accuracy and success rates." }, "efficiency": { "maintain_quality": "Recommend overall efficiency improvements. Optimize for the best cost-speed-quality balance. Identify waste, suggest streamlined processes, and provide holistic optimization strategies.", "maintain_cost": "Recommend efficiency improvements within budget. Focus on reducing waste, optimizing resource usage, and getting better results with the same cost.", "maintain_speed": "Recommend efficiency improvements maintaining speed. Reduce unnecessary operations, optimize resource usage, and improve output quality without adding latency." } } # Handle constraint variations if constraints == "maintain_quality" and optimization_goal == "speed": constraints = "maintain_quality" # Use existing template elif constraints == "maintain_speed" and optimization_goal == "cost": constraints = "maintain_speed" # Use existing template template = templates.get(optimization_goal, {}).get(constraints, templates["cost"]["maintain_quality"]) return template