Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| # -*- coding: utf-8 -*- | |
| from itertools import groupby | |
| from operator import itemgetter | |
| import math | |
| import gzip | |
| import glob | |
| import os | |
| def convert_vector_to_events(vector = [0, 1, 1, 0, 0, 1, 0]): | |
| """ | |
| Convert a binary vector (indicating 1 for the anomalous instances) | |
| to a list of events. The events are considered as durations, | |
| i.e. setting 1 at index i corresponds to an anomalous interval [i, i+1). | |
| :param vector: a list of elements belonging to {0, 1} | |
| :return: a list of couples, each couple representing the start and stop of | |
| each event | |
| """ | |
| positive_indexes = [idx for idx, val in enumerate(vector) if val > 0] | |
| events = [] | |
| for k, g in groupby(enumerate(positive_indexes), lambda ix : ix[0] - ix[1]): | |
| cur_cut = list(map(itemgetter(1), g)) | |
| events.append((cur_cut[0], cur_cut[-1])) | |
| # Consistent conversion in case of range anomalies (for indexes): | |
| # A positive index i is considered as the interval [i, i+1), | |
| # so the last index should be moved by 1 | |
| events = [(x, y+1) for (x,y) in events] | |
| return(events) | |
| def infer_Trange(events_pred, events_gt): | |
| """ | |
| Given the list of events events_pred and events_gt, get the | |
| smallest possible Trange corresponding to the start and stop indexes | |
| of the whole series. | |
| Trange will not influence the measure of distances, but will impact the | |
| measures of probabilities. | |
| :param events_pred: a list of couples corresponding to predicted events | |
| :param events_gt: a list of couples corresponding to ground truth events | |
| :return: a couple corresponding to the smallest range containing the events | |
| """ | |
| if len(events_gt) == 0: | |
| raise ValueError('The gt events should contain at least one event') | |
| if len(events_pred) == 0: | |
| # empty prediction, base Trange only on events_gt (which is non empty) | |
| return(infer_Trange(events_gt, events_gt)) | |
| min_pred = min([x[0] for x in events_pred]) | |
| min_gt = min([x[0] for x in events_gt]) | |
| max_pred = max([x[1] for x in events_pred]) | |
| max_gt = max([x[1] for x in events_gt]) | |
| Trange = (min(min_pred, min_gt), max(max_pred, max_gt)) | |
| return(Trange) | |
| def has_point_anomalies(events): | |
| """ | |
| Checking whether events contain point anomalies, i.e. | |
| events starting and stopping at the same time. | |
| :param events: a list of couples corresponding to predicted events | |
| :return: True is the events have any point anomalies, False otherwise | |
| """ | |
| if len(events) == 0: | |
| return(False) | |
| return(min([x[1] - x[0] for x in events]) == 0) | |
| def _sum_wo_nan(vec): | |
| """ | |
| Sum of elements, ignoring math.isnan ones | |
| :param vec: vector of floating numbers | |
| :return: sum of the elements, ignoring math.isnan ones | |
| """ | |
| vec_wo_nan = [e for e in vec if not math.isnan(e)] | |
| return(sum(vec_wo_nan)) | |
| def _len_wo_nan(vec): | |
| """ | |
| Count of elements, ignoring math.isnan ones | |
| :param vec: vector of floating numbers | |
| :return: count of the elements, ignoring math.isnan ones | |
| """ | |
| vec_wo_nan = [e for e in vec if not math.isnan(e)] | |
| return(len(vec_wo_nan)) | |
| def read_gz_data(filename = 'data/machinetemp_groundtruth.gz'): | |
| """ | |
| Load a file compressed with gz, such that each line of the | |
| file is either 0 (representing a normal instance) or 1 (representing) | |
| an anomalous instance. | |
| :param filename: file path to the gz compressed file | |
| :return: list of integers with either 0 or 1 | |
| """ | |
| with gzip.open(filename, 'rb') as f: | |
| content = f.read().splitlines() | |
| content = [int(x) for x in content] | |
| return(content) | |
| def read_all_as_events(): | |
| """ | |
| Load the files contained in the folder `data/` and convert | |
| to events. The length of the series is kept. | |
| The convention for the file name is: `dataset_algorithm.gz` | |
| :return: two dictionaries: | |
| - the first containing the list of events for each dataset and algorithm, | |
| - the second containing the range of the series for each dataset | |
| """ | |
| filepaths = glob.glob('data/*.gz') | |
| datasets = dict() | |
| Tranges = dict() | |
| for filepath in filepaths: | |
| vector = read_gz_data(filepath) | |
| events = convert_vector_to_events(vector) | |
| # ad hoc cut for those files | |
| cut_filepath = (os.path.split(filepath)[1]).split('_') | |
| data_name = cut_filepath[0] | |
| algo_name = (cut_filepath[1]).split('.')[0] | |
| if not data_name in datasets: | |
| datasets[data_name] = dict() | |
| Tranges[data_name] = (0, len(vector)) | |
| datasets[data_name][algo_name] = events | |
| return(datasets, Tranges) | |
| def f1_func(p, r): | |
| """ | |
| Compute the f1 function | |
| :param p: precision numeric value | |
| :param r: recall numeric value | |
| :return: f1 numeric value | |
| """ | |
| return(2*p*r/(p+r)) | |