| | import torch
|
| | import argparse
|
| | import random
|
| | import re
|
| | from typing import List, Optional, Union
|
| | from .utils import setup_logging
|
| |
|
| | setup_logging()
|
| | import logging
|
| |
|
| | logger = logging.getLogger(__name__)
|
| |
|
| |
|
| | def prepare_scheduler_for_custom_training(noise_scheduler, device):
|
| | if hasattr(noise_scheduler, "all_snr"):
|
| | return
|
| |
|
| | alphas_cumprod = noise_scheduler.alphas_cumprod
|
| | sqrt_alphas_cumprod = torch.sqrt(alphas_cumprod)
|
| | sqrt_one_minus_alphas_cumprod = torch.sqrt(1.0 - alphas_cumprod)
|
| | alpha = sqrt_alphas_cumprod
|
| | sigma = sqrt_one_minus_alphas_cumprod
|
| | all_snr = (alpha / sigma) ** 2
|
| |
|
| | noise_scheduler.all_snr = all_snr.to(device)
|
| |
|
| |
|
| | def fix_noise_scheduler_betas_for_zero_terminal_snr(noise_scheduler):
|
| |
|
| | logger.info(f"fix noise scheduler betas: https://arxiv.org/abs/2305.08891")
|
| |
|
| | def enforce_zero_terminal_snr(betas):
|
| |
|
| | alphas = 1 - betas
|
| | alphas_bar = alphas.cumprod(0)
|
| | alphas_bar_sqrt = alphas_bar.sqrt()
|
| |
|
| |
|
| | alphas_bar_sqrt_0 = alphas_bar_sqrt[0].clone()
|
| | alphas_bar_sqrt_T = alphas_bar_sqrt[-1].clone()
|
| |
|
| | alphas_bar_sqrt -= alphas_bar_sqrt_T
|
| |
|
| | alphas_bar_sqrt *= alphas_bar_sqrt_0 / (alphas_bar_sqrt_0 - alphas_bar_sqrt_T)
|
| |
|
| |
|
| | alphas_bar = alphas_bar_sqrt**2
|
| | alphas = alphas_bar[1:] / alphas_bar[:-1]
|
| | alphas = torch.cat([alphas_bar[0:1], alphas])
|
| | betas = 1 - alphas
|
| | return betas
|
| |
|
| | betas = noise_scheduler.betas
|
| | betas = enforce_zero_terminal_snr(betas)
|
| | alphas = 1.0 - betas
|
| | alphas_cumprod = torch.cumprod(alphas, dim=0)
|
| |
|
| |
|
| |
|
| |
|
| | noise_scheduler.betas = betas
|
| | noise_scheduler.alphas = alphas
|
| | noise_scheduler.alphas_cumprod = alphas_cumprod
|
| |
|
| |
|
| | def apply_snr_weight(loss, timesteps, noise_scheduler, gamma, v_prediction=False):
|
| | snr = torch.stack([noise_scheduler.all_snr[t] for t in timesteps])
|
| | min_snr_gamma = torch.minimum(snr, torch.full_like(snr, gamma))
|
| | if v_prediction:
|
| | snr_weight = torch.div(min_snr_gamma, snr + 1).float().to(loss.device)
|
| | else:
|
| | snr_weight = torch.div(min_snr_gamma, snr).float().to(loss.device)
|
| | loss = loss * snr_weight
|
| | return loss
|
| |
|
| |
|
| | def scale_v_prediction_loss_like_noise_prediction(loss, timesteps, noise_scheduler):
|
| | scale = get_snr_scale(timesteps, noise_scheduler)
|
| | loss = loss * scale
|
| | return loss
|
| |
|
| |
|
| | def get_snr_scale(timesteps, noise_scheduler):
|
| | snr_t = torch.stack([noise_scheduler.all_snr[t] for t in timesteps])
|
| | snr_t = torch.minimum(snr_t, torch.ones_like(snr_t) * 1000)
|
| | scale = snr_t / (snr_t + 1)
|
| |
|
| |
|
| | return scale
|
| |
|
| |
|
| | def add_v_prediction_like_loss(loss, timesteps, noise_scheduler, v_pred_like_loss):
|
| | scale = get_snr_scale(timesteps, noise_scheduler)
|
| |
|
| | loss = loss + loss / scale * v_pred_like_loss
|
| | return loss
|
| |
|
| |
|
| | def apply_debiased_estimation(loss, timesteps, noise_scheduler, v_prediction=False):
|
| | snr_t = torch.stack([noise_scheduler.all_snr[t] for t in timesteps])
|
| | snr_t = torch.minimum(snr_t, torch.ones_like(snr_t) * 1000)
|
| | if v_prediction:
|
| | weight = 1 / (snr_t + 1)
|
| | else:
|
| | weight = 1 / torch.sqrt(snr_t)
|
| | loss = weight * loss
|
| | return loss
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | def add_custom_train_arguments(parser: argparse.ArgumentParser, support_weighted_captions: bool = True):
|
| | parser.add_argument(
|
| | "--min_snr_gamma",
|
| | type=float,
|
| | default=None,
|
| | help="gamma for reducing the weight of high loss timesteps. Lower numbers have stronger effect. 5 is recommended by paper. / 低いタイムステップでの高いlossに対して重みを減らすためのgamma値、低いほど効果が強く、論文では5が推奨",
|
| | )
|
| | parser.add_argument(
|
| | "--scale_v_pred_loss_like_noise_pred",
|
| | action="store_true",
|
| | help="scale v-prediction loss like noise prediction loss / v-prediction lossをnoise prediction lossと同じようにスケーリングする",
|
| | )
|
| | parser.add_argument(
|
| | "--v_pred_like_loss",
|
| | type=float,
|
| | default=None,
|
| | help="add v-prediction like loss multiplied by this value / v-prediction lossをこの値をかけたものをlossに加算する",
|
| | )
|
| | parser.add_argument(
|
| | "--debiased_estimation_loss",
|
| | action="store_true",
|
| | help="debiased estimation loss / debiased estimation loss",
|
| | )
|
| | if support_weighted_captions:
|
| | parser.add_argument(
|
| | "--weighted_captions",
|
| | action="store_true",
|
| | default=False,
|
| | help="Enable weighted captions in the standard style (token:1.3). No commas inside parens, or shuffle/dropout may break the decoder. / 「[token]」、「(token)」「(token:1.3)」のような重み付きキャプションを有効にする。カンマを括弧内に入れるとシャッフルやdropoutで重みづけがおかしくなるので注意",
|
| | )
|
| |
|
| |
|
| | re_attention = re.compile(
|
| | r"""
|
| | \\\(|
|
| | \\\)|
|
| | \\\[|
|
| | \\]|
|
| | \\\\|
|
| | \\|
|
| | \(|
|
| | \[|
|
| | :([+-]?[.\d]+)\)|
|
| | \)|
|
| | ]|
|
| | [^\\()\[\]:]+|
|
| | :
|
| | """,
|
| | re.X,
|
| | )
|
| |
|
| |
|
| | def parse_prompt_attention(text):
|
| | """
|
| | Parses a string with attention tokens and returns a list of pairs: text and its associated weight.
|
| | Accepted tokens are:
|
| | (abc) - increases attention to abc by a multiplier of 1.1
|
| | (abc:3.12) - increases attention to abc by a multiplier of 3.12
|
| | [abc] - decreases attention to abc by a multiplier of 1.1
|
| | \( - literal character '('
|
| | \[ - literal character '['
|
| | \) - literal character ')'
|
| | \] - literal character ']'
|
| | \\ - literal character '\'
|
| | anything else - just text
|
| | >>> parse_prompt_attention('normal text')
|
| | [['normal text', 1.0]]
|
| | >>> parse_prompt_attention('an (important) word')
|
| | [['an ', 1.0], ['important', 1.1], [' word', 1.0]]
|
| | >>> parse_prompt_attention('(unbalanced')
|
| | [['unbalanced', 1.1]]
|
| | >>> parse_prompt_attention('\(literal\]')
|
| | [['(literal]', 1.0]]
|
| | >>> parse_prompt_attention('(unnecessary)(parens)')
|
| | [['unnecessaryparens', 1.1]]
|
| | >>> parse_prompt_attention('a (((house:1.3)) [on] a (hill:0.5), sun, (((sky))).')
|
| | [['a ', 1.0],
|
| | ['house', 1.5730000000000004],
|
| | [' ', 1.1],
|
| | ['on', 1.0],
|
| | [' a ', 1.1],
|
| | ['hill', 0.55],
|
| | [', sun, ', 1.1],
|
| | ['sky', 1.4641000000000006],
|
| | ['.', 1.1]]
|
| | """
|
| |
|
| | res = []
|
| | round_brackets = []
|
| | square_brackets = []
|
| |
|
| | round_bracket_multiplier = 1.1
|
| | square_bracket_multiplier = 1 / 1.1
|
| |
|
| | def multiply_range(start_position, multiplier):
|
| | for p in range(start_position, len(res)):
|
| | res[p][1] *= multiplier
|
| |
|
| | for m in re_attention.finditer(text):
|
| | text = m.group(0)
|
| | weight = m.group(1)
|
| |
|
| | if text.startswith("\\"):
|
| | res.append([text[1:], 1.0])
|
| | elif text == "(":
|
| | round_brackets.append(len(res))
|
| | elif text == "[":
|
| | square_brackets.append(len(res))
|
| | elif weight is not None and len(round_brackets) > 0:
|
| | multiply_range(round_brackets.pop(), float(weight))
|
| | elif text == ")" and len(round_brackets) > 0:
|
| | multiply_range(round_brackets.pop(), round_bracket_multiplier)
|
| | elif text == "]" and len(square_brackets) > 0:
|
| | multiply_range(square_brackets.pop(), square_bracket_multiplier)
|
| | else:
|
| | res.append([text, 1.0])
|
| |
|
| | for pos in round_brackets:
|
| | multiply_range(pos, round_bracket_multiplier)
|
| |
|
| | for pos in square_brackets:
|
| | multiply_range(pos, square_bracket_multiplier)
|
| |
|
| | if len(res) == 0:
|
| | res = [["", 1.0]]
|
| |
|
| |
|
| | i = 0
|
| | while i + 1 < len(res):
|
| | if res[i][1] == res[i + 1][1]:
|
| | res[i][0] += res[i + 1][0]
|
| | res.pop(i + 1)
|
| | else:
|
| | i += 1
|
| |
|
| | return res
|
| |
|
| |
|
| | def get_prompts_with_weights(tokenizer, prompt: List[str], max_length: int):
|
| | r"""
|
| | Tokenize a list of prompts and return its tokens with weights of each token.
|
| |
|
| | No padding, starting or ending token is included.
|
| | """
|
| | tokens = []
|
| | weights = []
|
| | truncated = False
|
| | for text in prompt:
|
| | texts_and_weights = parse_prompt_attention(text)
|
| | text_token = []
|
| | text_weight = []
|
| | for word, weight in texts_and_weights:
|
| |
|
| | token = tokenizer(word).input_ids[1:-1]
|
| | text_token += token
|
| |
|
| | text_weight += [weight] * len(token)
|
| |
|
| | if len(text_token) > max_length:
|
| | truncated = True
|
| | break
|
| |
|
| | if len(text_token) > max_length:
|
| | truncated = True
|
| | text_token = text_token[:max_length]
|
| | text_weight = text_weight[:max_length]
|
| | tokens.append(text_token)
|
| | weights.append(text_weight)
|
| | if truncated:
|
| | logger.warning("Prompt was truncated. Try to shorten the prompt or increase max_embeddings_multiples")
|
| | return tokens, weights
|
| |
|
| |
|
| | def pad_tokens_and_weights(tokens, weights, max_length, bos, eos, no_boseos_middle=True, chunk_length=77):
|
| | r"""
|
| | Pad the tokens (with starting and ending tokens) and weights (with 1.0) to max_length.
|
| | """
|
| | max_embeddings_multiples = (max_length - 2) // (chunk_length - 2)
|
| | weights_length = max_length if no_boseos_middle else max_embeddings_multiples * chunk_length
|
| | for i in range(len(tokens)):
|
| | tokens[i] = [bos] + tokens[i] + [eos] * (max_length - 1 - len(tokens[i]))
|
| | if no_boseos_middle:
|
| | weights[i] = [1.0] + weights[i] + [1.0] * (max_length - 1 - len(weights[i]))
|
| | else:
|
| | w = []
|
| | if len(weights[i]) == 0:
|
| | w = [1.0] * weights_length
|
| | else:
|
| | for j in range(max_embeddings_multiples):
|
| | w.append(1.0)
|
| | w += weights[i][j * (chunk_length - 2) : min(len(weights[i]), (j + 1) * (chunk_length - 2))]
|
| | w.append(1.0)
|
| | w += [1.0] * (weights_length - len(w))
|
| | weights[i] = w[:]
|
| |
|
| | return tokens, weights
|
| |
|
| |
|
| | def get_unweighted_text_embeddings(
|
| | tokenizer,
|
| | text_encoder,
|
| | text_input: torch.Tensor,
|
| | chunk_length: int,
|
| | clip_skip: int,
|
| | eos: int,
|
| | pad: int,
|
| | no_boseos_middle: Optional[bool] = True,
|
| | ):
|
| | """
|
| | When the length of tokens is a multiple of the capacity of the text encoder,
|
| | it should be split into chunks and sent to the text encoder individually.
|
| | """
|
| | max_embeddings_multiples = (text_input.shape[1] - 2) // (chunk_length - 2)
|
| | if max_embeddings_multiples > 1:
|
| | text_embeddings = []
|
| | for i in range(max_embeddings_multiples):
|
| |
|
| | text_input_chunk = text_input[:, i * (chunk_length - 2) : (i + 1) * (chunk_length - 2) + 2].clone()
|
| |
|
| |
|
| | text_input_chunk[:, 0] = text_input[0, 0]
|
| | if pad == eos:
|
| | text_input_chunk[:, -1] = text_input[0, -1]
|
| | else:
|
| | for j in range(len(text_input_chunk)):
|
| | if text_input_chunk[j, -1] != eos and text_input_chunk[j, -1] != pad:
|
| | text_input_chunk[j, -1] = eos
|
| | if text_input_chunk[j, 1] == pad:
|
| | text_input_chunk[j, 1] = eos
|
| |
|
| | if clip_skip is None or clip_skip == 1:
|
| | text_embedding = text_encoder(text_input_chunk)[0]
|
| | else:
|
| | enc_out = text_encoder(text_input_chunk, output_hidden_states=True, return_dict=True)
|
| | text_embedding = enc_out["hidden_states"][-clip_skip]
|
| | text_embedding = text_encoder.text_model.final_layer_norm(text_embedding)
|
| |
|
| | if no_boseos_middle:
|
| | if i == 0:
|
| |
|
| | text_embedding = text_embedding[:, :-1]
|
| | elif i == max_embeddings_multiples - 1:
|
| |
|
| | text_embedding = text_embedding[:, 1:]
|
| | else:
|
| |
|
| | text_embedding = text_embedding[:, 1:-1]
|
| |
|
| | text_embeddings.append(text_embedding)
|
| | text_embeddings = torch.concat(text_embeddings, axis=1)
|
| | else:
|
| | if clip_skip is None or clip_skip == 1:
|
| | text_embeddings = text_encoder(text_input)[0]
|
| | else:
|
| | enc_out = text_encoder(text_input, output_hidden_states=True, return_dict=True)
|
| | text_embeddings = enc_out["hidden_states"][-clip_skip]
|
| | text_embeddings = text_encoder.text_model.final_layer_norm(text_embeddings)
|
| | return text_embeddings
|
| |
|
| |
|
| | def get_weighted_text_embeddings(
|
| | tokenizer,
|
| | text_encoder,
|
| | prompt: Union[str, List[str]],
|
| | device,
|
| | max_embeddings_multiples: Optional[int] = 3,
|
| | no_boseos_middle: Optional[bool] = False,
|
| | clip_skip=None,
|
| | ):
|
| | r"""
|
| | Prompts can be assigned with local weights using brackets. For example,
|
| | prompt 'A (very beautiful) masterpiece' highlights the words 'very beautiful',
|
| | and the embedding tokens corresponding to the words get multiplied by a constant, 1.1.
|
| |
|
| | Also, to regularize of the embedding, the weighted embedding would be scaled to preserve the original mean.
|
| |
|
| | Args:
|
| | prompt (`str` or `List[str]`):
|
| | The prompt or prompts to guide the image generation.
|
| | max_embeddings_multiples (`int`, *optional*, defaults to `3`):
|
| | The max multiple length of prompt embeddings compared to the max output length of text encoder.
|
| | no_boseos_middle (`bool`, *optional*, defaults to `False`):
|
| | If the length of text token is multiples of the capacity of text encoder, whether reserve the starting and
|
| | ending token in each of the chunk in the middle.
|
| | skip_parsing (`bool`, *optional*, defaults to `False`):
|
| | Skip the parsing of brackets.
|
| | skip_weighting (`bool`, *optional*, defaults to `False`):
|
| | Skip the weighting. When the parsing is skipped, it is forced True.
|
| | """
|
| | max_length = (tokenizer.model_max_length - 2) * max_embeddings_multiples + 2
|
| | if isinstance(prompt, str):
|
| | prompt = [prompt]
|
| |
|
| | prompt_tokens, prompt_weights = get_prompts_with_weights(tokenizer, prompt, max_length - 2)
|
| |
|
| |
|
| | max_length = max([len(token) for token in prompt_tokens])
|
| |
|
| | max_embeddings_multiples = min(
|
| | max_embeddings_multiples,
|
| | (max_length - 1) // (tokenizer.model_max_length - 2) + 1,
|
| | )
|
| | max_embeddings_multiples = max(1, max_embeddings_multiples)
|
| | max_length = (tokenizer.model_max_length - 2) * max_embeddings_multiples + 2
|
| |
|
| |
|
| | bos = tokenizer.bos_token_id
|
| | eos = tokenizer.eos_token_id
|
| | pad = tokenizer.pad_token_id
|
| | prompt_tokens, prompt_weights = pad_tokens_and_weights(
|
| | prompt_tokens,
|
| | prompt_weights,
|
| | max_length,
|
| | bos,
|
| | eos,
|
| | no_boseos_middle=no_boseos_middle,
|
| | chunk_length=tokenizer.model_max_length,
|
| | )
|
| | prompt_tokens = torch.tensor(prompt_tokens, dtype=torch.long, device=device)
|
| |
|
| |
|
| | text_embeddings = get_unweighted_text_embeddings(
|
| | tokenizer,
|
| | text_encoder,
|
| | prompt_tokens,
|
| | tokenizer.model_max_length,
|
| | clip_skip,
|
| | eos,
|
| | pad,
|
| | no_boseos_middle=no_boseos_middle,
|
| | )
|
| | prompt_weights = torch.tensor(prompt_weights, dtype=text_embeddings.dtype, device=device)
|
| |
|
| |
|
| | previous_mean = text_embeddings.float().mean(axis=[-2, -1]).to(text_embeddings.dtype)
|
| | text_embeddings = text_embeddings * prompt_weights.unsqueeze(-1)
|
| | current_mean = text_embeddings.float().mean(axis=[-2, -1]).to(text_embeddings.dtype)
|
| | text_embeddings = text_embeddings * (previous_mean / current_mean).unsqueeze(-1).unsqueeze(-1)
|
| |
|
| | return text_embeddings
|
| |
|
| |
|
| |
|
| | def pyramid_noise_like(noise, device, iterations=6, discount=0.4):
|
| | b, c, w, h = noise.shape
|
| | u = torch.nn.Upsample(size=(w, h), mode="bilinear").to(device)
|
| | for i in range(iterations):
|
| | r = random.random() * 2 + 2
|
| | wn, hn = max(1, int(w / (r**i))), max(1, int(h / (r**i)))
|
| | noise += u(torch.randn(b, c, wn, hn).to(device)) * discount**i
|
| | if wn == 1 or hn == 1:
|
| | break
|
| | return noise / noise.std()
|
| |
|
| |
|
| |
|
| | def apply_noise_offset(latents, noise, noise_offset, adaptive_noise_scale):
|
| | if noise_offset is None:
|
| | return noise
|
| | if adaptive_noise_scale is not None:
|
| |
|
| |
|
| | latent_mean = torch.abs(latents.mean(dim=(2, 3), keepdim=True))
|
| |
|
| |
|
| | noise_offset = noise_offset + adaptive_noise_scale * latent_mean
|
| | noise_offset = torch.clamp(noise_offset, 0.0, None)
|
| |
|
| | noise = noise + noise_offset * torch.randn((latents.shape[0], latents.shape[1], 1, 1), device=latents.device)
|
| | return noise
|
| |
|
| |
|
| | def apply_masked_loss(loss, batch):
|
| | if "conditioning_images" in batch:
|
| |
|
| | mask_image = batch["conditioning_images"].to(dtype=loss.dtype)[:, 0].unsqueeze(1)
|
| | mask_image = mask_image / 2 + 0.5
|
| |
|
| | elif "alpha_masks" in batch and batch["alpha_masks"] is not None:
|
| |
|
| | mask_image = batch["alpha_masks"].to(dtype=loss.dtype).unsqueeze(1)
|
| |
|
| | else:
|
| | return loss
|
| |
|
| |
|
| | mask_image = torch.nn.functional.interpolate(mask_image, size=loss.shape[2:], mode="area")
|
| | loss = loss * mask_image
|
| | return loss
|
| |
|
| |
|
| | """
|
| | ##########################################
|
| | # Perlin Noise
|
| | def rand_perlin_2d(device, shape, res, fade=lambda t: 6 * t**5 - 15 * t**4 + 10 * t**3):
|
| | delta = (res[0] / shape[0], res[1] / shape[1])
|
| | d = (shape[0] // res[0], shape[1] // res[1])
|
| |
|
| | grid = (
|
| | torch.stack(
|
| | torch.meshgrid(torch.arange(0, res[0], delta[0], device=device), torch.arange(0, res[1], delta[1], device=device)),
|
| | dim=-1,
|
| | )
|
| | % 1
|
| | )
|
| | angles = 2 * torch.pi * torch.rand(res[0] + 1, res[1] + 1, device=device)
|
| | gradients = torch.stack((torch.cos(angles), torch.sin(angles)), dim=-1)
|
| |
|
| | tile_grads = (
|
| | lambda slice1, slice2: gradients[slice1[0] : slice1[1], slice2[0] : slice2[1]]
|
| | .repeat_interleave(d[0], 0)
|
| | .repeat_interleave(d[1], 1)
|
| | )
|
| | dot = lambda grad, shift: (
|
| | torch.stack((grid[: shape[0], : shape[1], 0] + shift[0], grid[: shape[0], : shape[1], 1] + shift[1]), dim=-1)
|
| | * grad[: shape[0], : shape[1]]
|
| | ).sum(dim=-1)
|
| |
|
| | n00 = dot(tile_grads([0, -1], [0, -1]), [0, 0])
|
| | n10 = dot(tile_grads([1, None], [0, -1]), [-1, 0])
|
| | n01 = dot(tile_grads([0, -1], [1, None]), [0, -1])
|
| | n11 = dot(tile_grads([1, None], [1, None]), [-1, -1])
|
| | t = fade(grid[: shape[0], : shape[1]])
|
| | return 1.414 * torch.lerp(torch.lerp(n00, n10, t[..., 0]), torch.lerp(n01, n11, t[..., 0]), t[..., 1])
|
| |
|
| |
|
| | def rand_perlin_2d_octaves(device, shape, res, octaves=1, persistence=0.5):
|
| | noise = torch.zeros(shape, device=device)
|
| | frequency = 1
|
| | amplitude = 1
|
| | for _ in range(octaves):
|
| | noise += amplitude * rand_perlin_2d(device, shape, (frequency * res[0], frequency * res[1]))
|
| | frequency *= 2
|
| | amplitude *= persistence
|
| | return noise
|
| |
|
| |
|
| | def perlin_noise(noise, device, octaves):
|
| | _, c, w, h = noise.shape
|
| | perlin = lambda: rand_perlin_2d_octaves(device, (w, h), (4, 4), octaves)
|
| | noise_perlin = []
|
| | for _ in range(c):
|
| | noise_perlin.append(perlin())
|
| | noise_perlin = torch.stack(noise_perlin).unsqueeze(0) # (1, c, w, h)
|
| | noise += noise_perlin # broadcast for each batch
|
| | return noise / noise.std() # Scaled back to roughly unit variance
|
| | """
|
| |
|