""" This function is adapted from [TimeEval-algorithms] by [CodeLionX&wenig] Original source: [https://github.com/TimeEval/TimeEval-algorithms] """ import numpy as np from dataclasses import dataclass from TSB_AD.models.base import BaseDetector from TSB_AD.utils.utility import zscore class FFT(BaseDetector): def __init__(self, ifft_parameters=5, local_neighbor_window=21, local_outlier_threshold=0.6, max_region_size=50, max_sign_change_distance=10, normalize=True): super().__init__() self.ifft_parameters = ifft_parameters self.local_neighbor_window = local_neighbor_window self.local_outlier_threshold = local_outlier_threshold self.max_region_size = max_region_size self.max_sign_change_distance = max_sign_change_distance self.normalize = normalize self.decision_scores_ = None def fit(self, X, y=None): """Fit detector. y is ignored in unsupervised methods.""" n_samples, n_features = X.shape if self.normalize: if n_features == 1: X = zscore(X, axis=0, ddof=0) else: X = zscore(X, axis=1, ddof=1) self.data = X self.decision_scores_ = self.detect_anomalies() return self def decision_function(self, X): """Predict raw anomaly score of X using the fitted detector.""" n_samples, n_features = X.shape decision_scores_ = np.zeros(n_samples) self.data = X local_outliers = self.calculate_local_outliers() if not local_outliers: print("No local outliers detected.") return np.zeros_like(self.data) regions = self.calculate_region_outliers(local_outliers) anomaly_scores = np.zeros_like(self.data) for region in regions: start_index = local_outliers[region.start_idx].index end_index = local_outliers[region.end_idx].index anomaly_scores[start_index:end_index + 1] = region.score decision_scores_ = anomaly_scores return decision_scores_ @staticmethod def reduce_parameters(f: np.ndarray, k: int) -> np.ndarray: transformed = f.copy() transformed[k:] = 0 return transformed def calculate_local_outliers(self): n = len(self.data) k = max(min(self.ifft_parameters, n), 1) y = self.reduce_parameters(np.fft.fft(self.data), k) f2 = np.real(np.fft.ifft(y)) so = np.abs(f2 - self.data) mso = np.mean(so) neighbor_c = self.local_neighbor_window // 2 scores = [] score_idxs = [] for i in range(n): if so[i] > mso: nav = np.mean(self.data[max(i - neighbor_c, 0):min(i + neighbor_c + 1, n)]) scores.append(self.data[i] - nav) score_idxs.append(i) if not scores: return [] ms = np.mean(scores) sds = np.std(scores) + 1e-6 z_scores = (np.array(scores) - ms) / sds return [self.LocalOutlier(index=score_idxs[i], z_score=z_scores[i]) for i in range(len(scores)) if abs(z_scores[i]) > self.local_outlier_threshold] def calculate_region_outliers(self, local_outliers): def distance(a: int, b: int) -> int: return abs(local_outliers[b].index - local_outliers[a].index) regions = [] i = 0 n_l = len(local_outliers) - 1 while i < n_l: start_idx = i while i < n_l and distance(i, i + 1) <= self.max_sign_change_distance: i += 1 end_idx = i if end_idx > start_idx: score = np.mean([abs(local_outliers[j].z_score) for j in range(start_idx, end_idx + 1)]) regions.append(self.RegionOutlier(start_idx=start_idx, end_idx=end_idx, score=score)) i += 1 return regions @dataclass class LocalOutlier: index: int z_score: float @property def sign(self) -> int: return np.sign(self.z_score) @dataclass class RegionOutlier: start_idx: int end_idx: int score: float def detect_anomalies(self): """Detect anomalies by combining local and regional outliers.""" local_outliers = self.calculate_local_outliers() if not local_outliers: print("No local outliers detected.") return np.zeros_like(self.data) regions = self.calculate_region_outliers(local_outliers) anomaly_scores = np.zeros_like(self.data) for region in regions: start_index = local_outliers[region.start_idx].index end_index = local_outliers[region.end_idx].index anomaly_scores[start_index:end_index + 1] = region.score return anomaly_scores