| import json | |
| import logging | |
| import os | |
| import zipfile | |
| from typing import Optional | |
| from llm_studio.src.utils.exceptions import LLMResourceException | |
| from llm_studio.src.utils.utils import add_file_to_zip | |
| def get_artifact_path_path( | |
| experiment_name: str, experiment_path: str, artifact_type: str | |
| ): | |
| """Get path to experiment artifact zipfile | |
| Args: | |
| experiment_name: name of the experiment | |
| experiment_path: path containing experiment related files | |
| artifact_type: type of the artifact | |
| Returns: | |
| Path to the zip file with experiment artifact | |
| """ | |
| return os.path.join(experiment_path, f"{artifact_type}_{experiment_name}.zip") | |
| def get_predictions_path(experiment_name: str, experiment_path: str): | |
| """Get path to experiment predictions""" | |
| return get_artifact_path_path(experiment_name, experiment_path, "preds") | |
| def get_logs_path(experiment_name: str, experiment_path: str): | |
| """Get path to experiment logs""" | |
| return get_artifact_path_path(experiment_name, experiment_path, "logs") | |
| def get_model_path(experiment_name: str, experiment_path: str): | |
| """Get path to experiment model""" | |
| return get_artifact_path_path(experiment_name, experiment_path, "model") | |
| def check_available_space(output_folder: str, min_disk_space: Optional[float]): | |
| if not min_disk_space: | |
| return True | |
| stats = os.statvfs(output_folder) | |
| available_size = stats.f_frsize * stats.f_bavail | |
| if available_size < min_disk_space: | |
| error = ( | |
| f"Not enough disk space. Available space is {get_size_str(available_size)}." | |
| f" Required space is {get_size_str(min_disk_space)}." | |
| ) | |
| raise LLMResourceException(error) | |
| def save_prediction_outputs( | |
| experiment_name: str, | |
| experiment_path: str, | |
| ): | |
| """Save experiment prediction | |
| Args: | |
| experiment_name: name of the experiment | |
| experiment_path: path containing experiment related files | |
| Returns: | |
| Path to the zip file with experiment predictions | |
| """ | |
| zip_path = get_predictions_path(experiment_name, experiment_path) | |
| zf = zipfile.ZipFile(zip_path, "w") | |
| add_file_to_zip(zf=zf, path=f"{experiment_path}/validation_raw_predictions.pkl") | |
| add_file_to_zip(zf=zf, path=f"{experiment_path}/validation_predictions.csv") | |
| zf.close() | |
| return zip_path | |
| def save_logs(experiment_name: str, experiment_path: str, logs: dict): | |
| """Save experiment logs | |
| Args: | |
| experiment_name: name of the experiment | |
| experiment_path: path containing experiment related files | |
| logs: dictionary with experiment charts | |
| Returns: | |
| Path to the zip file with experiment logs | |
| """ | |
| cfg_path = os.path.join(experiment_path, "cfg.yaml") | |
| charts_path = f"{experiment_path}/charts_{experiment_name}.json" | |
| with open(charts_path, "w") as fp: | |
| json.dump( | |
| {k: v for k, v in logs.items() if k in ["meta", "train", "validation"]}, fp | |
| ) | |
| zip_path = get_logs_path(experiment_name, experiment_path) | |
| zf = zipfile.ZipFile(zip_path, "w") | |
| zf.write(charts_path, os.path.basename(charts_path)) | |
| zf.write(cfg_path, f"cfg_{experiment_name}.yaml") | |
| try: | |
| zf.write( | |
| f"{experiment_path}/logs.log", | |
| f"logs_{experiment_name}.log", | |
| ) | |
| except FileNotFoundError: | |
| logging.warning("Log file is not available yet.") | |
| zf.close() | |
| return zip_path | |
| def get_size_str( | |
| x, sig_figs=2, input_unit="B", output_unit="dynamic", show_unit=True | |
| ) -> str: | |
| """ | |
| Convert a small input unit such as bytes to human readable format. | |
| Args: | |
| x: input value | |
| sig_figs: number of significant figures | |
| input_unit: input unit ("B", "KB", "MB", "GB", "TB"), default "B" | |
| output_unit: output unit ("B", "KB", "MB", "GB", "TB", "dynamic") | |
| default "dynamic" | |
| show_unit: whether to show the unit in the output string | |
| Returns: | |
| str: Human readable string | |
| """ | |
| names = ["B", "KB", "MB", "GB", "TB"] | |
| names = names[names.index(input_unit) :] | |
| act_i = 0 | |
| if output_unit == "dynamic": | |
| while x >= 1024 and act_i < len(names) - 1: | |
| x /= 1024 | |
| act_i += 1 | |
| else: | |
| target = names.index(output_unit) | |
| while act_i < target: | |
| x /= 1024 | |
| act_i += 1 | |
| ret_str = f"{str(round(x, sig_figs))}" | |
| if show_unit: | |
| ret_str += f" {names[act_i]}" | |
| return ret_str | |