"""Deduplication module for EEE validation pipeline. Two-level dedup: - Exact duplicates: SHA256 hash of entire file content - Near duplicates: SHA256 hash of content minus timestamps/UUIDs """ import hashlib import json import logging from dataclasses import dataclass, field from typing import Any from huggingface_hub import HfApi, hf_hub_download from huggingface_hub.utils import EntryNotFoundError, RepositoryNotFoundError logger = logging.getLogger(__name__) DATASET_REPO_ID = "evaleval/EEE_datastore" MANIFEST_PATH = "manifest.json" # Fields to strip for near-duplicate fingerprinting FINGERPRINT_STRIP_FIELDS = { "retrieved_timestamp", "evaluation_id", "evaluation_timestamp", } def compute_sha256(content: bytes) -> str: return hashlib.sha256(content).hexdigest() def _strip_fields(data: dict[str, Any], fields_to_strip: set[str]) -> dict[str, Any]: """Recursively strip specified fields from a dict for fingerprinting.""" result = {} for key, value in data.items(): if key in fields_to_strip: continue if isinstance(value, dict): result[key] = _strip_fields(value, fields_to_strip) elif isinstance(value, list): result[key] = [ _strip_fields(item, fields_to_strip) if isinstance(item, dict) else item for item in value ] else: result[key] = value return result def compute_fingerprint(content: bytes) -> str: """Compute a near-duplicate fingerprint by hashing content minus timestamps/UUIDs.""" try: data = json.loads(content) except (json.JSONDecodeError, UnicodeDecodeError): # If we can't parse as JSON, fall back to full content hash return compute_sha256(content) stripped = _strip_fields(data, FINGERPRINT_STRIP_FIELDS) # Serialize deterministically canonical = json.dumps(stripped, sort_keys=True, ensure_ascii=True).encode() return hashlib.sha256(canonical).hexdigest() @dataclass class DedupResult: """Results of deduplication check for a single file.""" file_path: str sha256: str fingerprint: str exact_duplicate_of: str | None = None near_duplicate_of: str | None = None @dataclass class DedupReport: """Aggregated dedup report across all checked files.""" results: list[DedupResult] = field(default_factory=list) @property def has_exact_duplicates(self) -> bool: return any(r.exact_duplicate_of is not None for r in self.results) @property def has_near_duplicates(self) -> bool: return any(r.near_duplicate_of is not None for r in self.results) def load_manifest(api: HfApi) -> dict[str, Any]: """Download and parse manifest.json from the dataset repo's main branch.""" try: manifest_file = hf_hub_download( repo_id=DATASET_REPO_ID, filename=MANIFEST_PATH, repo_type="dataset", revision="main", ) with open(manifest_file, "r") as f: return json.load(f) except (EntryNotFoundError, RepositoryNotFoundError): logger.warning("manifest.json not found in %s, using empty manifest", DATASET_REPO_ID) return {"files": {}} except Exception: logger.exception("Failed to load manifest.json") return {"files": {}} def check_duplicates( file_paths: list[str], file_contents: dict[str, bytes], manifest: dict[str, Any], ) -> DedupReport: """Check files against the manifest for exact and near duplicates. Args: file_paths: Repo-relative paths of files to check (e.g. "data/gsm8k/.../abc.json") file_contents: Map of repo-relative path -> raw file bytes manifest: Parsed manifest.json with "files" key """ report = DedupReport() manifest_files = manifest.get("files", {}) # Build reverse lookups from manifest sha256_to_path: dict[str, str] = {} fingerprint_to_path: dict[str, str] = {} for path, entry in manifest_files.items(): sha256_to_path[entry["sha256"]] = path fingerprint_to_path[entry["fingerprint"]] = path for file_path in file_paths: content = file_contents.get(file_path) if content is None: continue sha256 = compute_sha256(content) # Only compute fingerprints for .json files (not .jsonl) if file_path.endswith(".json"): fingerprint = compute_fingerprint(content) else: fingerprint = sha256 # For JSONL, fingerprint == sha256 result = DedupResult( file_path=file_path, sha256=sha256, fingerprint=fingerprint, ) # Check exact duplicate if sha256 in sha256_to_path and sha256_to_path[sha256] != file_path: result.exact_duplicate_of = sha256_to_path[sha256] # Check near duplicate (only if not already an exact duplicate) if ( result.exact_duplicate_of is None and fingerprint in fingerprint_to_path and fingerprint_to_path[fingerprint] != file_path ): result.near_duplicate_of = fingerprint_to_path[fingerprint] report.results.append(result) return report