Spaces:
Running
Running
| """Deduplication module for EEE validation pipeline. | |
| Two-level dedup: | |
| - Exact duplicates: SHA256 hash of entire file content | |
| - Near duplicates: SHA256 hash of content minus timestamps/UUIDs | |
| """ | |
| import hashlib | |
| import json | |
| import logging | |
| from dataclasses import dataclass, field | |
| from typing import Any | |
| from huggingface_hub import HfApi, hf_hub_download | |
| from huggingface_hub.utils import EntryNotFoundError, RepositoryNotFoundError | |
| logger = logging.getLogger(__name__) | |
| DATASET_REPO_ID = "evaleval/EEE_datastore" | |
| MANIFEST_PATH = "manifest.json" | |
| # Fields to strip for near-duplicate fingerprinting | |
| FINGERPRINT_STRIP_FIELDS = { | |
| "retrieved_timestamp", | |
| "evaluation_id", | |
| "evaluation_timestamp", | |
| } | |
| def compute_sha256(content: bytes) -> str: | |
| return hashlib.sha256(content).hexdigest() | |
| def _strip_fields(data: dict[str, Any], fields_to_strip: set[str]) -> dict[str, Any]: | |
| """Recursively strip specified fields from a dict for fingerprinting.""" | |
| result = {} | |
| for key, value in data.items(): | |
| if key in fields_to_strip: | |
| continue | |
| if isinstance(value, dict): | |
| result[key] = _strip_fields(value, fields_to_strip) | |
| elif isinstance(value, list): | |
| result[key] = [ | |
| _strip_fields(item, fields_to_strip) if isinstance(item, dict) else item | |
| for item in value | |
| ] | |
| else: | |
| result[key] = value | |
| return result | |
| def compute_fingerprint(content: bytes) -> str: | |
| """Compute a near-duplicate fingerprint by hashing content minus timestamps/UUIDs.""" | |
| try: | |
| data = json.loads(content) | |
| except (json.JSONDecodeError, UnicodeDecodeError): | |
| # If we can't parse as JSON, fall back to full content hash | |
| return compute_sha256(content) | |
| stripped = _strip_fields(data, FINGERPRINT_STRIP_FIELDS) | |
| # Serialize deterministically | |
| canonical = json.dumps(stripped, sort_keys=True, ensure_ascii=True).encode() | |
| return hashlib.sha256(canonical).hexdigest() | |
| class DedupResult: | |
| """Results of deduplication check for a single file.""" | |
| file_path: str | |
| sha256: str | |
| fingerprint: str | |
| exact_duplicate_of: str | None = None | |
| near_duplicate_of: str | None = None | |
| class DedupReport: | |
| """Aggregated dedup report across all checked files.""" | |
| results: list[DedupResult] = field(default_factory=list) | |
| def has_exact_duplicates(self) -> bool: | |
| return any(r.exact_duplicate_of is not None for r in self.results) | |
| def has_near_duplicates(self) -> bool: | |
| return any(r.near_duplicate_of is not None for r in self.results) | |
| def load_manifest(api: HfApi) -> dict[str, Any]: | |
| """Download and parse manifest.json from the dataset repo's main branch.""" | |
| try: | |
| manifest_file = hf_hub_download( | |
| repo_id=DATASET_REPO_ID, | |
| filename=MANIFEST_PATH, | |
| repo_type="dataset", | |
| revision="main", | |
| ) | |
| with open(manifest_file, "r") as f: | |
| return json.load(f) | |
| except (EntryNotFoundError, RepositoryNotFoundError): | |
| logger.warning("manifest.json not found in %s, using empty manifest", DATASET_REPO_ID) | |
| return {"files": {}} | |
| except Exception: | |
| logger.exception("Failed to load manifest.json") | |
| return {"files": {}} | |
| def check_duplicates( | |
| file_paths: list[str], | |
| file_contents: dict[str, bytes], | |
| manifest: dict[str, Any], | |
| ) -> DedupReport: | |
| """Check files against the manifest for exact and near duplicates. | |
| Args: | |
| file_paths: Repo-relative paths of files to check (e.g. "data/gsm8k/.../abc.json") | |
| file_contents: Map of repo-relative path -> raw file bytes | |
| manifest: Parsed manifest.json with "files" key | |
| """ | |
| report = DedupReport() | |
| manifest_files = manifest.get("files", {}) | |
| # Build reverse lookups from manifest | |
| sha256_to_path: dict[str, str] = {} | |
| fingerprint_to_path: dict[str, str] = {} | |
| for path, entry in manifest_files.items(): | |
| sha256_to_path[entry["sha256"]] = path | |
| fingerprint_to_path[entry["fingerprint"]] = path | |
| for file_path in file_paths: | |
| content = file_contents.get(file_path) | |
| if content is None: | |
| continue | |
| sha256 = compute_sha256(content) | |
| # Only compute fingerprints for .json files (not .jsonl) | |
| if file_path.endswith(".json"): | |
| fingerprint = compute_fingerprint(content) | |
| else: | |
| fingerprint = sha256 # For JSONL, fingerprint == sha256 | |
| result = DedupResult( | |
| file_path=file_path, | |
| sha256=sha256, | |
| fingerprint=fingerprint, | |
| ) | |
| # Check exact duplicate | |
| if sha256 in sha256_to_path and sha256_to_path[sha256] != file_path: | |
| result.exact_duplicate_of = sha256_to_path[sha256] | |
| # Check near duplicate (only if not already an exact duplicate) | |
| if ( | |
| result.exact_duplicate_of is None | |
| and fingerprint in fingerprint_to_path | |
| and fingerprint_to_path[fingerprint] != file_path | |
| ): | |
| result.near_duplicate_of = fingerprint_to_path[fingerprint] | |
| report.results.append(result) | |
| return report | |