"""Polynomial Anomoly Detector with GARCH method and raw error method """ # Author: Yinchen Wu import numpy as np import math from scipy.special import erf from sklearn.preprocessing import MinMaxScaler from sklearn.utils.validation import check_is_fitted from .base import BaseDetector from .distance import Fourier from ..utils.utility import zscore class POLY(BaseDetector): """An elementary method to detect pointwise anomolies using polynomial approxiamtion. A polynomial of certain degree and window size is fitted to the given timeseries dataset. A GARCH method is ran on the difference betweeen the approximation and the true value of the dataset to estimate the volatitilies of each point. A detector score is derived on each point based on the estimated volatitilies and residual to measure the normality of each point. An alternative method that only considers absoulte difference is also used. Parameters ---------- Power : int, optional (default=1) The power of polynomial fitted to the data neighborhood : int, optional (default=max (100, 10*window size)) The number of samples to fit for one subsequence. Since the timeseries may vary, to caculate the score for the subsequnece (a, a+k) of samples k, we only fit the polynomal on its neighborhood. window: int, optional (default = 20) The length of the window to detect the given anomolies contamination : float in (0., 0.55), optional (default=0.1) The amount of contamination of the data set, i.e. the proportion of outliers in the data set. Used when fitting to define the threshold on the decision function. Attributes ---------- estimators_ : dictionary of coefficients at each polynomial The collection of fitted sub-estimators. decision_scores_ : numpy array of shape (n_samples,) The outlier scores of the training data. The higher, the more abnormal. Outliers tend to have higher scores. This value is available once the detector is fitted. threshold_ : float The threshold is based on ``contamination``. It is the ``n_samples * contamination`` most abnormal samples in ``decision_scores_``. The threshold is calculated for generating binary outlier labels. labels_ : int, either 0 or 1 The binary labels of the training data. 0 stands for inliers and 1 for outliers/anomalies. It is generated by applying ``threshold_`` on ``decision_scores_``. """ def __init__(self, power = 1, window = 200, neighborhood = None, contamination = 0.1, normalize=True): self.power = power self.window = window self.avg_window = None if neighborhood == None: self.neighborhood = max(10*window, 100) else: self.neighborhood = neighborhood self.contamination = contamination self.normalize = normalize self.model_name = 'POLY' def fit(self, X, y=None): """Fit detector. y is ignored in unsupervised methods. Parameters ---------- X : numpy array of shape (n_samples, ) The input samples. y : Ignored Not used, present for API consistency by convention. Returns ------- self : object Fitted estimator. """ if self.normalize: X = (X - X.min()) / (X.max() - X.min()) X = X.squeeze() # validate inputs X and y (optional) self._set_n_classes(y) self.X_train_ = X self.n_train_ = len(X) self.decision_scores_ = np.zeros([self.n_train_, 1]) self.n_initial_ = min(500, int(0.1 * self.n_train_)) self.X_initial_ = X[:self.n_initial_] window = self.window if self.avg_window != None: self.neighborhood = max(self.neighborhood, 6*self.avg_window) if self.neighborhood > len(X): self.neighborhood = len(X) neighborhood = self.neighborhood N = math.floor(self.n_train_ / window) M = math.ceil(self.n_initial_ / window) data = self.X_train_ power=self.power fit = {} for i in range(M, N+1): index = int(i * window) neighbor = int(neighborhood/2) if index + neighbor < self.n_train_ and index - neighbor > 0: x = np.concatenate((np.arange(index - neighbor, index), np.arange(index + window, index + neighbor))) y = np.concatenate((data[index - neighbor: index], data[index + window: index + neighbor] )) mymodel = np.poly1d(np.polyfit(x, y, power)) fit['model' + str(index)] = mymodel elif index + neighbor >= self.n_train_ and index + window < self.n_train_: x = np.concatenate((np.arange(self.n_train_ - neighborhood, index), np.arange(index + window, self.n_train_))) y = np.concatenate((data[self.n_train_ - neighborhood: index], data[index + window: self.n_train_] )) mymodel = np.poly1d(np.polyfit(x, y, power)) fit['model' + str(index)] = mymodel elif index + window >= self.n_train_: x = np.arange(self.n_train_ - neighborhood, index) y = data[self.n_train_ - neighborhood: index] mymodel = np.poly1d(np.polyfit(x, y, power)) fit['model' + str(index)] = mymodel elif index + window < neighborhood: x = np.concatenate((np.arange(0, index), np.arange(index + window, neighborhood))) y = np.concatenate((data[0: index], data[index + window: neighborhood] )) try: mymodel = np.poly1d(np.polyfit(x, y, power)) except: x = np.concatenate((np.arange(0, index), np.arange(len(data[index + window: neighborhood])))) mymodel = np.poly1d(np.polyfit(x, y, power)) fit['model' + str(index)] = mymodel else: x = np.concatenate((np.arange(index - neighbor, index), np.arange(index + window, index + neighbor))) y = np.concatenate((data[index - neighbor: index], data[index + window: index + neighbor] )) print(data.shape) print(x.shape) print(y.shape) mymodel = np.poly1d(np.polyfit(x, y, power)) fit['model' + str(index)] = mymodel Y = np.zeros(self.n_train_ ) for i in range(M, N): myline = np.linspace(window *i, window * (i+1), window) Y[window *i: window * (i+1)] = fit['model' + str(i * window)](myline) if self.n_train_ % N != 0: x = np.arange(N*window, self.n_train_ ) Y[N*window:] = fit['model'+str(N * window)](x) self.estimation = Y self.estimator = fit measure = Fourier() measure.detector = self measure.set_param() self.decision_scores_ = self.decision_function(X, measure=measure) return self def decision_function(self, X= False, measure = None): """Derive the decision score based on the given distance measure Parameters ---------- X : numpy array of shape (n_samples, ) The input samples. measure : object object for given distance measure with methods to derive the score Returns ------- self : object Fitted estimator. """ if type(X) != bool: self.X_train_ = X estimation = self.estimation window = self.window n_train_ = self.n_train_ score = np.zeros(n_train_) N = math.floor((self.n_train_) / window) M = math.ceil(self.n_initial_ / window) for i in range(M, N+1): index = i * window if i == N: end = self.n_train_ else: end = index + window score[index: index+window] = measure.measure(self.X_train_[index: end], estimation[index: end], index) self._mu = np.mean(self.decision_scores_) self._sigma = np.std(self.decision_scores_) return score def predict_proba(self, X, method='linear', measure = None): """Predict the probability of a sample being outlier. Two approaches are possible: 1. simply use Min-max conversion to linearly transform the outlier scores into the range of [0,1]. The model must be fitted first. 2. use unifying scores, see :cite:`kriegel2011interpreting`. Parameters ---------- X : numpy array of shape (n_samples, n_features) The input samples. method : str, optional (default='linear') probability conversion method. It must be one of 'linear' or 'unify'. Returns ------- outlier_probability : numpy array of shape (n_samples,) For each observation, tells whether or not it should be considered as an outlier according to the fitted model. Return the outlier probability, ranging in [0,1]. """ check_is_fitted(self, ['decision_scores_', 'threshold_', 'labels_']) train_scores = self.decision_scores_ self.fit(X) self.decision_function(measure = measure) test_scores = self.decision_scores_ probs = np.zeros([X.shape[0], int(self._classes)]) if method == 'linear': scaler = MinMaxScaler().fit(train_scores.reshape(-1, 1)) probs[:, 1] = scaler.transform( test_scores.reshape(-1, 1)).ravel().clip(0, 1) probs[:, 0] = 1 - probs[:, 1] return probs elif method == 'unify': # turn output into probability pre_erf_score = (test_scores - self._mu) / ( self._sigma * np.sqrt(2)) erf_score = erf(pre_erf_score) probs[:, 1] = erf_score.clip(0, 1).ravel() probs[:, 0] = 1 - probs[:, 1] return probs else: raise ValueError(method, 'is not a valid probability conversion method')