File size: 4,849 Bytes
d03866e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
"""
This function is adapted from [TimeEval-algorithms] by [CodeLionX&wenig]
Original source: [https://github.com/TimeEval/TimeEval-algorithms]
"""

import numpy as np
from dataclasses import dataclass
from TSB_AD.models.base import BaseDetector
from TSB_AD.utils.utility import zscore

class FFT(BaseDetector):

    def __init__(self, ifft_parameters=5, local_neighbor_window=21, local_outlier_threshold=0.6, max_region_size=50, max_sign_change_distance=10, normalize=True):
        super().__init__()

        self.ifft_parameters = ifft_parameters
        self.local_neighbor_window = local_neighbor_window
        self.local_outlier_threshold = local_outlier_threshold
        self.max_region_size = max_region_size
        self.max_sign_change_distance = max_sign_change_distance
        self.normalize = normalize
        self.decision_scores_ = None

    def fit(self, X, y=None):
        """Fit detector. y is ignored in unsupervised methods."""
        n_samples, n_features = X.shape
        if self.normalize: 
            if n_features == 1:
                X = zscore(X, axis=0, ddof=0)
            else:
                X = zscore(X, axis=1, ddof=1)
        self.data = X
        self.decision_scores_ = self.detect_anomalies()  
        return self

    def decision_function(self, X):
        """Predict raw anomaly score of X using the fitted detector."""
        n_samples, n_features = X.shape
        decision_scores_ = np.zeros(n_samples)
        self.data = X
        local_outliers = self.calculate_local_outliers()
        if not local_outliers:
            print("No local outliers detected.")
            return np.zeros_like(self.data)

        regions = self.calculate_region_outliers(local_outliers)
        anomaly_scores = np.zeros_like(self.data)
        for region in regions:
            start_index = local_outliers[region.start_idx].index
            end_index = local_outliers[region.end_idx].index
            anomaly_scores[start_index:end_index + 1] = region.score

        decision_scores_ = anomaly_scores
        return decision_scores_

    @staticmethod
    def reduce_parameters(f: np.ndarray, k: int) -> np.ndarray:
        transformed = f.copy()
        transformed[k:] = 0
        return transformed

    def calculate_local_outliers(self):
        n = len(self.data)
        k = max(min(self.ifft_parameters, n), 1)
        y = self.reduce_parameters(np.fft.fft(self.data), k)
        f2 = np.real(np.fft.ifft(y))

        so = np.abs(f2 - self.data)
        mso = np.mean(so)
        neighbor_c = self.local_neighbor_window // 2

        scores = []
        score_idxs = []
        for i in range(n):
            if so[i] > mso:
                nav = np.mean(self.data[max(i - neighbor_c, 0):min(i + neighbor_c + 1, n)])
                scores.append(self.data[i] - nav)
                score_idxs.append(i)

        if not scores:
            return []

        ms = np.mean(scores)
        sds = np.std(scores) + 1e-6  
        z_scores = (np.array(scores) - ms) / sds

        return [self.LocalOutlier(index=score_idxs[i], z_score=z_scores[i])
                for i in range(len(scores)) if abs(z_scores[i]) > self.local_outlier_threshold]

    def calculate_region_outliers(self, local_outliers):
        def distance(a: int, b: int) -> int:
            return abs(local_outliers[b].index - local_outliers[a].index)

        regions = []
        i = 0
        n_l = len(local_outliers) - 1
        while i < n_l:
            start_idx = i
            while i < n_l and distance(i, i + 1) <= self.max_sign_change_distance:
                i += 1
            end_idx = i
            if end_idx > start_idx:
                score = np.mean([abs(local_outliers[j].z_score) for j in range(start_idx, end_idx + 1)])
                regions.append(self.RegionOutlier(start_idx=start_idx, end_idx=end_idx, score=score))
            i += 1

        return regions

    @dataclass
    class LocalOutlier:
        index: int
        z_score: float

        @property
        def sign(self) -> int:
            return np.sign(self.z_score)

    @dataclass
    class RegionOutlier:
        start_idx: int
        end_idx: int
        score: float

    def detect_anomalies(self):
        """Detect anomalies by combining local and regional outliers."""
        local_outliers = self.calculate_local_outliers()
        if not local_outliers:
            print("No local outliers detected.")
            return np.zeros_like(self.data)

        regions = self.calculate_region_outliers(local_outliers)
        anomaly_scores = np.zeros_like(self.data)
        for region in regions:
            start_index = local_outliers[region.start_idx].index
            end_index = local_outliers[region.end_idx].index
            anomaly_scores[start_index:end_index + 1] = region.score

        return anomaly_scores