Spaces:
Runtime error
Runtime error
| import math | |
| import numpy as np | |
| import torch | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| class CausalSelfAttention(nn.Module): | |
| """ | |
| A vanilla multi-head masked self-attention layer with a projection at the end. | |
| It is possible to use torch.nn.MultiheadAttention here but I am including an | |
| explicit implementation here to show that there is nothing too scary here. | |
| """ | |
| def __init__(self, bert_n_emb, bert_n_head, attn_pdrop, resid_pdrop, | |
| latent_shape, sampler): | |
| super().__init__() | |
| assert bert_n_emb % bert_n_head == 0 | |
| # key, query, value projections for all heads | |
| self.key = nn.Linear(bert_n_emb, bert_n_emb) | |
| self.query = nn.Linear(bert_n_emb, bert_n_emb) | |
| self.value = nn.Linear(bert_n_emb, bert_n_emb) | |
| # regularization | |
| self.attn_drop = nn.Dropout(attn_pdrop) | |
| self.resid_drop = nn.Dropout(resid_pdrop) | |
| # output projection | |
| self.proj = nn.Linear(bert_n_emb, bert_n_emb) | |
| self.n_head = bert_n_head | |
| self.causal = True if sampler == 'autoregressive' else False | |
| if self.causal: | |
| block_size = np.prod(latent_shape) | |
| mask = torch.tril(torch.ones(block_size, block_size)) | |
| self.register_buffer("mask", mask.view(1, 1, block_size, | |
| block_size)) | |
| def forward(self, x, layer_past=None): | |
| B, T, C = x.size() | |
| # calculate query, key, values for all heads in batch and move head forward to be the batch dim | |
| k = self.key(x).view(B, T, self.n_head, | |
| C // self.n_head).transpose(1, | |
| 2) # (B, nh, T, hs) | |
| q = self.query(x).view(B, T, self.n_head, | |
| C // self.n_head).transpose(1, | |
| 2) # (B, nh, T, hs) | |
| v = self.value(x).view(B, T, self.n_head, | |
| C // self.n_head).transpose(1, | |
| 2) # (B, nh, T, hs) | |
| present = torch.stack((k, v)) | |
| if self.causal and layer_past is not None: | |
| past_key, past_value = layer_past | |
| k = torch.cat((past_key, k), dim=-2) | |
| v = torch.cat((past_value, v), dim=-2) | |
| # causal self-attention; Self-attend: (B, nh, T, hs) x (B, nh, hs, T) -> (B, nh, T, T) | |
| att = (q @ k.transpose(-2, -1)) * (1.0 / math.sqrt(k.size(-1))) | |
| if self.causal and layer_past is None: | |
| att = att.masked_fill(self.mask[:, :, :T, :T] == 0, float('-inf')) | |
| att = F.softmax(att, dim=-1) | |
| att = self.attn_drop(att) | |
| y = att @ v # (B, nh, T, T) x (B, nh, T, hs) -> (B, nh, T, hs) | |
| # re-assemble all head outputs side by side | |
| y = y.transpose(1, 2).contiguous().view(B, T, C) | |
| # output projection | |
| y = self.resid_drop(self.proj(y)) | |
| return y, present | |
| class Block(nn.Module): | |
| """ an unassuming Transformer block """ | |
| def __init__(self, bert_n_emb, resid_pdrop, bert_n_head, attn_pdrop, | |
| latent_shape, sampler): | |
| super().__init__() | |
| self.ln1 = nn.LayerNorm(bert_n_emb) | |
| self.ln2 = nn.LayerNorm(bert_n_emb) | |
| self.attn = CausalSelfAttention(bert_n_emb, bert_n_head, attn_pdrop, | |
| resid_pdrop, latent_shape, sampler) | |
| self.mlp = nn.Sequential( | |
| nn.Linear(bert_n_emb, 4 * bert_n_emb), | |
| nn.GELU(), # nice | |
| nn.Linear(4 * bert_n_emb, bert_n_emb), | |
| nn.Dropout(resid_pdrop), | |
| ) | |
| def forward(self, x, layer_past=None, return_present=False): | |
| attn, present = self.attn(self.ln1(x), layer_past) | |
| x = x + attn | |
| x = x + self.mlp(self.ln2(x)) | |
| if layer_past is not None or return_present: | |
| return x, present | |
| return x | |
| class Transformer(nn.Module): | |
| """ the full GPT language model, with a context size of block_size """ | |
| def __init__(self, | |
| codebook_size, | |
| segm_codebook_size, | |
| bert_n_emb, | |
| bert_n_layers, | |
| bert_n_head, | |
| block_size, | |
| latent_shape, | |
| embd_pdrop, | |
| resid_pdrop, | |
| attn_pdrop, | |
| sampler='absorbing'): | |
| super().__init__() | |
| self.vocab_size = codebook_size + 1 | |
| self.n_embd = bert_n_emb | |
| self.block_size = block_size | |
| self.n_layers = bert_n_layers | |
| self.codebook_size = codebook_size | |
| self.segm_codebook_size = segm_codebook_size | |
| self.causal = sampler == 'autoregressive' | |
| if self.causal: | |
| self.vocab_size = codebook_size | |
| self.tok_emb = nn.Embedding(self.vocab_size, self.n_embd) | |
| self.pos_emb = nn.Parameter( | |
| torch.zeros(1, self.block_size, self.n_embd)) | |
| self.segm_emb = nn.Embedding(self.segm_codebook_size, self.n_embd) | |
| self.start_tok = nn.Parameter(torch.zeros(1, 1, self.n_embd)) | |
| self.drop = nn.Dropout(embd_pdrop) | |
| # transformer | |
| self.blocks = nn.Sequential(*[ | |
| Block(bert_n_emb, resid_pdrop, bert_n_head, attn_pdrop, | |
| latent_shape, sampler) for _ in range(self.n_layers) | |
| ]) | |
| # decoder head | |
| self.ln_f = nn.LayerNorm(self.n_embd) | |
| self.head = nn.Linear(self.n_embd, self.codebook_size, bias=False) | |
| def get_block_size(self): | |
| return self.block_size | |
| def _init_weights(self, module): | |
| if isinstance(module, (nn.Linear, nn.Embedding)): | |
| module.weight.data.normal_(mean=0.0, std=0.02) | |
| if isinstance(module, nn.Linear) and module.bias is not None: | |
| module.bias.data.zero_() | |
| elif isinstance(module, nn.LayerNorm): | |
| module.bias.data.zero_() | |
| module.weight.data.fill_(1.0) | |
| def forward(self, idx, segm_tokens, t=None): | |
| # each index maps to a (learnable) vector | |
| token_embeddings = self.tok_emb(idx) | |
| segm_embeddings = self.segm_emb(segm_tokens) | |
| if self.causal: | |
| token_embeddings = torch.cat((self.start_tok.repeat( | |
| token_embeddings.size(0), 1, 1), token_embeddings), | |
| dim=1) | |
| t = token_embeddings.shape[1] | |
| assert t <= self.block_size, "Cannot forward, model block size is exhausted." | |
| # each position maps to a (learnable) vector | |
| position_embeddings = self.pos_emb[:, :t, :] | |
| x = token_embeddings + position_embeddings + segm_embeddings | |
| x = self.drop(x) | |
| for block in self.blocks: | |
| x = block(x) | |
| x = self.ln_f(x) | |
| logits = self.head(x) | |
| return logits | |
| class TransformerMultiHead(nn.Module): | |
| """ the full GPT language model, with a context size of block_size """ | |
| def __init__(self, | |
| codebook_size, | |
| segm_codebook_size, | |
| texture_codebook_size, | |
| bert_n_emb, | |
| bert_n_layers, | |
| bert_n_head, | |
| block_size, | |
| latent_shape, | |
| embd_pdrop, | |
| resid_pdrop, | |
| attn_pdrop, | |
| num_head, | |
| sampler='absorbing'): | |
| super().__init__() | |
| self.vocab_size = codebook_size + 1 | |
| self.n_embd = bert_n_emb | |
| self.block_size = block_size | |
| self.n_layers = bert_n_layers | |
| self.codebook_size = codebook_size | |
| self.segm_codebook_size = segm_codebook_size | |
| self.texture_codebook_size = texture_codebook_size | |
| self.causal = sampler == 'autoregressive' | |
| if self.causal: | |
| self.vocab_size = codebook_size | |
| self.tok_emb = nn.Embedding(self.vocab_size, self.n_embd) | |
| self.pos_emb = nn.Parameter( | |
| torch.zeros(1, self.block_size, self.n_embd)) | |
| self.segm_emb = nn.Embedding(self.segm_codebook_size, self.n_embd) | |
| self.texture_emb = nn.Embedding(self.texture_codebook_size, | |
| self.n_embd) | |
| self.start_tok = nn.Parameter(torch.zeros(1, 1, self.n_embd)) | |
| self.drop = nn.Dropout(embd_pdrop) | |
| # transformer | |
| self.blocks = nn.Sequential(*[ | |
| Block(bert_n_emb, resid_pdrop, bert_n_head, attn_pdrop, | |
| latent_shape, sampler) for _ in range(self.n_layers) | |
| ]) | |
| # decoder head | |
| self.num_head = num_head | |
| self.head_class_num = codebook_size // self.num_head | |
| self.ln_f = nn.LayerNorm(self.n_embd) | |
| self.head_list = nn.ModuleList([ | |
| nn.Linear(self.n_embd, self.head_class_num, bias=False) | |
| for _ in range(self.num_head) | |
| ]) | |
| def get_block_size(self): | |
| return self.block_size | |
| def _init_weights(self, module): | |
| if isinstance(module, (nn.Linear, nn.Embedding)): | |
| module.weight.data.normal_(mean=0.0, std=0.02) | |
| if isinstance(module, nn.Linear) and module.bias is not None: | |
| module.bias.data.zero_() | |
| elif isinstance(module, nn.LayerNorm): | |
| module.bias.data.zero_() | |
| module.weight.data.fill_(1.0) | |
| def forward(self, idx, segm_tokens, texture_tokens, t=None): | |
| # each index maps to a (learnable) vector | |
| token_embeddings = self.tok_emb(idx) | |
| segm_embeddings = self.segm_emb(segm_tokens) | |
| texture_embeddings = self.texture_emb(texture_tokens) | |
| if self.causal: | |
| token_embeddings = torch.cat((self.start_tok.repeat( | |
| token_embeddings.size(0), 1, 1), token_embeddings), | |
| dim=1) | |
| t = token_embeddings.shape[1] | |
| assert t <= self.block_size, "Cannot forward, model block size is exhausted." | |
| # each position maps to a (learnable) vector | |
| position_embeddings = self.pos_emb[:, :t, :] | |
| x = token_embeddings + position_embeddings + segm_embeddings + texture_embeddings | |
| x = self.drop(x) | |
| for block in self.blocks: | |
| x = block(x) | |
| x = self.ln_f(x) | |
| logits_list = [self.head_list[i](x) for i in range(self.num_head)] | |
| return logits_list | |