Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import re | |
| from dataclasses import dataclass | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional | |
| import gradio as gr | |
| import requests | |
| import yaml | |
| from huggingface_hub import hf_hub_download | |
| from huggingface_hub.utils import HfHubHTTPError | |
| API_BASE = "https://huggingface.co/api" | |
| PIPELINE_FILTER = "text-generation" | |
| TRENDING_LIMIT = 10 | |
| TRENDING_FETCH_LIMIT = 50 | |
| PR_SCAN_LIMIT = 40 | |
| USER_AGENT = "skills-evals-leaderboard/0.2" | |
| TABLE_HEADERS = [ | |
| "Model", | |
| "Benchmark", | |
| "Score", | |
| "Source", | |
| ] | |
| TABLE_DATATYPES = [ | |
| "text", | |
| "text", | |
| "number", | |
| "markdown", | |
| ] | |
| def _normalize(text: Optional[str]) -> str: | |
| if not text: | |
| return "" | |
| text = text.lower() | |
| text = re.sub(r"[^a-z0-9]+", " ", text) | |
| return text.strip() | |
| def _coerce_score(value: Any) -> Optional[float]: | |
| if value is None: | |
| return None | |
| if isinstance(value, (int, float)): | |
| return float(value) | |
| if isinstance(value, str): | |
| candidate = value.strip() | |
| if candidate.endswith("%"): | |
| candidate = candidate[:-1] | |
| try: | |
| return float(candidate) | |
| except ValueError: | |
| return None | |
| return None | |
| class BenchmarkSpec: | |
| key: str | |
| label: str | |
| aliases: tuple[str, ...] | |
| def matches(self, fields: List[str]) -> bool: | |
| for alias in self.aliases: | |
| alias_norm = _normalize(alias) | |
| if not alias_norm: | |
| continue | |
| for field in fields: | |
| if alias_norm in field: | |
| return True | |
| return False | |
| BENCHMARKS: Dict[str, BenchmarkSpec] = { | |
| "mmlu": BenchmarkSpec( | |
| key="mmlu", | |
| label="MMLU", | |
| aliases=("mmlu", "massive multitask language understanding"), | |
| ), | |
| "bigcodebench": BenchmarkSpec( | |
| key="bigcodebench", | |
| label="BigCodeBench", | |
| aliases=("bigcodebench", "big code bench"), | |
| ), | |
| "arc_mc": BenchmarkSpec( | |
| key="arc_mc", | |
| label="ARC MC", | |
| aliases=( | |
| "arc mc", | |
| "arc-challenge", | |
| "arc challenge", | |
| "arc multiple choice", | |
| "arc c", | |
| ), | |
| ), | |
| } | |
| class LeaderboardFetcher: | |
| def __init__(self) -> None: | |
| self.session = requests.Session() | |
| self.session.headers.update({"User-Agent": USER_AGENT}) | |
| self.logs: List[str] = [] | |
| def build(self) -> Dict[str, Any]: | |
| trending = self._fetch_trending_models() | |
| leaders: List[Dict[str, Any]] = [] | |
| for entry in trending: | |
| repo_id = entry.get("modelId") or entry.get("id") | |
| if not repo_id: | |
| continue | |
| scores = self._collect_scores(repo_id) | |
| if scores["scores"]: | |
| leaders.append(scores) | |
| return self._compose_tables(leaders) | |
| def log_text(self) -> str: | |
| if not self.logs: | |
| return "No actions recorded." | |
| return "\n".join(self.logs) | |
| def _fetch_trending_models(self) -> List[Dict[str, Any]]: | |
| params = {"sort": "trendingScore", "limit": TRENDING_FETCH_LIMIT} | |
| response = self.session.get( | |
| f"{API_BASE}/models", | |
| params=params, | |
| timeout=30, | |
| ) | |
| response.raise_for_status() | |
| data = response.json() | |
| if not isinstance(data, list): | |
| raise ValueError("Unexpected trending response.") | |
| filtered = [ | |
| model | |
| for model in data | |
| if (model.get("pipeline_tag") == PIPELINE_FILTER or PIPELINE_FILTER in (model.get("tags") or [])) | |
| ] | |
| if not filtered: | |
| self.logs.append("⚠️ No text-generation models in trending feed.") | |
| return [] | |
| limited = filtered[:TRENDING_LIMIT] | |
| if len(limited) < TRENDING_LIMIT: | |
| self.logs.append(f"⚠️ Only {len(limited)} text-generation models available.") | |
| else: | |
| self.logs.append(f"🔍 Loaded {TRENDING_LIMIT} trending text-generation models.") | |
| return limited | |
| def _collect_scores(self, repo_id: str) -> Dict[str, Any]: | |
| owner = repo_id.split("/")[0] | |
| card_meta = self._read_model_card(repo_id) | |
| model_index = card_meta.get("model-index") | |
| if model_index: | |
| self.logs.append(f"✅ {repo_id}: model card metadata found.") | |
| scores = self._extract_scores( | |
| repo_id=repo_id, | |
| model_index=model_index, | |
| contributor=owner, | |
| source_type="model-card", | |
| source_url=f"https://huggingface.co/{repo_id}", | |
| revision="main", | |
| ) | |
| if scores: | |
| return {"model_id": repo_id, "scores": scores} | |
| prs = self._fetch_pull_requests(repo_id) | |
| for pr in prs: | |
| revision = f"refs/pr/{pr['num']}" | |
| pr_meta = self._read_model_card(repo_id, revision=revision) | |
| pr_index = pr_meta.get("model-index") | |
| if not pr_index: | |
| continue | |
| author_info = pr.get("author", {}) or {} | |
| contributor = author_info.get("name") or author_info.get("fullname") or "unknown-author" | |
| discussion_path = f"{repo_id}/discussions/{pr['num']}" | |
| source_url = f"https://huggingface.co/{discussion_path}" | |
| scores = self._extract_scores( | |
| repo_id=repo_id, | |
| model_index=pr_index, | |
| contributor=contributor, | |
| source_type="pull-request", | |
| source_url=source_url, | |
| revision=revision, | |
| ) | |
| if scores: | |
| note = f"📝 {repo_id}: PR #{pr['num']} by {contributor}." | |
| self.logs.append(note) | |
| return {"model_id": repo_id, "scores": scores} | |
| self.logs.append(f"⚠️ {repo_id}: no target benchmarks located.") | |
| return {"model_id": repo_id, "scores": {}} | |
| def _read_model_card( | |
| self, | |
| repo_id: str, | |
| revision: Optional[str] = None, | |
| ) -> Dict[str, Any]: | |
| try: | |
| path = hf_hub_download( | |
| repo_id=repo_id, | |
| filename="README.md", | |
| repo_type="model", | |
| revision=revision, | |
| ) | |
| except HfHubHTTPError as err: | |
| ctx = f"{repo_id} ({revision or 'main'})" | |
| self.logs.append(f"🚫 {ctx}: README download failed ({err}).") | |
| return {} | |
| text = Path(path).read_text(encoding="utf-8", errors="ignore") | |
| return self._parse_front_matter(text) | |
| def _parse_front_matter(content: str) -> Dict[str, Any]: | |
| content = content.lstrip("\ufeff") | |
| if not content.startswith("---"): | |
| return {} | |
| lines = content.splitlines() | |
| end_idx = None | |
| for idx, line in enumerate(lines[1:], start=1): | |
| if line.strip() == "---": | |
| end_idx = idx | |
| break | |
| if end_idx is None: | |
| return {} | |
| front_matter = "\n".join(lines[1:end_idx]) | |
| try: | |
| data = yaml.safe_load(front_matter) or {} | |
| return data if isinstance(data, dict) else {} | |
| except yaml.YAMLError: | |
| return {} | |
| def _fetch_pull_requests(self, repo_id: str) -> List[Dict[str, Any]]: | |
| url = f"{API_BASE}/models/{repo_id}/discussions" | |
| try: | |
| response = self.session.get( | |
| url, | |
| params={"limit": PR_SCAN_LIMIT}, | |
| timeout=30, | |
| ) | |
| response.raise_for_status() | |
| except requests.RequestException as err: | |
| self.logs.append(f"🚫 {repo_id}: PR list request failed ({err}).") | |
| return [] | |
| payload = response.json() | |
| discussions = payload.get("discussions", []) | |
| prs = [disc for disc in discussions if disc.get("isPullRequest")] | |
| prs.sort(key=lambda item: item.get("createdAt", ""), reverse=True) | |
| if prs: | |
| self.logs.append(f"📬 {repo_id}: scanning {len(prs)} pull requests.") | |
| return prs | |
| def _extract_scores( | |
| self, | |
| repo_id: str, | |
| model_index: Any, | |
| contributor: str, | |
| source_type: str, | |
| source_url: str, | |
| revision: str, | |
| ) -> Dict[str, Dict[str, Any]]: | |
| if not isinstance(model_index, list): | |
| return {} | |
| scores: Dict[str, Dict[str, Any]] = {} | |
| for entry in model_index: | |
| if not isinstance(entry, dict): | |
| continue | |
| model_name = entry.get("name") or repo_id.split("/")[-1] | |
| for result in entry.get("results", []): | |
| dataset_info = result.get("dataset") or {} | |
| dataset_name = dataset_info.get("name") | |
| dataset_type = dataset_info.get("type") | |
| task_info = result.get("task") or {} | |
| task_type = task_info.get("type") | |
| for metric in result.get("metrics", []): | |
| benchmark_key = self._match_benchmark( | |
| dataset_name, | |
| dataset_type, | |
| metric, | |
| ) | |
| if not benchmark_key: | |
| continue | |
| raw_value = metric.get("value") | |
| value = _coerce_score(raw_value) | |
| if value is None: | |
| continue | |
| unit = metric.get("unit") or "" | |
| is_pct = isinstance(raw_value, str) and raw_value.strip().endswith("%") | |
| if not unit and is_pct: | |
| unit = "%" | |
| metric_name = metric.get("name") or metric.get("type") or "" | |
| payload = { | |
| "model": repo_id, | |
| "model_name": model_name, | |
| "benchmark_key": benchmark_key, | |
| "benchmark_label": BENCHMARKS[benchmark_key].label, | |
| "value": value, | |
| "unit": unit, | |
| "dataset": dataset_name or dataset_type or "", | |
| "task_type": task_type or "", | |
| "metric_name": metric_name, | |
| "contributor": contributor, | |
| "source_type": source_type, | |
| "source_url": source_url, | |
| "revision": revision, | |
| } | |
| existing = scores.get(benchmark_key) | |
| if not existing or value > existing["value"]: | |
| scores[benchmark_key] = payload | |
| return scores | |
| def _match_benchmark( | |
| self, | |
| dataset_name: Optional[str], | |
| dataset_type: Optional[str], | |
| metric: Dict[str, Any], | |
| ) -> Optional[str]: | |
| fields = [ | |
| _normalize(dataset_name), | |
| _normalize(dataset_type), | |
| _normalize(metric.get("name")), | |
| _normalize(metric.get("type")), | |
| ] | |
| fields = [field for field in fields if field] | |
| for key, spec in BENCHMARKS.items(): | |
| if spec.matches(fields): | |
| return key | |
| return None | |
| def _compose_tables(self, entries: List[Dict[str, Any]]) -> Dict[str, Any]: | |
| all_rows: List[Dict[str, Any]] = [] | |
| per_benchmark: Dict[str, List[Dict[str, Any]]] = {key: [] for key in BENCHMARKS} | |
| for entry in entries: | |
| for benchmark_key, payload in entry["scores"].items(): | |
| row = { | |
| "Model": entry["model_id"], | |
| "Benchmark": BENCHMARKS[benchmark_key].label, | |
| "Score": round(payload["value"], 2), | |
| "Source": f"{payload['source_type']} by [{payload['contributor']}]({payload['source_url']})", | |
| } | |
| all_rows.append(row) | |
| per_benchmark[benchmark_key].append(row) | |
| for rows in per_benchmark.values(): | |
| rows.sort(key=lambda r: r["Score"], reverse=True) | |
| all_rows.sort(key=lambda r: r["Score"], reverse=True) | |
| return { | |
| "all_rows": all_rows, | |
| "per_benchmark": per_benchmark, | |
| "stats": { | |
| "models_with_scores": len(entries), | |
| "row_count": len(all_rows), | |
| "generated_at": datetime.now(timezone.utc).isoformat(), | |
| }, | |
| } | |
| def _rows_to_matrix(rows: List[Dict[str, Any]]) -> List[List[Any]]: | |
| return [[row.get(header, "") for header in TABLE_HEADERS] for row in rows] | |
| def refresh_handler() -> List[Any]: | |
| fetcher = LeaderboardFetcher() | |
| try: | |
| result = fetcher.build() | |
| stats = result["stats"] | |
| status = "\n".join( | |
| [ | |
| f"Last updated: {stats['generated_at']}", | |
| f"Models with scores: {stats['models_with_scores']}", | |
| f"Total entries: {stats['row_count']}", | |
| "", | |
| fetcher.log_text(), | |
| ] | |
| ) | |
| return [ | |
| status, | |
| _rows_to_matrix(result["all_rows"]), | |
| ] | |
| except Exception as exc: # pylint: disable=broad-except | |
| error = f"❌ Failed to refresh leaderboard: {exc}" | |
| empty: List[List[Any]] = [] | |
| return [error, empty] | |
| with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
| gr.Markdown( | |
| """ | |
| # HF Evaluation Leaderboard | |
| Shows MMLU, BigCodeBench, and ARC MC scores pulled from model-index | |
| metadata or their pull requests for the top text-generation models. | |
| """ | |
| ) | |
| refresh_button = gr.Button("Refresh", variant="primary") | |
| status_box = gr.Markdown("") | |
| all_table = gr.Dataframe(headers=TABLE_HEADERS, interactive=False, datatype=TABLE_DATATYPES) | |
| refresh_button.click( # pylint: disable=no-member | |
| refresh_handler, | |
| inputs=[], | |
| outputs=[ | |
| status_box, | |
| all_table, | |
| ], | |
| ) | |
| demo.load( # pylint: disable=no-member | |
| refresh_handler, | |
| outputs=[ | |
| status_box, | |
| all_table, | |
| ], | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() | |