File size: 10,503 Bytes
d03866e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
"""Polynomial Anomoly Detector with GARCH method and raw error method
"""
# Author: Yinchen Wu <yinchen@uchicago.edu>

import numpy as np
import math
from scipy.special import erf
from sklearn.preprocessing import MinMaxScaler
from sklearn.utils.validation import check_is_fitted
from .base import BaseDetector
from .distance import Fourier
from ..utils.utility import zscore


class POLY(BaseDetector):
    """An elementary method to detect pointwise anomolies using polynomial approxiamtion. 
    A polynomial of certain degree and window size is fitted to the given timeseries dataset.
    A GARCH method is ran on the difference betweeen the approximation and the true value of 
    the dataset to estimate the volatitilies of each point. A detector score is derived on each 
    point based on the estimated volatitilies and residual to measure the normality of each point.
    An alternative method that only considers absoulte difference is also used. 
    Parameters
    ----------
    Power : int, optional (default=1)
        The power of polynomial fitted to the data
    neighborhood : int, optional (default=max (100, 10*window size))
        The number of samples to fit for one subsequence. Since the timeseries may vary, 
        to caculate the score for the subsequnece (a, a+k) of samples k, we only fit the
        polynomal on its neighborhood.  
    window: int, optional (default = 20)
        The length of the window to detect the given anomolies 
    contamination : float in (0., 0.55), optional (default=0.1)
        The amount of contamination of the data set, i.e. the proportion
        of outliers in the data set. Used when fitting to define the threshold
        on the decision function.

    Attributes
    ----------
    estimators_ : dictionary of coefficients at each polynomial
        The collection of fitted sub-estimators.
    decision_scores_ : numpy array of shape (n_samples,)
        The outlier scores of the training data.
        The higher, the more abnormal. Outliers tend to have higher
        scores. This value is available once the detector is
        fitted.
    threshold_ : float
        The threshold is based on ``contamination``. It is the
        ``n_samples * contamination`` most abnormal samples in
        ``decision_scores_``. The threshold is calculated for generating
        binary outlier labels.
    labels_ : int, either 0 or 1
        The binary labels of the training data. 0 stands for inliers
        and 1 for outliers/anomalies. It is generated by applying
        ``threshold_`` on ``decision_scores_``.
    """
    def __init__(self, power = 1, window = 200, neighborhood = None, contamination = 0.1, normalize=True):
        self.power = power
        self.window = window
        self.avg_window = None
        if neighborhood == None:
            self.neighborhood = max(10*window, 100)
        else:
            self.neighborhood = neighborhood
            
        self.contamination = contamination
        self.normalize = normalize
        self.model_name = 'POLY'

    def fit(self, X, y=None):
        """Fit detector. y is ignored in unsupervised methods.
        Parameters
        ----------
        X : numpy array of shape (n_samples, )
            The input samples.
        y : Ignored
            Not used, present for API consistency by convention.
        Returns
        -------
        self : object
            Fitted estimator.
        """
        if self.normalize: X = (X - X.min()) / (X.max() - X.min())

        X = X.squeeze()
        # validate inputs X and y (optional)
        self._set_n_classes(y)

        self.X_train_ = X
        self.n_train_ = len(X)
        self.decision_scores_ = np.zeros([self.n_train_, 1])
        
        self.n_initial_ = min(500, int(0.1 * self.n_train_))
        self.X_initial_ = X[:self.n_initial_]                         

        window = self.window
        
        if self.avg_window != None:
            self.neighborhood = max(self.neighborhood, 6*self.avg_window) 
            
        if self.neighborhood > len(X):
            self.neighborhood = len(X)
        
        neighborhood = self.neighborhood
                                          
        N = math.floor(self.n_train_ / window)
        M = math.ceil(self.n_initial_ / window)
        data = self.X_train_
        power=self.power
        fit = {}

        for i in range(M, N+1):

            index = int(i * window)    
            neighbor = int(neighborhood/2)

            if index + neighbor < self.n_train_ and index - neighbor > 0: 

                x = np.concatenate((np.arange(index - neighbor, index), np.arange(index + window, index + neighbor)))
                y = np.concatenate((data[index - neighbor: index], data[index + window: index + neighbor] ))
                mymodel = np.poly1d(np.polyfit(x, y, power))
                fit['model' + str(index)] = mymodel
            elif index + neighbor >= self.n_train_ and index + window < self.n_train_:
                x = np.concatenate((np.arange(self.n_train_ - neighborhood, index), np.arange(index + window, self.n_train_)))
                y = np.concatenate((data[self.n_train_ - neighborhood: index], data[index + window: self.n_train_] ))
                mymodel = np.poly1d(np.polyfit(x, y, power))
                fit['model' + str(index)] = mymodel     
            elif index + window >= self.n_train_:
                x = np.arange(self.n_train_ - neighborhood, index)
                y = data[self.n_train_ - neighborhood: index]
                mymodel = np.poly1d(np.polyfit(x, y, power))
                fit['model' + str(index)] = mymodel     
            elif index + window < neighborhood:
                x = np.concatenate((np.arange(0, index), np.arange(index + window, neighborhood)))
                y = np.concatenate((data[0: index], data[index + window: neighborhood] ))
                try:
                    mymodel = np.poly1d(np.polyfit(x, y, power))
                except:
                    x = np.concatenate((np.arange(0, index), np.arange(len(data[index + window: neighborhood]))))
                    mymodel = np.poly1d(np.polyfit(x, y, power))

                fit['model' + str(index)] = mymodel
            else:
                x = np.concatenate((np.arange(index - neighbor, index), np.arange(index + window, index + neighbor)))
                y = np.concatenate((data[index - neighbor: index], data[index + window: index + neighbor] ))
                print(data.shape)
                print(x.shape)
                print(y.shape)
                mymodel = np.poly1d(np.polyfit(x, y, power))
                fit['model' + str(index)] = mymodel
    
        Y = np.zeros(self.n_train_ )
        for i in range(M, N):
            myline = np.linspace(window *i, window * (i+1), window)
            Y[window *i: window * (i+1)] = fit['model' + str(i * window)](myline)
        if self.n_train_ % N != 0:
            x = np.arange(N*window, self.n_train_ )
            Y[N*window:] = fit['model'+str(N * window)](x)  
        self.estimation = Y
        self.estimator = fit

        measure = Fourier()
        measure.detector = self
        measure.set_param()
        self.decision_scores_ = self.decision_function(X, measure=measure)
        return self
        
        
    
    def decision_function(self, X= False, measure = None):
        """Derive the decision score based on the given distance measure
        Parameters
        ----------
        X : numpy array of shape (n_samples, )
            The input samples.
        measure : object
            object for given distance measure with methods to derive the score
        Returns
        -------
        self : object
            Fitted estimator.
        """
        if type(X) != bool:
            self.X_train_ = X
        estimation = self.estimation
        window = self.window
        n_train_ = self.n_train_
        score = np.zeros(n_train_)
        N = math.floor((self.n_train_) / window)
        M = math.ceil(self.n_initial_ / window)

        for i in range(M, N+1):
            index = i * window
            if i == N:
                end = self.n_train_
            else:
                end = index + window
            score[index: index+window] = measure.measure(self.X_train_[index: end], estimation[index: end], index)
        self._mu = np.mean(self.decision_scores_)
        self._sigma = np.std(self.decision_scores_)
        return score

    def predict_proba(self, X, method='linear', measure = None):
        """Predict the probability of a sample being outlier. Two approaches
        are possible:
        1. simply use Min-max conversion to linearly transform the outlier
           scores into the range of [0,1]. The model must be
           fitted first.
        2. use unifying scores, see :cite:`kriegel2011interpreting`.
        Parameters
        ----------
        X : numpy array of shape (n_samples, n_features)
            The input samples.
        method : str, optional (default='linear')
            probability conversion method. It must be one of
            'linear' or 'unify'.
        Returns
        -------
        outlier_probability : numpy array of shape (n_samples,)
            For each observation, tells whether or not
            it should be considered as an outlier according to the
            fitted model. Return the outlier probability, ranging
            in [0,1].
        """

        check_is_fitted(self, ['decision_scores_', 'threshold_', 'labels_'])
        train_scores = self.decision_scores_

        self.fit(X)
        self.decision_function(measure = measure)
        test_scores = self.decision_scores_

        probs = np.zeros([X.shape[0], int(self._classes)])
        if method == 'linear':
            scaler = MinMaxScaler().fit(train_scores.reshape(-1, 1))
            probs[:, 1] = scaler.transform(
                test_scores.reshape(-1, 1)).ravel().clip(0, 1)
            probs[:, 0] = 1 - probs[:, 1]
            return probs

        elif method == 'unify':
            # turn output into probability
            pre_erf_score = (test_scores - self._mu) / (
                    self._sigma * np.sqrt(2))
            erf_score = erf(pre_erf_score)
            probs[:, 1] = erf_score.clip(0, 1).ravel()
            probs[:, 0] = 1 - probs[:, 1]
            return probs
        else:
            raise ValueError(method,
                             'is not a valid probability conversion method')