Spaces:
Sleeping
Sleeping
File size: 4,849 Bytes
d03866e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
"""
This function is adapted from [TimeEval-algorithms] by [CodeLionX&wenig]
Original source: [https://github.com/TimeEval/TimeEval-algorithms]
"""
import numpy as np
from dataclasses import dataclass
from TSB_AD.models.base import BaseDetector
from TSB_AD.utils.utility import zscore
class FFT(BaseDetector):
def __init__(self, ifft_parameters=5, local_neighbor_window=21, local_outlier_threshold=0.6, max_region_size=50, max_sign_change_distance=10, normalize=True):
super().__init__()
self.ifft_parameters = ifft_parameters
self.local_neighbor_window = local_neighbor_window
self.local_outlier_threshold = local_outlier_threshold
self.max_region_size = max_region_size
self.max_sign_change_distance = max_sign_change_distance
self.normalize = normalize
self.decision_scores_ = None
def fit(self, X, y=None):
"""Fit detector. y is ignored in unsupervised methods."""
n_samples, n_features = X.shape
if self.normalize:
if n_features == 1:
X = zscore(X, axis=0, ddof=0)
else:
X = zscore(X, axis=1, ddof=1)
self.data = X
self.decision_scores_ = self.detect_anomalies()
return self
def decision_function(self, X):
"""Predict raw anomaly score of X using the fitted detector."""
n_samples, n_features = X.shape
decision_scores_ = np.zeros(n_samples)
self.data = X
local_outliers = self.calculate_local_outliers()
if not local_outliers:
print("No local outliers detected.")
return np.zeros_like(self.data)
regions = self.calculate_region_outliers(local_outliers)
anomaly_scores = np.zeros_like(self.data)
for region in regions:
start_index = local_outliers[region.start_idx].index
end_index = local_outliers[region.end_idx].index
anomaly_scores[start_index:end_index + 1] = region.score
decision_scores_ = anomaly_scores
return decision_scores_
@staticmethod
def reduce_parameters(f: np.ndarray, k: int) -> np.ndarray:
transformed = f.copy()
transformed[k:] = 0
return transformed
def calculate_local_outliers(self):
n = len(self.data)
k = max(min(self.ifft_parameters, n), 1)
y = self.reduce_parameters(np.fft.fft(self.data), k)
f2 = np.real(np.fft.ifft(y))
so = np.abs(f2 - self.data)
mso = np.mean(so)
neighbor_c = self.local_neighbor_window // 2
scores = []
score_idxs = []
for i in range(n):
if so[i] > mso:
nav = np.mean(self.data[max(i - neighbor_c, 0):min(i + neighbor_c + 1, n)])
scores.append(self.data[i] - nav)
score_idxs.append(i)
if not scores:
return []
ms = np.mean(scores)
sds = np.std(scores) + 1e-6
z_scores = (np.array(scores) - ms) / sds
return [self.LocalOutlier(index=score_idxs[i], z_score=z_scores[i])
for i in range(len(scores)) if abs(z_scores[i]) > self.local_outlier_threshold]
def calculate_region_outliers(self, local_outliers):
def distance(a: int, b: int) -> int:
return abs(local_outliers[b].index - local_outliers[a].index)
regions = []
i = 0
n_l = len(local_outliers) - 1
while i < n_l:
start_idx = i
while i < n_l and distance(i, i + 1) <= self.max_sign_change_distance:
i += 1
end_idx = i
if end_idx > start_idx:
score = np.mean([abs(local_outliers[j].z_score) for j in range(start_idx, end_idx + 1)])
regions.append(self.RegionOutlier(start_idx=start_idx, end_idx=end_idx, score=score))
i += 1
return regions
@dataclass
class LocalOutlier:
index: int
z_score: float
@property
def sign(self) -> int:
return np.sign(self.z_score)
@dataclass
class RegionOutlier:
start_idx: int
end_idx: int
score: float
def detect_anomalies(self):
"""Detect anomalies by combining local and regional outliers."""
local_outliers = self.calculate_local_outliers()
if not local_outliers:
print("No local outliers detected.")
return np.zeros_like(self.data)
regions = self.calculate_region_outliers(local_outliers)
anomaly_scores = np.zeros_like(self.data)
for region in regions:
start_index = local_outliers[region.start_idx].index
end_index = local_outliers[region.end_idx].index
anomaly_scores[start_index:end_index + 1] = region.score
return anomaly_scores |