Spaces:
Running
Running
| """Classes of distance measure for model type A | |
| """ | |
| # Author: Yinchen Wu <yinchen@uchicago.edu> | |
| import numpy as np | |
| from arch import arch_model | |
| import math | |
| class Euclidean: | |
| """ The function class for Lp euclidean norm | |
| ---------- | |
| Power : int, optional (default=1) | |
| The power of the lp norm. For power = k, the measure is calculagted by |x - y|_k | |
| neighborhood : int, optional (default=max (100, 10*window size)) | |
| The length of neighborhood to derivete the normalizing constant D which is based on | |
| the difference of maximum and minimum in the neighborhood minus window. | |
| window: int, optional (default = length of input data) | |
| The length of the subsequence to be compaired | |
| Attributes | |
| ---------- | |
| decision_scores_ : numpy array of shape (n_samples,) | |
| The outlier scores of the training data. | |
| The higher, the more abnormal. Outliers tend to have higher | |
| scores. This value is available once the detector is | |
| fitted. | |
| detector: Object classifier | |
| the anomaly detector that is used | |
| """ | |
| def __init__(self, power = 1, neighborhood = 100, window = 20, norm = False): | |
| self.power = power | |
| self.window = window | |
| self.neighborhood = neighborhood | |
| self.detector = None | |
| self.decision_scores_ = [] | |
| self.norm = norm | |
| self.X_train = 2 | |
| def measure(self, X, Y, index): | |
| """Derive the decision score based on the given distance measure | |
| Parameters | |
| ---------- | |
| X : numpy array of shape (n_samples, ) | |
| The real input samples subsequence. | |
| Y : numpy array of shape (n_samples, ) | |
| The estimated input samples subsequence. | |
| Index : int | |
| the index of the starting point in the subsequence | |
| Returns | |
| ------- | |
| score : float | |
| dissimiarity score between the two subsquence | |
| """ | |
| X_train = self.X_train | |
| X_train = self.detector.X_train_ | |
| power = self.power | |
| window = self.window | |
| neighborhood = self.neighborhood | |
| norm = self.norm | |
| data = X_train | |
| if norm == False: | |
| if X.shape[0] == 0: | |
| score = 0 | |
| else: | |
| score = np.linalg.norm(X-Y, power)/(X.shape[0]) | |
| self.decision_scores_.append((index, score)) | |
| return score | |
| elif type(X_train) == int: | |
| print('Error! Detector is not fed to the object and X_train is not known') | |
| elif neighborhood != 'all': | |
| neighbor = int(self.neighborhood/2) | |
| if index + neighbor < self.n_train_ and index - neighbor > 0: | |
| region = np.concatenate((data[index - neighbor: index], data[index + window: index + neighbor] )) | |
| D = np.max(region) - np.min(region) | |
| elif index + neighbor >= self.n_train_ and index + window < self.n_train_: | |
| region = np.concatenate((data[self.n_train_ - neighborhood: index], data[index + window: self.n_train_] )) | |
| D = np.max(region) - np.min(region) | |
| elif index + window >= self.n_train_: | |
| region = data[self.n_train_ - neighborhood: index] | |
| D = np.max(region) - np.min(region) | |
| else: | |
| region = np.concatenate((data[0: index], data[index + window: index + neighborhood] )) | |
| D = np.max(region) - np.min(region) | |
| score = np.linalg.norm(X-Y, power)/D/(X.shape[0]**power) | |
| self.decision_scores_.append((index, score)) | |
| return score | |
| def set_param(self): | |
| if self.detector != None: | |
| self.window = self.detector.window | |
| self.neighborhood = self.detector.neighborhood | |
| self.n_train_ = self.detector.n_train_ | |
| self.X_train = self.detector.X_train_ | |
| else: | |
| print('Error! Detector is not fed to the object and X_train is not known') | |
| return self | |
| class Mahalanobis: | |
| """ The function class for Mahalanobis measure | |
| ---------- | |
| Probability : boolean, optional (default=False) | |
| Whether to derive the anomoly score by the probability that such point occurs | |
| neighborhood : int, optional (default=max (100, 10*window size)) | |
| The length of neighborhood to derivete the normalizing constant D which is based on | |
| the difference of maximum and minimum in the neighborhood minus window. | |
| Attributes | |
| ---------- | |
| decision_scores_ : numpy array of shape (n_samples,) | |
| The outlier scores of the training data. | |
| The higher, the more abnormal. Outliers tend to have higher | |
| scores. This value is available once the detector is | |
| fitted. | |
| detector: Object classifier | |
| the anomaly detector that is used | |
| """ | |
| def __init__(self, probability = False): | |
| self.probability = probability | |
| self.detector = None | |
| self.decision_scores_ = [] | |
| self.mu = 0 | |
| def set_param(self): | |
| '''update the parameters with the detector that is used | |
| ''' | |
| self.n_initial_ = self.detector.n_initial_ | |
| self.estimation = self.detector.estimation | |
| self.X_train = self.detector.X_train_ | |
| self.window = self.detector.window | |
| window = self.window | |
| resid = self.X_train - self.estimation | |
| number = max(100, self.window) | |
| self.residual = np.zeros((window, number)) | |
| for i in range(number): | |
| self.residual[:, i] = resid[self.n_initial_+i:self.n_initial_+i+window] | |
| self.mu = np.zeros(number) | |
| self.cov = np.cov(self.residual, rowvar=1) | |
| if self.window == 1: | |
| self.cov = (np.sum(np.square(self.residual))/(number - 1))**0.5 | |
| return self | |
| def norm_pdf_multivariate(self, x): | |
| '''multivarite normal density function | |
| ''' | |
| try: | |
| mu = self.mu | |
| except: | |
| mu = np.zeros(x.shape[0]) | |
| sigma = self.cov | |
| size = x.shape[0] | |
| if size == len(mu) and (size, size) == sigma.shape: | |
| det = np.linalg.det(sigma) | |
| if det == 0: | |
| raise NameError("The covariance matrix can't be singular") | |
| norm_const = 1.0/ ( math.pow((2*math.pi),float(size)/2) * math.pow(det,1.0/2) ) | |
| x_mu = np.matrix(x - mu) | |
| inv = np.linalg.inv(sigma) | |
| result = math.pow(math.e, -0.5 * (x_mu * inv * x_mu.T)) | |
| return norm_const * result | |
| else: | |
| raise NameError("The dimensions of the input don't match") | |
| def normpdf(self, x): | |
| '''univariate normal | |
| ''' | |
| mean = 0 | |
| sd = np.asscalar(self.cov) | |
| var = float(sd)**2 | |
| denom = (2*math.pi*var)**.5 | |
| num = math.exp(-(float(x)-float(mean))**2/(2*var)) | |
| return num/denom | |
| def measure(self, X, Y, index): | |
| """Derive the decision score based on the given distance measure | |
| Parameters | |
| ---------- | |
| X : numpy array of shape (n_samples, ) | |
| The real input samples subsequence. | |
| Y : numpy array of shape (n_samples, ) | |
| The estimated input samples subsequence. | |
| Index : int | |
| the index of the starting point in the subsequence | |
| Returns | |
| ------- | |
| score : float | |
| dissimiarity score between the two subsquence | |
| """ | |
| mu = np.zeros(self.detector.window) | |
| cov = self.cov | |
| if self.probability == False: | |
| if X.shape[0] == mu.shape[0]: | |
| score = np.matmul(np.matmul((X-Y-mu).T, cov), (X-Y-mu))/(X.shape[0]) | |
| self.decision_scores_.append((index, score)) | |
| return score | |
| else: | |
| return (X-Y).T.dot(X-Y) | |
| else: | |
| if len(X) > 1: | |
| prob = self.norm_pdf_multivariate(X-Y) | |
| elif len(X) == 1: | |
| X = np.asscalar(X) | |
| Y = np.asscalar(Y) | |
| prob = self.normpdf(X-Y) | |
| else: | |
| prob = 1 | |
| score = 1 - prob | |
| score = max(score, 0) | |
| self.decision_scores_.append((index, score)) | |
| return score | |
| class Garch: | |
| """ The function class for garch measure | |
| ---------- | |
| p, q : int, optional (default=1, 1) | |
| The order of the garch model to be fitted on the residual | |
| mean : string, optional (default='zero' ) | |
| The forecast conditional mean. | |
| vol: string, optional (default = 'garch') | |
| he forecast conditional variance. | |
| Attributes | |
| ---------- | |
| decision_scores_ : numpy array of shape (n_samples,) | |
| The outlier scores of the training data. | |
| The higher, the more abnormal. Outliers tend to have higher | |
| scores. This value is available once the detector is | |
| fitted. | |
| detector: Object classifier | |
| the anomaly detector that is used | |
| """ | |
| def __init__(self, p = 1, q = 1, mean = 'zero', vol = 'garch'): | |
| self.p = p | |
| self.q = q | |
| self.vol = vol | |
| self.mean = mean | |
| self.decision_scores_ = [] | |
| def set_param(self): | |
| '''update the parameters with the detector that is used | |
| ''' | |
| q = self.q | |
| p=self.p | |
| mean = self.mean | |
| vol = self.vol | |
| if self.detector != None: | |
| self.n_initial_ = self.detector.n_initial_ | |
| self.estimation = self.detector.estimation | |
| self.X_train = self.detector.X_train_ | |
| self.window = self.detector.window | |
| resid = 10 * (self.X_train - self.estimation) | |
| model = arch_model(resid, mean=mean, vol=vol, p=p, q=q) | |
| model_fit = model.fit(disp='off') | |
| self.votility = model_fit.conditional_volatility/10 | |
| else: | |
| print('Error! Detector not fed to the measure') | |
| return self | |
| def measure(self, X, Y, index): | |
| """Derive the decision score based on the given distance measure | |
| Parameters | |
| ---------- | |
| X : numpy array of shape (n_samples, ) | |
| The real input samples subsequence. | |
| Y : numpy array of shape (n_samples, ) | |
| The estimated input samples subsequence. | |
| Index : int | |
| the index of the starting point in the subsequence | |
| Returns | |
| ------- | |
| score : float | |
| dissimiarity score between the two subsquences | |
| """ | |
| X = np.array(X) | |
| Y = np.array(Y) | |
| length = len(X) | |
| score = 0 | |
| if length != 0: | |
| for i in range(length): | |
| sigma = self.votility[index + i] | |
| if sigma != 0: | |
| score += abs(X[i]-Y[i])/sigma | |
| score = score/length | |
| return score | |
| class SSA_DISTANCE: | |
| """ The function class for SSA measure | |
| good for contextual anomolies | |
| ---------- | |
| method : string, optional (default='linear' ) | |
| The method to fit the line and derives the SSA score | |
| e: float, optional (default = 1) | |
| The upper bound to start new line search for linear method | |
| Attributes | |
| ---------- | |
| decision_scores_ : numpy array of shape (n_samples,) | |
| The outlier scores of the training data. | |
| The higher, the more abnormal. Outliers tend to have higher | |
| scores. This value is available once the detector is | |
| fitted. | |
| detector: Object classifier | |
| the anomaly detector that is used | |
| """ | |
| def __init__(self, method ='linear', e = 1): | |
| self.method = method | |
| self.decision_scores_ = [] | |
| self.e = e | |
| def Linearization(self, X2): | |
| """Obtain the linearized curve. | |
| Parameters | |
| ---------- | |
| X2 : numpy array of shape (n, ) | |
| the time series curve to be fitted | |
| e: float, integer, or numpy array | |
| weights to obtain the | |
| Returns | |
| ------- | |
| fit: parameters for the fitted linear curve | |
| """ | |
| e = self.e | |
| i = 0 | |
| fit = {} | |
| fit['index'] = [] | |
| fit['rep'] = [] | |
| while i < len(X2): | |
| fit['index'].append(i) | |
| try: | |
| fit['Y'+str(i)]= X2[i] | |
| except: | |
| print(X2.shape, X2) | |
| fit['rep'].append(np.array([i, X2[i]])) | |
| if i+1 >= len(X2): | |
| break | |
| k = X2[i+1]-X2[i] | |
| b = -i*(X2[i+1]-X2[i])+X2[i] | |
| fit['reg' +str(i)]= np.array([k, b]) | |
| i += 2 | |
| if i >= len(X2): | |
| break | |
| d = np.abs(X2[i]- (k * i+b)) | |
| while d < e: | |
| i +=1 | |
| if i >= len(X2): | |
| break | |
| d = np.abs(X2[i]- (k * i+b)) | |
| return fit | |
| def set_param(self): | |
| '''update the parameters with the detector that is used. | |
| Since the SSA measure doens't need the attributes of detector | |
| or characteristics of X_train, the process is omitted. | |
| ''' | |
| return self | |
| def measure(self, X2, X3, start_index): | |
| """Obtain the SSA similarity score. | |
| Parameters | |
| ---------- | |
| X2 : numpy array of shape (n, ) | |
| the reference timeseries | |
| X3 : numpy array of shape (n, ) | |
| the tested timeseries | |
| e: float, integer, or numpy array | |
| weights to obtain the | |
| Returns | |
| ------- | |
| score: float, the higher the more dissimilar are the two curves | |
| """ | |
| #linearization of data X2 and X3 | |
| X2 = np.array(X2) | |
| X3 = np.array(X3) | |
| fit = self.Linearization(X2) | |
| fit2 = self.Linearization(X3) | |
| #line alinement | |
| Index = [] | |
| test_list = fit['index'] + fit2['index'] | |
| [Index.append(x) for x in test_list if x not in Index] | |
| Y = 0 | |
| #Similarity Computation | |
| for i in Index: | |
| if i in fit['index'] and i in fit2['index']: | |
| Y += abs(fit['Y'+str(i)]-fit2['Y'+str(i)]) | |
| elif i in fit['index']: | |
| J = np.max(np.where(np.array(fit2['index']) < i )) | |
| index = fit2['index'][J] | |
| k = fit2['reg'+str(index)][0] | |
| b = fit2['reg'+str(index)][1] | |
| value = abs(k * i + b - fit['Y'+str(i)]) | |
| Y += value | |
| elif i in fit2['index']: | |
| J = np.max(np.where(np.array(fit['index']) < i )) | |
| index = fit['index'][J] | |
| k = fit['reg'+str(index)][0] | |
| b = fit['reg'+str(index)][1] | |
| value = abs(k * i + b - fit2['Y'+str(i)]) | |
| Y += value | |
| if len(Index) != 0: | |
| score = Y/len(Index) | |
| else: | |
| score = 0 | |
| self.decision_scores_.append((start_index, score)) | |
| if len(X2) == 1: | |
| print('Error! SSA measure doesn\'t apply to singleton' ) | |
| else: | |
| return score | |
| class Fourier: | |
| """ The function class for Fourier measure | |
| good for contextual anomolies | |
| ---------- | |
| power: int, optional (default = 2) | |
| Lp norm for dissimiarlity measure considered | |
| Attributes | |
| ---------- | |
| decision_scores_ : numpy array of shape (n_samples,) | |
| The outlier scores of the training data. | |
| The higher, the more abnormal. Outliers tend to have higher | |
| scores. This value is available once the detector is | |
| fitted. | |
| detector: Object classifier | |
| the anomaly detector that is used | |
| """ | |
| def __init__(self, power = 2): | |
| self.decision_scores_ = [] | |
| self.power = power | |
| def set_param(self): | |
| '''update the parameters with the detector that is used | |
| since the FFT measure doens't need the attributes of detector | |
| or characteristics of X_train, the process is omitted. | |
| ''' | |
| return self | |
| def measure(self, X2, X3, start_index): | |
| """Obtain the SSA similarity score. | |
| Parameters | |
| ---------- | |
| X2 : numpy array of shape (n, ) | |
| the reference timeseries | |
| X3 : numpy array of shape (n, ) | |
| the tested timeseries | |
| index: int, | |
| current index for the subseqeuence that is being measured | |
| Returns | |
| ------- | |
| score: float, the higher the more dissimilar are the two curves | |
| """ | |
| power = self.power | |
| X2 = np.array(X2) | |
| X3 = np.array(X3) | |
| if len(X2) == 0: | |
| score = 0 | |
| else: | |
| X2 = np.fft.fft(X2) | |
| X3 = np.fft.fft(X3) | |
| score = np.linalg.norm(X2 - X3, ord = power)/len(X3) | |
| self.decision_scores_.append((start_index, score)) | |
| return score | |
| class DTW: | |
| """ The function class for dynamic time warping measure | |
| ---------- | |
| method : string, optional (default='L2' ) | |
| The distance measure to derive DTW. | |
| Avaliable "L2", "L1", and custom | |
| Attributes | |
| ---------- | |
| decision_scores_ : numpy array of shape (n_samples,) | |
| The outlier scores of the training data. | |
| The higher, the more abnormal. Outliers tend to have higher | |
| scores. This value is available once the detector is | |
| fitted. | |
| detector: Object classifier | |
| the anomaly detector that is used | |
| """ | |
| def __init__(self, method = 'L2'): | |
| self.decision_scores_ = [] | |
| if type(method) == str: | |
| if method == 'L1': | |
| distance = lambda x, y: abs(x-y) | |
| elif method == 'L2': | |
| distance = lambda x, y: (x-y)**2 | |
| else: | |
| distance = method | |
| self.distance = distance | |
| def set_param(self): | |
| '''update the parameters with the detector that is used | |
| since the FFT measure doens't need the attributes of detector | |
| or characteristics of X_train, the process is omitted. | |
| ''' | |
| return self | |
| def measure(self, X1, X2, start_index): | |
| """Obtain the SSA similarity score. | |
| Parameters | |
| ---------- | |
| X1 : numpy array of shape (n, ) | |
| the reference timeseries | |
| X2 : numpy array of shape (n, ) | |
| the tested timeseries | |
| index: int, | |
| current index for the subseqeuence that is being measured | |
| Returns | |
| ------- | |
| score: float, the higher the more dissimilar are the two curves | |
| """ | |
| distance = self.distance | |
| X1 = np.array(X1) | |
| X2 = np.array(X2) | |
| value = 1 | |
| if len(X1)==0: | |
| value =0 | |
| X1= np.zeros(5) | |
| X2 = X1 | |
| M = np.zeros((len(X1), len(X2))) | |
| for index_i in range(len(X1)): | |
| for index_j in range(len(X1) - index_i): | |
| L = [] | |
| i = index_i | |
| j = index_i + index_j | |
| D = distance(X1[i], X2[j]) | |
| try: | |
| L.append(M[i-1, j-1]) | |
| except: | |
| L.append(np.inf) | |
| try: | |
| L.append(M[i, j-1]) | |
| except: | |
| L.append(np.inf) | |
| try: | |
| L.append(M[i-1, j]) | |
| except: | |
| L.append(np.inf) | |
| D += min(L) | |
| M[i,j] = D | |
| if i !=j: | |
| L = [] | |
| j = index_i | |
| i = index_i + index_j | |
| D = distance(X1[i], X2[j]) | |
| try: | |
| L.append(M[i-1, j-1]) | |
| except: | |
| L.append(np.inf) | |
| try: | |
| L.append(M[i, j-1]) | |
| except: | |
| L.append(np.inf) | |
| try: | |
| L.append(M[i-1, j]) | |
| except: | |
| L.append(np.inf) | |
| D += min(L) | |
| M[i,j] = D | |
| score = M[len(X1)-1, len(X1)-1]/len(X1) | |
| if value == 0: | |
| score = 0 | |
| self.decision_scores_.append((start_index, score)) | |
| return score | |
| class EDRS: | |
| """ The function class for edit distance on real sequences | |
| ---------- | |
| method : string, optional (default='L2' ) | |
| The distance measure to derive DTW. | |
| Avaliable "L2", "L1", and custom | |
| ep: float, optiona (default = 0.1) | |
| the threshold value to decide Di_j | |
| vot : boolean, optional (default = False) | |
| whether to adapt a chaging votilities estimaed by garch | |
| for ep at different windows. | |
| Attributes | |
| ---------- | |
| decision_scores_ : numpy array of shape (n_samples,) | |
| The outlier scores of the training data. | |
| The higher, the more abnormal. Outliers tend to have higher | |
| scores. This value is available once the detector is | |
| fitted. | |
| detector: Object classifier | |
| the anomaly detector that is used | |
| """ | |
| def __init__(self, method = 'L1', ep = False, vol = False): | |
| self.decision_scores_ = [] | |
| if type(method) == str: | |
| if method == 'L1': | |
| distance = lambda x, y: abs(x-y) | |
| else: | |
| distance = method | |
| self.distance = distance | |
| self.ep = ep | |
| self.vot = vol | |
| def set_param(self): | |
| '''update the ep based on the votalitiy of the model | |
| ''' | |
| estimation = np.array(self.detector.estimation ) | |
| initial = self.detector.n_initial_ | |
| X = np.array(self.detector.X_train_) | |
| self.initial = initial | |
| residual = estimation[initial:] - X[initial:] | |
| # number = len(residual) | |
| # var = (np.sum(np.square(residual))/(number - 1))**0.5 | |
| vot = self.vot | |
| if vot == False: | |
| var = np.var(residual) | |
| else: | |
| model = arch_model(10 * residual, mean='Constant', vol='garch', p=1, q=1) | |
| model_fit = model.fit(disp='off') | |
| var = model_fit.conditional_volatility/10 | |
| if self.ep == False: | |
| self.ep = 3 * (np.sum(np.square(residual))/(len(residual) - 1))**0.5 | |
| else: | |
| self.ep = self.ep | |
| return self | |
| def measure(self, X1, X2, start_index): | |
| """Obtain the SSA similarity score. | |
| Parameters | |
| ---------- | |
| X1 : numpy array of shape (n, ) | |
| the reference timeseries | |
| X2 : numpy array of shape (n, ) | |
| the tested timeseries | |
| index: int, | |
| current index for the subseqeuence that is being measured | |
| Returns | |
| ------- | |
| score: float, the higher the more dissimilar are the two curves | |
| """ | |
| distance = self.distance | |
| X1 = np.array(X1) | |
| X2 = np.array(X2) | |
| vot = self.vot | |
| if vot == False: | |
| ep = self.ep | |
| else: | |
| try: | |
| ep = self.ep[start_index - self.initial] | |
| except: | |
| #sometime start_index is the length of the number | |
| ep = 0 | |
| value = 1 | |
| if len(X1)==0: | |
| value =0 | |
| X1= np.zeros(5) | |
| X2 = X1 | |
| M = np.zeros((len(X1), len(X2))) | |
| M[:, 0] = np.arange(len(X1)) | |
| M[0, :] = np.arange(len(X1)) | |
| for index_i in range(1, len(X1)): | |
| for index_j in range(len(X1) - index_i): | |
| L = [] | |
| i = index_i | |
| j = index_i + index_j | |
| D = distance(X1[i], X2[j]) | |
| if D < ep: | |
| M[i, j]= M[i-1, j-1] | |
| else: | |
| try: | |
| L.append(M[i-1, j-1]) | |
| except: | |
| L.append(np.inf) | |
| try: | |
| L.append(M[i, j-1]) | |
| except: | |
| L.append(np.inf) | |
| try: | |
| L.append(M[i-1, j]) | |
| except: | |
| L.append(np.inf) | |
| M[i,j] = 1 + min(L) | |
| if i !=j: | |
| L = [] | |
| j = index_i | |
| i = index_i + index_j | |
| D = distance(X1[i], X2[j]) | |
| if D < ep: | |
| M[i, j]= M[i-1, j-1] | |
| else: | |
| try: | |
| L.append(M[i-1, j-1]) | |
| except: | |
| L.append(np.inf) | |
| try: | |
| L.append(M[i, j-1]) | |
| except: | |
| L.append(np.inf) | |
| try: | |
| L.append(M[i-1, j]) | |
| except: | |
| L.append(np.inf) | |
| M[i,j] = 1 + min(L) | |
| score = M[len(X1)-1, len(X1)-1]/len(X1) | |
| if value == 0: | |
| score = 0 | |
| self.decision_scores_.append((start_index, score)) | |
| return score | |
| class TWED: | |
| """ Function class for Time-warped edit distance(TWED) measure | |
| ---------- | |
| method : string, optional (default='L2' ) | |
| The distance measure to derive DTW. | |
| Avaliable "L2", "L1", and custom | |
| gamma: float, optiona (default = 0.1) | |
| mismatch penalty | |
| v : float, optional (default = False) | |
| stifness parameter | |
| Attributes | |
| ---------- | |
| decision_scores_ : numpy array of shape (n_samples,) | |
| The outlier scores of the training data. | |
| The higher, the more abnormal. Outliers tend to have higher | |
| scores. This value is available once the detector is | |
| fitted. | |
| detector: Object classifier | |
| the anomaly detector that is used | |
| """ | |
| def __init__(self, gamma = 0.1, v = 0.1): | |
| self.decision_scores_ = [] | |
| self.gamma = gamma | |
| self.v = v | |
| def set_param(self): | |
| '''No need''' | |
| return self | |
| def measure(self, A, B, start_index): | |
| """Obtain the SSA similarity score. | |
| Parameters | |
| ---------- | |
| X1 : numpy array of shape (n, ) | |
| the reference timeseries | |
| X2 : numpy array of shape (n, ) | |
| the tested timeseries | |
| index: int, | |
| current index for the subseqeuence that is being measured | |
| Returns | |
| ------- | |
| score: float, the higher the more dissimilar are the two curves | |
| """ | |
| #code modifed from wikipedia | |
| Dlp = lambda x,y: abs(x-y) | |
| timeSB = np.arange(1,len(B)+1) | |
| timeSA = np.arange(1,len(A)+1) | |
| nu = self.v | |
| _lambda = self.gamma | |
| # Reference : | |
| # Marteau, P.; F. (2009). "Time Warp Edit Distance with Stiffness Adjustment for Time Series Matching". | |
| # IEEE Transactions on Pattern Analysis and Machine Intelligence. 31 (2): 306–318. arXiv:cs/0703033 | |
| # http://people.irisa.fr/Pierre-Francois.Marteau/ | |
| # Check if input arguments | |
| if len(A) != len(timeSA): | |
| print("The length of A is not equal length of timeSA") | |
| return None, None | |
| if len(B) != len(timeSB): | |
| print("The length of B is not equal length of timeSB") | |
| return None, None | |
| if nu < 0: | |
| print("nu is negative") | |
| return None, None | |
| # Add padding | |
| A = np.array([0] + list(A)) | |
| timeSA = np.array([0] + list(timeSA)) | |
| B = np.array([0] + list(B)) | |
| timeSB = np.array([0] + list(timeSB)) | |
| n = len(A) | |
| m = len(B) | |
| # Dynamical programming | |
| DP = np.zeros((n, m)) | |
| # Initialize DP Matrix and set first row and column to infinity | |
| DP[0, :] = np.inf | |
| DP[:, 0] = np.inf | |
| DP[0, 0] = 0 | |
| # Compute minimal cost | |
| for i in range(1, n): | |
| for j in range(1, m): | |
| # Calculate and save cost of various operations | |
| C = np.ones((3, 1)) * np.inf | |
| # Deletion in A | |
| C[0] = ( | |
| DP[i - 1, j] | |
| + Dlp(A[i - 1], A[i]) | |
| + nu * (timeSA[i] - timeSA[i - 1]) | |
| + _lambda | |
| ) | |
| # Deletion in B | |
| C[1] = ( | |
| DP[i, j - 1] | |
| + Dlp(B[j - 1], B[j]) | |
| + nu * (timeSB[j] - timeSB[j - 1]) | |
| + _lambda | |
| ) | |
| # Keep data points in both time series | |
| C[2] = ( | |
| DP[i - 1, j - 1] | |
| + Dlp(A[i], B[j]) | |
| + Dlp(A[i - 1], B[j - 1]) | |
| + nu * (abs(timeSA[i] - timeSB[j]) + abs(timeSA[i - 1] - timeSB[j - 1])) | |
| ) | |
| # Choose the operation with the minimal cost and update DP Matrix | |
| DP[i, j] = np.min(C) | |
| distance = DP[n - 1, m - 1] | |
| self.M = DP | |
| self.decision_scores_.append((start_index, distance)) | |
| return distance |