Spaces:
Build error
Build error
| import json | |
| import logging | |
| import time | |
| from dataclasses import InitVar, asdict, dataclass | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional | |
| import git | |
| import yaml | |
| from simple_parsing import ArgumentParser, Serializable | |
| from simple_parsing.helpers import dict_field, list_field | |
| from m4.training.types import DatasetNames, DatasetTypes | |
| from m4.training.utils import FAKE_TOKEN_AROUND_IMAGE_V2, IMAGE_TOKEN, LoggingTypes | |
| logger = logging.getLogger(__name__) | |
| class CfgFileConfig: | |
| """Config file args""" | |
| # path to config file | |
| config: Optional[Path] = None | |
| # set to false if you don't want to save config automatically | |
| save_config: bool = True | |
| class GlobalBatchSizeRampUp: | |
| """These are init variables that are used to set up the GBS ramp up protocol""" | |
| # global batch size ramp up protocol: | |
| # | |
| # 1. start with global batch size `start` | |
| # 2. every time the number of `samples` is consumed increment global batch size by `increment` | |
| # 3. repeat step 2 until global batch size reaches `finish` | |
| start: Optional[int] = None | |
| finish: Optional[int] = None | |
| increment: Optional[int] = None | |
| samples: Optional[int] = None | |
| class GlobalBatchSizeRampUpRunningParams: | |
| """The are running variables that are used to tell when to increment GBS and when to stop doing | |
| that, they are never set directly in the config file, but are calculated when the training starts. | |
| """ | |
| global_seen_samples: int = 0 | |
| global_batch_size_current: int = 0 | |
| next_goal_samples: int = 0 | |
| grad_acc_size_current: int = 1 | |
| class Hparams: | |
| """General Hyperparameters""" | |
| # -------------------- | |
| # General parameters | |
| # -------------------- | |
| seed: int = 13 | |
| # If set to True, the sole purpose of the job is to pre-process the dataset (i.e. the map | |
| # operations). The job will exit as soon as the dataset is pre-processed. | |
| just_preprocess: bool = False | |
| jz_job_time_sec: Optional[float] = None | |
| jz_start_time: float = time.time() | |
| job_id: Optional[int] = None | |
| timeout: int = 1800 # 30 min | |
| # set to False to ignore the optimizer states when loading from a checkpoint | |
| load_optimizer_states: Optional[bool] = True | |
| # set to False to disable this gpu memory saving method | |
| gradient_checkpointing: Optional[bool] = True | |
| # -------------------- | |
| # Model-related hparams | |
| # -------------------- | |
| tokenizer_name: str = "gpt2" | |
| # The value of the string will evaluated (i.e. interpreted) and must be a dict | |
| tokenizer_params: str = '{"use_fast":True}' | |
| tokenizer_add_tokens: str = ( | |
| f'[AddedToken("{FAKE_TOKEN_AROUND_IMAGE_V2}", rstrip=False, lstrip=False), AddedToken("{IMAGE_TOKEN}",' | |
| " rstrip=False, lstrip=False)]" | |
| ) | |
| # The value of the string will evaluated (i.e. interpreted). Unnecessary if tokenizer has a `pad_token`. | |
| tokenizer_add_special_tokens: str = '{"pad_token": tokenizer.eos_token}' | |
| model_name: str = "gpt2" | |
| revision: str = "main" | |
| model_params: Dict[str, Any] = dict_field( | |
| dict( | |
| vision_embed_dim=768, | |
| vision_image_size=224, | |
| vision_model_name="google/vit-base-patch16-224", | |
| # The value of the string will evaluated (i.e. interpreted) and must be a dict | |
| vision_model_params="{}", | |
| # Ties the word embedding with LM head's weights | |
| # Since word embedding is frozen, use in conjuncation with freeze_lm_head=True | |
| tie_word_embeddings=False, | |
| # Freeze different parts of the model | |
| freeze_lm_head=False, | |
| freeze_text_layers=True, | |
| freeze_text_module_exceptions=[], | |
| freeze_vision_layers=True, | |
| freeze_vision_module_exceptions=[], | |
| # Perceiver Resampler Parameters | |
| use_resampler=False, | |
| resampler_n_latents=64, | |
| resampler_depth=6, | |
| resampler_n_heads=16, | |
| resampler_head_dim=96, | |
| ) | |
| ) | |
| # -------------------- | |
| # Training parameters | |
| # -------------------- | |
| resume_run: Optional[bool] = None | |
| do_validation: bool = True | |
| # deprecated in favor of batch_size_per_gpu | |
| batch_size: Optional[int] = None | |
| batch_size_per_gpu: int = 1 | |
| global_batch_size: Optional[int] = None | |
| global_batch_size_ramp_up: GlobalBatchSizeRampUp = GlobalBatchSizeRampUp() | |
| grad_acc_size: Optional[int] = 1 | |
| grad_clip: float = 1.0 | |
| # weights by which to multiply the loss of each dataset when accumulating gradients over datasets | |
| loss_weights_per_dataset: Optional[List[float]] = None | |
| # int(max_num_tokens / (batch_size * max_seq_len * grad_acc_size * num_processes)) | |
| max_num_opt_steps: Optional[int] = 500_000 | |
| max_num_opt_steps_this_run: Optional[int] = None | |
| max_num_epochs: Optional[int] = None | |
| # If the path appears the program will stop after finishing the current training step | |
| kill_switch_path: Optional[Path] = None | |
| # If the path appears the program will save a checkpoint and immediately delete this flag | |
| save_switch_path: Optional[Path] = None | |
| # -------------------- | |
| # Logging parameters | |
| # -------------------- | |
| train_logging_opt_steps: int = 50 | |
| train_logging_per_dataset_suffix: str = "" | |
| # If a specific logging type is specified, per dataset information will be inserted inside | |
| # those logs. | |
| train_logging_per_dataset_info: List[LoggingTypes] = list_field(LoggingTypes.JSONL, LoggingTypes.WANDB) | |
| # If `train_logging_activations` is not empty, hooks will be inserted to the model to track | |
| # the min/max/std/norm of the activations and weights. This will slow down training. | |
| # See https://huggingface.co/docs/transformers/main/en/debugging#underflow-and-overflow-detection | |
| train_logging_activations: List[LoggingTypes] = list_field() | |
| train_logging_activations_opt_steps: Optional[int] = 25 | |
| train_logging_grad_param_deepspeed: List[LoggingTypes] = list_field() | |
| train_logging_grad_param_deepspeed_opt_steps: int = 50 | |
| val_logging_opt_steps: int = train_logging_opt_steps * 5 | |
| val_inline_logging_opt_steps: int = train_logging_opt_steps | |
| train_saving_opt_steps: int = train_logging_opt_steps * 5 | |
| save_dir: Optional[Path] = None | |
| upload_to_s3: bool = False | |
| train_log_mem_usage: bool = False | |
| timing_break_down: bool = False | |
| save_batch_max_idx: Optional[int] = None | |
| save_batch_min_idx: Optional[int] = None | |
| # ---------------------- | |
| # Wandb Parameters | |
| # ---------------------- | |
| wandb_enable: bool = False | |
| # name of the project | |
| wandb_project: str = "VLOOM" | |
| wandb_entity: str = "huggingfacem4" | |
| # name of the wandb entity | |
| wandb_log_freq: int = 50 | |
| wandb_run_id: str = "" | |
| wandb_tags: Optional[List[str]] = None | |
| repo_commit_id: Optional[str] = None | |
| # ---------------------- | |
| # Debug Parameters | |
| # ---------------------- | |
| use_torch_profiler: bool = False | |
| class ResumeParams: | |
| # ---------------------- | |
| # Resume run Parameters | |
| # ---------------------- | |
| # Need to make sure that resume_run is True to give an input here | |
| opt_step_dir: Optional[Path] = None | |
| accelerator_state_dir: Optional[Path] = None | |
| model_file: Optional[Path] = None | |
| model_config_file: Optional[Path] = None | |
| # Automatically resumes last run of the save_dir. Set to False to choose a specific run | |
| resume_last: bool = True | |
| train_logs: Dict = dict_field() | |
| resume_opt_step: int = 0 | |
| resume_epoch: int = 0 | |
| resume_dataset_state: List = list_field() | |
| gbs_running: GlobalBatchSizeRampUpRunningParams = GlobalBatchSizeRampUpRunningParams() | |
| class DatasetParams: | |
| # This always need to be specified as it is needed by dataset utils down the line | |
| dataset_name: DatasetNames | |
| # max number of images per sample | |
| max_num_images: int = 5 | |
| # maximum sequence length | |
| max_seq_len: int = 256 | |
| training_datasets_paths: List[Path] = list_field() | |
| validation_datasets_paths: List[Path] = list_field() | |
| # if True, instead of split and pack, each instance in sample will be | |
| # either truncated or padded to the same length. | |
| pad_dataset: bool = False | |
| map_batch_size: int = 64 | |
| # Preprocessing number of processes in map (not useful for processing on the fly) | |
| map_num_proc: Optional[int] = None | |
| # Decides how many number of samples/subsequence should be extracted from the | |
| # CM4 corpus when the dataset is to be padded irrelavent otherwise as full packing | |
| # is used | |
| max_num_samples_per_document: int = 10 | |
| # Strategy for detecting blur, laplacian or fft | |
| blur_strategy: str = "fft" | |
| # Threshold for blur detection, 0.0 means disabled. Set 32 for "laplacian" and | |
| # 10 for "fft" for starters | |
| blur_threshold: float = 0.0 | |
| add_begin_of_doc_token: bool = False | |
| add_end_of_doc_token: bool = True | |
| shuffle_after_packing: bool = False | |
| # Parameters for T5 MLM | |
| t5_mlm_noise_density: float = 0.15 | |
| t5_mlm_mean_noise_span_length: int = 3 | |
| dataset_type: Optional[DatasetTypes] = None | |
| # Parameters for webdataset pipeline | |
| shuffle_initial_urls_list: bool = False | |
| shuffle_before_split_by_node_buffer_size: Optional[int] = None | |
| shuffle_before_split_by_worker_buffer_size: Optional[int] = None | |
| shuffle_after_tarfile_to_samples_buffer_size: Optional[int] = None | |
| shuffle_after_batching_buffer_size: Optional[int] = None | |
| class ImageCaptionPairedDatasetParams(DatasetParams): | |
| # PMD only: This value decides the probability of the image token being at the start | |
| # of the text or at the end of the text. Set to 0.5 for equal probability. | |
| # Set to 0 for the image always at start. | |
| prob_image_at_end: float = 0.5 | |
| # PMD only: Specifies the tolerance for the amount of padding in a sequence. If set | |
| # to -1, then all padding will be tolerated. If set to 0, then no padding will be tolerated. | |
| # Continuously increase this value to allow more padding in the sequence. | |
| padding_tolerance: int = -1 | |
| dataset_type: DatasetTypes = DatasetTypes.IMAGE_CAPTION_PAIRS | |
| class WebDocumentsDatasetParams(DatasetParams): | |
| # Decide how often should the image attention mask is such that the | |
| # the text attends to next image. Set to 0 for just perceding images | |
| # NOTE: For PMD, this option doesn't apply anymore. Use `prob_image_at_end` | |
| # to control the position of the image and corresponding image. | |
| p_next: float = 0.5 | |
| dataset_type: DatasetTypes = DatasetTypes.WEB_DOCUMENTS | |
| class DataParams(Serializable): | |
| """Data Parameters""" | |
| # what software to use for the dataset | |
| use_webdataset: bool = False | |
| # number of workers for dataloaders int | |
| num_workers: int = 1 | |
| # allow async faster data transfer to GPUs (only make sense when CUDA GPUs are available) | |
| # known to cause memory issues | |
| pin_memory: bool = False | |
| # Whether to use persistent workers for the dataloaders | |
| persistent_workers: bool = True | |
| realtime_processing: bool = False | |
| train_seed: int = 1 | |
| val_seed: int = 2 | |
| # can use one config for both train + validation or specific ones if need to be different | |
| select_n_examples: Optional[int] = None | |
| select_n_examples_train: Optional[int] = None | |
| select_n_examples_validation: Optional[int] = None | |
| # TODO: Move to per dataset params as it makes more sense there | |
| proba_interleaving_dataset: Optional[List[float]] = None | |
| pmd: ImageCaptionPairedDatasetParams = ImageCaptionPairedDatasetParams(dataset_name=DatasetNames.PMD) | |
| laion: ImageCaptionPairedDatasetParams = ImageCaptionPairedDatasetParams(dataset_name=DatasetNames.LAION) | |
| cm4: WebDocumentsDatasetParams = WebDocumentsDatasetParams(dataset_name=DatasetNames.CM4) | |
| wiki: WebDocumentsDatasetParams = WebDocumentsDatasetParams(dataset_name=DatasetNames.WIKI) | |
| class OptimizerParams: | |
| """Optimization parameters""" | |
| # -------------------- | |
| # vl optim parameters | |
| # -------------------- | |
| vl_optim: str = "AdamW" | |
| vl_optim_params: Dict[str, Any] = dict_field( | |
| dict( | |
| # learning rate | |
| lr=1e-4, | |
| # betas for adam | |
| betas=(0.9, 0.999), | |
| weight_decay=0.1, | |
| no_decay=["bias", "alpha", "layernorm", "ln", "layer_norm", "perceiver_resampler"], | |
| ) | |
| ) | |
| vl_lr_scheduler: str = "get_constant_schedule_with_warmup" | |
| # number of warmup steps for the learning rate | |
| vl_lr_scheduler_params: Dict[str, Any] = dict_field(dict(num_warmup_steps=5_000, last_epoch=-1)) | |
| z_loss: float = 0.0 | |
| class Parameters(Serializable): | |
| """base options.""" | |
| hparams: Hparams = Hparams() | |
| optim_param: OptimizerParams = OptimizerParams() | |
| data_param: DataParams = DataParams() | |
| resume_param: ResumeParams = ResumeParams() | |
| should_verify: InitVar[bool] = True | |
| def verify(self, should_verify: bool): | |
| if not should_verify: | |
| return | |
| dict_rep = vars(self) | |
| expected = vars(self.__class__(should_verify=False)) | |
| for key, value in dict_rep.items(): | |
| if isinstance(value, dict): | |
| diff = set(value.keys()) - set(asdict(expected[key]).keys()) | |
| raise TypeError( | |
| f"{key} in {self.__class__.__name__} has extra keys: {diff}. Please fix your config if you are" | |
| " using one." | |
| ) | |
| if key not in expected: | |
| raise ValueError(f"{key} is not a valid parameter for {self.__class__.__name__}") | |
| def __post_init__(self, should_verify: bool = True): | |
| """Post-initialization code""" | |
| self.verify(should_verify=should_verify) | |
| # copy select_n_examples to the more specific ones if the latter haven't been preset | |
| if self.data_param.select_n_examples is not None: | |
| if self.data_param.select_n_examples_train is None: | |
| self.data_param.select_n_examples_train = self.data_param.select_n_examples | |
| if self.data_param.select_n_examples_validation is None: | |
| self.data_param.select_n_examples_validation = self.data_param.select_n_examples | |
| # Get commit id | |
| if self.hparams.repo_commit_id is None: | |
| self.hparams.repo_commit_id = git.Repo(search_parent_directories=True).head.object.hexsha | |
| # If processing on the fly, with the current implementation, we can't have `num_workers=0` | |
| if self.data_param.realtime_processing and self.data_param.num_workers == 0: | |
| raise ValueError( | |
| "If doing processing on the fly (and thus using the `IterableDataset`), you can't have `num_workers`" | |
| " equal to 0." | |
| ) | |
| # batch_size deprecation | |
| if self.hparams.batch_size is not None: | |
| if self.hparams.batch_size_per_gpu > 1: | |
| raise ValueError( | |
| "as hparams.batch_size is deprecated - don't know how to proceed with both hparams.batch_size>1" | |
| " and hparams.batch_size_per_gpu > 1" | |
| ) | |
| else: | |
| logger.warning( | |
| "will use the deprecated hparams.batch_size, but transition to hparams.batch_size_per_gpu instead" | |
| ) | |
| self.hparams.batch_size_per_gpu = self.hparams.batch_size | |
| self.hparams.batch_size = None | |
| # Assign batch size to data_param as well for dataloaders | |
| self.data_param.batch_size = self.hparams.batch_size_per_gpu | |
| # note: all global batch_size-related configs including hparams.grad_acc_size will be | |
| # checked/set in trainer's setup_batch_size_related_configs since we need to know the value | |
| # of num_processes | |
| # Assign loggingtypes given values | |
| self.hparams.train_logging_activations = [LoggingTypes(val) for val in self.hparams.train_logging_activations] | |
| # Check that proba_interleaving_dataset is mutually exclusive to loss_weights_per_dataset | |
| if self.data_param.proba_interleaving_dataset and self.hparams.loss_weights_per_dataset: | |
| raise ValueError( | |
| "Can't have hparams.loss_weights_per_dataset and proba_interleaving_dataset. If we have" | |
| " loss_weights_per_dataset, it means the gradients are accumulated over datasets. Therefore a batch of" | |
| " each given at each update and there is no use of proba_interleaving_dataset" | |
| ) | |
| if ( | |
| self.data_param.proba_interleaving_dataset is not None | |
| and sum(self.data_param.proba_interleaving_dataset) != 1 | |
| ): | |
| raise ValueError("proba_interleaving_dataset must sum to 1") | |
| self.hparams.train_logging_grad_param_deepspeed = [ | |
| LoggingTypes(val) for val in self.hparams.train_logging_grad_param_deepspeed | |
| ] | |
| # Resume run if there is already an existing folder for this run | |
| if self.hparams.save_dir is not None and self.hparams.save_dir.exists(): | |
| save_dir_has_checkpoints = ( | |
| len([dir for dir in self.hparams.save_dir.iterdir() if (dir.is_dir() and "opt_step" in str(dir))]) > 0 | |
| ) | |
| if self.hparams.resume_run is not None and not self.hparams.resume_run and save_dir_has_checkpoints: | |
| logger.warning( | |
| "`resume_run` was explicitely set to False (i.e. starting from scratch), but the experiment" | |
| " folder already has been populated with previous runs.\nAlready saved checkpoints will be" | |
| " overwritten (at best, when `train_saving_opt_steps` is the same) or will be mixed with the new" | |
| " checkpoints of a potentially brand new experiment. Would it make sense to create a new" | |
| " `save_dir`?" | |
| ) | |
| self.hparams.resume_run = save_dir_has_checkpoints | |
| # Setup all args needed to resume a run | |
| if self.hparams.resume_run: | |
| # Get last step directory | |
| if self.resume_param.opt_step_dir is None and not self.resume_param.resume_last: | |
| raise ValueError( | |
| "`opt_step_dir` cannot be None while `resume_last` is False. Choose which dir you want to resume" | |
| " from..." | |
| ) | |
| if self.resume_param.resume_last: | |
| if self.resume_param.opt_step_dir is not None: | |
| raise ValueError( | |
| "`resume_last` cannot be True while `opt_step_dir` is not None. Choose which dir you want to" | |
| " resume from..." | |
| ) | |
| latest_path = self.hparams.save_dir / "latest_opt_step_dir" | |
| with open(latest_path, "r") as fd: | |
| self.resume_param.opt_step_dir = Path(fd.read().strip()) | |
| if not (self.resume_param.opt_step_dir.exists() and self.resume_param.opt_step_dir.is_dir()): | |
| raise ValueError( | |
| f"It appears that the path in the `latest_opt_step_dir` file {latest_path} is invalid. It's" | |
| " either does not exist or is not a directory. Please fix that." | |
| ) | |
| with open(self.resume_param.opt_step_dir / "resume_run_infos.json", "r") as f: | |
| resume_infos = json.load(f) | |
| logger.info(f"Resuming from {self.resume_param.opt_step_dir}") | |
| self.resume_param.accelerator_state_dir = self.resume_param.opt_step_dir / "accelerator_state" | |
| self.resume_param.model_file = self.resume_param.opt_step_dir / "unwrapped_model" | |
| self.resume_param.model_config_file = self.resume_param.opt_step_dir / "unwrapped_model/config.json" | |
| self.resume_param.tokenizer = self.resume_param.opt_step_dir / "tokenizer" | |
| self.resume_param.train_logs = resume_infos["train_logs"] | |
| self.resume_param.resume_opt_step = resume_infos["resume_opt_step"] | |
| self.resume_param.resume_epoch = resume_infos["resume_epoch"] | |
| self.resume_param.resume_dataset_state = resume_infos.get("resume_dataset_state", list()) | |
| gbs_running = resume_infos["gbs_running"] | |
| self.resume_param.gbs_running.global_batch_size_current = gbs_running["global_batch_size_current"] | |
| self.resume_param.gbs_running.global_seen_samples = gbs_running["global_seen_samples"] | |
| self.resume_param.gbs_running.next_goal_samples = gbs_running["next_goal_samples"] | |
| self.resume_param.gbs_running.grad_acc_size_current = gbs_running["grad_acc_size_current"] | |
| self.hparams.wandb_run_id = resume_infos["wandb_run_id"] | |
| self.hparams.seed = resume_infos["seed"] | |
| # Should not happen, but this is in case there is a run mixing | |
| # wandb_enable = True and wandb_enable = False between jobs | |
| if not self.hparams.wandb_enable: | |
| self.hparams.wandb_run_id = "" | |
| def parse(cls): | |
| cfgfile_parser = ArgumentParser(add_help=False) | |
| cfgfile_parser.add_arguments(CfgFileConfig, dest="cfgfile") | |
| cfgfile_args, rest = cfgfile_parser.parse_known_args() | |
| cfgfile: CfgFileConfig = cfgfile_args.cfgfile | |
| file_config: Optional[Parameters] = None | |
| if cfgfile.config is not None: | |
| file_config = Parameters.load(cfgfile.config, load_fn=yaml.safe_load) | |
| parser = ArgumentParser() | |
| # add cfgfile args so they appear in the help message | |
| parser.add_arguments(CfgFileConfig, dest="cfgfile") | |
| parser.add_arguments(Parameters, dest="parameters", default=file_config) | |
| # XXX: currently when called from tests we don't want to parse pytest arguments, so either | |
| # this whole logic needs to be rewritten to not always call `parser.parse_args` but only | |
| # when needed, for now as a workaround using `parse_known_args` and ignoring the args which | |
| # don't belong to this program | |
| args, unknown = parser.parse_known_args() | |
| parameters: Parameters = args.parameters | |
| parameters.save_config = cfgfile.save_config | |
| return parameters | |
| def save_config_state(self): | |
| if self.save_config: | |
| self.hparams.save_dir.mkdir(parents=True, exist_ok=True) | |
| if self.hparams.job_id is not None: | |
| config_file_name = f"{self.hparams.job_id}_config.yaml" | |
| else: | |
| config_file_name = "config.yaml" | |
| self.save(self.hparams.save_dir / config_file_name, indent=4) | |
| def get_config(print_config: bool = True): | |
| parameters: Parameters = Parameters.parse() | |
| if print_config: | |
| print(parameters) | |
| return parameters | |
| if __name__ == "__main__": | |
| config = get_config() | |
| config.save_config_state() | |