Spaces:
Running
Running
| """Polynomial Anomoly Detector with GARCH method and raw error method | |
| """ | |
| # Author: Yinchen Wu <yinchen@uchicago.edu> | |
| import numpy as np | |
| import math | |
| from scipy.special import erf | |
| from sklearn.preprocessing import MinMaxScaler | |
| from sklearn.utils.validation import check_is_fitted | |
| from .base import BaseDetector | |
| from .distance import Fourier | |
| from ..utils.utility import zscore | |
| class POLY(BaseDetector): | |
| """An elementary method to detect pointwise anomolies using polynomial approxiamtion. | |
| A polynomial of certain degree and window size is fitted to the given timeseries dataset. | |
| A GARCH method is ran on the difference betweeen the approximation and the true value of | |
| the dataset to estimate the volatitilies of each point. A detector score is derived on each | |
| point based on the estimated volatitilies and residual to measure the normality of each point. | |
| An alternative method that only considers absoulte difference is also used. | |
| Parameters | |
| ---------- | |
| Power : int, optional (default=1) | |
| The power of polynomial fitted to the data | |
| neighborhood : int, optional (default=max (100, 10*window size)) | |
| The number of samples to fit for one subsequence. Since the timeseries may vary, | |
| to caculate the score for the subsequnece (a, a+k) of samples k, we only fit the | |
| polynomal on its neighborhood. | |
| window: int, optional (default = 20) | |
| The length of the window to detect the given anomolies | |
| contamination : float in (0., 0.55), optional (default=0.1) | |
| The amount of contamination of the data set, i.e. the proportion | |
| of outliers in the data set. Used when fitting to define the threshold | |
| on the decision function. | |
| Attributes | |
| ---------- | |
| estimators_ : dictionary of coefficients at each polynomial | |
| The collection of fitted sub-estimators. | |
| decision_scores_ : numpy array of shape (n_samples,) | |
| The outlier scores of the training data. | |
| The higher, the more abnormal. Outliers tend to have higher | |
| scores. This value is available once the detector is | |
| fitted. | |
| threshold_ : float | |
| The threshold is based on ``contamination``. It is the | |
| ``n_samples * contamination`` most abnormal samples in | |
| ``decision_scores_``. The threshold is calculated for generating | |
| binary outlier labels. | |
| labels_ : int, either 0 or 1 | |
| The binary labels of the training data. 0 stands for inliers | |
| and 1 for outliers/anomalies. It is generated by applying | |
| ``threshold_`` on ``decision_scores_``. | |
| """ | |
| def __init__(self, power = 1, window = 200, neighborhood = None, contamination = 0.1, normalize=True): | |
| self.power = power | |
| self.window = window | |
| self.avg_window = None | |
| if neighborhood == None: | |
| self.neighborhood = max(10*window, 100) | |
| else: | |
| self.neighborhood = neighborhood | |
| self.contamination = contamination | |
| self.normalize = normalize | |
| self.model_name = 'POLY' | |
| def fit(self, X, y=None): | |
| """Fit detector. y is ignored in unsupervised methods. | |
| Parameters | |
| ---------- | |
| X : numpy array of shape (n_samples, ) | |
| The input samples. | |
| y : Ignored | |
| Not used, present for API consistency by convention. | |
| Returns | |
| ------- | |
| self : object | |
| Fitted estimator. | |
| """ | |
| if self.normalize: X = (X - X.min()) / (X.max() - X.min()) | |
| X = X.squeeze() | |
| # validate inputs X and y (optional) | |
| self._set_n_classes(y) | |
| self.X_train_ = X | |
| self.n_train_ = len(X) | |
| self.decision_scores_ = np.zeros([self.n_train_, 1]) | |
| self.n_initial_ = min(500, int(0.1 * self.n_train_)) | |
| self.X_initial_ = X[:self.n_initial_] | |
| window = self.window | |
| if self.avg_window != None: | |
| self.neighborhood = max(self.neighborhood, 6*self.avg_window) | |
| if self.neighborhood > len(X): | |
| self.neighborhood = len(X) | |
| neighborhood = self.neighborhood | |
| N = math.floor(self.n_train_ / window) | |
| M = math.ceil(self.n_initial_ / window) | |
| data = self.X_train_ | |
| power=self.power | |
| fit = {} | |
| for i in range(M, N+1): | |
| index = int(i * window) | |
| neighbor = int(neighborhood/2) | |
| if index + neighbor < self.n_train_ and index - neighbor > 0: | |
| x = np.concatenate((np.arange(index - neighbor, index), np.arange(index + window, index + neighbor))) | |
| y = np.concatenate((data[index - neighbor: index], data[index + window: index + neighbor] )) | |
| mymodel = np.poly1d(np.polyfit(x, y, power)) | |
| fit['model' + str(index)] = mymodel | |
| elif index + neighbor >= self.n_train_ and index + window < self.n_train_: | |
| x = np.concatenate((np.arange(self.n_train_ - neighborhood, index), np.arange(index + window, self.n_train_))) | |
| y = np.concatenate((data[self.n_train_ - neighborhood: index], data[index + window: self.n_train_] )) | |
| mymodel = np.poly1d(np.polyfit(x, y, power)) | |
| fit['model' + str(index)] = mymodel | |
| elif index + window >= self.n_train_: | |
| x = np.arange(self.n_train_ - neighborhood, index) | |
| y = data[self.n_train_ - neighborhood: index] | |
| mymodel = np.poly1d(np.polyfit(x, y, power)) | |
| fit['model' + str(index)] = mymodel | |
| elif index + window < neighborhood: | |
| x = np.concatenate((np.arange(0, index), np.arange(index + window, neighborhood))) | |
| y = np.concatenate((data[0: index], data[index + window: neighborhood] )) | |
| try: | |
| mymodel = np.poly1d(np.polyfit(x, y, power)) | |
| except: | |
| x = np.concatenate((np.arange(0, index), np.arange(len(data[index + window: neighborhood])))) | |
| mymodel = np.poly1d(np.polyfit(x, y, power)) | |
| fit['model' + str(index)] = mymodel | |
| else: | |
| x = np.concatenate((np.arange(index - neighbor, index), np.arange(index + window, index + neighbor))) | |
| y = np.concatenate((data[index - neighbor: index], data[index + window: index + neighbor] )) | |
| print(data.shape) | |
| print(x.shape) | |
| print(y.shape) | |
| mymodel = np.poly1d(np.polyfit(x, y, power)) | |
| fit['model' + str(index)] = mymodel | |
| Y = np.zeros(self.n_train_ ) | |
| for i in range(M, N): | |
| myline = np.linspace(window *i, window * (i+1), window) | |
| Y[window *i: window * (i+1)] = fit['model' + str(i * window)](myline) | |
| if self.n_train_ % N != 0: | |
| x = np.arange(N*window, self.n_train_ ) | |
| Y[N*window:] = fit['model'+str(N * window)](x) | |
| self.estimation = Y | |
| self.estimator = fit | |
| measure = Fourier() | |
| measure.detector = self | |
| measure.set_param() | |
| self.decision_scores_ = self.decision_function(X, measure=measure) | |
| return self | |
| def decision_function(self, X= False, measure = None): | |
| """Derive the decision score based on the given distance measure | |
| Parameters | |
| ---------- | |
| X : numpy array of shape (n_samples, ) | |
| The input samples. | |
| measure : object | |
| object for given distance measure with methods to derive the score | |
| Returns | |
| ------- | |
| self : object | |
| Fitted estimator. | |
| """ | |
| if type(X) != bool: | |
| self.X_train_ = X | |
| estimation = self.estimation | |
| window = self.window | |
| n_train_ = self.n_train_ | |
| score = np.zeros(n_train_) | |
| N = math.floor((self.n_train_) / window) | |
| M = math.ceil(self.n_initial_ / window) | |
| for i in range(M, N+1): | |
| index = i * window | |
| if i == N: | |
| end = self.n_train_ | |
| else: | |
| end = index + window | |
| score[index: index+window] = measure.measure(self.X_train_[index: end], estimation[index: end], index) | |
| self._mu = np.mean(self.decision_scores_) | |
| self._sigma = np.std(self.decision_scores_) | |
| return score | |
| def predict_proba(self, X, method='linear', measure = None): | |
| """Predict the probability of a sample being outlier. Two approaches | |
| are possible: | |
| 1. simply use Min-max conversion to linearly transform the outlier | |
| scores into the range of [0,1]. The model must be | |
| fitted first. | |
| 2. use unifying scores, see :cite:`kriegel2011interpreting`. | |
| Parameters | |
| ---------- | |
| X : numpy array of shape (n_samples, n_features) | |
| The input samples. | |
| method : str, optional (default='linear') | |
| probability conversion method. It must be one of | |
| 'linear' or 'unify'. | |
| Returns | |
| ------- | |
| outlier_probability : numpy array of shape (n_samples,) | |
| For each observation, tells whether or not | |
| it should be considered as an outlier according to the | |
| fitted model. Return the outlier probability, ranging | |
| in [0,1]. | |
| """ | |
| check_is_fitted(self, ['decision_scores_', 'threshold_', 'labels_']) | |
| train_scores = self.decision_scores_ | |
| self.fit(X) | |
| self.decision_function(measure = measure) | |
| test_scores = self.decision_scores_ | |
| probs = np.zeros([X.shape[0], int(self._classes)]) | |
| if method == 'linear': | |
| scaler = MinMaxScaler().fit(train_scores.reshape(-1, 1)) | |
| probs[:, 1] = scaler.transform( | |
| test_scores.reshape(-1, 1)).ravel().clip(0, 1) | |
| probs[:, 0] = 1 - probs[:, 1] | |
| return probs | |
| elif method == 'unify': | |
| # turn output into probability | |
| pre_erf_score = (test_scores - self._mu) / ( | |
| self._sigma * np.sqrt(2)) | |
| erf_score = erf(pre_erf_score) | |
| probs[:, 1] = erf_score.clip(0, 1).ravel() | |
| probs[:, 0] = 1 - probs[:, 1] | |
| return probs | |
| else: | |
| raise ValueError(method, | |
| 'is not a valid probability conversion method') |