Spaces:
Sleeping
Sleeping
File size: 10,503 Bytes
d03866e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 |
"""Polynomial Anomoly Detector with GARCH method and raw error method
"""
# Author: Yinchen Wu <yinchen@uchicago.edu>
import numpy as np
import math
from scipy.special import erf
from sklearn.preprocessing import MinMaxScaler
from sklearn.utils.validation import check_is_fitted
from .base import BaseDetector
from .distance import Fourier
from ..utils.utility import zscore
class POLY(BaseDetector):
"""An elementary method to detect pointwise anomolies using polynomial approxiamtion.
A polynomial of certain degree and window size is fitted to the given timeseries dataset.
A GARCH method is ran on the difference betweeen the approximation and the true value of
the dataset to estimate the volatitilies of each point. A detector score is derived on each
point based on the estimated volatitilies and residual to measure the normality of each point.
An alternative method that only considers absoulte difference is also used.
Parameters
----------
Power : int, optional (default=1)
The power of polynomial fitted to the data
neighborhood : int, optional (default=max (100, 10*window size))
The number of samples to fit for one subsequence. Since the timeseries may vary,
to caculate the score for the subsequnece (a, a+k) of samples k, we only fit the
polynomal on its neighborhood.
window: int, optional (default = 20)
The length of the window to detect the given anomolies
contamination : float in (0., 0.55), optional (default=0.1)
The amount of contamination of the data set, i.e. the proportion
of outliers in the data set. Used when fitting to define the threshold
on the decision function.
Attributes
----------
estimators_ : dictionary of coefficients at each polynomial
The collection of fitted sub-estimators.
decision_scores_ : numpy array of shape (n_samples,)
The outlier scores of the training data.
The higher, the more abnormal. Outliers tend to have higher
scores. This value is available once the detector is
fitted.
threshold_ : float
The threshold is based on ``contamination``. It is the
``n_samples * contamination`` most abnormal samples in
``decision_scores_``. The threshold is calculated for generating
binary outlier labels.
labels_ : int, either 0 or 1
The binary labels of the training data. 0 stands for inliers
and 1 for outliers/anomalies. It is generated by applying
``threshold_`` on ``decision_scores_``.
"""
def __init__(self, power = 1, window = 200, neighborhood = None, contamination = 0.1, normalize=True):
self.power = power
self.window = window
self.avg_window = None
if neighborhood == None:
self.neighborhood = max(10*window, 100)
else:
self.neighborhood = neighborhood
self.contamination = contamination
self.normalize = normalize
self.model_name = 'POLY'
def fit(self, X, y=None):
"""Fit detector. y is ignored in unsupervised methods.
Parameters
----------
X : numpy array of shape (n_samples, )
The input samples.
y : Ignored
Not used, present for API consistency by convention.
Returns
-------
self : object
Fitted estimator.
"""
if self.normalize: X = (X - X.min()) / (X.max() - X.min())
X = X.squeeze()
# validate inputs X and y (optional)
self._set_n_classes(y)
self.X_train_ = X
self.n_train_ = len(X)
self.decision_scores_ = np.zeros([self.n_train_, 1])
self.n_initial_ = min(500, int(0.1 * self.n_train_))
self.X_initial_ = X[:self.n_initial_]
window = self.window
if self.avg_window != None:
self.neighborhood = max(self.neighborhood, 6*self.avg_window)
if self.neighborhood > len(X):
self.neighborhood = len(X)
neighborhood = self.neighborhood
N = math.floor(self.n_train_ / window)
M = math.ceil(self.n_initial_ / window)
data = self.X_train_
power=self.power
fit = {}
for i in range(M, N+1):
index = int(i * window)
neighbor = int(neighborhood/2)
if index + neighbor < self.n_train_ and index - neighbor > 0:
x = np.concatenate((np.arange(index - neighbor, index), np.arange(index + window, index + neighbor)))
y = np.concatenate((data[index - neighbor: index], data[index + window: index + neighbor] ))
mymodel = np.poly1d(np.polyfit(x, y, power))
fit['model' + str(index)] = mymodel
elif index + neighbor >= self.n_train_ and index + window < self.n_train_:
x = np.concatenate((np.arange(self.n_train_ - neighborhood, index), np.arange(index + window, self.n_train_)))
y = np.concatenate((data[self.n_train_ - neighborhood: index], data[index + window: self.n_train_] ))
mymodel = np.poly1d(np.polyfit(x, y, power))
fit['model' + str(index)] = mymodel
elif index + window >= self.n_train_:
x = np.arange(self.n_train_ - neighborhood, index)
y = data[self.n_train_ - neighborhood: index]
mymodel = np.poly1d(np.polyfit(x, y, power))
fit['model' + str(index)] = mymodel
elif index + window < neighborhood:
x = np.concatenate((np.arange(0, index), np.arange(index + window, neighborhood)))
y = np.concatenate((data[0: index], data[index + window: neighborhood] ))
try:
mymodel = np.poly1d(np.polyfit(x, y, power))
except:
x = np.concatenate((np.arange(0, index), np.arange(len(data[index + window: neighborhood]))))
mymodel = np.poly1d(np.polyfit(x, y, power))
fit['model' + str(index)] = mymodel
else:
x = np.concatenate((np.arange(index - neighbor, index), np.arange(index + window, index + neighbor)))
y = np.concatenate((data[index - neighbor: index], data[index + window: index + neighbor] ))
print(data.shape)
print(x.shape)
print(y.shape)
mymodel = np.poly1d(np.polyfit(x, y, power))
fit['model' + str(index)] = mymodel
Y = np.zeros(self.n_train_ )
for i in range(M, N):
myline = np.linspace(window *i, window * (i+1), window)
Y[window *i: window * (i+1)] = fit['model' + str(i * window)](myline)
if self.n_train_ % N != 0:
x = np.arange(N*window, self.n_train_ )
Y[N*window:] = fit['model'+str(N * window)](x)
self.estimation = Y
self.estimator = fit
measure = Fourier()
measure.detector = self
measure.set_param()
self.decision_scores_ = self.decision_function(X, measure=measure)
return self
def decision_function(self, X= False, measure = None):
"""Derive the decision score based on the given distance measure
Parameters
----------
X : numpy array of shape (n_samples, )
The input samples.
measure : object
object for given distance measure with methods to derive the score
Returns
-------
self : object
Fitted estimator.
"""
if type(X) != bool:
self.X_train_ = X
estimation = self.estimation
window = self.window
n_train_ = self.n_train_
score = np.zeros(n_train_)
N = math.floor((self.n_train_) / window)
M = math.ceil(self.n_initial_ / window)
for i in range(M, N+1):
index = i * window
if i == N:
end = self.n_train_
else:
end = index + window
score[index: index+window] = measure.measure(self.X_train_[index: end], estimation[index: end], index)
self._mu = np.mean(self.decision_scores_)
self._sigma = np.std(self.decision_scores_)
return score
def predict_proba(self, X, method='linear', measure = None):
"""Predict the probability of a sample being outlier. Two approaches
are possible:
1. simply use Min-max conversion to linearly transform the outlier
scores into the range of [0,1]. The model must be
fitted first.
2. use unifying scores, see :cite:`kriegel2011interpreting`.
Parameters
----------
X : numpy array of shape (n_samples, n_features)
The input samples.
method : str, optional (default='linear')
probability conversion method. It must be one of
'linear' or 'unify'.
Returns
-------
outlier_probability : numpy array of shape (n_samples,)
For each observation, tells whether or not
it should be considered as an outlier according to the
fitted model. Return the outlier probability, ranging
in [0,1].
"""
check_is_fitted(self, ['decision_scores_', 'threshold_', 'labels_'])
train_scores = self.decision_scores_
self.fit(X)
self.decision_function(measure = measure)
test_scores = self.decision_scores_
probs = np.zeros([X.shape[0], int(self._classes)])
if method == 'linear':
scaler = MinMaxScaler().fit(train_scores.reshape(-1, 1))
probs[:, 1] = scaler.transform(
test_scores.reshape(-1, 1)).ravel().clip(0, 1)
probs[:, 0] = 1 - probs[:, 1]
return probs
elif method == 'unify':
# turn output into probability
pre_erf_score = (test_scores - self._mu) / (
self._sigma * np.sqrt(2))
erf_score = erf(pre_erf_score)
probs[:, 1] = erf_score.clip(0, 1).ravel()
probs[:, 0] = 1 - probs[:, 1]
return probs
else:
raise ValueError(method,
'is not a valid probability conversion method') |