File size: 2,589 Bytes
d03866e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import numpy as np
import logging
import math
from stumpy import stumpi
from TSB_AD.models.base import BaseDetector
from TSB_AD.utils.utility import zscore

class Left_STAMPi(BaseDetector):

    def __init__(self, n_init_train=100, window_size=50, normalize=True):
        super().__init__()
        self.n_init_train = n_init_train
        self.window_size = window_size
        self.normalize = normalize

    def fit(self, X, y=None):
        """Fit detector. y is ignored in unsupervised methods.

        Parameters
        ----------
        X : numpy array of shape (n_samples, n_features)
            The input samples.

        y : Ignored
            Not used, present for API consistency by convention.

        Returns
        -------
        self : object
            Fitted estimator.
        """
        n_samples, n_features = X.shape
        if self.normalize: 
            X = zscore(X, axis=0, ddof=0)

        warmup = self.n_init_train
        ws = self.window_size

        if ws > warmup:
            logging.warning(f"WARN: window_size is larger than n_init_train. Adjusting to n_init_train={warmup}.")
            ws = warmup
        if ws < 3:
            logging.warning("WARN: window_size must be at least 3. Adjusting to 3.")
            ws = 3

        self.stream = stumpi(X[:warmup, 0], m=ws, egress=False)
        for point in X[warmup:, 0]:
            self.stream.update(point)
  
        self.decision_scores_ = self.stream.left_P_
        self.decision_scores_[:warmup] = 0  

        return self

    def decision_function(self, X):
        """Predict raw anomaly score of X using the fitted detector.

        Parameters
        ----------
        X : numpy array of shape (n_samples, n_features)
            The training input samples.

        Returns
        -------
        anomaly_scores : numpy array of shape (n_samples,)
            The anomaly score of the input samples.
        """
        n_samples = X.shape[0]
        padded_scores = self.pad_anomaly_scores(self.decision_scores_, n_samples, self.window_size)
        return padded_scores

    @staticmethod
    def pad_anomaly_scores(scores, n_samples, window_size):
        """
        Pads the anomaly scores to match the length of the input time series.
        Padding is symmetric, using the first and last values.
        """
        left_padding = [scores[0]] * math.ceil((window_size - 1) / 2)
        right_padding = [scores[-1]] * ((window_size - 1) // 2)
        padded_scores = np.array(left_padding + list(scores) + right_padding)

        return padded_scores[:n_samples]