import glob import json import math import os from dataclasses import dataclass import dateutil import numpy as np from src.display.formatting import make_clickable_model # changes to be made here from src.display.utils import AutoEvalColumn, ModelType, ModelArch, Precision, HarnessTasks, WeightType, OpenEndedColumns, MedSafetyColumns, MedicalSummarizationColumns, ACIColumns, SOAPColumns, HealthbenchColumns, HealthbenchHardColumns, ClosedEndedMultilingualColumns, OpenEndedArabicColumn, OpenEndedFrenchColumn, OpenEndedSpanishColumn, OpenEndedPortugueseColumn, OpenEndedRomanianColumn, OpenEndedGreekColumn, EHRSQLZeroShotColumns, EHRSQLFewShotColumns, MedCalcDirectAnswerColumns, MedCalcOneShotCotColumns, MedCalcZeroShotCotColumns, MedECZeroShotColumns, MedECOneShotColumns from src.submission.check_validity import is_model_on_hub from src.envs import PRIVATE_REPO @dataclass class EvalResult: """Represents one full evaluation. Built from a combination of the result and request file for a given run.""" eval_name: str # org_model_precision (uid) full_model: str # org/model (path on hub) org: str model: str revision: str # commit hash, "" if main dataset_results: dict # changes to be made here open_ended_results: dict med_safety_results: dict medical_summarization_results: dict aci_results: dict soap_results: dict healthbench_results: dict healthbench_hard_results: dict open_ended_arabic_results: dict open_ended_french_results: dict open_ended_spanish_results: dict open_ended_portuguese_results: dict open_ended_romanian_results: dict open_ended_greek_results: dict closed_ended_multilingual_results: dict ehrsql_zero_shot_results: dict ehrsql_few_shot_results: dict medcalc_direct_answer_results: dict medcalc_one_shot_cot_results: dict medcalc_zero_shot_cot_results: dict medec_zero_shot_results: dict medec_one_shot_results: dict is_domain_specific: bool use_chat_template: bool # clinical_type_results:dict precision: Precision = Precision.Unknown model_type: ModelType = ModelType.Unknown # Pretrained, fine tuned, ... weight_type: WeightType = WeightType.Original # Original or Adapter backbone:str = "Unknown" license: str = "?" likes: int = 0 num_params: int = 0 date: str = "" # submission date of request file still_on_hub: bool = False display_result:bool = True @classmethod def init_from_json_file(self, json_filepath, evaluation_metric): """Inits the result from the specific model result file""" with open(json_filepath) as fp: try: data = json.load(fp) except: breakpoint() # if "deepseek-ai/DeepSeek-R1-Distill-Llama-70B" in json_filepath: # breakpoint() config = data.get("config") # Precision precision = Precision.from_str(config.get("model_dtype")) model_type = ModelType.from_str(config.get("model_type", "")) license = config.get("license", "?") num_params = config.get("num_params", "?") num_params = -1 if num_params == "?" or num_params is None or isinstance(num_params, float) and math.isnan(num_params) else num_params display_result = config.get("display_result", True) display_result = False if display_result=="False" else True # Get model and org org_and_model = config.get("model_name", config.get("model_args", None)) org_and_model = org_and_model.split("/", 1) if len(org_and_model) == 1: org = None model = org_and_model[0] result_key = f"{model}_{precision.value.name}" else: org = org_and_model[0] model = org_and_model[1] result_key = f"{org}_{model}_{precision.value.name}" full_model = "/".join(org_and_model) still_on_hub, _, model_config = is_model_on_hub( full_model, config.get("revision", "main"), trust_remote_code=True, test_tokenizer=False ) backbone = "?" if model_config is not None: backbones = getattr(model_config, "architectures", None) if backbones: backbone = ";".join(backbones) # Extract results available in this file (some results are split in several files) harness_results = {} if "closed-ended" in data["results"]: for task in HarnessTasks: task = task.value # We average all scores of a given metric (not all metrics are present in all files) try: accs = np.array([v.get(task.metric, None) for k, v in data["results"]["closed-ended"].items() if task.benchmark == k]) except: # breakpoint() accs = np.array([]) if accs.size == 0 or any([acc is None for acc in accs]): continue mean_acc = np.mean(accs) # * 100.0 harness_results[task.benchmark] = mean_acc open_ended_results = {} if "open-ended" in data["results"]: for task in OpenEndedColumns: task = task.value # We average all scores of a given metric (not all metrics are present in all files) accs = data["results"]["open-ended"]["overall"][task.benchmark] if task.benchmark in data["results"]["open-ended"]["overall"] else None open_ended_results[task.benchmark] = accs if open_ended_results["ELO_intervals"] is not None and open_ended_results["Score_intervals"] is not None: open_ended_results["ELO_intervals"] = "+" + str(open_ended_results["ELO_intervals"][1]) + "/-" + str(abs(open_ended_results["ELO_intervals"][0])) open_ended_results["Score_intervals"] = "+" + str(open_ended_results["Score_intervals"][1]) + "/-" + str(abs(open_ended_results["Score_intervals"][0])) # if "deepseek-ai/DeepSeek-R1-Distill-Llama-70B" in json_filepath: # breakpoint() # changes to be made here med_safety_results = {} if "med-safety" in data["results"]: for task in MedSafetyColumns: task = task.value if task.benchmark == "Harmfulness Score": accs = data["results"]["med-safety"][task.benchmark] med_safety_results[task.benchmark] = accs elif task.benchmark == "95% CI": accs = data["results"]["med-safety"][task.benchmark] med_safety_results[task.benchmark] = "+" + str(round(accs[1], 3)) + "/-" + str(round(abs(accs[0]), 3)) else: accs = data["results"]["med-safety"][task.benchmark]["score"] med_safety_results[task.benchmark] = accs medical_summarization_results = {} if "medical-summarization" in data["results"]: for task in MedicalSummarizationColumns: task = task.value try: accs = np.array([v for k, v in data["results"]["medical-summarization"]["clinical_trial"].items() if task.benchmark == k]) except: accs = np.array([]) if accs.size == 0 or any([acc is None for acc in accs]): continue mean_acc = np.mean(accs) # * 100.0 medical_summarization_results[task.benchmark] = mean_acc aci_results = {} if "note-generation" in data["results"] and "aci" in data["results"]["note-generation"]: for task in ACIColumns: task = task.value try: accs = np.array([v for k, v in data["results"]["note-generation"]["aci"].items() if task.benchmark == k]) except: accs = np.array([]) if accs.size == 0 or any([acc is None for acc in accs]): continue mean_acc = np.mean(accs) # * 100.0 aci_results[task.benchmark] = mean_acc soap_results = {} if "note-generation" in data["results"] and "soap" in data["results"]["note-generation"]: for task in SOAPColumns: task = task.value try: accs = np.array([v for k, v in data["results"]["note-generation"]["soap"].items() if task.benchmark == k]) except: accs = np.array([]) if accs.size == 0 or any([acc is None for acc in accs]): continue mean_acc = np.mean(accs) # * 100.0 soap_results[task.benchmark] = mean_acc healthbench_results = {} if "healthbench" in data["results"]: for task in HealthbenchColumns: task = task.value if task.benchmark == "Overall Score": accs = data["results"]["healthbench"][task.benchmark] healthbench_results[task.benchmark] = accs elif task.benchmark.startswith("Axis"): accs = data["results"]["healthbench"]["Axis Scores"][task.benchmark.replace("Axis: ", "")] healthbench_results[task.benchmark] = accs else: accs = data["results"]["healthbench"]["Theme Scores"][task.benchmark] healthbench_results[task.benchmark] = accs healthbench_hard_results = {} if "healthbench-hard" in data["results"]: for task in HealthbenchHardColumns: task = task.value if task.benchmark == "Overall Score": accs = data["results"]["healthbench-hard"][task.benchmark] healthbench_hard_results[task.benchmark] = accs elif task.benchmark.startswith("Axis"): accs = data["results"]["healthbench-hard"]["Axis Scores"][task.benchmark.replace("Axis: ", "")] healthbench_hard_results[task.benchmark] = accs else: accs = data["results"]["healthbench-hard"]["Theme Scores"][task.benchmark] healthbench_hard_results[task.benchmark] = accs open_ended_arabic_results = {} if "open-ended-arabic" in data["results"]: for task in OpenEndedArabicColumn: task = task.value # We average all scores of a given metric (not all metrics are present in all files) accs = data["results"]["open-ended-arabic"]["overall"][task.benchmark] if task.benchmark in data["results"]["open-ended-arabic"]["overall"] else None open_ended_arabic_results[task.benchmark] = accs if open_ended_arabic_results["ELO_intervals"] is not None and open_ended_arabic_results["Score_intervals"] is not None: open_ended_arabic_results["ELO_intervals"] = "+" + str(open_ended_arabic_results["ELO_intervals"][1]) + "/-" + str(abs(float(open_ended_arabic_results["ELO_intervals"][0]))) open_ended_arabic_results["Score_intervals"] = "+" + str(open_ended_arabic_results["Score_intervals"][1]) + "/-" + str(abs(float(open_ended_arabic_results["Score_intervals"][0]))) open_ended_french_results = {} if "open-ended-french" in data["results"]: for task in OpenEndedFrenchColumn: task = task.value # We average all scores of a given metric (not all metrics are present in all files) accs = data["results"]["open-ended-french"]["overall"][task.benchmark] if task.benchmark in data["results"]["open-ended-french"]["overall"] else None open_ended_french_results[task.benchmark] = accs if open_ended_french_results["ELO_intervals"] is not None and open_ended_french_results["Score_intervals"] is not None: open_ended_french_results["ELO_intervals"] = "+" + str(open_ended_french_results["ELO_intervals"][1]) + "/-" + str(abs(open_ended_french_results["ELO_intervals"][0])) open_ended_french_results["Score_intervals"] = "+" + str(open_ended_french_results["Score_intervals"][1]) + "/-" + str(abs(open_ended_french_results["Score_intervals"][0])) open_ended_spanish_results = {} if "open-ended-spanish" in data["results"]: for task in OpenEndedSpanishColumn: task = task.value # We average all scores of a given metric (not all metrics are present in all files) accs = data["results"]["open-ended-spanish"]["overall"][task.benchmark] if task.benchmark in data["results"]["open-ended-spanish"]["overall"] else None open_ended_spanish_results[task.benchmark] = accs if open_ended_spanish_results["ELO_intervals"] is not None and open_ended_spanish_results["Score_intervals"] is not None: open_ended_spanish_results["ELO_intervals"] = "+" + str(open_ended_spanish_results["ELO_intervals"][1]) + "/-" + str(abs(open_ended_spanish_results["ELO_intervals"][0])) open_ended_spanish_results["Score_intervals"] = "+" + str(open_ended_spanish_results["Score_intervals"][1]) + "/-" + str(abs(open_ended_spanish_results["Score_intervals"][0])) open_ended_portuguese_results = {} if "open-ended-portuguese" in data["results"]: for task in OpenEndedPortugueseColumn: task = task.value # We average all scores of a given metric (not all metrics are present in all files) accs = data["results"]["open-ended-portuguese"]["overall"][task.benchmark] if task.benchmark in data["results"]["open-ended-portuguese"]["overall"] else None open_ended_portuguese_results[task.benchmark] = accs if open_ended_portuguese_results["ELO_intervals"] is not None and open_ended_portuguese_results["Score_intervals"] is not None: open_ended_portuguese_results["ELO_intervals"] = "+" + str(open_ended_portuguese_results["ELO_intervals"][1]) + "/-" + str(abs(open_ended_portuguese_results["ELO_intervals"][0])) open_ended_portuguese_results["Score_intervals"] = "+" + str(open_ended_portuguese_results["Score_intervals"][1]) + "/-" + str(abs(open_ended_portuguese_results["Score_intervals"][0])) open_ended_romanian_results = {} if "open-ended-romanian" in data["results"]: for task in OpenEndedRomanianColumn: task = task.value # We average all scores of a given metric (not all metrics are present in all files) accs = data["results"]["open-ended-romanian"]["overall"][task.benchmark] if task.benchmark in data["results"]["open-ended-romanian"]["overall"] else None open_ended_romanian_results[task.benchmark] = accs if open_ended_romanian_results["ELO_intervals"] is not None and open_ended_romanian_results["Score_intervals"] is not None: open_ended_romanian_results["ELO_intervals"] = "+" + str(open_ended_romanian_results["ELO_intervals"][1]) + "/-" + str(abs(open_ended_romanian_results["ELO_intervals"][0])) open_ended_romanian_results["Score_intervals"] = "+" + str(open_ended_romanian_results["Score_intervals"][1]) + "/-" + str(abs(open_ended_romanian_results["Score_intervals"][0])) open_ended_greek_results = {} if "open-ended-greek" in data["results"]: for task in OpenEndedGreekColumn: task = task.value # We average all scores of a given metric (not all metrics are present in all files) accs = data["results"]["open-ended-greek"]["overall"][task.benchmark] if task.benchmark in data["results"]["open-ended-greek"]["overall"] else None open_ended_greek_results[task.benchmark] = accs if open_ended_greek_results["ELO_intervals"] is not None and open_ended_greek_results["Score_intervals"] is not None: open_ended_greek_results["ELO_intervals"] = "+" + str(open_ended_greek_results["ELO_intervals"][1]) + "/-" + str(abs(float(open_ended_greek_results["ELO_intervals"][0]))) open_ended_greek_results["Score_intervals"] = "+" + str(open_ended_greek_results["Score_intervals"][1]) + "/-" + str(abs(float(open_ended_greek_results["Score_intervals"][0]))) closed_ended_multilingual_results = {} if "closed-ended-multilingual" in data["results"]: for task in ClosedEndedMultilingualColumns: task = task.value accs = data["results"]["closed-ended-multilingual"][task.benchmark]["accuracy"] if task.benchmark in data["results"]["closed-ended-multilingual"] else None closed_ended_multilingual_results[task.benchmark] = accs ehrsql_zero_shot_results = {} if "ehrsql" in data["results"] and "zero_shot" in data["results"]["ehrsql"]: for task in EHRSQLZeroShotColumns: task = task.value accs = data["results"]["ehrsql"]["zero_shot"][task.benchmark] if task.benchmark in data["results"]["ehrsql"]["zero_shot"] else None ehrsql_zero_shot_results[task.benchmark] = accs ehrsql_few_shot_results = {} if "ehrsql" in data["results"] and "few_shot" in data["results"]["ehrsql"]: for task in EHRSQLFewShotColumns: task = task.value accs = data["results"]["ehrsql"]["few_shot"][task.benchmark] if task.benchmark in data["results"]["ehrsql"]["few_shot"] else None ehrsql_few_shot_results[task.benchmark] = accs medcalc_direct_answer_results = {} if "medcalc" in data["results"] and "direct_answer" in data["results"]["medcalc"]: for task in MedCalcDirectAnswerColumns: task = task.value accs = data["results"]["medcalc"]["direct_answer"].get(task.benchmark, {}).get("average", None) medcalc_direct_answer_results[task.benchmark] = accs medcalc_one_shot_cot_results = {} if "medcalc" in data["results"] and "one_shot_cot" in data["results"]["medcalc"]: for task in MedCalcOneShotCotColumns: task = task.value accs = data["results"]["medcalc"]["one_shot_cot"].get(task.benchmark, {}).get("average", None) medcalc_one_shot_cot_results[task.benchmark] = accs medcalc_zero_shot_cot_results = {} if "medcalc" in data["results"] and "zero_shot_cot" in data["results"]["medcalc"]: for task in MedCalcZeroShotCotColumns: task = task.value accs = data["results"]["medcalc"]["zero_shot_cot"].get(task.benchmark, {}).get("average", None) medcalc_zero_shot_cot_results[task.benchmark] = accs medec_zero_shot_results = {} if "medec" in data["results"] and "zero_shot" in data["results"]["medec"]: for task in MedECZeroShotColumns: task = task.value accs = data["results"]["medec"]["zero_shot"].get(task.benchmark, None) medec_zero_shot_results[task.benchmark] = accs medec_one_shot_results = {} if "medec" in data["results"] and "one_shot" in data["results"]["medec"]: for task in MedECOneShotColumns: task = task.value accs = data["results"]["medec"]["one_shot"].get(task.benchmark, None) medec_one_shot_results[task.benchmark] = accs # #add the # closed_ended_arabic_results = {} # if PRIVATE_REPO and "closed-ended-arabic" in data["results"]: # for task in ClosedEndedArabicColumns: # task = task.value # # We average all scores of a given metric (not all metrics are present in all files) # try: # accs = np.array([v.get(task.metric, None) for k, v in data["results"]["closed-ended-arabic"].items() if task.benchmark == k]) # except: # # breakpoint() # accs = np.array([]) # if accs.size == 0 or any([acc is None for acc in accs]): # continue # mean_acc = np.mean(accs) # * 100.0 # closed_ended_arabic_results[task.benchmark] = mean_acc # if open_ended_results == {} or med_safety_results == {} or medical_summarization_results == {} or aci_results == {} or soap_results == {}: # open_ended_results = {} # med_safety_results = {} # medical_summarization_results = {} # aci_results = {} # soap_results = {} # types_results = {} # for clinical_type in ClinicalTypes: # clinical_type = clinical_type.value # # We average all scores of a given metric (not all metrics are present in all files) # accs = np.array([v.get(clinical_type.metric, None) for k, v in data[evaluation_metric]["clinical_type_results"].items() if clinical_type.benchmark == k]) # if accs.size == 0 or any([acc is None for acc in accs]): # continue # mean_acc = np.mean(accs) # * 100.0 # types_results[clinical_type.benchmark] = mean_acc # if "deepseek-ai/DeepSeek-R1-Distill-Llama-70B" in json_filepath: # breakpoint() return self( eval_name=result_key, full_model=full_model, org=org, model=model, revision=config.get("revision", ""), dataset_results=harness_results, open_ended_results=open_ended_results, med_safety_results=med_safety_results, medical_summarization_results=medical_summarization_results, aci_results=aci_results, soap_results=soap_results, healthbench_results=healthbench_results, healthbench_hard_results=healthbench_hard_results, open_ended_arabic_results=open_ended_arabic_results, open_ended_french_results=open_ended_french_results, open_ended_spanish_results=open_ended_spanish_results, open_ended_portuguese_results=open_ended_portuguese_results, open_ended_romanian_results=open_ended_romanian_results, open_ended_greek_results=open_ended_greek_results, closed_ended_multilingual_results=closed_ended_multilingual_results, ehrsql_zero_shot_results=ehrsql_zero_shot_results, ehrsql_few_shot_results=ehrsql_few_shot_results, medcalc_direct_answer_results=medcalc_direct_answer_results, medcalc_one_shot_cot_results=medcalc_one_shot_cot_results, medcalc_zero_shot_cot_results=medcalc_zero_shot_cot_results, medec_zero_shot_results=medec_zero_shot_results, medec_one_shot_results=medec_one_shot_results, is_domain_specific=config.get("is_domain_specific", False), # Assuming a default value use_chat_template=config.get("use_chat_template", False), # Assuming a default value precision=precision, model_type=model_type, weight_type=WeightType.from_str(config.get("weight_type", "")), # Assuming the default value backbone=backbone, license=license, likes=config.get("likes", 0), # Assuming a default value num_params=num_params, still_on_hub=still_on_hub, display_result=display_result, date=config.get("submitted_time","") ) def update_with_request_file(self, requests_path): """Finds the relevant request file for the current model and updates info with it""" request_file = get_request_file_for_model(requests_path, self.full_model, self.precision.value.name) try: with open(request_file, "r") as f: request = json.load(f) self.model_type = ModelType.from_str(request.get("model_type", "")) self.weight_type = WeightType[request.get("weight_type", "Original")] self.license = request.get("license", "?") self.likes = request.get("likes", 0) self.num_params = request.get("params", 0) self.date = request.get("submitted_time", "") # self.precision = request.get("precision", "float32") except Exception: pass # print( # f"Could not find request file for {self.org}/{self.model} with precision {self.precision.value.name}" # ) # print(f" Args used were - {request_file=}, {requests_path=}, {self.full_model=},") def to_dict(self, subset): """Converts the Eval Result to a dict compatible with our dataframe display""" data_dict = { "eval_name": self.eval_name, # not a column, just a save name, AutoEvalColumn.precision.name: self.precision.value.name, AutoEvalColumn.model_type.name: self.model_type.value.name, # AutoEvalColumn.model_type_symbol.name: self.model_type.value.symbol + (" 🏥" if self.is_domain_specific else ""), AutoEvalColumn.weight_type.name: self.weight_type.value.name, # AutoEvalColumn.architecture.name: self.architecture.value.name, # AutoEvalColumn.backbone.name: self.backbone, AutoEvalColumn.model.name: make_clickable_model(self.full_model), AutoEvalColumn.is_domain_specific.name: self.is_domain_specific, AutoEvalColumn.use_chat_template.name: self.use_chat_template, AutoEvalColumn.revision.name: self.revision, AutoEvalColumn.license.name: self.license, AutoEvalColumn.likes.name: self.likes, AutoEvalColumn.params.name: self.num_params, AutoEvalColumn.still_on_hub.name: self.still_on_hub, AutoEvalColumn.date.name: self.date, "display_result" : self.display_result, } if subset == "datasets": average = sum([v for v in self.dataset_results.values() if v is not None]) / len(HarnessTasks) data_dict[AutoEvalColumn.average.name] = average if len(self.dataset_results) > 0: for task in HarnessTasks: data_dict[task.value.col_name] = self.dataset_results[task.value.benchmark] return data_dict if subset == "open_ended": if len(self.open_ended_results) > 0: for task in OpenEndedColumns: data_dict[task.value.col_name] = self.open_ended_results[task.value.benchmark] return data_dict # changes to be made here if subset == "med_safety": if len(self.med_safety_results) > 0: for task in MedSafetyColumns: data_dict[task.value.col_name] = self.med_safety_results[task.value.benchmark] return data_dict if subset == "medical_summarization": if len(self.medical_summarization_results) > 0: adjusted_conciseness = max(0, self.medical_summarization_results["brief"]) coverage = self.medical_summarization_results["coverage"] hm = 2 / (1/coverage + 1/adjusted_conciseness) if not (adjusted_conciseness == 0 or coverage == 0) else 0 conformity = self.medical_summarization_results["conform"] consistency = self.medical_summarization_results["fact"] overall = sum([hm, conformity, consistency]) / 3 data_dict[AutoEvalColumn.overall.name] = overall for task in MedicalSummarizationColumns: data_dict[task.value.col_name] = self.medical_summarization_results[task.value.benchmark] return data_dict if subset == "aci": overall = sum([v for v in self.aci_results.values() if v is not None]) / len(ACIColumns) data_dict[AutoEvalColumn.overall.name] = overall if len(self.aci_results) > 0: for task in ACIColumns: data_dict[task.value.col_name] = self.aci_results[task.value.benchmark] return data_dict if subset == "soap": overall = sum([v for v in self.soap_results.values() if v is not None]) / len(SOAPColumns) data_dict[AutoEvalColumn.overall.name] = overall if len(self.soap_results) > 0: for task in SOAPColumns: data_dict[task.value.col_name] = self.soap_results[task.value.benchmark] return data_dict if subset == "healthbench": if len(self.healthbench_results) > 0: for task in HealthbenchColumns: data_dict[task.value.col_name] = self.healthbench_results[task.value.benchmark] return data_dict if subset == "healthbench_hard": if len(self.healthbench_hard_results) > 0: for task in HealthbenchHardColumns: data_dict[task.value.col_name] = self.healthbench_hard_results[task.value.benchmark] return data_dict if subset == "open_ended_arabic": if len(self.open_ended_arabic_results) > 0: for task in OpenEndedArabicColumn: data_dict[task.value.col_name] = self.open_ended_arabic_results[task.value.benchmark] return data_dict if subset == "open_ended_french": if len(self.open_ended_french_results) > 0: for task in OpenEndedFrenchColumn: data_dict[task.value.col_name] = self.open_ended_french_results[task.value.benchmark] return data_dict if subset == "open_ended_spanish": if len(self.open_ended_spanish_results) > 0: for task in OpenEndedSpanishColumn: data_dict[task.value.col_name] = self.open_ended_spanish_results[task.value.benchmark] return data_dict if subset == "open_ended_portuguese": if len(self.open_ended_portuguese_results) > 0: for task in OpenEndedPortugueseColumn: data_dict[task.value.col_name] = self.open_ended_portuguese_results[task.value.benchmark] return data_dict if subset == "open_ended_romanian": if len(self.open_ended_romanian_results) > 0: for task in OpenEndedRomanianColumn: data_dict[task.value.col_name] = self.open_ended_romanian_results[task.value.benchmark] return data_dict if subset == "open_ended_greek": if len(self.open_ended_greek_results) > 0: for task in OpenEndedGreekColumn: data_dict[task.value.col_name] = self.open_ended_greek_results[task.value.benchmark] return data_dict if subset == "closed_ended_multilingual": average = sum([v for v in self.closed_ended_multilingual_results.values() if v is not None]) / len(ClosedEndedMultilingualColumns) data_dict[AutoEvalColumn.average.name] = average if len(self.closed_ended_multilingual_results) > 0: for task in ClosedEndedMultilingualColumns: data_dict[task.value.col_name] = self.closed_ended_multilingual_results[task.value.benchmark] return data_dict if subset == "ehrsql_zero_shot": if len(self.ehrsql_zero_shot_results) > 0: for task in EHRSQLZeroShotColumns: data_dict[task.value.col_name] = self.ehrsql_zero_shot_results[task.value.benchmark] return data_dict if subset == "ehrsql_few_shot": if len(self.ehrsql_few_shot_results) > 0: for task in EHRSQLFewShotColumns: data_dict[task.value.col_name] = self.ehrsql_few_shot_results[task.value.benchmark] return data_dict if subset == "medcalc_direct_answer": if len(self.medcalc_direct_answer_results) > 0: for task in MedCalcDirectAnswerColumns: data_dict[task.value.col_name] = self.medcalc_direct_answer_results[task.value.benchmark] return data_dict if subset == "medcalc_one_shot_cot": if len(self.medcalc_one_shot_cot_results) > 0: for task in MedCalcOneShotCotColumns: data_dict[task.value.col_name] = self.medcalc_one_shot_cot_results[task.value.benchmark] return data_dict if subset == "medcalc_zero_shot_cot": if len(self.medcalc_zero_shot_cot_results) > 0: for task in MedCalcZeroShotCotColumns: data_dict[task.value.col_name] = self.medcalc_zero_shot_cot_results[task.value.benchmark] return data_dict if subset == "medec_zero_shot": if len(self.medec_zero_shot_results) > 0: for task in MedECZeroShotColumns: data_dict[task.value.col_name] = self.medec_zero_shot_results[task.value.benchmark] return data_dict if subset == "medec_one_shot": if len(self.medec_one_shot_results) > 0: for task in MedECOneShotColumns: data_dict[task.value.col_name] = self.medec_one_shot_results[task.value.benchmark] return data_dict def get_request_file_for_model(requests_path, model_name, precision): """Selects the correct request file for a given model. Only keeps runs tagged as FINISHED""" request_files = os.path.join( requests_path, f"{model_name}_eval_request_*.json", ) request_files = glob.glob(request_files) # Select correct request file (precision) request_file = "" request_files = sorted(request_files, reverse=True) for tmp_request_file in request_files: with open(tmp_request_file, "r") as f: req_content = json.load(f) if req_content["status"] in ["FINISHED"] and req_content["precision"] == precision.split(".")[-1]: request_file = tmp_request_file return request_file def update_results(result1, result2): # breakpoint() for key in dir(result1): if key.endswith("_results"): if getattr(result1, key) == {}: setattr(result1, key, getattr(result2, key)) # breakpoint() return result1 def get_raw_eval_results(results_path: str, requests_path: str, evaluation_metric: str) -> list[EvalResult]: """From the path of the results folder root, extract all needed info for results""" model_result_filepaths = [] for root, _, files in os.walk(results_path): # We should only have json files in model results if len(files) == 0 or any([not f.endswith(".json") for f in files]): continue # Sort the files by date try: files.sort(key=lambda x: x.removesuffix(".json").removeprefix("results_")[:-7]) except dateutil.parser._parser.ParserError: files = [files[-1]] for file in files: model_result_filepaths.append(os.path.join(root, file)) # breakpoint() eval_results = {} for model_result_filepath in model_result_filepaths: # Creation of result eval_result = EvalResult.init_from_json_file(model_result_filepath, evaluation_metric) # eval_result.update_with_request_file(requests_path) # Store results of same eval together eval_name = eval_result.eval_name if eval_name in eval_results.keys(): eval_results[eval_name] = update_results(eval_results[eval_name], eval_result) # eval_results[eval_name].results.update({k: v for k, v in eval_result.results.items() if v is not None}) else: eval_results[eval_name] = eval_result # breakpoint() results = [] # clinical_type_results = [] for v in eval_results.values(): if not v.display_result: continue results.append(v) # breakpoint() return results