Spaces:
Sleeping
Sleeping
| """ | |
| LangChain + Custom Evaluator | |
| Combines LangChain for model management with custom evaluation metrics. | |
| """ | |
| import os | |
| import time | |
| import pandas as pd | |
| from typing import Dict, List, Any, Optional | |
| from pathlib import Path | |
| import duckdb | |
| import sqlglot | |
| from langchain_models import langchain_models_registry | |
| from custom_evaluator import custom_evaluator, EvaluationResult | |
| class LangChainEvaluator: | |
| """Integrated evaluator using LangChain and custom evaluation metrics.""" | |
| def __init__(self): | |
| self.models_registry = langchain_models_registry | |
| self.custom_evaluator = custom_evaluator | |
| def load_dataset(self, dataset_name: str) -> Dict[str, Any]: | |
| """Load dataset configuration and data.""" | |
| dataset_path = Path(f"tasks/{dataset_name}") | |
| if not dataset_path.exists(): | |
| raise ValueError(f"Dataset {dataset_name} not found") | |
| # Load schema | |
| schema_path = dataset_path / "schema.sql" | |
| with open(schema_path, 'r') as f: | |
| schema = f.read() | |
| # Load cases | |
| cases_path = dataset_path / "cases.yaml" | |
| import yaml | |
| with open(cases_path, 'r') as f: | |
| cases = yaml.safe_load(f) | |
| # Load data | |
| loader_path = dataset_path / "loader.py" | |
| db_path = f"{dataset_name}.duckdb" | |
| # Create database if it doesn't exist | |
| if not os.path.exists(db_path): | |
| self._create_database(loader_path, db_path) | |
| return { | |
| 'schema': schema, | |
| 'cases': cases.get('cases', []), # Extract the cases list from YAML | |
| 'db_path': db_path | |
| } | |
| def _create_database(self, loader_path: Path, db_path: str): | |
| """Create database using the loader script.""" | |
| try: | |
| # Import and run the loader | |
| import importlib.util | |
| spec = importlib.util.spec_from_file_location("loader", loader_path) | |
| loader_module = importlib.util.module_from_spec(spec) | |
| spec.loader.exec_module(loader_module) | |
| # Run the loader function | |
| if hasattr(loader_module, 'load_data'): | |
| loader_module.load_data(db_path) | |
| else: | |
| print(f"⚠️ No load_data function found in {loader_path}") | |
| except Exception as e: | |
| print(f"❌ Error creating database: {e}") | |
| def load_prompt_template(self, dialect: str) -> str: | |
| """Load prompt template for the given dialect.""" | |
| template_path = f"prompts/template_{dialect}.txt" | |
| if not os.path.exists(template_path): | |
| # Fallback to generic template | |
| template_path = "prompts/template_presto.txt" | |
| with open(template_path, 'r') as f: | |
| return f.read() | |
| def evaluate_models( | |
| self, | |
| dataset_name: str, | |
| dialect: str, | |
| case_id: str, | |
| model_names: List[str] | |
| ) -> List[EvaluationResult]: | |
| """Evaluate multiple models on a single case.""" | |
| # Load dataset | |
| dataset = self.load_dataset(dataset_name) | |
| # Find the case | |
| case = None | |
| for c in dataset['cases']: | |
| if c['id'] == case_id: | |
| case = c | |
| break | |
| if not case: | |
| raise ValueError(f"Case {case_id} not found in dataset {dataset_name}") | |
| # Load prompt template | |
| prompt_template = self.load_prompt_template(dialect) | |
| # Setup database connection | |
| db_conn = duckdb.connect(dataset['db_path']) | |
| results = [] | |
| for model_name in model_names: | |
| print(f"🔍 Evaluating {model_name} on {dataset_name}/{case_id} ({dialect})") | |
| # Get model configuration | |
| model_config = self.models_registry.get_model_config(model_name) | |
| if not model_config: | |
| print(f"⚠️ Model {model_name} not found, skipping") | |
| continue | |
| try: | |
| # Generate SQL using LangChain | |
| raw_sql, generated_sql = self.models_registry.generate_sql( | |
| model_config=model_config, | |
| prompt_template=prompt_template, | |
| schema=dataset['schema'], | |
| question=case['question'] | |
| ) | |
| # Get reference SQL for the dialect | |
| reference_sql = case['reference_sql'].get(dialect, case['reference_sql'].get('presto', '')) | |
| print(f"📝 LLM Raw Output: {raw_sql[:100]}...") | |
| print(f"📝 LLM Cleaned SQL: {generated_sql[:100]}...") | |
| print(f"📝 Human Reference SQL: {reference_sql[:100]}...") | |
| # Evaluate using custom evaluator | |
| result = self.custom_evaluator.evaluate_sql( | |
| model_name=model_name, | |
| dataset=dataset_name, | |
| case_id=case_id, | |
| dialect=dialect, | |
| question=case['question'], | |
| raw_sql=raw_sql, | |
| generated_sql=generated_sql, | |
| reference_sql=reference_sql, | |
| schema=dataset['schema'], | |
| db_conn=db_conn | |
| ) | |
| results.append(result) | |
| # Calculate composite score | |
| composite_score = ( | |
| result.correctness_exact * 0.3 + | |
| result.result_match_f1 * 0.3 + | |
| result.exec_success * 0.2 + | |
| result.sql_quality * 0.1 + | |
| result.semantic_similarity * 0.1 | |
| ) | |
| print(f"✅ {model_name}: Composite Score = {composite_score:.3f}") | |
| except Exception as e: | |
| print(f"❌ Error evaluating {model_name}: {e}") | |
| continue | |
| # Close database connection | |
| db_conn.close() | |
| return results | |
| def evaluate_batch( | |
| self, | |
| dataset_name: str, | |
| dialect: str, | |
| case_ids: List[str], | |
| model_names: List[str] | |
| ) -> List[EvaluationResult]: | |
| """Evaluate multiple models on multiple cases.""" | |
| all_results = [] | |
| for case_id in case_ids: | |
| print(f"\n🎯 Evaluating case: {case_id}") | |
| case_results = self.evaluate_models( | |
| dataset_name=dataset_name, | |
| dialect=dialect, | |
| case_id=case_id, | |
| model_names=model_names | |
| ) | |
| all_results.extend(case_results) | |
| return all_results | |
| def get_leaderboard_data(self) -> pd.DataFrame: | |
| """Get current leaderboard data.""" | |
| leaderboard_path = "leaderboard.parquet" | |
| if os.path.exists(leaderboard_path): | |
| return pd.read_parquet(leaderboard_path) | |
| else: | |
| return pd.DataFrame() | |
| def update_leaderboard(self, results: List[EvaluationResult]): | |
| """Update the leaderboard with new results.""" | |
| # Convert results to DataFrame | |
| new_data = [] | |
| for result in results: | |
| new_data.append({ | |
| 'model_name': result.model_name, | |
| 'dataset_name': result.dataset, | |
| 'dialect': result.dialect, | |
| 'case_id': result.case_id, | |
| 'question': result.question, | |
| 'reference_sql': result.reference_sql, | |
| 'generated_sql': result.generated_sql, | |
| 'correctness_exact': result.correctness_exact, | |
| 'result_match_f1': result.result_match_f1, | |
| 'exec_success': result.exec_success, | |
| 'latency_ms': result.latency_ms, | |
| 'readability': result.readability, | |
| 'dialect_ok': result.dialect_ok, | |
| 'sql_quality': result.sql_quality, | |
| 'semantic_similarity': result.semantic_similarity, | |
| 'structural_similarity': result.structural_similarity, | |
| 'composite_score': result.composite_score, | |
| 'timestamp': str(pd.Timestamp.now()) | |
| }) | |
| new_df = pd.DataFrame(new_data) | |
| # Load existing leaderboard | |
| existing_df = self.get_leaderboard_data() | |
| # Combine and save | |
| if not existing_df.empty: | |
| combined_df = pd.concat([existing_df, new_df], ignore_index=True) | |
| else: | |
| combined_df = new_df | |
| # Ensure timestamp column is treated as string to avoid conversion issues | |
| if 'timestamp' in combined_df.columns: | |
| combined_df['timestamp'] = combined_df['timestamp'].astype(str) | |
| combined_df.to_parquet("leaderboard.parquet", index=False) | |
| print(f"📊 Leaderboard updated with {len(new_data)} new results") | |
| def get_leaderboard_summary(self, top_n: int = 50) -> pd.DataFrame: | |
| """Get leaderboard summary with aggregated scores.""" | |
| df = self.get_leaderboard_data() | |
| if df.empty: | |
| return pd.DataFrame() | |
| # Aggregate by model - handle missing RAGAS columns | |
| agg_dict = { | |
| 'composite_score': ['mean', 'std', 'count'], | |
| 'correctness_exact': 'mean', | |
| 'result_match_f1': 'mean', | |
| 'exec_success': 'mean', | |
| 'latency_ms': 'mean' | |
| } | |
| # Add RAGAS columns if they exist | |
| if 'sql_quality' in df.columns: | |
| agg_dict['sql_quality'] = 'mean' | |
| if 'semantic_similarity' in df.columns: | |
| agg_dict['semantic_similarity'] = 'mean' | |
| if 'structural_similarity' in df.columns: | |
| agg_dict['structural_similarity'] = 'mean' | |
| summary = df.groupby('model_name').agg(agg_dict).round(3) | |
| # Flatten column names | |
| summary.columns = ['_'.join(col).strip() for col in summary.columns] | |
| # Sort by composite score | |
| summary = summary.sort_values('composite_score_mean', ascending=False) | |
| return summary.head(top_n) | |
| def run_comprehensive_evaluation( | |
| self, | |
| dataset_name: str, | |
| dialect: str, | |
| model_names: List[str], | |
| max_cases: Optional[int] = None | |
| ) -> List[EvaluationResult]: | |
| """Run comprehensive evaluation across all cases.""" | |
| # Load dataset | |
| dataset = self.load_dataset(dataset_name) | |
| # Get case IDs | |
| case_ids = [case['id'] for case in dataset['cases']] | |
| if max_cases: | |
| case_ids = case_ids[:max_cases] | |
| print(f"🚀 Starting comprehensive evaluation:") | |
| print(f" Dataset: {dataset_name}") | |
| print(f" Dialect: {dialect}") | |
| print(f" Models: {', '.join(model_names)}") | |
| print(f" Cases: {len(case_ids)}") | |
| # Run evaluation | |
| results = self.evaluate_batch( | |
| dataset_name=dataset_name, | |
| dialect=dialect, | |
| case_ids=case_ids, | |
| model_names=model_names | |
| ) | |
| # Update leaderboard | |
| self.update_leaderboard(results) | |
| # Print summary | |
| self._print_evaluation_summary(results) | |
| return results | |
| def _print_evaluation_summary(self, results: List[EvaluationResult]): | |
| """Print evaluation summary.""" | |
| if not results: | |
| print("❌ No results to summarize") | |
| return | |
| # Group by model | |
| model_results = {} | |
| for result in results: | |
| if result.model_name not in model_results: | |
| model_results[result.model_name] = [] | |
| model_results[result.model_name].append(result) | |
| print(f"\n📊 Evaluation Summary:") | |
| print("=" * 60) | |
| for model_name, model_result_list in model_results.items(): | |
| avg_composite = sum(r.composite_score for r in model_result_list) / len(model_result_list) | |
| avg_correctness = sum(r.correctness_exact for r in model_result_list) / len(model_result_list) | |
| avg_f1 = sum(r.result_match_f1 for r in model_result_list) / len(model_result_list) | |
| avg_exec = sum(r.exec_success for r in model_result_list) / len(model_result_list) | |
| avg_latency = sum(r.latency_ms for r in model_result_list) / len(model_result_list) | |
| print(f"\n🤖 {model_name}:") | |
| print(f" Composite Score: {avg_composite:.3f}") | |
| print(f" Correctness: {avg_correctness:.3f}") | |
| print(f" Result Match F1: {avg_f1:.3f}") | |
| print(f" Execution Success: {avg_exec:.3f}") | |
| print(f" Avg Latency: {avg_latency:.1f}ms") | |
| print(f" Cases Evaluated: {len(model_result_list)}") | |
| # Global instance | |
| langchain_evaluator = LangChainEvaluator() | |