Time_RCD / models /FFT.py
Oliver Le
Initial commit
d03866e
"""
This function is adapted from [TimeEval-algorithms] by [CodeLionX&wenig]
Original source: [https://github.com/TimeEval/TimeEval-algorithms]
"""
import numpy as np
from dataclasses import dataclass
from TSB_AD.models.base import BaseDetector
from TSB_AD.utils.utility import zscore
class FFT(BaseDetector):
def __init__(self, ifft_parameters=5, local_neighbor_window=21, local_outlier_threshold=0.6, max_region_size=50, max_sign_change_distance=10, normalize=True):
super().__init__()
self.ifft_parameters = ifft_parameters
self.local_neighbor_window = local_neighbor_window
self.local_outlier_threshold = local_outlier_threshold
self.max_region_size = max_region_size
self.max_sign_change_distance = max_sign_change_distance
self.normalize = normalize
self.decision_scores_ = None
def fit(self, X, y=None):
"""Fit detector. y is ignored in unsupervised methods."""
n_samples, n_features = X.shape
if self.normalize:
if n_features == 1:
X = zscore(X, axis=0, ddof=0)
else:
X = zscore(X, axis=1, ddof=1)
self.data = X
self.decision_scores_ = self.detect_anomalies()
return self
def decision_function(self, X):
"""Predict raw anomaly score of X using the fitted detector."""
n_samples, n_features = X.shape
decision_scores_ = np.zeros(n_samples)
self.data = X
local_outliers = self.calculate_local_outliers()
if not local_outliers:
print("No local outliers detected.")
return np.zeros_like(self.data)
regions = self.calculate_region_outliers(local_outliers)
anomaly_scores = np.zeros_like(self.data)
for region in regions:
start_index = local_outliers[region.start_idx].index
end_index = local_outliers[region.end_idx].index
anomaly_scores[start_index:end_index + 1] = region.score
decision_scores_ = anomaly_scores
return decision_scores_
@staticmethod
def reduce_parameters(f: np.ndarray, k: int) -> np.ndarray:
transformed = f.copy()
transformed[k:] = 0
return transformed
def calculate_local_outliers(self):
n = len(self.data)
k = max(min(self.ifft_parameters, n), 1)
y = self.reduce_parameters(np.fft.fft(self.data), k)
f2 = np.real(np.fft.ifft(y))
so = np.abs(f2 - self.data)
mso = np.mean(so)
neighbor_c = self.local_neighbor_window // 2
scores = []
score_idxs = []
for i in range(n):
if so[i] > mso:
nav = np.mean(self.data[max(i - neighbor_c, 0):min(i + neighbor_c + 1, n)])
scores.append(self.data[i] - nav)
score_idxs.append(i)
if not scores:
return []
ms = np.mean(scores)
sds = np.std(scores) + 1e-6
z_scores = (np.array(scores) - ms) / sds
return [self.LocalOutlier(index=score_idxs[i], z_score=z_scores[i])
for i in range(len(scores)) if abs(z_scores[i]) > self.local_outlier_threshold]
def calculate_region_outliers(self, local_outliers):
def distance(a: int, b: int) -> int:
return abs(local_outliers[b].index - local_outliers[a].index)
regions = []
i = 0
n_l = len(local_outliers) - 1
while i < n_l:
start_idx = i
while i < n_l and distance(i, i + 1) <= self.max_sign_change_distance:
i += 1
end_idx = i
if end_idx > start_idx:
score = np.mean([abs(local_outliers[j].z_score) for j in range(start_idx, end_idx + 1)])
regions.append(self.RegionOutlier(start_idx=start_idx, end_idx=end_idx, score=score))
i += 1
return regions
@dataclass
class LocalOutlier:
index: int
z_score: float
@property
def sign(self) -> int:
return np.sign(self.z_score)
@dataclass
class RegionOutlier:
start_idx: int
end_idx: int
score: float
def detect_anomalies(self):
"""Detect anomalies by combining local and regional outliers."""
local_outliers = self.calculate_local_outliers()
if not local_outliers:
print("No local outliers detected.")
return np.zeros_like(self.data)
regions = self.calculate_region_outliers(local_outliers)
anomaly_scores = np.zeros_like(self.data)
for region in regions:
start_index = local_outliers[region.start_idx].index
end_index = local_outliers[region.end_idx].index
anomaly_scores[start_index:end_index + 1] = region.score
return anomaly_scores