Spaces:
Sleeping
Sleeping
| import json | |
| import os | |
| import pandas as pd | |
| from src.display.formatting import has_no_nan_values, make_clickable_model | |
| from src.display.utils import AutoEvalColumn, EvalQueueColumn | |
| from src.leaderboard.read_evals import get_raw_eval_results | |
| # Import SAGE-specific modules - avoid transformers dependency | |
| process_sage_results_for_leaderboard = None | |
| try: | |
| # Import SAGE modules without triggering transformers dependency | |
| import sys | |
| import os | |
| import json | |
| from dataclasses import dataclass | |
| from typing import Dict, List, Any | |
| import numpy as np | |
| # Copy SAGEResult class locally to avoid import issues | |
| class SAGEResult: | |
| submission_id: str | |
| organization: str | |
| email: str | |
| results: Dict[str, float] | |
| num_predictions: int | |
| submitted_time: str | |
| status: str = "EVALUATED" | |
| def to_dict(self): | |
| """Converts the SAGE Result to a dict compatible with our dataframe display""" | |
| # Use overall score if available, otherwise calculate average | |
| if "sage_overall" in self.results: | |
| average = self.results["sage_overall"] | |
| else: | |
| domain_scores = [v for v in self.results.values() if v is not None and isinstance(v, (int, float))] | |
| average = sum(domain_scores) / len(domain_scores) if domain_scores else 0.0 | |
| # Extract model name from submission_id for initial results | |
| if self.submission_id.startswith("initial_"): | |
| model_name = self.submission_id.split("_", 2)[-1].replace("_", " ") | |
| display_name = f"**{model_name}**" | |
| model_symbol = "🤖" | |
| else: | |
| display_name = f"[{self.organization}]({self.email})" | |
| model_symbol = "🏢" | |
| from src.display.utils import AutoEvalColumn, Tasks | |
| data_dict = { | |
| "eval_name": self.submission_id, | |
| AutoEvalColumn.model.name: display_name, | |
| AutoEvalColumn.model_type_symbol.name: model_symbol, | |
| AutoEvalColumn.model_type.name: "SAGE Benchmark", | |
| AutoEvalColumn.precision.name: self.organization, | |
| AutoEvalColumn.weight_type.name: "Evaluated", | |
| AutoEvalColumn.architecture.name: "Multi-domain", | |
| AutoEvalColumn.average.name: round(average, 2), | |
| AutoEvalColumn.license.name: "N/A", | |
| AutoEvalColumn.likes.name: 0, | |
| AutoEvalColumn.params.name: 0, | |
| AutoEvalColumn.still_on_hub.name: True, | |
| AutoEvalColumn.revision.name: self.submitted_time, | |
| } | |
| # Add domain-specific scores | |
| for task in Tasks: | |
| domain_key = task.value.benchmark | |
| data_dict[task.value.col_name] = self.results.get(domain_key, 0.0) | |
| return data_dict | |
| def load_initial_sage_results_local() -> List[SAGEResult]: | |
| """Load initial SAGE results without external dependencies""" | |
| possible_paths = [ | |
| "./initial_sage_results.json", | |
| "initial_sage_results.json", | |
| os.path.join(os.path.dirname(os.path.dirname(__file__)), "initial_sage_results.json") | |
| ] | |
| initial_results_path = None | |
| for path in possible_paths: | |
| if os.path.exists(path): | |
| initial_results_path = path | |
| break | |
| sage_results = [] | |
| if initial_results_path: | |
| try: | |
| with open(initial_results_path, 'r') as f: | |
| initial_data = json.load(f) | |
| for i, entry in enumerate(initial_data): | |
| sage_result = SAGEResult( | |
| submission_id=f"initial_{i:02d}_{entry['model_name'].replace(' ', '_').replace('-', '_')}", | |
| organization=f"{entry['organization']} ({entry['tokens']})", | |
| email=f"contact@{entry['organization'].lower().replace(' ', '')}.com", | |
| results=entry["results"], | |
| num_predictions=1000, | |
| submitted_time=entry["submitted_time"], | |
| status="EVALUATED" | |
| ) | |
| sage_results.append(sage_result) | |
| except Exception as e: | |
| print(f"Error loading initial SAGE results from {initial_results_path}: {e}") | |
| else: | |
| print(f"Initial SAGE results file not found. Tried paths: {possible_paths}") | |
| return sage_results | |
| def process_sage_results_for_leaderboard_local(submissions_dir: str = "./sage_submissions") -> List[SAGEResult]: | |
| """Process all SAGE submissions without external dependencies""" | |
| sage_results = [] | |
| # Load initial benchmark results | |
| sage_results.extend(load_initial_sage_results_local()) | |
| return sage_results | |
| # Set the function | |
| process_sage_results_for_leaderboard = process_sage_results_for_leaderboard_local | |
| except ImportError as e: | |
| print(f"Could not set up SAGE results processing: {e}") | |
| process_sage_results_for_leaderboard = None | |
| def get_leaderboard_df(results_path: str, requests_path: str, cols: list, benchmark_cols: list) -> pd.DataFrame: | |
| """Creates a dataframe from all the individual experiment results""" | |
| raw_data = get_raw_eval_results(results_path, requests_path) | |
| all_data_json = [v.to_dict() for v in raw_data] | |
| df = pd.DataFrame.from_records(all_data_json) | |
| df = df.sort_values(by=[AutoEvalColumn.average.name], ascending=False) | |
| df = df[cols].round(decimals=2) | |
| # filter out if any of the benchmarks have not been produced | |
| df = df[has_no_nan_values(df, benchmark_cols)] | |
| return df | |
| def get_sage_leaderboard_df(cols: list, benchmark_cols: list) -> pd.DataFrame: | |
| """Creates a dataframe from SAGE evaluation results""" | |
| if process_sage_results_for_leaderboard is None: | |
| return pd.DataFrame() | |
| # Get SAGE results | |
| sage_results = process_sage_results_for_leaderboard() | |
| all_data_json = [result.to_dict() for result in sage_results] | |
| if not all_data_json: | |
| return pd.DataFrame() | |
| df = pd.DataFrame.from_records(all_data_json) | |
| df = df.sort_values(by=[AutoEvalColumn.average.name], ascending=False) | |
| df = df[cols].round(decimals=2) | |
| # filter out if any of the benchmarks have not been produced | |
| df = df[has_no_nan_values(df, benchmark_cols)] | |
| return df | |
| def get_evaluation_queue_df(save_path: str, cols: list) -> list[pd.DataFrame]: | |
| """Creates the different dataframes for the evaluation queues requestes""" | |
| if not os.path.exists(save_path): | |
| # Return empty dataframes if the path doesn't exist | |
| empty_df = pd.DataFrame(columns=cols) | |
| return empty_df, empty_df, empty_df | |
| entries = [entry for entry in os.listdir(save_path) if not entry.startswith(".")] | |
| all_evals = [] | |
| for entry in entries: | |
| if ".json" in entry: | |
| file_path = os.path.join(save_path, entry) | |
| with open(file_path) as fp: | |
| data = json.load(fp) | |
| data[EvalQueueColumn.model.name] = make_clickable_model(data["model"]) | |
| data[EvalQueueColumn.revision.name] = data.get("revision", "main") | |
| all_evals.append(data) | |
| elif ".md" not in entry: | |
| # this is a folder | |
| sub_entries = [e for e in os.listdir(f"{save_path}/{entry}") if os.path.isfile(e) and not e.startswith(".")] | |
| for sub_entry in sub_entries: | |
| file_path = os.path.join(save_path, entry, sub_entry) | |
| with open(file_path) as fp: | |
| data = json.load(fp) | |
| data[EvalQueueColumn.model.name] = make_clickable_model(data["model"]) | |
| data[EvalQueueColumn.revision.name] = data.get("revision", "main") | |
| all_evals.append(data) | |
| pending_list = [e for e in all_evals if e["status"] in ["PENDING", "RERUN"]] | |
| running_list = [e for e in all_evals if e["status"] == "RUNNING"] | |
| finished_list = [e for e in all_evals if e["status"].startswith("FINISHED") or e["status"] == "PENDING_NEW_EVAL"] | |
| df_pending = pd.DataFrame.from_records(pending_list, columns=cols) | |
| df_running = pd.DataFrame.from_records(running_list, columns=cols) | |
| df_finished = pd.DataFrame.from_records(finished_list, columns=cols) | |
| return df_finished[cols], df_running[cols], df_pending[cols] | |