Spaces:
Sleeping
Sleeping
File size: 7,307 Bytes
d03866e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 |
"""
This function is adapted from [pyod] by [yzhao062]
Original source: [https://github.com/yzhao062/pyod]
"""
from __future__ import division
from __future__ import print_function
import warnings
import numpy as np
from joblib import Parallel, delayed
from scipy.stats import skew as skew_sp
from sklearn.utils.validation import check_is_fitted
from sklearn.utils import check_array
from .base import BaseDetector
from ..utils.stat_models import column_ecdf
from ..utils.utility import _partition_estimators
from ..utils.utility import zscore
def skew(X, axis=0):
return np.nan_to_num(skew_sp(X, axis=axis))
def _parallel_ecdf(n_dims, X):
"""Private method to calculate ecdf in parallel.
Parameters
----------
n_dims : int
The number of dimensions of the current input matrix
X : numpy array
The subarray for building the ECDF
Returns
-------
U_l_mat : numpy array
ECDF subarray.
U_r_mat : numpy array
ECDF subarray.
"""
U_l_mat = np.zeros([X.shape[0], n_dims])
U_r_mat = np.zeros([X.shape[0], n_dims])
for i in range(n_dims):
U_l_mat[:, i: i + 1] = column_ecdf(X[:, i: i + 1])
U_r_mat[:, i: i + 1] = column_ecdf(X[:, i: i + 1] * -1)
return U_l_mat, U_r_mat
class COPOD(BaseDetector):
"""COPOD class for Copula Based Outlier Detector.
COPOD is a parameter-free, highly interpretable outlier detection algorithm
based on empirical copula models.
See :cite:`li2020copod` for details.
Parameters
----------
contamination : float in (0., 0.5), optional (default=0.1)
The amount of contamination of the data set, i.e.
the proportion of outliers in the data set. Used when fitting to
define the threshold on the decision function.
n_jobs : optional (default=1)
The number of jobs to run in parallel for both `fit` and
`predict`. If -1, then the number of jobs is set to the
number of cores.
Attributes
----------
decision_scores_ : numpy array of shape (n_samples,)
The outlier scores of the training data.
The higher, the more abnormal. Outliers tend to have higher
scores. This value is available once the detector is
fitted.
threshold_ : float
The threshold is based on ``contamination``. It is the
``n_samples * contamination`` most abnormal samples in
``decision_scores_``. The threshold is calculated for generating
binary outlier labels.
labels_ : int, either 0 or 1
The binary labels of the training data. 0 stands for inliers
and 1 for outliers/anomalies. It is generated by applying
``threshold_`` on ``decision_scores_``.
"""
def __init__(self, contamination=0.1, n_jobs=1, normalize=True):
super(COPOD, self).__init__(contamination=contamination)
#TODO: Make it parameterized for n_jobs
self.n_jobs = n_jobs
self.normalize = normalize
def fit(self, X, y=None):
"""Fit detector. y is ignored in unsupervised methods.
Parameters
----------
X : numpy array of shape (n_samples, n_features)
The input samples.
y : Ignored
Not used, present for API consistency by convention.
Returns
-------
self : object
Fitted estimator.
"""
X = check_array(X)
if self.normalize: X = zscore(X, axis=1, ddof=1)
self._set_n_classes(y)
self.decision_scores_ = self.decision_function(X)
self.X_train = X
self._process_decision_scores()
return self
def decision_function(self, X):
"""Predict raw anomaly score of X using the fitted detector.
For consistency, outliers are assigned with larger anomaly scores.
Parameters
----------
X : numpy array of shape (n_samples, n_features)
The training input samples. Sparse matrices are accepted only
if they are supported by the base estimator.
Returns
-------
anomaly_scores : numpy array of shape (n_samples,)
The anomaly score of the input samples.
"""
# use multi-thread execution
if self.n_jobs != 1:
return self._decision_function_parallel(X)
if hasattr(self, 'X_train'):
original_size = X.shape[0]
X = np.concatenate((self.X_train, X), axis=0)
self.U_l = -1 * np.log(column_ecdf(X))
self.U_r = -1 * np.log(column_ecdf(-X))
skewness = np.sign(skew(X, axis=0))
self.U_skew = self.U_l * -1 * np.sign(
skewness - 1) + self.U_r * np.sign(skewness + 1)
self.O = np.maximum(self.U_skew, np.add(self.U_l, self.U_r) / 2)
if hasattr(self, 'X_train'):
decision_scores_ = self.O.sum(axis=1)[-original_size:]
else:
decision_scores_ = self.O.sum(axis=1)
return decision_scores_.ravel()
def _decision_function_parallel(self, X):
"""Predict raw anomaly score of X using the fitted detector.
For consistency, outliers are assigned with larger anomaly scores.
Parameters
----------
X : numpy array of shape (n_samples, n_features)
The training input samples. Sparse matrices are accepted only
if they are supported by the base estimator.
Returns
-------
anomaly_scores : numpy array of shape (n_samples,)
The anomaly score of the input samples.
"""
if hasattr(self, 'X_train'):
original_size = X.shape[0]
X = np.concatenate((self.X_train, X), axis=0)
n_samples, n_features = X.shape[0], X.shape[1]
if n_features < 2:
raise ValueError(
'n_jobs should not be used on one dimensional dataset')
if n_features <= self.n_jobs:
self.n_jobs = n_features
warnings.warn("n_features <= n_jobs; setting them equal instead.")
n_jobs, n_dims_list, starts = _partition_estimators(n_features,
self.n_jobs)
all_results = Parallel(n_jobs=n_jobs, max_nbytes=None,
verbose=True)(
delayed(_parallel_ecdf)(
n_dims_list[i],
X[:, starts[i]:starts[i + 1]],
)
for i in range(n_jobs))
# recover the results
self.U_l = np.zeros([n_samples, n_features])
self.U_r = np.zeros([n_samples, n_features])
for i in range(n_jobs):
self.U_l[:, starts[i]:starts[i + 1]] = all_results[i][0]
self.U_r[:, starts[i]:starts[i + 1]] = all_results[i][1]
self.U_l = -1 * np.log(self.U_l)
self.U_r = -1 * np.log(self.U_r)
skewness = np.sign(skew(X, axis=0))
self.U_skew = self.U_l * -1 * np.sign(
skewness - 1) + self.U_r * np.sign(skewness + 1)
self.O = np.maximum(self.U_skew, np.add(self.U_l, self.U_r) / 2)
if hasattr(self, 'X_train'):
decision_scores_ = self.O.sum(axis=1)[-original_size:]
else:
decision_scores_ = self.O.sum(axis=1)
return decision_scores_.ravel() |