Spaces:
Running
Running
| """ | |
| This function is adapted from [TimeEval-algorithms] by [CodeLionX&wenig] | |
| Original source: [https://github.com/TimeEval/TimeEval-algorithms] | |
| """ | |
| from sklearn.base import BaseEstimator, OutlierMixin | |
| from sklearn.cluster import KMeans | |
| import numpy as np | |
| from numpy.lib.stride_tricks import sliding_window_view | |
| from ..utils.utility import zscore | |
| class KMeansAD(BaseEstimator, OutlierMixin): | |
| def __init__(self, k, window_size, stride, n_jobs=1, normalize=True): | |
| self.k = k | |
| self.window_size = window_size | |
| self.stride = stride | |
| self.model = KMeans(n_clusters=k) | |
| self.padding_length = 0 | |
| self.normalize = normalize | |
| def _preprocess_data(self, X: np.ndarray) -> np.ndarray: | |
| flat_shape = (X.shape[0] - (self.window_size - 1), -1) # in case we have a multivariate TS | |
| slides = sliding_window_view(X, window_shape=self.window_size, axis=0).reshape(flat_shape)[::self.stride, :] | |
| self.padding_length = X.shape[0] - (slides.shape[0] * self.stride + self.window_size - self.stride) | |
| print(f"Required padding_length={self.padding_length}") | |
| if self.normalize: slides = zscore(slides, axis=1, ddof=1) | |
| return slides | |
| def _custom_reverse_windowing(self, scores: np.ndarray) -> np.ndarray: | |
| print("Reversing window-based scores to point-based scores:") | |
| print(f"Before reverse-windowing: scores.shape={scores.shape}") | |
| # compute begin and end indices of windows | |
| begins = np.array([i * self.stride for i in range(scores.shape[0])]) | |
| ends = begins + self.window_size | |
| # prepare target array | |
| unwindowed_length = self.stride * (scores.shape[0] - 1) + self.window_size + self.padding_length | |
| mapped = np.full(unwindowed_length, fill_value=np.nan) | |
| # only iterate over window intersections | |
| indices = np.unique(np.r_[begins, ends]) | |
| for i, j in zip(indices[:-1], indices[1:]): | |
| window_indices = np.flatnonzero((begins <= i) & (j-1 < ends)) | |
| # print(i, j, window_indices) | |
| mapped[i:j] = np.nanmean(scores[window_indices]) | |
| # replace untouched indices with 0 (especially for the padding at the end) | |
| np.nan_to_num(mapped, copy=False) | |
| print(f"After reverse-windowing: scores.shape={mapped.shape}") | |
| return mapped | |
| def fit(self, X: np.ndarray, y=None, preprocess=True) -> 'KMeansAD': | |
| if preprocess: | |
| X = self._preprocess_data(X) | |
| self.model.fit(X) | |
| return self | |
| def predict(self, X: np.ndarray, preprocess=True) -> np.ndarray: | |
| if preprocess: | |
| X = self._preprocess_data(X) | |
| clusters = self.model.predict(X) | |
| diffs = np.linalg.norm(X - self.model.cluster_centers_[clusters], axis=1) | |
| return self._custom_reverse_windowing(diffs) | |
| def fit_predict(self, X, y=None) -> np.ndarray: | |
| X = self._preprocess_data(X) | |
| self.fit(X, y, preprocess=False) | |
| return self.predict(X, preprocess=False) |