""" This function is adapted from [TranAD] by [imperial-qore] Original source: [https://github.com/imperial-qore/TranAD] """ from __future__ import division from __future__ import print_function import numpy as np import math import torch import torch.nn.functional as F from sklearn.utils import check_array from sklearn.utils.validation import check_is_fitted from torch import nn from torch.nn import TransformerEncoder from torch.nn import TransformerDecoder from torch.utils.data import DataLoader from sklearn.preprocessing import MinMaxScaler import tqdm from .base import BaseDetector from ..utils.dataset import ReconstructDataset from ..utils.torch_utility import EarlyStoppingTorch, get_gpu class PositionalEncoding(nn.Module): def __init__(self, d_model, dropout=0.1, max_len=5000): super(PositionalEncoding, self).__init__() self.dropout = nn.Dropout(p=dropout) pe = torch.zeros(max_len, d_model) position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) div_term = torch.exp( torch.arange(0, d_model).float() * (-math.log(10000.0) / d_model) ) pe += torch.sin(position * div_term) pe += torch.cos(position * div_term) pe = pe.unsqueeze(0).transpose(0, 1) self.register_buffer("pe", pe) def forward(self, x, pos=0): x = x + self.pe[pos : pos + x.size(0), :] return self.dropout(x) class TransformerEncoderLayer(nn.Module): def __init__(self, d_model, nhead, dim_feedforward=16, dropout=0): super(TransformerEncoderLayer, self).__init__() self.self_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout) self.linear1 = nn.Linear(d_model, dim_feedforward) self.dropout = nn.Dropout(dropout) self.linear2 = nn.Linear(dim_feedforward, d_model) self.dropout1 = nn.Dropout(dropout) self.dropout2 = nn.Dropout(dropout) self.activation = nn.LeakyReLU(True) def forward(self, src, *args, **kwargs): src2 = self.self_attn(src, src, src)[0] src = src + self.dropout1(src2) src2 = self.linear2(self.dropout(self.activation(self.linear1(src)))) src = src + self.dropout2(src2) return src class TransformerDecoderLayer(nn.Module): def __init__(self, d_model, nhead, dim_feedforward=16, dropout=0): super(TransformerDecoderLayer, self).__init__() self.self_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout) self.multihead_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout) self.linear1 = nn.Linear(d_model, dim_feedforward) self.dropout = nn.Dropout(dropout) self.linear2 = nn.Linear(dim_feedforward, d_model) self.dropout1 = nn.Dropout(dropout) self.dropout2 = nn.Dropout(dropout) self.dropout3 = nn.Dropout(dropout) self.activation = nn.LeakyReLU(True) def forward(self, tgt, memory, *args, **kwargs): tgt2 = self.self_attn(tgt, tgt, tgt)[0] tgt = tgt + self.dropout1(tgt2) tgt2 = self.multihead_attn(tgt, memory, memory)[0] tgt = tgt + self.dropout2(tgt2) tgt2 = self.linear2(self.dropout(self.activation(self.linear1(tgt)))) tgt = tgt + self.dropout3(tgt2) return tgt class TranADModel(nn.Module): def __init__(self, batch_size, feats, win_size): super(TranADModel, self).__init__() self.name = "TranAD" self.batch = batch_size self.n_feats = feats self.n_window = win_size self.n = self.n_feats * self.n_window self.pos_encoder = PositionalEncoding(2 * feats, 0.1, self.n_window) encoder_layers = TransformerEncoderLayer( d_model=2 * feats, nhead=feats, dim_feedforward=16, dropout=0.1 ) self.transformer_encoder = TransformerEncoder(encoder_layers, 1) decoder_layers1 = TransformerDecoderLayer( d_model=2 * feats, nhead=feats, dim_feedforward=16, dropout=0.1 ) self.transformer_decoder1 = TransformerDecoder(decoder_layers1, 1) decoder_layers2 = TransformerDecoderLayer( d_model=2 * feats, nhead=feats, dim_feedforward=16, dropout=0.1 ) self.transformer_decoder2 = TransformerDecoder(decoder_layers2, 1) self.fcn = nn.Sequential(nn.Linear(2 * feats, feats), nn.Sigmoid()) def encode(self, src, c, tgt): src = torch.cat((src, c), dim=2) src = src * math.sqrt(self.n_feats) src = self.pos_encoder(src) memory = self.transformer_encoder(src) tgt = tgt.repeat(1, 1, 2) return tgt, memory def forward(self, src, tgt): # Phase 1 - Without anomaly scores c = torch.zeros_like(src) x1 = self.fcn(self.transformer_decoder1(*self.encode(src, c, tgt))) # Phase 2 - With anomaly scores c = (x1 - src) ** 2 x2 = self.fcn(self.transformer_decoder2(*self.encode(src, c, tgt))) return x1, x2 class TranAD(BaseDetector): def __init__(self, win_size = 100, feats = 1, batch_size = 128, epochs = 50, patience = 3, lr = 1e-4, validation_size=0.2 ): super().__init__() self.__anomaly_score = None self.cuda = True self.device = get_gpu(self.cuda) self.win_size = win_size self.batch_size = batch_size self.epochs = epochs self.feats = feats self.validation_size = validation_size self.model = TranADModel(batch_size=self.batch_size, feats=self.feats, win_size=self.win_size).to(self.device) self.optimizer = torch.optim.AdamW( self.model.parameters(), lr=lr, weight_decay=1e-5 ) self.scheduler = torch.optim.lr_scheduler.StepLR(self.optimizer, 5, 0.9) self.criterion = nn.MSELoss() self.early_stopping = EarlyStoppingTorch(None, patience=patience) def fit(self, data): tsTrain = data[:int((1-self.validation_size)*len(data))] tsValid = data[int((1-self.validation_size)*len(data)):] train_loader = DataLoader( dataset=ReconstructDataset(tsTrain, window_size=self.win_size), batch_size=self.batch_size, shuffle=True ) valid_loader = DataLoader( dataset=ReconstructDataset(tsValid, window_size=self.win_size), batch_size=self.batch_size, shuffle=False ) for epoch in range(1, self.epochs + 1): self.model.train(mode=True) avg_loss = 0 loop = tqdm.tqdm( enumerate(train_loader), total=len(train_loader), leave=True ) for idx, (x, _) in loop: if torch.isnan(x).any() or torch.isinf(x).any(): print("Input data contains nan or inf") x = torch.nan_to_num(x) x = x.to(self.device) bs = x.shape[0] x = x.permute(1, 0, 2) elem = x[-1, :, :].view(1, bs, self.feats) self.optimizer.zero_grad() z = self.model(x, elem) loss = (1 / epoch) * self.criterion(z[0], elem) + (1 - 1 / epoch) * self.criterion(z[1], elem) loss.backward(retain_graph=True) self.optimizer.step() avg_loss += loss.cpu().item() loop.set_description(f"Training Epoch [{epoch}/{self.epochs}]") loop.set_postfix(loss=loss.item(), avg_loss=avg_loss / (idx + 1)) if torch.isnan(loss): print(f"Loss is nan at epoch {epoch}") break if len(valid_loader) > 0: self.model.eval() avg_loss_val = 0 loop = tqdm.tqdm( enumerate(valid_loader), total=len(valid_loader), leave=True ) with torch.no_grad(): for idx, (x, _) in loop: if torch.isnan(x).any() or torch.isinf(x).any(): print("Input data contains nan or inf") x = torch.nan_to_num(x) x = x.to(self.device) # x = x.unsqueeze(-1) bs = x.shape[0] x = x.permute(1, 0, 2) elem = x[-1, :, :].view(1, bs, self.feats) self.optimizer.zero_grad() z = self.model(x, elem) loss = (1 / epoch) * self.criterion(z[0], elem) + ( 1 - 1 / epoch ) * self.criterion(z[1], elem) avg_loss_val += loss.cpu().item() loop.set_description(f"Validation Epoch [{epoch}/{self.epochs}]") loop.set_postfix(loss=loss.item(), avg_loss_val=avg_loss_val / (idx + 1)) self.scheduler.step() if len(valid_loader) > 0: avg_loss = avg_loss_val / len(valid_loader) else: avg_loss = avg_loss / len(train_loader) self.early_stopping(avg_loss, self.model) if self.early_stopping.early_stop: print(" Early stopping<<<") break def decision_function(self, data): test_loader = DataLoader( dataset=ReconstructDataset(data, window_size=self.win_size), batch_size=self.batch_size, shuffle=False ) self.model.eval() scores = [] loop = tqdm.tqdm(enumerate(test_loader), total=len(test_loader), leave=True) with torch.no_grad(): for idx, (x, _) in loop: x = x.to(self.device) bs = x.shape[0] x = x.permute(1, 0, 2) elem = x[-1, :, :].view(1, bs, self.feats) # breakpoint() _, z = self.model(x, elem) loss = torch.mean(F.mse_loss(z, elem, reduction="none")[0], axis=-1) scores.append(loss.cpu()) scores = torch.cat(scores, dim=0) scores = scores.numpy() self.__anomaly_score = scores if self.__anomaly_score.shape[0] < len(data): self.__anomaly_score = np.array([self.__anomaly_score[0]]*math.ceil((self.win_size-1)/2) + list(self.__anomaly_score) + [self.__anomaly_score[-1]]*((self.win_size-1)//2)) return self.__anomaly_score def anomaly_score(self) -> np.ndarray: return self.__anomaly_score def param_statistic(self, save_file): pass