Spaces:
Sleeping
Sleeping
| # evaluator.py | |
| import numpy as np | |
| from collections import defaultdict | |
| from datasets import load_dataset | |
| from transformers import pipeline | |
| import os | |
| from sacrebleu.metrics import BLEU, CHRF | |
| from rouge_score import rouge_scorer | |
| import Levenshtein | |
| from transformers.models.whisper.english_normalizer import BasicTextNormalizer | |
| def calculate_metrics(reference: str, prediction: str) -> dict: | |
| """ | |
| Compute a suite of translation / generation metrics: | |
| - BLEU | |
| - chrF | |
| - CER (character error rate) | |
| - WER (word error rate) | |
| - length ratio | |
| - ROUGE-1 & ROUGE-L | |
| - a combined quality_score | |
| """ | |
| # BLEU | |
| bleu = BLEU(effective_order=True) | |
| bleu_score = bleu.sentence_score(prediction, [reference]).score | |
| # chrF | |
| chrf = CHRF() | |
| chrf_score = chrf.sentence_score(prediction, [reference]).score / 100.0 | |
| # Character error rate | |
| cer = Levenshtein.distance(reference, prediction) / max(len(reference), 1) | |
| # Word error rate | |
| ref_words = reference.split() | |
| pred_words = prediction.split() | |
| wer = Levenshtein.distance(ref_words, pred_words) / max(len(ref_words), 1) | |
| # Length ratio | |
| len_ratio = len(prediction) / max(len(reference), 1) | |
| # ROUGE | |
| rouge_scores = {} | |
| try: | |
| scorer = rouge_scorer.RougeScorer(["rouge1", "rougeL"], use_stemmer=True) | |
| rouge_scores = scorer.score(reference, prediction) | |
| rouge_1 = rouge_scores["rouge1"].fmeasure | |
| rouge_L = rouge_scores["rougeL"].fmeasure | |
| except Exception: | |
| rouge_1 = rouge_L = 0.0 | |
| # Combined quality | |
| try: | |
| quality_score = ( | |
| (bleu_score / 100) | |
| + chrf_score | |
| + (1 - cer) | |
| + (1 - wer) | |
| + rouge_1 | |
| + rouge_L | |
| ) / 6 | |
| except Exception: | |
| quality_score = ( | |
| (bleu_score / 100) + chrf_score + (1 - cer) + (1 - wer) | |
| ) / 4 | |
| return { | |
| "bleu": bleu_score, | |
| "chrf": chrf_score, | |
| "cer": cer, | |
| "wer": wer, | |
| "len_ratio": len_ratio, | |
| "rouge1": rouge_1, | |
| "rougeL": rouge_L, | |
| "quality_score": quality_score, | |
| } | |
| def evaluate_model( | |
| model_name: str, | |
| dataset_name: str, | |
| split: str = "test", | |
| text_field: str = "source", | |
| target_field: str = "target", | |
| task: str = "translation", # or "automatic-speech-recognition", etc. | |
| device: int = 0, | |
| ) -> dict: | |
| """ | |
| Load your dataset, run inference via a 🤗 pipeline, and compute metrics | |
| grouped by language‐pair (if present) plus overall averages. | |
| Returns a dict of shape: | |
| { | |
| "<src>_to_<tgt>": {<metric1>: val, ...}, | |
| ..., | |
| "averages": {<metric1>: val, ...} | |
| } | |
| """ | |
| # Get Hugging Face token from environment variable | |
| hf_token = os.getenv("HF_TOKEN") | |
| if not hf_token: | |
| raise ValueError("Hugging Face token (HF_TOKEN) is not set. Please set it as an environment variable.") | |
| # 1) load test split | |
| ds = load_dataset(dataset_name, split=split, use_auth_token=hf_token) | |
| # 2) build pipeline | |
| nlp = pipeline(task, model=model_name, device=device) | |
| # 3) run inference | |
| normalizer = BasicTextNormalizer() | |
| translations = [] | |
| for ex in tqdm(ds, desc=f"Eval {model_name}"): | |
| src = ex[text_field] | |
| tgt = ex[target_field] | |
| pred = nlp(src)[0].get("translation_text", nlp(src)[0].get("text", "")) | |
| translations.append({ | |
| "source": src, | |
| "target": tgt, | |
| "prediction": pred, | |
| # Optional language metadata: | |
| "source.language": ex.get("source.language", ""), | |
| "target.language": ex.get("target.language", "") | |
| }) | |
| # 4) group by language‐pair | |
| subsets = defaultdict(list) | |
| for ex in translations: | |
| key = ( | |
| f"{ex['source.language']}_to_{ex['target.language']}" | |
| if ex["source.language"] and ex["target.language"] | |
| else "default" | |
| ) | |
| subsets[key].append(ex) | |
| # 5) compute metrics per subset | |
| results = {} | |
| for subset, examples in subsets.items(): | |
| # collect metrics lists | |
| agg = defaultdict(list) | |
| for ex in examples: | |
| ref = normalizer(ex["target"]) | |
| pred = normalizer(ex["prediction"]) | |
| m = calculate_metrics(ref, pred) | |
| for k, v in m.items(): | |
| agg[k].append(v) | |
| # take mean | |
| results[subset] = {k: float(np.mean(vs)) for k, vs in agg.items()} | |
| # 6) overall averages | |
| all_metrics = list(results.values()) | |
| avg = {} | |
| for k in all_metrics[0].keys(): | |
| avg[k] = float(np.mean([m[k] for m in all_metrics])) | |
| results["averages"] = avg | |
| return results | |
| if __name__ == "__main__": | |
| # simple test | |
| import json | |
| out = evaluate_model( | |
| model_name="facebook/wmt19-en-de", | |
| dataset_name="wmt19", | |
| split="test", | |
| ) | |
| print(json.dumps(out, indent=2)) | |