# https://github.com/maszhongming/UniEval/tree/main from dataclasses import dataclass, field from tqdm import tqdm from graphgen.bases.datatypes import QAPair def _add_questions(dimension: str, question: str, answer: str): if dimension == "naturalness": cur_input = ( "question: Is this a natural response in the dialogue? response: " + answer ) elif dimension == "coherence": cur_input = ( "question: Is this a coherent response given the dialogue history? response: " + answer + " dialogue history: " + question ) elif dimension == "understandability": cur_input = ( "question: Is this an understandable response in the dialogue? response: " + answer ) else: raise NotImplementedError( "The input format for this dimension is still undefined. Please customize it first." ) return cur_input @dataclass class UniEvaluator: model_name: str = "MingZhong/unieval-sum" dimensions: list = field( default_factory=lambda: ["naturalness", "coherence", "understandability"] ) max_length: int = 2560 results: dict = None def __post_init__(self): import torch self.num_gpus = torch.cuda.device_count() self.results = {} @staticmethod def process_chunk(rank, pairs, model_name, max_length, dimension, return_dict): import torch from transformers import AutoModelForSeq2SeqLM, AutoTokenizer device = f"cuda:{rank}" torch.cuda.set_device(rank) rank_model = AutoModelForSeq2SeqLM.from_pretrained(model_name) tokenizer = AutoTokenizer.from_pretrained(model_name) rank_model.to(device) rank_model.eval() softmax = torch.nn.Softmax(dim=1) pos_id = tokenizer("Yes")["input_ids"][0] neg_id = tokenizer("No")["input_ids"][0] results = [] with torch.no_grad(): for pair in tqdm(pairs): text = _add_questions(dimension, pair.question, pair.answer) tgt = "No" encoded_src = tokenizer( text, max_length=max_length, truncation=True, padding=True, return_tensors="pt", ) encoded_tgt = tokenizer( tgt, max_length=max_length, truncation=True, padding=True, return_tensors="pt", ) src_tokens = encoded_src["input_ids"].to(device) src_mask = encoded_src["attention_mask"].to(device) tgt_tokens = encoded_tgt["input_ids"].to(device)[:, 0].unsqueeze(-1) output = rank_model( input_ids=src_tokens, attention_mask=src_mask, labels=tgt_tokens, use_cache=False, ) logits = output.logits.view(-1, rank_model.config.vocab_size) pos_score = softmax(logits)[:, pos_id] # Yes neg_score = softmax(logits)[:, neg_id] score = pos_score / (pos_score + neg_score) results.append(score.item()) return_dict[rank] = results def evaluate(self, pairs: list[QAPair]) -> list[dict]: import torch.multiprocessing as mp final_results = [] for dimension in self.dimensions: chunk_size = len(pairs) // self.num_gpus chunks = [] for i in range(self.num_gpus): start = i * chunk_size end = start + chunk_size if i == self.num_gpus - 1: end = len(pairs) chunks.append(pairs[start:end]) # multi-process manager = mp.Manager() return_dict = manager.dict() processes = [] for rank, chunk in enumerate(chunks): p = mp.Process( target=self.process_chunk, args=( rank, chunk, self.model_name, self.max_length, dimension, return_dict, ), ) p.start() processes.append(p) for p in processes: p.join() # 合并结果 results = [] for rank in range(len(chunks)): results.extend(return_dict[rank]) for p in processes: if p.is_alive(): p.terminate() p.join() final_results.append({dimension: results}) return final_results def get_average_score(self, pairs: list[QAPair]) -> dict: """ Get the average score of a batch of texts. """ results = self.evaluate(pairs) final_results = {} for result in results: for key, value in result.items(): final_results[key] = sum(value) / len(value) self.results[key] = value return final_results def get_min_max_score(self, pairs: list[QAPair]) -> dict: """ Get the min and max score of a batch of texts. """ if self.results is None: self.get_average_score(pairs) final_results = {} for key, value in self.results.items(): final_results[key] = min(value), max(value) return final_results