diff --git "a/evaluation/basic_metrics.py" "b/evaluation/basic_metrics.py" new file mode 100644--- /dev/null +++ "b/evaluation/basic_metrics.py" @@ -0,0 +1,2801 @@ +import torch +from tqdm import tqdm +from sklearn.metrics import precision_recall_fscore_support +from sklearn import metrics +import numpy as np +import math +import copy +import sklearn +from typing import Callable, Dict, Any, Tuple, Optional, List +from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed +from functools import partial +import time +import multiprocessing as mp +def generate_curve(label, score, slidingWindow, version='opt', thre=250): + if version =='opt_mem': + tpr_3d, fpr_3d, prec_3d, window_3d, avg_auc_3d, avg_ap_3d = basic_metricor().RangeAUC_volume_opt_mem(labels_original=label, score=score, windowSize=slidingWindow, thre=thre) + else: + tpr_3d, fpr_3d, prec_3d, window_3d, avg_auc_3d, avg_ap_3d = basic_metricor().RangeAUC_volume_opt(labels_original=label, score=score, windowSize=slidingWindow, thre=thre) + + + X = np.array(tpr_3d).reshape(1,-1).ravel() + X_ap = np.array(tpr_3d)[:,:-1].reshape(1,-1).ravel() + Y = np.array(fpr_3d).reshape(1,-1).ravel() + W = np.array(prec_3d).reshape(1,-1).ravel() + Z = np.repeat(window_3d, len(tpr_3d[0])) + Z_ap = np.repeat(window_3d, len(tpr_3d[0])-1) + + return Y, Z, X, X_ap, W, Z_ap,avg_auc_3d, avg_ap_3d + +def inverse_proportional_cardinality_fn(cardinality: int, gt_length: int) -> float: + r""" + Cardinality function that assigns an inversely proportional weight to predictions within a single ground-truth + window. + + This is the default cardinality function recommended in [Tatbul2018]_. + + .. note:: + This function leads to a metric that is not recall-consistent! Please see [Wagner2023]_ for more details. + + :param cardinality: Number of predicted windows that overlap the ground-truth window in question. + :param gt_length: Length of the ground-truth window (unused). + :return: The cardinality factor :math:`\frac{1}{\text{cardinality}}`. + + .. [Tatbul2018] N. Tatbul, T.J. Lee, S. Zdonik, M. Alam, J. Gottschlich. + Precision and recall for time series. Advances in neural information processing systems. 2018;31. + .. [Wagner2023] D. Wagner, T. Michels, F.C.F. Schulz, A. Nair, M. Rudolph, and M. Kloft. + TimeSeAD: Benchmarking Deep Multivariate Time-Series Anomaly Detection. + Transactions on Machine Learning Research (TMLR), (to appear) 2023. + """ + return 1 / max(1, cardinality) + +def constant_bias_fn(inputs: torch.Tensor) -> float: + r""" + Compute the overlap size for a constant bias function that assigns the same weight to all positions. + + This functions computes + + .. math:: + \omega(\text{inputs}) = \frac{1}{n} \sum_{i = 1}^{n} \text{inputs}_i, + + where :math:`n = \lvert \text{inputs} \rvert`. + + .. note:: + To improve the runtime of our algorithm, we calculate the overlap :math:`\omega` directly as part of the bias + function. + + :param inputs: A 1-D :class:`~torch.Tensor` containing the predictions inside a ground-truth window. + :return: The overlap :math:`\omega`. + """ + return torch.sum(inputs).item() / inputs.shape[0] + +def improved_cardinality_fn(cardinality: int, gt_length: int): + r""" + Recall-consistent cardinality function introduced by [Wagner2023]_ that assigns lower weight to ground-truth windows + that overlap with many predicted windows. + + This function computes + + .. math:: + \left(\frac{\text{gt_length} - 1}{\text{gt_length}}\right)^{\text{cardinality} - 1}. + + :param cardinality: Number of predicted windows that overlap the ground-truth window in question. + :param gt_length: Length of the ground-truth window. + :return: The cardinality factor. + """ + return ((gt_length - 1) / gt_length) ** (cardinality - 1) + +class basic_metricor(): + def __init__(self, a = 1, probability = True, bias = 'flat', ): + self.a = a + self.probability = probability + self.bias = bias + self.eps = 1e-15 + + def detect_model(self, model, label, contamination = 0.1, window = 100, is_A = False, is_threshold = True): + if is_threshold: + score = self.scale_threshold(model.decision_scores_, model._mu, model._sigma) + else: + score = self.scale_contamination(model.decision_scores_, contamination = contamination) + if is_A is False: + scoreX = np.zeros(len(score)+window) + scoreX[math.ceil(window/2): len(score)+window - math.floor(window/2)] = score + else: + scoreX = score + + self.score_=scoreX + L = self.metric(label, scoreX) + return L + + def w(self, AnomalyRange, p): + MyValue = 0 + MaxValue = 0 + start = AnomalyRange[0] + AnomalyLength = AnomalyRange[1] - AnomalyRange[0] + 1 + for i in range(start, start +AnomalyLength): + bi = self.b(i, AnomalyLength) + MaxValue += bi + if i in p: + MyValue += bi + return MyValue/MaxValue + + def Cardinality_factor(self, Anomolyrange, Prange): + score = 0 + start = Anomolyrange[0] + end = Anomolyrange[1] + for i in Prange: + if i[0] >= start and i[0] <= end: + score +=1 + elif start >= i[0] and start <= i[1]: + score += 1 + elif end >= i[0] and end <= i[1]: + score += 1 + elif start >= i[0] and end <= i[1]: + score += 1 + if score == 0: + return 0 + else: + return 1/score + + def b(self, i, length): + bias = self.bias + if bias == 'flat': + return 1 + elif bias == 'front-end bias': + return length - i + 1 + elif bias == 'back-end bias': + return i + else: + if i <= length/2: + return i + else: + return length - i + 1 + + def scale_threshold(self, score, score_mu, score_sigma): + return (score >= (score_mu + 3*score_sigma)).astype(int) + + def _adjust_predicts(self, score, label, threshold=None, pred=None, calc_latency=False): + """ + Calculate adjusted predict labels using given `score`, `threshold` (or given `pred`) and `label`. + + Args: + score (np.ndarray): The anomaly score + label (np.ndarray): The ground-truth label + threshold (float): The threshold of anomaly score. + A point is labeled as "anomaly" if its score is higher than the threshold. + pred (np.ndarray or None): if not None, adjust `pred` and ignore `score` and `threshold`, + calc_latency (bool): + + Returns: + np.ndarray: predict labels + """ + if len(score) != len(label): + raise ValueError("score and label must have the same length") + score = np.asarray(score) + label = np.asarray(label) + latency = 0 + if pred is None: + predict = score > threshold + else: + predict = copy.deepcopy(pred) + actual = label > 0.1 + anomaly_state = False + anomaly_count = 0 + for i in range(len(score)): + if actual[i] and predict[i] and not anomaly_state: + anomaly_state = True + anomaly_count += 1 + for j in range(i, 0, -1): + if not actual[j]: + break + else: + if not predict[j]: + predict[j] = True + latency += 1 + elif not actual[i]: + anomaly_state = False + if anomaly_state: + predict[i] = True + if calc_latency: + return predict, latency / (anomaly_count + 1e-4) + else: + return predict + + def adjustment(self, gt, pred): + adjusted_pred = np.array(pred) + anomaly_state = False + for i in range(len(gt)): + if gt[i] == 1 and adjusted_pred[i] == 1 and not anomaly_state: + anomaly_state = True + for j in range(i, 0, -1): + if gt[j] == 0: + break + else: + if adjusted_pred[j] == 0: + adjusted_pred[j] = 1 + for j in range(i, len(gt)): + if gt[j] == 0: + break + else: + if adjusted_pred[j] == 0: + adjusted_pred[j] = 1 + elif gt[i] == 0: + anomaly_state = False + if anomaly_state: + adjusted_pred[i] = 1 + return adjusted_pred + + def metric_new(self, label, score, preds, plot_ROC=False, alpha=0.2): + '''input: + Real labels and anomaly score in prediction + + output: + AUC, + Precision, + Recall, + F-score, + Range-precision, + Range-recall, + Range-Fscore, + Precison@k, + + k is chosen to be # of outliers in real labels + ''' + if np.sum(label) == 0: + print('All labels are 0. Label must have groud truth value for calculating AUC score.') + return None + + if np.isnan(score).any() or score is None: + print('Score must not be none.') + return None + + #area under curve + auc = metrics.roc_auc_score(label, score) + # plor ROC curve + if plot_ROC: + fpr, tpr, thresholds = metrics.roc_curve(label, score) + # display = metrics.RocCurveDisplay(fpr=fpr, tpr=tpr, roc_auc=auc) + # display.plot() + + #precision, recall, F + if preds is None: + preds = score > (np.mean(score)+3*np.std(score)) + Precision, Recall, F, Support = metrics.precision_recall_fscore_support(label, preds, zero_division=0) + precision = Precision[1] + recall = Recall[1] + f = F[1] + + #point-adjust + adjust_preds = self._adjust_predicts(score, label, pred=preds) + PointF1PA = metrics.f1_score(label, adjust_preds) + + #range anomaly + Rrecall, ExistenceReward, OverlapReward = self.range_recall_new(label, preds, alpha) + Rprecision = self.range_recall_new(preds, label, 0)[0] + + if Rprecision + Rrecall==0: + Rf=0 + else: + Rf = 2 * Rrecall * Rprecision / (Rprecision + Rrecall) + + # top-k + k = int(np.sum(label)) + threshold = np.percentile(score, 100 * (1-k/len(label))) + + # precision_at_k = metrics.top_k_accuracy_score(label, score, k) + p_at_k = np.where(preds > threshold)[0] + TP_at_k = sum(label[p_at_k]) + precision_at_k = TP_at_k/k + + L = [auc, precision, recall, f, PointF1PA, Rrecall, ExistenceReward, OverlapReward, Rprecision, Rf, precision_at_k] + if plot_ROC: + return L, fpr, tpr + return L + + def metric_ROC(self, label, score): + return metrics.roc_auc_score(label, score) + + def metric_PR(self, label, score): + return metrics.average_precision_score(label, score) + + def metric_PointF1(self, label, score, preds=None): + if preds is None: + precision, recall, thresholds = metrics.precision_recall_curve(label, score) + f1_scores = 2 * (precision * recall) / (precision + recall + 0.00001) + F1 = np.max(f1_scores) + threshold = thresholds[np.argmax(f1_scores)] + else: + Precision, Recall, F, Support = metrics.precision_recall_fscore_support(label, preds, zero_division=0) + F1 = F[1] + return F1 + + def metric_standard_F1(self, true_labels, anomaly_scores, threshold=None): + """ + Calculate F1, Precision, Recall, and other metrics for anomaly detection. + + Args: + anomaly_scores: np.ndarray, anomaly scores (continuous values) + true_labels: np.ndarray, ground truth binary labels (0=normal, 1=anomaly) + threshold: float, optional. If None, will use optimal threshold based on F1 score + + Returns: + dict: Dictionary containing various metrics + """ + # If no threshold provided, find optimal threshold + if threshold is None: + thresholds = np.linspace(0, 1, 1500) + best_f1 = 0 + best_threshold = 0 + + for t in tqdm(thresholds, total=len(thresholds), desc="Finding optimal threshold"): + threshold = np.quantile(anomaly_scores, t) + predictions = (anomaly_scores >= threshold).astype(int) + if len(np.unique(predictions)) > 1: # Avoid division by zero + precision, recall, f1, _ = precision_recall_fscore_support( + true_labels, predictions, average='binary', zero_division=0 + ) + # print(f1, t) + if f1 > best_f1: + best_f1 = f1 + best_threshold = threshold + threshold = best_threshold + # print("aaa", threshold, best_threshold, best_f1) + # Calculate predictions based on threshold + predictions = (anomaly_scores >= threshold).astype(int) + + # Calculate basic metrics + precision, recall, f1, _ = precision_recall_fscore_support( + true_labels, predictions, average='binary', zero_division=0 + ) + # print(threshold, f1) + return { + 'F1': f1, + 'Recall': recall, + 'Precision': precision, } + + + def metric_Affiliation(self, label, score, preds=None): + from .affiliation.generics import convert_vector_to_events + from .affiliation.metrics import pr_from_events + + # Ensure proper data types to avoid float/integer issues + label = np.asarray(label, dtype=int) + score = np.asarray(score, dtype=float) + + # Convert ground truth to events once, outside the loop + events_gt = convert_vector_to_events(label) + + if preds is None: + # print("Calculating afiliation metrics using score thresholds.") + p_values = np.linspace(0, 1, 1500) + # print(f"Using {thresholds} thresholds for affiliation metrics.") + Affiliation_scores = [] + Affiliation_Precision_scores = [] + Affiliation_Recall_scores = [] + # print("Score values", score) + + for p in tqdm(p_values, total=(len(p_values)), desc="Calculating Affiliation Metrics"): + threshold = np.quantile(score, p) + preds_loop = (score > threshold).astype(int) + + events_pred = convert_vector_to_events(preds_loop) + # events_gt is already calculated + Trange = (0, len(preds_loop)) + + affiliation_metrics = pr_from_events(events_pred, events_gt, Trange) + + Affiliation_Precision = affiliation_metrics['Affiliation_Precision'] + Affiliation_Recall = affiliation_metrics['Affiliation_Recall'] + # --- FIX 1: Prevent division by zero --- + denominator = Affiliation_Precision + Affiliation_Recall + if denominator > 0: + Affiliation_F = 2 * Affiliation_Precision * Affiliation_Recall / (denominator + self.eps) + else: + Affiliation_F = 0.0 + # # Use a local variable for the F1 score in the loop + # Affiliation_F = 2 * Affiliation_Precision * Affiliation_Recall / ( + # Affiliation_Precision + Affiliation_Recall + self.eps) + + Affiliation_scores.append(Affiliation_F) + Affiliation_Precision_scores.append(Affiliation_Precision) + Affiliation_Recall_scores.append(Affiliation_Recall) + + # Find the best scores after the loop + # print("Here are the Affiliation scores:", Affiliation_scores) + best_index = np.argmax(Affiliation_scores) + # print(f"Best Affiliation F1 score found at index {best_index} with value {Affiliation_scores[best_index]}") + Best_Affiliation_F1 = Affiliation_scores[best_index] + Best_Affiliation_Precision = Affiliation_Precision_scores[best_index] + Best_Affiliation_Recall = Affiliation_Recall_scores[best_index] + + else: + print("Using provided predictions for affiliation metrics.") + # This block runs when 'preds' is provided + events_pred = convert_vector_to_events(preds) + Trange = (0, len(preds)) + + affiliation_metrics = pr_from_events(events_pred, events_gt, Trange) + + # FIX: Assign the calculated values to the 'Best_' variables + # so they exist for the return statement. + Best_Affiliation_Precision = affiliation_metrics['Affiliation_Precision'] + Best_Affiliation_Recall = affiliation_metrics['Affiliation_Recall'] + Best_Affiliation_F1 = 2 * Best_Affiliation_Precision * Best_Affiliation_Recall / ( + Best_Affiliation_Precision + Best_Affiliation_Recall + self.eps) + + # FIX: Corrected the typo from Best_Affiliation_Rec to Best_Affiliation_Recall + return Best_Affiliation_F1, Best_Affiliation_Precision, Best_Affiliation_Recall + + def metric_RF1(self, label, score, preds=None): + + if preds is None: + q_values = np.linspace(0, 1, 1000) + Rf1_scores = [] + thresholds = [] + for q in tqdm(q_values, total=(len(q_values)), desc="Calculating RF1 Metrics"): + # Calculate prediction + threshold = np.quantile(score, q) + preds = (score > threshold).astype(int) + + Rrecall, ExistenceReward, OverlapReward = self.range_recall_new(label, preds, alpha=0.2) + Rprecision = self.range_recall_new(preds, label, 0)[0] + if Rprecision + Rrecall==0: + Rf=0 + else: + Rf = 2 * Rrecall * Rprecision / (Rprecision + Rrecall) + + Rf1_scores.append(Rf) + thresholds.append(threshold) + + RF1_Threshold = thresholds[np.argmax(Rf1_scores)] + RF1 = max(Rf1_scores) + else: + Rrecall, ExistenceReward, OverlapReward = self.range_recall_new(label, preds, alpha=0.2) + Rprecision = self.range_recall_new(preds, label, 0)[0] + if Rprecision + Rrecall==0: + RF1=0 + else: + RF1 = 2 * Rrecall * Rprecision / (Rprecision + Rrecall) + return RF1 + + # def metric_F1_T(self, labels: torch.Tensor, scores: torch.Tensor): + # """ + # Computes the F1 score for time series anomaly detection by finding the best threshold. + # + # Args: + # labels (torch.Tensor): Ground truth labels for the time series data. + # scores (torch.Tensor): Anomaly scores predicted by the model. + # + # Returns: + # Tuple[float, Dict[str, Any]]: The best F1 score and a dictionary with additional metrics. + # """ + # result = {} + # labels = torch.tensor(labels, dtype=torch.int) + # score = torch.tensor(scores, dtype=torch.float) + # f1, details = self.__best_ts_fbeta_score(labels, score, beta=1,) + # result['thre_T'] = details['threshold'] + # result['ACC_T'] = sklearn.metrics.accuracy_score(labels, score > details['threshold']) + # result['P_T'] = details['precision'] + # result['R_T'] = details['recall'] + # result['F1_T'] = f1 + # + # return result + + def metric_F1_T(self, labels: torch.Tensor, scores: torch.Tensor, use_parallel=True, + parallel_method='chunked', chunk_size=10, max_workers=8): + """ + Computes the F1 score with optional parallel processing. + + Args: + labels: Ground truth labels + scores: Anomaly scores + use_parallel: Whether to use parallel processing (default: True) + parallel_method: Type of parallel processing ('standard' or 'chunked') + chunk_size: Size of chunks for chunked parallel processing + max_workers: Maximum number of worker threads + """ + result = {} + labels = torch.tensor(labels, dtype=torch.int) + score = torch.tensor(scores, dtype=torch.float) + + # Choose which method to use + if use_parallel: + if parallel_method == 'chunked': + f1, details = self.__best_ts_fbeta_score_parallel_chunked( + labels, score, beta=1, chunk_size=chunk_size, max_workers=max_workers + ) + else: # standard parallel + f1, details = self.__best_ts_fbeta_score_parallel(labels, score, beta=1) + else: + f1, details = self.__best_ts_fbeta_score(labels, score, beta=1) + + result['thre_T'] = details['threshold'] + result['ACC_T'] = sklearn.metrics.accuracy_score(labels, score > details['threshold']) + result['P_T'] = details['precision'] + result['R_T'] = details['recall'] + result['F1_T'] = f1 + + return result + + def __best_ts_fbeta_score_parallel(self, labels: torch.Tensor, scores: torch.Tensor, beta: float, + recall_cardinality_fn: Callable = improved_cardinality_fn, + weighted_precision: bool = True, n_splits: int = 1500) -> Tuple[ + float, Dict[str, Any]]: + """ + Parallel version of best_ts_fbeta_score using ThreadPoolExecutor. + + Uses threading instead of multiprocessing to avoid serialization issues + with PyTorch tensors and instance methods. + """ + + # Use same parameter range as sequential version for consistency + device = scores.device + p_values = torch.linspace(0, 1.0, steps=n_splits, device=device) + thresholds = torch.quantile(scores, p_values) + + label_ranges = self.compute_window_indices(labels) + precision = torch.empty_like(thresholds, dtype=torch.float) + recall = torch.empty_like(thresholds, dtype=torch.float) + + def process_single_threshold(idx_threshold_pair): + """Process a single threshold computation""" + idx, threshold = idx_threshold_pair + + # Create predictions for this threshold + predictions = (scores > threshold).long() + + # Calculate precision and recall using instance method + prec, rec = self.ts_precision_and_recall( + labels, + predictions, + alpha=0, + recall_cardinality_fn=recall_cardinality_fn, + anomaly_ranges=label_ranges, + weighted_precision=weighted_precision, + ) + + # Handle edge case to avoid 0/0 in F-score computation + if prec == 0 and rec == 0: + rec = 1 + + return idx, prec, rec + + # Use ThreadPoolExecutor instead of ProcessPoolExecutor + # This allows us to use instance methods and share PyTorch tensors safely + max_workers = min(16, len(thresholds)) # Don't create more threads than thresholds + + with ThreadPoolExecutor(max_workers=max_workers) as executor: + # Submit all threshold computations + futures = { + executor.submit(process_single_threshold, (i, t)): i + for i, t in enumerate(thresholds) + } + + # Collect results as they complete + for future in tqdm(as_completed(futures), total=len(futures), + desc="Calculating F-beta score (parallel)"): + idx, prec, rec = future.result() + precision[idx] = prec + recall[idx] = rec + + # Compute F-scores and find the best one + f_score = (1 + beta ** 2) * precision * recall / (beta ** 2 * precision + recall) + max_score_index = torch.argmax(f_score) + + return ( + f_score[max_score_index].item(), + dict( + threshold=thresholds[max_score_index].item(), + precision=precision[max_score_index].item(), + recall=recall[max_score_index].item(), + ), + ) + + def __best_ts_fbeta_score_parallel_chunked(self, labels: torch.Tensor, scores: torch.Tensor, beta: float, + recall_cardinality_fn: Callable = improved_cardinality_fn, + weighted_precision: bool = True, n_splits: int = 1500, + chunk_size: int = 10, max_workers: int = 8) -> Tuple[float, Dict[str, Any]]: + """ + Chunked parallel version of best_ts_fbeta_score using ThreadPoolExecutor. + + This version processes thresholds in chunks to reduce overhead and improve efficiency. + + Args: + labels: Ground truth labels + scores: Anomaly scores + beta: Beta parameter for F-beta score + recall_cardinality_fn: Cardinality function for recall calculation + weighted_precision: Whether to use weighted precision + n_splits: Number of threshold splits + chunk_size: Number of thresholds to process in each chunk + max_workers: Maximum number of worker threads + """ + + # Use same parameter range as sequential version for consistency + device = scores.device + p_values = torch.linspace(0, 1.0, steps=n_splits, device=device) + thresholds = torch.quantile(scores, p_values) + + label_ranges = self.compute_window_indices(labels) + precision = torch.empty_like(thresholds, dtype=torch.float) + recall = torch.empty_like(thresholds, dtype=torch.float) + + def process_threshold_chunk(chunk_data): + """Process a chunk of thresholds""" + chunk_indices, chunk_thresholds = chunk_data + chunk_results = [] + + # Process each threshold in the chunk + for i, (idx, threshold) in enumerate(zip(chunk_indices, chunk_thresholds)): + # Create predictions for this threshold + predictions = (scores > threshold).long() + + # Calculate precision and recall using instance method + prec, rec = self.ts_precision_and_recall( + labels, + predictions, + alpha=0, + recall_cardinality_fn=recall_cardinality_fn, + anomaly_ranges=label_ranges, + weighted_precision=weighted_precision, + ) + + # Handle edge case to avoid 0/0 in F-score computation + if prec == 0 and rec == 0: + rec = 1 + + chunk_results.append((idx, prec, rec)) + + return chunk_results + + # Create chunks of threshold indices and values + chunks = [] + for i in range(0, len(thresholds), chunk_size): + end_idx = min(i + chunk_size, len(thresholds)) + chunk_indices = list(range(i, end_idx)) + chunk_thresholds = thresholds[i:end_idx] + chunks.append((chunk_indices, chunk_thresholds)) + + print(f"Processing {len(thresholds)} thresholds in {len(chunks)} chunks of size ~{chunk_size}") + + # Use ThreadPoolExecutor to process chunks in parallel + actual_workers = min(max_workers, len(chunks)) + + with ThreadPoolExecutor(max_workers=actual_workers) as executor: + # Submit all chunk computations + futures = { + executor.submit(process_threshold_chunk, chunk): i + for i, chunk in enumerate(chunks) + } + + # Collect results as they complete + for future in tqdm(as_completed(futures), total=len(futures), + desc=f"Processing {len(chunks)} chunks (chunked parallel)"): + chunk_results = future.result() + + # Store results in the appropriate positions + for idx, prec, rec in chunk_results: + precision[idx] = prec + recall[idx] = rec + + # Compute F-scores and find the best one + f_score = (1 + beta ** 2) * precision * recall / (beta ** 2 * precision + recall) + max_score_index = torch.argmax(f_score) + + return ( + f_score[max_score_index].item(), + dict( + threshold=thresholds[max_score_index].item(), + precision=precision[max_score_index].item(), + recall=recall[max_score_index].item(), + ), + ) + + def compute_window_indices(self, binary_labels: torch.Tensor) -> List[Tuple[int, int]]: + """ + Compute a list of indices where anomaly windows begin and end. + + :param binary_labels: A 1-D :class:`~torch.Tensor` containing ``1`` for an anomalous time step or ``0`` otherwise. + :return: A list of tuples ``(start, end)`` for each anomaly window in ``binary_labels``, where ``start`` is the + index at which the window starts and ``end`` is the first index after the end of the window. + """ + boundaries = torch.empty_like(binary_labels) + boundaries[0] = 0 + boundaries[1:] = binary_labels[:-1] + boundaries *= -1 + boundaries += binary_labels + # boundaries will be 1 where a window starts and -1 at the end of a window + + indices = torch.nonzero(boundaries, as_tuple=True)[0].tolist() + if len(indices) % 2 != 0: + # Add the last index as the end of a window if appropriate + indices.append(binary_labels.shape[0]) + indices = [(indices[i], indices[i + 1]) for i in range(0, len(indices), 2)] + + return indices + + def _compute_overlap(self, preds: torch.Tensor, pred_indices: List[Tuple[int, int]], + gt_indices: List[Tuple[int, int]], alpha: float, + bias_fn: Callable, cardinality_fn: Callable, + use_window_weight: bool = False) -> float: + n_gt_windows = len(gt_indices) + n_pred_windows = len(pred_indices) + total_score = 0.0 + total_gt_points = 0 + + i = j = 0 + while i < n_gt_windows and j < n_pred_windows: + gt_start, gt_end = gt_indices[i] + window_length = gt_end - gt_start + total_gt_points += window_length + i += 1 + + cardinality = 0 + while j < n_pred_windows and pred_indices[j][1] <= gt_start: + j += 1 + while j < n_pred_windows and pred_indices[j][0] < gt_end: + j += 1 + cardinality += 1 + + if cardinality == 0: + # cardinality == 0 means no overlap at all, hence no contribution + continue + + # The last predicted window that overlaps our current window could also overlap the next window. + # Therefore, we must consider it again in the next loop iteration. + j -= 1 + + cardinality_multiplier = cardinality_fn(cardinality, window_length) + + prediction_inside_ground_truth = preds[gt_start:gt_end] + # We calculate omega directly in the bias function, because this can greatly improve running time + # for the constant bias, for example. + omega = bias_fn(prediction_inside_ground_truth) + + # Either weight evenly across all windows or based on window length + weight = window_length if use_window_weight else 1 + + # Existence reward (if cardinality > 0 then this is certainly 1) + total_score += alpha * weight + # Overlap reward + total_score += (1 - alpha) * cardinality_multiplier * omega * weight + + denom = total_gt_points if use_window_weight else n_gt_windows + + return total_score / denom + + def ts_precision_and_recall(self, anomalies: torch.Tensor, predictions: torch.Tensor, alpha: float = 0, + recall_bias_fn: Callable[[torch.Tensor], float] = constant_bias_fn, + recall_cardinality_fn: Callable[[int], float] = inverse_proportional_cardinality_fn, + precision_bias_fn: Optional[Callable] = None, + precision_cardinality_fn: Optional[Callable] = None, + anomaly_ranges: Optional[List[Tuple[int, int]]] = None, + prediction_ranges: Optional[List[Tuple[int, int]]] = None, + weighted_precision: bool = False) -> Tuple[float, float]: + """ + Computes precision and recall for time series as defined in [Tatbul2018]_. + + .. note:: + The default parameters for this function correspond to the defaults recommended in [Tatbul2018]_. However, + those might not be desirable in most cases, please see [Wagner2023]_ for a detailed discussion. + + :param anomalies: Binary 1-D :class:`~torch.Tensor` of shape ``(length,)`` containing the true labels. + :param predictions: Binary 1-D :class:`~torch.Tensor` of shape ``(length,)`` containing the predicted labels. + :param alpha: Weight for existence term in recall. + :param recall_bias_fn: Function that computes the bias term for a given ground-truth window. + :param recall_cardinality_fn: Function that compute the cardinality factor for a given ground-truth window. + :param precision_bias_fn: Function that computes the bias term for a given predicted window. + If ``None``, this will be the same as ``recall_bias_function``. + :param precision_cardinality_fn: Function that computes the cardinality factor for a given predicted window. + If ``None``, this will be the same as ``recall_cardinality_function``. + :param weighted_precision: If True, the precision score of a predicted window will be weighted with the + length of the window in the final score. Otherwise, each window will have the same weight. + :param anomaly_ranges: A list of tuples ``(start, end)`` for each anomaly window in ``anomalies``, where ``start`` + is the index at which the window starts and ``end`` is the first index after the end of the window. This can + be ``None``, in which case the list is computed automatically from ``anomalies``. + :param prediction_ranges: A list of tuples ``(start, end)`` for each anomaly window in ``predictions``, where + ``start`` is the index at which the window starts and ``end`` is the first index after the end of the window. + This can be ``None``, in which case the list is computed automatically from ``predictions``. + :return: A tuple consisting of the time-series precision and recall for the given labels. + """ + has_anomalies = torch.any(anomalies > 0).item() + has_predictions = torch.any(predictions > 0).item() + + # Catch special cases which would cause a division by zero + if not has_predictions and not has_anomalies: + # In this case, the classifier is perfect, so it makes sense to set precision and recall to 1 + return 1, 1 + elif not has_predictions or not has_anomalies: + return 0, 0 + + # Set precision functions to the same as recall functions if they are not given + if precision_bias_fn is None: + precision_bias_fn = recall_bias_fn + if precision_cardinality_fn is None: + precision_cardinality_fn = recall_cardinality_fn + + if anomaly_ranges is None: + anomaly_ranges = self.compute_window_indices(anomalies) + if prediction_ranges is None: + prediction_ranges = self.compute_window_indices(predictions) + + recall = self._compute_overlap(predictions, prediction_ranges, anomaly_ranges, alpha, recall_bias_fn, + recall_cardinality_fn) + precision = self._compute_overlap(anomalies, anomaly_ranges, prediction_ranges, 0, precision_bias_fn, + precision_cardinality_fn, use_window_weight=weighted_precision) + + return precision, recall + + def __best_ts_fbeta_score(self, labels: torch.Tensor, scores: torch.Tensor, beta: float, + recall_cardinality_fn: Callable = improved_cardinality_fn, + weighted_precision: bool = True, n_splits: int = 1500) -> Tuple[float, Dict[str, Any]]: + # Build thresholds from p-values (quantiles/percentiles) of the score distribution + # p_values in [0, 1]; thresholds = percentile(scores, p_values) + device = scores.device + p_values = torch.linspace(0, 1.0, steps=n_splits, device=device) + thresholds = torch.quantile(scores, p_values) + print("Here is the shape of thresholds",thresholds.shape) + precision = torch.empty_like(thresholds, dtype=torch.float) + recall = torch.empty_like(thresholds, dtype=torch.float) + predictions = torch.empty_like(scores, dtype=torch.long) + + print("Here is the shape of labels",labels.shape) + print("Here is the shape of scores",scores.shape) + print("Here is the shape of predictions",predictions.shape) + print("Here is the shape of precision",precision.shape) + print("Here is the shape of recall",recall.shape) + + label_ranges = self.compute_window_indices(labels) + + for i, t in tqdm(enumerate(thresholds), total=len(thresholds), + desc="Calculating F-beta score for thresholds"): + # predictions are 0/1 longs to be compatible with downstream computations + torch.greater(scores, t, out=predictions) + prec, rec = self.ts_precision_and_recall( + labels, + predictions, + alpha=0, + recall_cardinality_fn=recall_cardinality_fn, + anomaly_ranges=label_ranges, + weighted_precision=weighted_precision, + ) + + # Avoid 0/0 in F-score computation when both prec and rec are 0 + if prec == 0 and rec == 0: + rec = 1 + + precision[i] = prec + recall[i] = rec + + f_score = (1 + beta ** 2) * precision * recall / (beta ** 2 * precision + recall) + max_score_index = torch.argmax(f_score) + + return ( + f_score[max_score_index].item(), + dict( + threshold=thresholds[max_score_index].item(), + precision=precision[max_score_index].item(), + recall=recall[max_score_index].item(), + ), + ) + + + + def metric_PointF1PA(self, label, score, preds=None): + import sklearn.metrics + + best_f1_adjusted = 0 + best_result = None + q_values = np.arange(0.7, 0.99, 0.001) + for q in tqdm(q_values, total= len(q_values), desc="Calculating PointF1PA"): + thre = np.quantile(score, q) + result = {} + pred = (score > thre).astype(int) + adjusted_pred = self.adjustment(label, pred) + accuracy = sklearn.metrics.accuracy_score(label, adjusted_pred) + P, R, F1, _ = sklearn.metrics.precision_recall_fscore_support(label, adjusted_pred, average="binary") + result['thre_PA'] = thre + result['ACC_PA'] = accuracy + result['P_PA'] = P + result['R_PA'] = R + result['F1_PA'] = F1 + # results.append(pd.DataFrame([result])) + if F1 >= best_f1_adjusted: + best_f1_adjusted = F1 + best_result = result + if best_result is not None: + return best_result + else: + assert False, "No best result found, check the input data." + # results_storage['f1_pa'] = pd.concat(results, axis=0).reset_index(drop=True) + + def _get_events(self, y_test, outlier=1, normal=0): + events = dict() + label_prev = normal + event = 0 # corresponds to no event + event_start = 0 + for tim, label in enumerate(y_test): + if label == outlier: + if label_prev == normal: + event += 1 + event_start = tim + else: + if label_prev == outlier: + event_end = tim - 1 + events[event] = (event_start, event_end) + label_prev = label + + if label_prev == outlier: + event_end = tim - 1 + events[event] = (event_start, event_end) + return events + + def metric_EventF1PA(self, label, score, preds=None): + from sklearn.metrics import precision_score + true_events = self._get_events(label) + + if preds is None: + thresholds = np.linspace(score.min(), score.max(), 100) + EventF1PA_scores = [] + + for threshold in tqdm(thresholds, total=len(thresholds), desc="Calculating EventF1PA"): + preds = (score > threshold).astype(int) + + tp = np.sum([preds[start:end + 1].any() for start, end in true_events.values()]) + fn = len(true_events) - tp + rec_e = tp/(tp + fn) + prec_t = precision_score(label, preds) + EventF1PA = 2 * rec_e * prec_t / (rec_e + prec_t + self.eps) + + EventF1PA_scores.append(EventF1PA) + + EventF1PA_Threshold = thresholds[np.argmax(EventF1PA_scores)] + EventF1PA1 = max(EventF1PA_scores) + + else: + + tp = np.sum([preds[start:end + 1].any() for start, end in true_events.values()]) + fn = len(true_events) - tp + rec_e = tp/(tp + fn) + prec_t = precision_score(label, preds) + EventF1PA1 = 2 * rec_e * prec_t / (rec_e + prec_t + self.eps) + + return EventF1PA1 + + def range_recall_new(self, labels, preds, alpha): + p = np.where(preds == 1)[0] # positions of predicted label==1 + range_pred = self.range_convers_new(preds) + range_label = self.range_convers_new(labels) + + Nr = len(range_label) # total # of real anomaly segments + + ExistenceReward = self.existence_reward(range_label, preds) + + + OverlapReward = 0 + for i in range_label: + OverlapReward += self.w(i, p) * self.Cardinality_factor(i, range_pred) + + + score = alpha * ExistenceReward + (1-alpha) * OverlapReward + if Nr != 0: + return score/Nr, ExistenceReward/Nr, OverlapReward/Nr + else: + return 0,0,0 + + def range_convers_new(self, label): + ''' + input: arrays of binary values + output: list of ordered pair [[a0,b0], [a1,b1]... ] of the inputs + ''' + anomaly_starts = np.where(np.diff(label) == 1)[0] + 1 + anomaly_ends, = np.where(np.diff(label) == -1) + if len(anomaly_ends): + if not len(anomaly_starts) or anomaly_ends[0] < anomaly_starts[0]: + # we started with an anomaly, so the start of the first anomaly is the start of the labels + anomaly_starts = np.concatenate([[0], anomaly_starts]) + if len(anomaly_starts): + if not len(anomaly_ends) or anomaly_ends[-1] < anomaly_starts[-1]: + # we ended on an anomaly, so the end of the last anomaly is the end of the labels + anomaly_ends = np.concatenate([anomaly_ends, [len(label) - 1]]) + return list(zip(anomaly_starts, anomaly_ends)) + + def existence_reward(self, labels, preds): + ''' + labels: list of ordered pair + preds predicted data + ''' + + score = 0 + for i in labels: + if preds[i[0]:i[1]+1].any(): + score += 1 + return score + + def num_nonzero_segments(self, x): + count=0 + if x[0]>0: + count+=1 + for i in range(1, len(x)): + if x[i]>0 and x[i-1]==0: + count+=1 + return count + + def extend_postive_range(self, x, window=5): + label = x.copy().astype(float) + L = self.range_convers_new(label) # index of non-zero segments + length = len(label) + for k in range(len(L)): + s = L[k][0] + e = L[k][1] + + + x1 = np.arange(e,min(e+window//2,length)) + label[x1] += np.sqrt(1 - (x1-e)/(window)) + + x2 = np.arange(max(s-window//2,0),s) + label[x2] += np.sqrt(1 - (s-x2)/(window)) + + label = np.minimum(np.ones(length), label) + return label + + def extend_postive_range_individual(self, x, percentage=0.2): + label = x.copy().astype(float) + L = self.range_convers_new(label) # index of non-zero segments + length = len(label) + for k in range(len(L)): + s = L[k][0] + e = L[k][1] + + l0 = int((e-s+1)*percentage) + + x1 = np.arange(e,min(e+l0,length)) + label[x1] += np.sqrt(1 - (x1-e)/(2*l0)) + + x2 = np.arange(max(s-l0,0),s) + label[x2] += np.sqrt(1 - (s-x2)/(2*l0)) + + label = np.minimum(np.ones(length), label) + return label + + def TPR_FPR_RangeAUC(self, labels, pred, P, L): + indices = np.where(labels == 1)[0] + product = labels * pred + TP = np.sum(product) + newlabels = product.copy() + newlabels[indices] = 1 + + # recall = min(TP/P,1) + P_new = (P + np.sum(newlabels)) / 2 # so TPR is neither large nor small + # P_new = np.sum(labels) + recall = min(TP / P_new, 1) + # recall = TP/np.sum(labels) + # print('recall '+str(recall)) + + existence = 0 + for seg in L: + if np.sum(product[seg[0]:(seg[1] + 1)]) > 0: # if newlabels>0, that segment must contained + existence += 1 + + existence_ratio = existence / len(L) + # print(existence_ratio) + + # TPR_RangeAUC = np.sqrt(recall*existence_ratio) + # print(existence_ratio) + TPR_RangeAUC = recall * existence_ratio + + FP = np.sum(pred) - TP + # TN = np.sum((1-pred) * (1-labels)) + + # FPR_RangeAUC = FP/(FP+TN) + N_new = len(labels) - P_new + FPR_RangeAUC = FP / N_new + + Precision_RangeAUC = TP / np.sum(pred) + + return TPR_RangeAUC, FPR_RangeAUC, Precision_RangeAUC + + def RangeAUC(self, labels, score, window=0, percentage=0, plot_ROC=False, AUC_type='window'): + # AUC_type='window'/'percentage' + score_sorted = -np.sort(-score) + + P = np.sum(labels) + # print(np.sum(labels)) + if AUC_type == 'window': + labels = self.extend_postive_range(labels, window=window) + else: + labels = self.extend_postive_range_individual(labels, percentage=percentage) + + # print(np.sum(labels)) + L = self.range_convers_new(labels) + TPR_list = [0] + FPR_list = [0] + Precision_list = [1] + + for i in np.linspace(0, len(score) - 1, 250).astype(int): + threshold = score_sorted[i] + # print('thre='+str(threshold)) + pred = score >= threshold + TPR, FPR, Precision = self.TPR_FPR_RangeAUC(labels, pred, P, L) + + TPR_list.append(TPR) + FPR_list.append(FPR) + Precision_list.append(Precision) + + TPR_list.append(1) + FPR_list.append(1) # otherwise, range-AUC will stop earlier than (1,1) + + tpr = np.array(TPR_list) + fpr = np.array(FPR_list) + prec = np.array(Precision_list) + + width = fpr[1:] - fpr[:-1] + height = (tpr[1:] + tpr[:-1]) / 2 + AUC_range = np.sum(width * height) + + width_PR = tpr[1:-1] - tpr[:-2] + height_PR = prec[1:] + AP_range = np.sum(width_PR * height_PR) + + if plot_ROC: + return AUC_range, AP_range, fpr, tpr, prec + + return AUC_range + + def range_convers_new(self, label): + ''' + input: arrays of binary values + output: list of ordered pair [[a0,b0], [a1,b1]... ] of the inputs + ''' + anomaly_starts = np.where(np.diff(label) == 1)[0] + 1 + anomaly_ends, = np.where(np.diff(label) == -1) + if len(anomaly_ends): + if not len(anomaly_starts) or anomaly_ends[0] < anomaly_starts[0]: + # we started with an anomaly, so the start of the first anomaly is the start of the labels + anomaly_starts = np.concatenate([[0], anomaly_starts]) + if len(anomaly_starts): + if not len(anomaly_ends) or anomaly_ends[-1] < anomaly_starts[-1]: + # we ended on an anomaly, so the end of the last anomaly is the end of the labels + anomaly_ends = np.concatenate([anomaly_ends, [len(label) - 1]]) + return list(zip(anomaly_starts, anomaly_ends)) + + def new_sequence(self, label, sequence_original, window): + a = max(sequence_original[0][0] - window // 2, 0) + sequence_new = [] + for i in range(len(sequence_original) - 1): + if sequence_original[i][1] + window // 2 < sequence_original[i + 1][0] - window // 2: + sequence_new.append((a, sequence_original[i][1] + window // 2)) + a = sequence_original[i + 1][0] - window // 2 + sequence_new.append((a, min(sequence_original[len(sequence_original) - 1][1] + window // 2, len(label) - 1))) + return sequence_new + + def sequencing(self, x, L, window=5): + label = x.copy().astype(float) + length = len(label) + + for k in range(len(L)): + s = L[k][0] + e = L[k][1] + + x1 = np.arange(e + 1, min(e + window // 2 + 1, length)) + label[x1] += np.sqrt(1 - (x1 - e) / (window)) + + x2 = np.arange(max(s - window // 2, 0), s) + label[x2] += np.sqrt(1 - (s - x2) / (window)) + + label = np.minimum(np.ones(length), label) + return label + + # TPR_FPR_window + def RangeAUC_volume_opt(self, labels_original, score, windowSize, thre=250): + window_3d = np.arange(0, windowSize + 1, 1) + P = np.sum(labels_original) + seq = self.range_convers_new(labels_original) + l = self.new_sequence(labels_original, seq, windowSize) + + score_sorted = -np.sort(-score) + + tpr_3d = np.zeros((windowSize + 1, thre + 2)) + fpr_3d = np.zeros((windowSize + 1, thre + 2)) + prec_3d = np.zeros((windowSize + 1, thre + 1)) + + auc_3d = np.zeros(windowSize + 1) + ap_3d = np.zeros(windowSize + 1) + + tp = np.zeros(thre) + N_pred = np.zeros(thre) + + for k, i in enumerate(np.linspace(0, len(score) - 1, thre).astype(int)): + threshold = score_sorted[i] + pred = score >= threshold + N_pred[k] = np.sum(pred) + + for window in window_3d: + + labels_extended = self.sequencing(labels_original, seq, window) + L = self.new_sequence(labels_extended, seq, window) + + TF_list = np.zeros((thre + 2, 2)) + Precision_list = np.ones(thre + 1) + j = 0 + + for i in np.linspace(0, len(score) - 1, thre).astype(int): + threshold = score_sorted[i] + pred = score >= threshold + labels = labels_extended.copy() + existence = 0 + + for seg in L: + labels[seg[0]:seg[1] + 1] = labels_extended[seg[0]:seg[1] + 1] * pred[seg[0]:seg[1] + 1] + if (pred[seg[0]:(seg[1] + 1)] > 0).any(): + existence += 1 + for seg in seq: + labels[seg[0]:seg[1] + 1] = 1 + + TP = 0 + N_labels = 0 + for seg in l: + TP += np.dot(labels[seg[0]:seg[1] + 1], pred[seg[0]:seg[1] + 1]) + N_labels += np.sum(labels[seg[0]:seg[1] + 1]) + + TP += tp[j] + FP = N_pred[j] - TP + + existence_ratio = existence / len(L) + + P_new = (P + N_labels) / 2 + recall = min(TP / P_new, 1) + + TPR = recall * existence_ratio + N_new = len(labels) - P_new + FPR = FP / N_new + + Precision = TP / N_pred[j] + + j += 1 + TF_list[j] = [TPR, FPR] + Precision_list[j] = Precision + + TF_list[j + 1] = [1, 1] # otherwise, range-AUC will stop earlier than (1,1) + + tpr_3d[window] = TF_list[:, 0] + fpr_3d[window] = TF_list[:, 1] + prec_3d[window] = Precision_list + + width = TF_list[1:, 1] - TF_list[:-1, 1] + height = (TF_list[1:, 0] + TF_list[:-1, 0]) / 2 + AUC_range = np.dot(width, height) + auc_3d[window] = (AUC_range) + + width_PR = TF_list[1:-1, 0] - TF_list[:-2, 0] + height_PR = Precision_list[1:] + + AP_range = np.dot(width_PR, height_PR) + ap_3d[window] = AP_range + + return tpr_3d, fpr_3d, prec_3d, window_3d, sum(auc_3d) / len(window_3d), sum(ap_3d) / len(window_3d) + + def RangeAUC_volume_opt_mem(self, labels_original, score, windowSize, thre=250): + window_3d = np.arange(0, windowSize + 1, 1) + P = np.sum(labels_original) + seq = self.range_convers_new(labels_original) + l = self.new_sequence(labels_original, seq, windowSize) + + score_sorted = -np.sort(-score) + + tpr_3d = np.zeros((windowSize + 1, thre + 2)) + fpr_3d = np.zeros((windowSize + 1, thre + 2)) + prec_3d = np.zeros((windowSize + 1, thre + 1)) + + auc_3d = np.zeros(windowSize + 1) + ap_3d = np.zeros(windowSize + 1) + + tp = np.zeros(thre) + N_pred = np.zeros(thre) + p = np.zeros((thre, len(score))) + + for k, i in enumerate(np.linspace(0, len(score) - 1, thre).astype(int)): + threshold = score_sorted[i] + pred = score >= threshold + p[k] = pred + N_pred[k] = np.sum(pred) + + for window in window_3d: + labels_extended = self.sequencing(labels_original, seq, window) + L = self.new_sequence(labels_extended, seq, window) + + TF_list = np.zeros((thre + 2, 2)) + Precision_list = np.ones(thre + 1) + j = 0 + + for i in np.linspace(0, len(score) - 1, thre).astype(int): + labels = labels_extended.copy() + existence = 0 + + for seg in L: + labels[seg[0]:seg[1] + 1] = labels_extended[seg[0]:seg[1] + 1] * p[j][seg[0]:seg[1] + 1] + if (p[j][seg[0]:(seg[1] + 1)] > 0).any(): + existence += 1 + for seg in seq: + labels[seg[0]:seg[1] + 1] = 1 + + N_labels = 0 + TP = 0 + for seg in l: + TP += np.dot(labels[seg[0]:seg[1] + 1], p[j][seg[0]:seg[1] + 1]) + N_labels += np.sum(labels[seg[0]:seg[1] + 1]) + + TP += tp[j] + FP = N_pred[j] - TP + + existence_ratio = existence / len(L) + + P_new = (P + N_labels) / 2 + recall = min(TP / P_new, 1) + + TPR = recall * existence_ratio + + N_new = len(labels) - P_new + FPR = FP / N_new + Precision = TP / N_pred[j] + j += 1 + + TF_list[j] = [TPR, FPR] + Precision_list[j] = Precision + + TF_list[j + 1] = [1, 1] + tpr_3d[window] = TF_list[:, 0] + fpr_3d[window] = TF_list[:, 1] + prec_3d[window] = Precision_list + + width = TF_list[1:, 1] - TF_list[:-1, 1] + height = (TF_list[1:, 0] + TF_list[:-1, 0]) / 2 + AUC_range = np.dot(width, height) + auc_3d[window] = (AUC_range) + + width_PR = TF_list[1:-1, 0] - TF_list[:-2, 0] + height_PR = Precision_list[1:] + AP_range = np.dot(width_PR, height_PR) + ap_3d[window] = (AP_range) + return tpr_3d, fpr_3d, prec_3d, window_3d, sum(auc_3d) / len(window_3d), sum(ap_3d) / len(window_3d) + + + def metric_VUS_pred(self, labels, preds, windowSize): + window_3d = np.arange(0, windowSize + 1, 1) + P = np.sum(labels) + seq = self.range_convers_new(labels) + l = self.new_sequence(labels, seq, windowSize) + + recall_3d = np.zeros((windowSize + 1)) + prec_3d = np.zeros((windowSize + 1)) + f_3d = np.zeros((windowSize + 1)) + + N_pred = np.sum(preds) + + for window in window_3d: + + labels_extended = self.sequencing(labels, seq, window) + L = self.new_sequence(labels_extended, seq, window) + + labels = labels_extended.copy() + existence = 0 + + for seg in L: + labels[seg[0]:seg[1] + 1] = labels_extended[seg[0]:seg[1] + 1] * preds[seg[0]:seg[1] + 1] + if (preds[seg[0]:(seg[1] + 1)] > 0).any(): + existence += 1 + for seg in seq: + labels[seg[0]:seg[1] + 1] = 1 + + TP = 0 + N_labels = 0 + for seg in l: + TP += np.dot(labels[seg[0]:seg[1] + 1], preds[seg[0]:seg[1] + 1]) + N_labels += np.sum(labels[seg[0]:seg[1] + 1]) + + P_new = (P + N_labels) / 2 + recall = min(TP / P_new, 1) + Precision = TP / N_pred + + recall_3d[window] = recall + prec_3d[window] = Precision + f_3d[window] = 2 * Precision * recall / (Precision + recall) if (Precision + recall) > 0 else 0 + return sum(recall_3d) / len(window_3d), sum(prec_3d) / len(window_3d), sum(f_3d) / len(window_3d) + + # def metric_F1_T_gpu_corrected(self, labels, scores, device='cuda', batch_size=50): + # """ + # GPU-accelerated F1_T that maintains exact compatibility with CPU version + # Only the threshold generation and prediction computation is done on GPU + # The actual metric calculation uses your original CPU functions + # """ + # if not torch.cuda.is_available(): + # print("CUDA not available, falling back to CPU implementation") + # return self.metric_F1_T(labels, scores) + # + # print(f"Computing F1_T on {device} (corrected version)") + # start_time = time.time() + # + # # Keep original data types for compatibility + # labels_np = np.array(labels) + # scores_np = np.array(scores) + # + # # Use GPU only for threshold generation + # scores_gpu = torch.tensor(scores_np, dtype=torch.float32, device=device) + # n_splits = 1000 + # p_values = torch.linspace(0.0, 1.0, steps=n_splits, device=device) + # thresholds_gpu = torch.quantile(scores_gpu, p_values) + # thresholds = thresholds_gpu.cpu().numpy() + # + # # Convert to torch tensors for CPU computation (matching original) + # labels_torch = torch.tensor(labels_np, dtype=torch.int) + # scores_torch = torch.tensor(scores_np, dtype=torch.float) + # + # # Compute label ranges once + # label_ranges = self.compute_window_indices(labels_torch) + # + # # Process thresholds in batches but use original metric calculation + # precision_list = [] + # recall_list = [] + # + # if batch_size is None: + # batch_size = 50 # Default batch size + # + # beta = 1 + # predictions = torch.empty_like(scores_torch, dtype=torch.long) + # + # for i in tqdm(range(0, n_splits, batch_size), + # desc="Computing metrics (corrected)"): + # end_idx = min(i + batch_size, n_splits) + # + # batch_precisions = [] + # batch_recalls = [] + # + # for j in range(i, end_idx): + # threshold = thresholds[j] + # + # # Compute predictions + # torch.greater(scores_torch, threshold, out=predictions) + # + # # Use your original ts_precision_and_recall function + # prec, rec = self.ts_precision_and_recall( + # labels_torch, + # predictions, + # alpha=0, + # recall_cardinality_fn=improved_cardinality_fn, + # anomaly_ranges=label_ranges, + # weighted_precision=True, + # ) + # + # # Handle edge case + # if prec == 0 and rec == 0: + # rec = 1 + # + # batch_precisions.append(prec) + # batch_recalls.append(rec) + # + # precision_list.extend(batch_precisions) + # recall_list.extend(batch_recalls) + # + # # Convert to tensors for final computation + # precision = torch.tensor(precision_list, dtype=torch.float) + # recall = torch.tensor(recall_list, dtype=torch.float) + # + # # Compute F-scores + # f_scores = (1 + beta ** 2) * precision * recall / (beta ** 2 * precision + recall) + # + # # Find best threshold + # best_idx = torch.argmax(f_scores) + # best_threshold = thresholds[best_idx] + # + # # Compute accuracy + # best_predictions = scores_np > best_threshold + # accuracy = np.mean(best_predictions == labels_np) + # + # elapsed = time.time() - start_time + # print(f"F1_T computed in {elapsed:.2f}s") + # + # return { + # 'F1_T': f_scores[best_idx].item(), + # 'P_T': precision[best_idx].item(), + # 'R_T': recall[best_idx].item(), + # 'thre_T': best_threshold, + # 'ACC_T': accuracy + # } + # + # def metric_F1_T_parallel_cpu(self, labels, scores, num_workers=8): + # """ + # CPU-parallel version that matches the original exactly + # Uses multiprocessing to speed up threshold evaluation + # """ + # from concurrent.futures import ProcessPoolExecutor + # import multiprocessing as mp + # + # print(f"Computing F1_T with {num_workers} CPU workers") + # start_time = time.time() + # + # # Convert to torch tensors + # labels = torch.tensor(labels, dtype=torch.int) + # scores = torch.tensor(scores, dtype=torch.float) + # + # # Generate thresholds + # n_splits = 1000 + # p_values = torch.linspace(0.0, 1.0, steps=n_splits) + # thresholds = torch.quantile(scores, p_values) + # + # # Compute label ranges once + # label_ranges = self.compute_window_indices(labels) + # + # # Split thresholds for parallel processing + # threshold_chunks = torch.chunk(thresholds, num_workers) + # + # # Process in parallel + # beta = 1 + # with ProcessPoolExecutor(max_workers=num_workers) as executor: + # futures = [] + # for chunk in threshold_chunks: + # future = executor.submit( + # self._compute_f1t_chunk, + # chunk, labels, scores, beta, label_ranges, True + # ) + # futures.append(future) + # + # # Collect results + # all_results = [] + # for future in tqdm(as_completed(futures), + # total=len(futures), + # desc="Processing chunks"): + # chunk_results = future.result() + # all_results.extend(chunk_results) + # + # # Find best result + # best_result = max(all_results, key=lambda x: x['f_score']) + # + # # Compute accuracy + # best_predictions = scores > best_result['threshold'] + # accuracy = torch.mean((best_predictions == labels).float()).item() + # + # elapsed = time.time() - start_time + # print(f"F1_T computed in {elapsed:.2f}s") + # + # return { + # 'F1_T': best_result['f_score'], + # 'P_T': best_result['precision'], + # 'R_T': best_result['recall'], + # 'thre_T': best_result['threshold'], + # 'ACC_T': accuracy + # } + # + # def metric_F1_T_hybrid(self, labels, scores, device='cuda'): + # """ + # Hybrid approach: GPU for threshold generation and prediction, + # CPU parallel for metric calculation + # """ + # if not torch.cuda.is_available(): + # return self.metric_F1_T_parallel_cpu(labels, scores) + # + # print(f"Computing F1_T with hybrid GPU/CPU approach") + # start_time = time.time() + # + # # Generate thresholds on GPU (fast) + # labels_gpu = torch.tensor(labels, dtype=torch.int32, device=device) + # scores_gpu = torch.tensor(scores, dtype=torch.float32, device=device) + # + # n_splits = 1000 + # p_values = torch.linspace(0.0, 1.0, steps=n_splits, device=device) + # thresholds_gpu = torch.quantile(scores_gpu, p_values) + # + # # Generate all predictions on GPU at once (if memory allows) + # try: + # # This creates a matrix of shape (n_thresholds, n_samples) + # all_predictions_gpu = scores_gpu.unsqueeze(0) > thresholds_gpu.unsqueeze(1) + # all_predictions = all_predictions_gpu.cpu().long() + # thresholds = thresholds_gpu.cpu() + # print(" Generated all predictions on GPU") + # except RuntimeError as e: + # if "out of memory" in str(e): + # print(" Not enough GPU memory, falling back to batched approach") + # return self.metric_F1_T_gpu_corrected(labels, scores, batch_size=50) + # else: + # raise e + # + # # Move back to CPU for metric calculation + # labels_cpu = torch.tensor(labels, dtype=torch.int) + # scores_cpu = torch.tensor(scores, dtype=torch.float) + # + # # Compute label ranges + # label_ranges = self.compute_window_indices(labels_cpu) + # + # # Parallel CPU computation of metrics + # beta = 1 + # from concurrent.futures import ThreadPoolExecutor + # + # def compute_single_threshold(idx): + # predictions = all_predictions[idx] + # + # prec, rec = self.ts_precision_and_recall( + # labels_cpu, + # predictions, + # alpha=0, + # recall_cardinality_fn=improved_cardinality_fn, + # anomaly_ranges=label_ranges, + # weighted_precision=True, + # ) + # + # if prec == 0 and rec == 0: + # rec = 1 + # + # f_score = (1 + beta ** 2) * prec * rec / (beta ** 2 * prec + rec) + # + # return { + # 'idx': idx, + # 'f_score': f_score, + # 'precision': prec, + # 'recall': rec, + # 'threshold': thresholds[idx].item() + # } + # + # # Process with thread pool + # with ThreadPoolExecutor(max_workers=8) as executor: + # futures = [executor.submit(compute_single_threshold, i) + # for i in range(n_splits)] + # + # results = [] + # for future in tqdm(as_completed(futures), + # total=n_splits, + # desc="Computing metrics"): + # results.append(future.result()) + # + # # Find best result + # best_result = max(results, key=lambda x: x['f_score']) + # + # # Compute accuracy + # best_predictions = scores_cpu > best_result['threshold'] + # accuracy = torch.mean((best_predictions == labels_cpu).float()).item() + # + # elapsed = time.time() - start_time + # print(f"F1_T computed in {elapsed:.2f}s") + # + # return { + # 'F1_T': best_result['f_score'], + # 'P_T': best_result['precision'], + # 'R_T': best_result['recall'], + # 'thre_T': best_result['threshold'], + # 'ACC_T': accuracy + # } + # + # def metric_F1_T_optimized(self, labels, scores, num_workers=None): + # """ + # Optimized version using the best strategies from our tests + # """ + # if num_workers is None: + # num_workers = min(mp.cpu_count(), 8) + # + # print(f"Computing F1_T (optimized) with {num_workers} workers") + # start_time = time.time() + # + # # Convert to torch tensors + # labels = torch.tensor(labels, dtype=torch.int) + # scores = torch.tensor(scores, dtype=torch.float) + # + # # Generate thresholds + # n_splits = 1000 + # p_values = torch.linspace(0.0, 1.0, steps=n_splits) + # thresholds = torch.quantile(scores, p_values) + # + # # Pre-compute label ranges once + # label_ranges = self.compute_window_indices(labels) + # + # # Pre-generate all predictions at once (memory efficient) + # print("Pre-computing predictions...") + # predictions_list = [] + # for i in range(0, n_splits, 100): # Process in chunks to save memory + # end_idx = min(i + 100, n_splits) + # batch_thresholds = thresholds[i:end_idx] + # # Create boolean predictions then convert to long + # batch_preds = (scores.unsqueeze(0) > batch_thresholds.unsqueeze(1)).long() # FIX: Convert to long + # predictions_list.append(batch_preds) + # + # all_predictions = torch.cat(predictions_list, dim=0) + # print(f"Predictions ready, computing metrics...") + # + # # Define worker function + # def compute_metrics_batch(indices): + # results = [] + # for idx in indices: + # predictions = all_predictions[idx] + # + # prec, rec = self.ts_precision_and_recall( + # labels, + # predictions, + # alpha=0, + # recall_cardinality_fn=improved_cardinality_fn, + # anomaly_ranges=label_ranges, + # weighted_precision=True, + # ) + # + # if prec == 0 and rec == 0: + # rec = 1 + # + # f_score = 2 * prec * rec / (prec + rec) + # + # results.append({ + # 'idx': idx, + # 'f_score': f_score, + # 'precision': prec, + # 'recall': rec, + # 'threshold': thresholds[idx].item() + # }) + # + # return results + # + # # Split indices for workers + # indices = list(range(n_splits)) + # chunk_size = len(indices) // num_workers + # if chunk_size == 0: + # chunk_size = 1 + # index_chunks = [indices[i:i + chunk_size] for i in range(0, len(indices), chunk_size)] + # + # # Process with thread pool (better for this workload than process pool) + # all_results = [] + # with ThreadPoolExecutor(max_workers=num_workers) as executor: + # futures = [executor.submit(compute_metrics_batch, chunk) for chunk in index_chunks] + # + # completed = 0 + # for future in as_completed(futures): + # all_results.extend(future.result()) + # completed += 1 + # print(f"Progress: {completed}/{len(futures)} chunks completed", end='\r') + # + # print() # New line after progress + # + # # Find best result + # best_result = max(all_results, key=lambda x: x['f_score']) + # + # # Compute accuracy + # best_predictions = scores > best_result['threshold'] + # accuracy = torch.mean((best_predictions == labels).float()).item() + # + # elapsed = time.time() - start_time + # print(f"F1_T computed in {elapsed:.2f}s") + # + # return { + # 'F1_T': best_result['f_score'], + # 'P_T': best_result['precision'], + # 'R_T': best_result['recall'], + # 'thre_T': best_result['threshold'], + # 'ACC_T': accuracy + # } + # + # def metric_F1_T_sampling(self, labels, scores, sample_rate=0.2): + # """ + # Fast approximation by sampling thresholds + # Good for quick estimates or hyperparameter tuning + # """ + # print(f"Computing F1_T with threshold sampling (rate={sample_rate})") + # start_time = time.time() + # + # # Convert to torch tensors + # labels = torch.tensor(labels, dtype=torch.int) + # scores = torch.tensor(scores, dtype=torch.float) + # + # # Generate fewer thresholds + # n_splits = int(1000 * sample_rate) + # p_values = torch.linspace(0.0, 1.0, steps=n_splits) + # thresholds = torch.quantile(scores, p_values) + # + # # Rest is same as original + # precision = torch.empty_like(thresholds, dtype=torch.float) + # recall = torch.empty_like(thresholds, dtype=torch.float) + # predictions = torch.empty_like(scores, dtype=torch.long) + # + # label_ranges = self.compute_window_indices(labels) + # beta = 1 + # + # for i, t in enumerate(thresholds): + # torch.greater(scores, t, out=predictions) + # prec, rec = self.ts_precision_and_recall( + # labels, + # predictions, + # alpha=0, + # recall_cardinality_fn=improved_cardinality_fn, + # anomaly_ranges=label_ranges, + # weighted_precision=True, + # ) + # + # if prec == 0 and rec == 0: + # rec = 1 + # + # precision[i] = prec + # recall[i] = rec + # + # f_score = (1 + beta ** 2) * precision * recall / (beta ** 2 * precision + recall) + # max_score_index = torch.argmax(f_score) + # + # elapsed = time.time() - start_time + # print(f"F1_T computed in {elapsed:.2f}s (approximate)") + # + # return { + # 'F1_T': f_score[max_score_index].item(), + # 'P_T': precision[max_score_index].item(), + # 'R_T': recall[max_score_index].item(), + # 'thre_T': thresholds[max_score_index].item(), + # 'ACC_T': sklearn.metrics.accuracy_score(labels, scores > thresholds[max_score_index]) + # } + # + # def metric_F1_T_chunked(self, labels, scores, chunk_size=50, num_workers=4): + # """ + # Simple chunked parallel processing without pre-computing all predictions + # More memory efficient and often faster + # """ + # from concurrent.futures import ProcessPoolExecutor + # import multiprocessing as mp + # + # print(f"Computing F1_T (chunked) with {num_workers} workers, chunk_size={chunk_size}") + # start_time = time.time() + # + # # Convert to torch tensors + # labels_t = torch.tensor(labels, dtype=torch.int) + # scores_t = torch.tensor(scores, dtype=torch.float) + # + # # Generate thresholds + # n_splits = 1000 + # p_values = torch.linspace(0.0, 1.0, steps=n_splits) + # thresholds = torch.quantile(scores_t, p_values).numpy() + # + # # Convert back to numpy for pickling + # labels_np = labels_t.numpy() + # scores_np = scores_t.numpy() + # + # # Helper function for parallel processing + # def process_chunk(args): + # chunk_thresholds, labels_local, scores_local = args + # results = [] + # + # # Convert back to torch tensors in worker + # labels_tensor = torch.tensor(labels_local, dtype=torch.int) + # scores_tensor = torch.tensor(scores_local, dtype=torch.float) + # predictions = torch.empty_like(scores_tensor, dtype=torch.long) + # + # # Compute label ranges in worker + # label_ranges_local = self.compute_window_indices(labels_tensor) + # + # for threshold in chunk_thresholds: + # torch.greater(scores_tensor, threshold, out=predictions) + # + # prec, rec = self.ts_precision_and_recall( + # labels_tensor, + # predictions, + # alpha=0, + # recall_cardinality_fn=improved_cardinality_fn, + # anomaly_ranges=label_ranges_local, + # weighted_precision=True, + # ) + # + # if prec == 0 and rec == 0: + # rec = 1 + # + # f_score = 2 * prec * rec / (prec + rec) + # + # results.append({ + # 'f_score': f_score, + # 'precision': prec, + # 'recall': rec, + # 'threshold': threshold + # }) + # + # return results + # + # # Create chunks of thresholds + # threshold_chunks = [thresholds[i:i + chunk_size] + # for i in range(0, len(thresholds), chunk_size)] + # + # # Prepare arguments for workers + # chunk_args = [(chunk, labels_np, scores_np) for chunk in threshold_chunks] + # + # # Process in parallel + # all_results = [] + # with ProcessPoolExecutor(max_workers=num_workers) as executor: + # for i, result_chunk in enumerate(executor.map(process_chunk, chunk_args)): + # all_results.extend(result_chunk) + # print(f"Progress: {(i + 1) * chunk_size}/{n_splits} thresholds processed", end='\r') + # + # print() # New line + # + # # Find best result + # best_result = max(all_results, key=lambda x: x['f_score']) + # + # # Compute accuracy + # best_predictions = scores_np > best_result['threshold'] + # accuracy = np.mean(best_predictions == labels_np) + # + # elapsed = time.time() - start_time + # print(f"F1_T computed in {elapsed:.2f}s") + # + # return { + # 'F1_T': best_result['f_score'], + # 'P_T': best_result['precision'], + # 'R_T': best_result['recall'], + # 'thre_T': best_result['threshold'], + # 'ACC_T': accuracy + # } + + # def metric_F1_T_optimized(self, labels, scores, num_workers=None): + # """ + # Optimized version using the best strategies from our tests + # """ + # if num_workers is None: + # num_workers = min(mp.cpu_count(), 8) + # + # print(f"Computing F1_T (optimized) with {num_workers} workers") + # start_time = time.time() + # + # # Convert to torch tensors + # labels = torch.tensor(labels, dtype=torch.int) + # scores = torch.tensor(scores, dtype=torch.float) + # + # # Generate thresholds + # n_splits = 1000 + # p_values = torch.linspace(0.0, 1.0, steps=n_splits) + # thresholds = torch.quantile(scores, p_values) + # + # # Pre-compute label ranges once + # label_ranges = self.compute_window_indices(labels) + # + # # Pre-generate all predictions at once (memory efficient) + # print("Pre-computing predictions...") + # predictions_list = [] + # for i in range(0, n_splits, 100): # Process in chunks to save memory + # end_idx = min(i + 100, n_splits) + # batch_thresholds = thresholds[i:end_idx] + # # Create boolean predictions then convert to long + # batch_preds = (scores.unsqueeze(0) > batch_thresholds.unsqueeze(1)).long() # FIX: Convert to long + # predictions_list.append(batch_preds) + # + # all_predictions = torch.cat(predictions_list, dim=0) + # print(f"Predictions ready, computing metrics...") + # + # # Define worker function + # def compute_metrics_batch(indices): + # results = [] + # for idx in indices: + # predictions = all_predictions[idx] + # + # prec, rec = self.ts_precision_and_recall( + # labels, + # predictions, + # alpha=0, + # recall_cardinality_fn=improved_cardinality_fn, + # anomaly_ranges=label_ranges, + # weighted_precision=True, + # ) + # + # if prec == 0 and rec == 0: + # rec = 1 + # + # f_score = 2 * prec * rec / (prec + rec) + # + # results.append({ + # 'idx': idx, + # 'f_score': f_score, + # 'precision': prec, + # 'recall': rec, + # 'threshold': thresholds[idx].item() + # }) + # + # return results + # + # # Split indices for workers + # indices = list(range(n_splits)) + # chunk_size = len(indices) // num_workers + # if chunk_size == 0: + # chunk_size = 1 + # index_chunks = [indices[i:i + chunk_size] for i in range(0, len(indices), chunk_size)] + # + # # Process with thread pool (better for this workload than process pool) + # all_results = [] + # with ThreadPoolExecutor(max_workers=num_workers) as executor: + # futures = [executor.submit(compute_metrics_batch, chunk) for chunk in index_chunks] + # + # completed = 0 + # for future in as_completed(futures): + # all_results.extend(future.result()) + # completed += 1 + # print(f"Progress: {completed}/{len(futures)} chunks completed", end='\r') + # + # print() # New line after progress + # + # # Find best result + # best_result = max(all_results, key=lambda x: x['f_score']) + # + # # Compute accuracy + # best_predictions = scores > best_result['threshold'] + # accuracy = torch.mean((best_predictions == labels).float()).item() + # + # elapsed = time.time() - start_time + # print(f"F1_T computed in {elapsed:.2f}s") + # + # return { + # 'F1_T': best_result['f_score'], + # 'P_T': best_result['precision'], + # 'R_T': best_result['recall'], + # 'thre_T': best_result['threshold'], + # 'ACC_T': accuracy + # } + # + # def metric_F1_T_sampling(self, labels, scores, sample_rate=0.2): + # """ + # Fast approximation by sampling thresholds + # Good for quick estimates or hyperparameter tuning + # """ + # print(f"Computing F1_T with threshold sampling (rate={sample_rate})") + # start_time = time.time() + # + # # Convert to torch tensors + # labels = torch.tensor(labels, dtype=torch.int) + # scores = torch.tensor(scores, dtype=torch.float) + # + # # Generate fewer thresholds + # n_splits = int(1000 * sample_rate) + # p_values = torch.linspace(0.0, 1.0, steps=n_splits) + # thresholds = torch.quantile(scores, p_values) + # + # # Rest is same as original + # precision = torch.empty_like(thresholds, dtype=torch.float) + # recall = torch.empty_like(thresholds, dtype=torch.float) + # predictions = torch.empty_like(scores, dtype=torch.long) # FIX: Ensure long type + # + # label_ranges = self.compute_window_indices(labels) + # beta = 1 + # + # for i, t in enumerate(thresholds): + # torch.greater(scores, t, out=predictions) + # prec, rec = self.ts_precision_and_recall( + # labels, + # predictions, + # alpha=0, + # recall_cardinality_fn=improved_cardinality_fn, + # anomaly_ranges=label_ranges, + # weighted_precision=True, + # ) + # + # if prec == 0 and rec == 0: + # rec = 1 + # + # precision[i] = prec + # recall[i] = rec + # + # f_score = (1 + beta ** 2) * precision * recall / (beta ** 2 * precision + recall) + # max_score_index = torch.argmax(f_score) + # + # # Calculate accuracy + # best_predictions = (scores > thresholds[max_score_index]).long() + # accuracy = torch.mean((best_predictions == labels).float()).item() + # + # elapsed = time.time() - start_time + # print(f"F1_T computed in {elapsed:.2f}s (approximate)") + # + # return { + # 'F1_T': f_score[max_score_index].item(), + # 'P_T': precision[max_score_index].item(), + # 'R_T': recall[max_score_index].item(), + # 'thre_T': thresholds[max_score_index].item(), + # 'ACC_T': accuracy + # } + # + # def metric_F1_T_chunked(self, labels, scores, chunk_size=50, num_workers=4): + # """ + # Simple chunked parallel processing with detailed progress bar + # """ + # from concurrent.futures import ProcessPoolExecutor, as_completed + # from tqdm import tqdm + # import multiprocessing as mp + # + # print(f"Computing F1_T (chunked) with {num_workers} workers, chunk_size={chunk_size}") + # start_time = time.time() + # + # # Convert to torch tensors + # labels_t = torch.tensor(labels, dtype=torch.int) + # scores_t = torch.tensor(scores, dtype=torch.float) + # + # # Generate thresholds + # n_splits = 1000 + # p_values = torch.linspace(0.0, 1.0, steps=n_splits) + # thresholds = torch.quantile(scores_t, p_values).numpy() + # + # # Convert back to numpy for pickling + # labels_np = labels_t.numpy() + # scores_np = scores_t.numpy() + # + # # Create chunks of thresholds + # threshold_chunks = [thresholds[i:i + chunk_size] + # for i in range(0, len(thresholds), chunk_size)] + # + # total_chunks = len(threshold_chunks) + # print(f"Split {n_splits} thresholds into {total_chunks} chunks") + # + # # Process in parallel with progress bar + # all_results = [] + # + # # Method 1: Using executor.map with tqdm + # with ProcessPoolExecutor(max_workers=num_workers) as executor: + # with tqdm(total=n_splits, desc="Processing F1_T thresholds", unit="threshold", colour="blue") as pbar: + # # Prepare arguments + # chunk_args = [(chunk, labels_np, scores_np) for chunk in threshold_chunks] + # + # # Process and update progress bar + # for i, result_chunk in enumerate(executor.map(self._process_f1t_chunk, chunk_args)): + # all_results.extend(result_chunk) + # pbar.update(len(threshold_chunks[i])) # Update by number of thresholds in chunk + # pbar.set_postfix({ + # 'chunk': f"{i + 1}/{total_chunks}", + # 'results': len(all_results) + # }) + # + # # Find best result + # best_result = max(all_results, key=lambda x: x['f_score']) + # + # # Compute accuracy + # best_predictions = scores_np > best_result['threshold'] + # accuracy = np.mean(best_predictions == labels_np) + # + # elapsed = time.time() - start_time + # print(f"✓ F1_T computed in {elapsed:.2f}s") + # print(f" Best F1: {best_result['f_score']:.4f} at threshold {best_result['threshold']:.4f}") + # + # return { + # 'F1_T': best_result['f_score'], + # 'P_T': best_result['precision'], + # 'R_T': best_result['recall'], + # 'thre_T': best_result['threshold'], + # 'ACC_T': accuracy + # } + # + # @staticmethod + # def _process_f1t_chunk(args): + # """ + # Static method to process a chunk of thresholds for F1_T metrics. + # This can be pickled for multiprocessing. + # """ + # chunk_thresholds, labels_local, scores_local = args + # results = [] + # + # # Convert back to torch tensors in worker + # labels_tensor = torch.tensor(labels_local, dtype=torch.int) + # scores_tensor = torch.tensor(scores_local, dtype=torch.float) + # predictions = torch.empty_like(scores_tensor, dtype=torch.long) + # + # # Compute label ranges in worker + # # We need to create a basic_metricor instance to access methods + # grader = basic_metricor() + # label_ranges_local = grader.compute_window_indices(labels_tensor) + # + # for threshold in chunk_thresholds: + # torch.greater(scores_tensor, threshold, out=predictions) + # + # prec, rec = grader.ts_precision_and_recall( + # labels_tensor, + # predictions, + # alpha=0, + # recall_cardinality_fn=improved_cardinality_fn, + # anomaly_ranges=label_ranges_local, + # weighted_precision=True, + # ) + # + # if prec == 0 and rec == 0: + # rec = 1 + # + # f_score = 2 * prec * rec / (prec + rec) + # + # results.append({ + # 'f_score': f_score, + # 'precision': prec, + # 'recall': rec, + # 'threshold': threshold + # }) + # + # return results + + def metric_Affiliation_optimized(self, label, score, num_workers=None): + """ + Optimized version with ThreadPool and better chunking + """ + if num_workers is None: + num_workers = min(mp.cpu_count(), 8) + + print(f"Computing Affiliation (optimized) with {num_workers} workers") + start_time = time.time() + + from .affiliation.generics import convert_vector_to_events + from .affiliation.metrics import pr_from_events + + # Pre-compute ground truth events once + events_gt = convert_vector_to_events(label) + Trange = (0, len(label)) + + # Generate p-values and thresholds + p_values = np.linspace(0.8, 1, 300) + + # Pre-compute all thresholds + thresholds = np.quantile(score, p_values) + + # Pre-compute all predictions + print("Pre-computing predictions...") + all_predictions = [] + for threshold in thresholds: + preds = (score > threshold).astype(int) + all_predictions.append(preds) + + print("Computing affiliation metrics...") + + # Function to process a batch of indices + def compute_metrics_batch(indices): + results = [] + for idx in indices: + preds = all_predictions[idx] + + events_pred = convert_vector_to_events(preds) + affiliation_metrics = pr_from_events(events_pred, events_gt, Trange) + + prec = affiliation_metrics['Affiliation_Precision'] + rec = affiliation_metrics['Affiliation_Recall'] + + if prec + rec > 0: + f1 = 2 * prec * rec / (prec + rec + self.eps) + else: + f1 = 0.0 + + results.append({ + 'f1': f1, + 'precision': prec, + 'recall': rec, + 'p_value': p_values[idx], + 'threshold': thresholds[idx] + }) + + return results + + # Split indices for workers + indices = list(range(len(p_values))) + chunk_size = len(indices) // num_workers + if chunk_size == 0: + chunk_size = 1 + index_chunks = [indices[i:i + chunk_size] for i in range(0, len(indices), chunk_size)] + + # Process with thread pool + all_results = [] + with ThreadPoolExecutor(max_workers=num_workers) as executor: + futures = [executor.submit(compute_metrics_batch, chunk) for chunk in index_chunks] + + completed = 0 + for future in as_completed(futures): + all_results.extend(future.result()) + completed += 1 + print(f"Progress: {completed}/{len(futures)} chunks completed", end='\r') + + print() # New line + + # Find best result + best_result = max(all_results, key=lambda x: x['f1']) + + elapsed = time.time() - start_time + print(f"Affiliation computed in {elapsed:.2f}s") + + return best_result['f1'], best_result['precision'], best_result['recall'] + + def metric_Affiliation_chunked(self, label, score, chunk_size=30, num_workers=4): + """ + Simple chunked parallel processing + """ + print(f"Computing Affiliation (chunked) with {num_workers} workers, chunk_size={chunk_size}") + start_time = time.time() + + # Generate p-values + p_values = np.linspace(0.8, 1, 300) + + # Create chunks of p-values + p_value_chunks = [p_values[i:i + chunk_size] + for i in range(0, len(p_values), chunk_size)] + + # Prepare arguments for workers + chunk_args = [(chunk, label, score) for chunk in p_value_chunks] + + # Process in parallel + all_results = [] + with ProcessPoolExecutor(max_workers=num_workers) as executor: + for i, result_chunk in enumerate(executor.map(self._process_affiliation_chunk, chunk_args)): + all_results.extend(result_chunk) + print(f"Progress: {(i + 1) * chunk_size}/{len(p_values)} thresholds processed", end='\r') + + print() # New line + + # Find best result + best_result = max(all_results, key=lambda x: x['f1']) + + elapsed = time.time() - start_time + print(f"Affiliation computed in {elapsed:.2f}s") + + return best_result['f1'], best_result['precision'], best_result['recall'] + + def _compute_affiliation_chunk(self, p_values_chunk, score, label, eps=1e-7): + """ + Process a chunk of p-values for affiliation metrics + """ + from .affiliation.generics import convert_vector_to_events + from .affiliation.metrics import pr_from_events + + # Ensure proper data types to avoid float/integer issues + label = np.asarray(label, dtype=int) + score = np.asarray(score, dtype=float) + + # Convert ground truth to events once for this chunk + events_gt = convert_vector_to_events(label) + Trange = (0, len(label)) + + chunk_results = [] + for p in p_values_chunk: + threshold = np.quantile(score, p) + preds_loop = (score > threshold).astype(int) + + events_pred = convert_vector_to_events(preds_loop) + affiliation_metrics = pr_from_events(events_pred, events_gt, Trange) + + Affiliation_Precision = affiliation_metrics['Affiliation_Precision'] + Affiliation_Recall = affiliation_metrics['Affiliation_Recall'] + + denominator = Affiliation_Precision + Affiliation_Recall + if denominator > 0: + Affiliation_F = 2 * Affiliation_Precision * Affiliation_Recall / (denominator + eps) + else: + Affiliation_F = 0.0 + + chunk_results.append({ + 'f1': Affiliation_F, + 'precision': Affiliation_Precision, + 'recall': Affiliation_Recall, + 'p_value': p, + 'threshold': threshold + }) + + return chunk_results + + def _compute_affiliation_parallel(self, label, score, num_workers=8): + """ + Parallel computation with progress bar + """ + print(f"Computing Affiliation (parallel) with {num_workers} workers") + start_time = time.time() + + # Generate p-values + p_values = np.linspace(0.8, 1, 300) + total_thresholds = len(p_values) + + # Split p-values into chunks for parallel processing + p_value_chunks = np.array_split(p_values, num_workers) + + # Process chunks in parallel with progress bar + with ProcessPoolExecutor(max_workers=num_workers) as executor: + # Submit all tasks and track chunk sizes + futures = {} + for i, chunk in enumerate(p_value_chunks): + future = executor.submit(self._compute_affiliation_chunk, chunk, score, label) + futures[future] = len(chunk) + + # Collect results with progress bar + all_results = [] + with tqdm( + total=total_thresholds, + desc="Computing affiliation metrics", + unit="threshold", + colour="green" + ) as pbar: + for future in as_completed(futures): + chunk_results = future.result() + all_results.extend(chunk_results) + # Update by the number of thresholds processed in this chunk + pbar.update(futures[future]) + + # Find best result + best_result = max(all_results, key=lambda x: x['f1']) + + elapsed = time.time() - start_time + print(f"Affiliation computed in {elapsed:.2f}s") + + return best_result['f1'], best_result['precision'], best_result['recall'] + + def metric_Affiliation_optimized(self, label, score, num_workers=None): + """ + Optimized version with ThreadPool and better chunking + """ + if num_workers is None: + num_workers = min(mp.cpu_count(), 8) + + print(f"Computing Affiliation (optimized) with {num_workers} workers") + start_time = time.time() + + from .affiliation.generics import convert_vector_to_events + from .affiliation.metrics import pr_from_events + + # Pre-compute ground truth events once + events_gt = convert_vector_to_events(label) + Trange = (0, len(label)) + + # Generate p-values and thresholds + p_values = np.linspace(0.8, 1, 300) + + # Pre-compute all thresholds + thresholds = np.quantile(score, p_values) + + # Pre-compute all predictions + print("Pre-computing predictions...") + all_predictions = [] + for threshold in thresholds: + preds = (score > threshold).astype(int) + all_predictions.append(preds) + + print("Computing affiliation metrics...") + + # Function to process a batch of indices + def compute_metrics_batch(indices): + results = [] + for idx in indices: + preds = all_predictions[idx] + + events_pred = convert_vector_to_events(preds) + affiliation_metrics = pr_from_events(events_pred, events_gt, Trange) + + prec = affiliation_metrics['Affiliation_Precision'] + rec = affiliation_metrics['Affiliation_Recall'] + + if prec + rec > 0: + f1 = 2 * prec * rec / (prec + rec + self.eps) + else: + f1 = 0.0 + + results.append({ + 'f1': f1, + 'precision': prec, + 'recall': rec, + 'p_value': p_values[idx], + 'threshold': thresholds[idx] + }) + + return results + + # Split indices for workers + indices = list(range(len(p_values))) + chunk_size = len(indices) // num_workers + if chunk_size == 0: + chunk_size = 1 + index_chunks = [indices[i:i + chunk_size] for i in range(0, len(indices), chunk_size)] + + # Process with thread pool + all_results = [] + with ThreadPoolExecutor(max_workers=num_workers) as executor: + futures = [executor.submit(compute_metrics_batch, chunk) for chunk in index_chunks] + + completed = 0 + for future in as_completed(futures): + all_results.extend(future.result()) + completed += 1 + print(f"Progress: {completed}/{len(futures)} chunks completed", end='\r') + + print() # New line + + # Find best result + best_result = max(all_results, key=lambda x: x['f1']) + + elapsed = time.time() - start_time + print(f"Affiliation computed in {elapsed:.2f}s") + + return best_result['f1'], best_result['precision'], best_result['recall'] + + def metric_Affiliation_chunked(self, label, score, chunk_size=30, num_workers=4): + """ + Simple chunked parallel processing + """ + print(f"Computing Affiliation (chunked) with {num_workers} workers, chunk_size={chunk_size}") + start_time = time.time() + + # Generate p-values + p_values = np.linspace(0.8, 1, 300) + + # Create chunks of p-values + p_value_chunks = [p_values[i:i + chunk_size] + for i in range(0, len(p_values), chunk_size)] + + # Prepare arguments for workers + chunk_args = [(chunk, label, score) for chunk in p_value_chunks] + + # Process in parallel + all_results = [] + with ProcessPoolExecutor(max_workers=num_workers) as executor: + for i, result_chunk in enumerate(executor.map(self._process_affiliation_chunk, chunk_args)): + all_results.extend(result_chunk) + print(f"Progress: {(i + 1) * chunk_size}/{len(p_values)} thresholds processed", end='\r') + + print() # New line + + # Find best result + best_result = max(all_results, key=lambda x: x['f1']) + + elapsed = time.time() - start_time + print(f"Affiliation computed in {elapsed:.2f}s") + + return best_result['f1'], best_result['precision'], best_result['recall'] + + @staticmethod + def _process_affiliation_chunk(args): + """ + Static method to process a chunk of p-values for affiliation metrics. + This can be pickled for multiprocessing. + """ + chunk_p_values, label_local, score_local = args + from .affiliation.generics import convert_vector_to_events + from .affiliation.metrics import pr_from_events + + # Convert ground truth to events once for this chunk + events_gt = convert_vector_to_events(label_local) + Trange = (0, len(label_local)) + + results = [] + for p in chunk_p_values: + threshold = np.quantile(score_local, p) + preds = (score_local > threshold).astype(int) + + events_pred = convert_vector_to_events(preds) + affiliation_metrics = pr_from_events(events_pred, events_gt, Trange) + + prec = affiliation_metrics['Affiliation_Precision'] + rec = affiliation_metrics['Affiliation_Recall'] + + if prec + rec > 0: + f1 = 2 * prec * rec / (prec + rec + 1e-7) + else: + f1 = 0.0 + + results.append({ + 'f1': f1, + 'precision': prec, + 'recall': rec, + 'p_value': p, + 'threshold': threshold + }) + + return results + + def metric_Affiliation_sampling(self, label, score, sample_rate=0.2): + """ + Fast approximation by sampling thresholds + """ + print(f"Computing Affiliation with threshold sampling (rate={sample_rate})") + start_time = time.time() + + from .affiliation.generics import convert_vector_to_events + from .affiliation.metrics import pr_from_events + + # Convert ground truth to events once + events_gt = convert_vector_to_events(label) + Trange = (0, len(label)) + + # Generate fewer p-values + n_samples = int(300 * sample_rate) + p_values = np.linspace(0.8, 1, n_samples) + + results = [] + for p in tqdm(p_values, desc="Sampling affiliation", unit="threshold"): + threshold = np.quantile(score, p) + preds = (score > threshold).astype(int) + + events_pred = convert_vector_to_events(preds) + affiliation_metrics = pr_from_events(events_pred, events_gt, Trange) + + prec = affiliation_metrics['Affiliation_Precision'] + rec = affiliation_metrics['Affiliation_Recall'] + + if prec + rec > 0: + f1 = 2 * prec * rec / (prec + rec + self.eps) + else: + f1 = 0.0 + + results.append({ + 'f1': f1, + 'precision': prec, + 'recall': rec, + 'p_value': p, + 'threshold': threshold + }) + + # Find best result + best_result = max(results, key=lambda x: x['f1']) + + elapsed = time.time() - start_time + print(f"Affiliation computed in {elapsed:.2f}s (approximate)") + + return best_result['f1'], best_result['precision'], best_result['recall'] + + def metric_standard_F1_chunked(self, true_labels, anomaly_scores, threshold=None, chunk_size=50, num_workers=4): + """ + Optimized chunked parallel version of metric_standard_F1. + + Calculate F1, Precision, Recall using parallel threshold processing. + + Args: + true_labels: np.ndarray, ground truth binary labels (0=normal, 1=anomaly) + anomaly_scores: np.ndarray, anomaly scores (continuous values) + threshold: float, optional. If None, will use optimal threshold based on F1 score + chunk_size: int, number of thresholds to process in each chunk + num_workers: int, number of parallel workers + + Returns: + dict: Dictionary containing various metrics + """ + # If threshold is provided, use original method + if threshold is not None: + return self.metric_standard_F1(true_labels, anomaly_scores, threshold) + + print(f"Computing standard F1 (chunked) with {num_workers} workers, chunk_size={chunk_size}") + start_time = time.time() + + # Generate thresholds + thresholds = np.linspace(0.5, 1, 500) + total_thresholds = len(thresholds) + + # Create chunks of thresholds + threshold_chunks = [thresholds[i:i + chunk_size] + for i in range(0, len(thresholds), chunk_size)] + + print(f"Split {total_thresholds} thresholds into {len(threshold_chunks)} chunks") + + # Process in parallel + all_results = [] + + with ProcessPoolExecutor(max_workers=num_workers) as executor: + with tqdm(total=total_thresholds, desc="Processing standard F1 thresholds", unit="threshold", colour="blue") as pbar: + # Prepare arguments + chunk_args = [(chunk, true_labels, anomaly_scores) for chunk in threshold_chunks] + + # Process and update progress bar + for i, result_chunk in enumerate(executor.map(self._process_standard_f1_chunk, chunk_args)): + all_results.extend(result_chunk) + pbar.update(len(threshold_chunks[i])) + pbar.set_postfix({ + 'chunk': f"{i + 1}/{len(threshold_chunks)}", + 'results': len(all_results) + }) + + # Find best result + best_result = max(all_results, key=lambda x: x['f1']) + + elapsed = time.time() - start_time + print(f"✓ Standard F1 computed in {elapsed:.2f}s") + print(f" Best F1: {best_result['f1']:.4f} at threshold {best_result['threshold']:.4f}") + + return { + 'F1': best_result['f1'], + 'Recall': best_result['recall'], + 'Precision': best_result['precision'] + } + + @staticmethod + def _process_standard_f1_chunk(args): + """ + Static method to process a chunk of thresholds for standard F1 metrics. + This can be pickled for multiprocessing. + """ + chunk_thresholds, true_labels, anomaly_scores = args + results = [] + + for t in chunk_thresholds: + threshold = np.quantile(anomaly_scores, t) + predictions = (anomaly_scores >= threshold).astype(int) + + if len(np.unique(predictions)) > 1: # Avoid division by zero + precision, recall, f1, _ = precision_recall_fscore_support( + true_labels, predictions, average='binary', zero_division=0 + ) + else: + precision, recall, f1 = 0.0, 0.0, 0.0 + + results.append({ + 'f1': f1, + 'precision': precision, + 'recall': recall, + 'threshold': threshold, + 'quantile': t + }) + + return results + + def metric_PointF1PA_chunked(self, label, score, preds=None, chunk_size=50, num_workers=4): + """ + Optimized chunked parallel version of metric_PointF1PA. + + Calculate Point F1 with Point Adjustment using parallel threshold processing. + + Args: + label: np.ndarray, ground truth binary labels + score: np.ndarray, anomaly scores + preds: np.ndarray, optional. If provided, use these predictions directly + chunk_size: int, number of thresholds to process in each chunk + num_workers: int, number of parallel workers + + Returns: + dict: Dictionary containing various metrics (same format as original method) + """ + # If predictions are provided, use original method + if preds is not None: + return self.metric_PointF1PA(label, score, preds) + + print(f"Computing PointF1PA (chunked) with {num_workers} workers, chunk_size={chunk_size}") + start_time = time.time() + + # Generate q_values (quantiles) + q_values = np.arange(0.7, 0.99, 0.001) + total_thresholds = len(q_values) + + # Create chunks of q_values + q_value_chunks = [q_values[i:i + chunk_size] + for i in range(0, len(q_values), chunk_size)] + + print(f"Split {total_thresholds} thresholds into {len(q_value_chunks)} chunks") + + # Process in parallel + all_results = [] + + with ProcessPoolExecutor(max_workers=num_workers) as executor: + with tqdm(total=total_thresholds, desc="Processing PointF1PA thresholds", unit="threshold", colour="green") as pbar: + # Prepare arguments + chunk_args = [(chunk, label, score) for chunk in q_value_chunks] + + # Process and update progress bar + for i, result_chunk in enumerate(executor.map(self._process_pointf1pa_chunk, chunk_args)): + all_results.extend(result_chunk) + pbar.update(len(q_value_chunks[i])) + pbar.set_postfix({ + 'chunk': f"{i + 1}/{len(q_value_chunks)}", + 'results': len(all_results) + }) + + # Find best result + best_result = max(all_results, key=lambda x: x['F1_PA']) + + elapsed = time.time() - start_time + print(f"✓ PointF1PA computed in {elapsed:.2f}s") + print(f" Best F1_PA: {best_result['F1_PA']:.4f} at threshold {best_result['thre_PA']:.4f}") + + return best_result + + @staticmethod + def _process_pointf1pa_chunk(args): + """ + Static method to process a chunk of q_values for PointF1PA metrics. + This can be pickled for multiprocessing. + """ + import sklearn.metrics + + chunk_q_values, label, score = args + results = [] + + # Create a basic_metricor instance to access adjustment method + grader = basic_metricor() + + for q in chunk_q_values: + thre = np.quantile(score, q) + pred = (score > thre).astype(int) + adjusted_pred = grader.adjustment(label, pred) + + accuracy = sklearn.metrics.accuracy_score(label, adjusted_pred) + P, R, F1, _ = sklearn.metrics.precision_recall_fscore_support(label, adjusted_pred, average="binary") + + result = { + 'thre_PA': thre, + 'ACC_PA': accuracy, + 'P_PA': P, + 'R_PA': R, + 'F1_PA': F1, + 'quantile': q + } + + results.append(result) + + return results \ No newline at end of file