eee_validator / dedup.py
deepmage121's picture
initial commit, space validation stuff
92ea780
"""Deduplication module for EEE validation pipeline.
Two-level dedup:
- Exact duplicates: SHA256 hash of entire file content
- Near duplicates: SHA256 hash of content minus timestamps/UUIDs
"""
import hashlib
import json
import logging
from dataclasses import dataclass, field
from typing import Any
from huggingface_hub import HfApi, hf_hub_download
from huggingface_hub.utils import EntryNotFoundError, RepositoryNotFoundError
logger = logging.getLogger(__name__)
DATASET_REPO_ID = "evaleval/EEE_datastore"
MANIFEST_PATH = "manifest.json"
# Fields to strip for near-duplicate fingerprinting
FINGERPRINT_STRIP_FIELDS = {
"retrieved_timestamp",
"evaluation_id",
"evaluation_timestamp",
}
def compute_sha256(content: bytes) -> str:
return hashlib.sha256(content).hexdigest()
def _strip_fields(data: dict[str, Any], fields_to_strip: set[str]) -> dict[str, Any]:
"""Recursively strip specified fields from a dict for fingerprinting."""
result = {}
for key, value in data.items():
if key in fields_to_strip:
continue
if isinstance(value, dict):
result[key] = _strip_fields(value, fields_to_strip)
elif isinstance(value, list):
result[key] = [
_strip_fields(item, fields_to_strip) if isinstance(item, dict) else item
for item in value
]
else:
result[key] = value
return result
def compute_fingerprint(content: bytes) -> str:
"""Compute a near-duplicate fingerprint by hashing content minus timestamps/UUIDs."""
try:
data = json.loads(content)
except (json.JSONDecodeError, UnicodeDecodeError):
# If we can't parse as JSON, fall back to full content hash
return compute_sha256(content)
stripped = _strip_fields(data, FINGERPRINT_STRIP_FIELDS)
# Serialize deterministically
canonical = json.dumps(stripped, sort_keys=True, ensure_ascii=True).encode()
return hashlib.sha256(canonical).hexdigest()
@dataclass
class DedupResult:
"""Results of deduplication check for a single file."""
file_path: str
sha256: str
fingerprint: str
exact_duplicate_of: str | None = None
near_duplicate_of: str | None = None
@dataclass
class DedupReport:
"""Aggregated dedup report across all checked files."""
results: list[DedupResult] = field(default_factory=list)
@property
def has_exact_duplicates(self) -> bool:
return any(r.exact_duplicate_of is not None for r in self.results)
@property
def has_near_duplicates(self) -> bool:
return any(r.near_duplicate_of is not None for r in self.results)
def load_manifest(api: HfApi) -> dict[str, Any]:
"""Download and parse manifest.json from the dataset repo's main branch."""
try:
manifest_file = hf_hub_download(
repo_id=DATASET_REPO_ID,
filename=MANIFEST_PATH,
repo_type="dataset",
revision="main",
)
with open(manifest_file, "r") as f:
return json.load(f)
except (EntryNotFoundError, RepositoryNotFoundError):
logger.warning("manifest.json not found in %s, using empty manifest", DATASET_REPO_ID)
return {"files": {}}
except Exception:
logger.exception("Failed to load manifest.json")
return {"files": {}}
def check_duplicates(
file_paths: list[str],
file_contents: dict[str, bytes],
manifest: dict[str, Any],
) -> DedupReport:
"""Check files against the manifest for exact and near duplicates.
Args:
file_paths: Repo-relative paths of files to check (e.g. "data/gsm8k/.../abc.json")
file_contents: Map of repo-relative path -> raw file bytes
manifest: Parsed manifest.json with "files" key
"""
report = DedupReport()
manifest_files = manifest.get("files", {})
# Build reverse lookups from manifest
sha256_to_path: dict[str, str] = {}
fingerprint_to_path: dict[str, str] = {}
for path, entry in manifest_files.items():
sha256_to_path[entry["sha256"]] = path
fingerprint_to_path[entry["fingerprint"]] = path
for file_path in file_paths:
content = file_contents.get(file_path)
if content is None:
continue
sha256 = compute_sha256(content)
# Only compute fingerprints for .json files (not .jsonl)
if file_path.endswith(".json"):
fingerprint = compute_fingerprint(content)
else:
fingerprint = sha256 # For JSONL, fingerprint == sha256
result = DedupResult(
file_path=file_path,
sha256=sha256,
fingerprint=fingerprint,
)
# Check exact duplicate
if sha256 in sha256_to_path and sha256_to_path[sha256] != file_path:
result.exact_duplicate_of = sha256_to_path[sha256]
# Check near duplicate (only if not already an exact duplicate)
if (
result.exact_duplicate_of is None
and fingerprint in fingerprint_to_path
and fingerprint_to_path[fingerprint] != file_path
):
result.near_duplicate_of = fingerprint_to_path[fingerprint]
report.results.append(result)
return report