File size: 3,871 Bytes
d03866e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
"""
This function is adapted from [TimeEval-algorithms] by [CodeLionX&wenig]
Original source: [https://github.com/TimeEval/TimeEval-algorithms]
"""

from __future__ import division
from __future__ import print_function

import numpy as np
from sklearn.decomposition import PCA
from typing import Optional

from .base import BaseDetector
from sklearn.utils.validation import check_is_fitted
from sklearn.utils.validation import check_array
from scipy.spatial.distance import cdist

class Robust_PCA:
    def __init__(self, D, mu=None, lmbda=None):
        self.D = D
        self.S = np.zeros(self.D.shape)
        err = np.inf

        if mu:
            self.mu = mu
        else:
            self.mu = np.prod(self.D.shape) / (4 * np.linalg.norm(self.D, ord=1))

        self.mu_inv = 1 / self.mu

        if lmbda:
            self.lmbda = lmbda
        else:
            self.lmbda = 1 / np.sqrt(np.max(self.D.shape))

    @staticmethod
    def frobenius_norm(M):
        return np.linalg.norm(M, ord='fro')

    @staticmethod
    def shrink(M, tau):
        return np.sign(M) * np.maximum((np.abs(M) - tau), np.zeros(M.shape))

    def svd_threshold(self, M, tau):
        U, S, V = np.linalg.svd(M, full_matrices=False)
        return np.dot(U, np.dot(np.diag(self.shrink(S, tau)), V))

    def fit(self, tol=None, max_iter=1000, iter_print=100):
        iter = 0
        err = np.Inf
        Sk = self.S
        Yk = self.Y
        Lk = np.zeros(self.D.shape)

        if tol:
            _tol = tol
        else:
            _tol = 1E-7 * self.frobenius_norm(self.D)

        #this loop implements the principal component pursuit (PCP) algorithm
        #located in the table on page 29 of https://arxiv.org/pdf/0912.3599.pdf
        while (err > _tol) and iter < max_iter:
            Lk = self.svd_threshold(
                self.D - Sk + self.mu_inv * Yk, self.mu_inv)                            #this line implements step 3
            Sk = self.shrink(
                self.D - Lk + (self.mu_inv * Yk), self.mu_inv * self.lmbda)             #this line implements step 4
            Yk = Yk + self.mu * (self.D - Lk - Sk)                                      #this line implements step 5
            err = self.frobenius_norm(self.D - Lk - Sk)
            iter += 1
            if (iter % iter_print) == 0 or iter == 1 or iter > max_iter or err <= _tol:
                print('iteration: {0}, error: {1}'.format(iter, err))

        self.L = Lk
        self.S = Sk
        return Lk, Sk
    
class RobustPCA(BaseDetector):
    def __init__(self, max_iter: int = 1000, n_components = None, zero_pruning = True):
        self.pca: Optional[PCA] = None
        self.max_iter = max_iter
        self.n_components = n_components
        self.zero_pruning = zero_pruning

    def fit(self, X, y=None):

        if self.zero_pruning:
            non_zero_columns = np.any(X != 0, axis=0)
            X = X[:, non_zero_columns]
        
        rpca = Robust_PCA(X)
        L, S = rpca.fit(max_iter=self.max_iter)
        self.detector_ = PCA(n_components=L.shape[1])
        self.detector_.fit(L)
        self.decision_scores_ = self.decision_function(L)
        return self

    # def decision_function(self, X):
    #     check_is_fitted(self, ['detector_'])
    #     X_transformed = self.detector_.transform(X)  # Transform the data into the PCA space
    #     X_reconstructed = self.detector_.inverse_transform(X_transformed)  # Reconstruct the data from the PCA space
    #     anomaly_scores = np.linalg.norm(X - X_reconstructed, axis=1)  # Compute the Euclidean norm between original and reconstructed data
    #     return anomaly_scores

    def decision_function(self, X):
        assert self.detector_, "Please train PCA before running the detection!"

        L = self.detector_.transform(X)
        S = np.absolute(X - L)
        return S.sum(axis=1)