Time_RCD / evaluation /basic_metrics.py
Oliver Le
Initial commit
d03866e
import torch
from tqdm import tqdm
from sklearn.metrics import precision_recall_fscore_support
from sklearn import metrics
import numpy as np
import math
import copy
import sklearn
from typing import Callable, Dict, Any, Tuple, Optional, List
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
from functools import partial
import time
import multiprocessing as mp
def generate_curve(label, score, slidingWindow, version='opt', thre=250):
if version =='opt_mem':
tpr_3d, fpr_3d, prec_3d, window_3d, avg_auc_3d, avg_ap_3d = basic_metricor().RangeAUC_volume_opt_mem(labels_original=label, score=score, windowSize=slidingWindow, thre=thre)
else:
tpr_3d, fpr_3d, prec_3d, window_3d, avg_auc_3d, avg_ap_3d = basic_metricor().RangeAUC_volume_opt(labels_original=label, score=score, windowSize=slidingWindow, thre=thre)
X = np.array(tpr_3d).reshape(1,-1).ravel()
X_ap = np.array(tpr_3d)[:,:-1].reshape(1,-1).ravel()
Y = np.array(fpr_3d).reshape(1,-1).ravel()
W = np.array(prec_3d).reshape(1,-1).ravel()
Z = np.repeat(window_3d, len(tpr_3d[0]))
Z_ap = np.repeat(window_3d, len(tpr_3d[0])-1)
return Y, Z, X, X_ap, W, Z_ap,avg_auc_3d, avg_ap_3d
def inverse_proportional_cardinality_fn(cardinality: int, gt_length: int) -> float:
r"""
Cardinality function that assigns an inversely proportional weight to predictions within a single ground-truth
window.
This is the default cardinality function recommended in [Tatbul2018]_.
.. note::
This function leads to a metric that is not recall-consistent! Please see [Wagner2023]_ for more details.
:param cardinality: Number of predicted windows that overlap the ground-truth window in question.
:param gt_length: Length of the ground-truth window (unused).
:return: The cardinality factor :math:`\frac{1}{\text{cardinality}}`.
.. [Tatbul2018] N. Tatbul, T.J. Lee, S. Zdonik, M. Alam, J. Gottschlich.
Precision and recall for time series. Advances in neural information processing systems. 2018;31.
.. [Wagner2023] D. Wagner, T. Michels, F.C.F. Schulz, A. Nair, M. Rudolph, and M. Kloft.
TimeSeAD: Benchmarking Deep Multivariate Time-Series Anomaly Detection.
Transactions on Machine Learning Research (TMLR), (to appear) 2023.
"""
return 1 / max(1, cardinality)
def constant_bias_fn(inputs: torch.Tensor) -> float:
r"""
Compute the overlap size for a constant bias function that assigns the same weight to all positions.
This functions computes
.. math::
\omega(\text{inputs}) = \frac{1}{n} \sum_{i = 1}^{n} \text{inputs}_i,
where :math:`n = \lvert \text{inputs} \rvert`.
.. note::
To improve the runtime of our algorithm, we calculate the overlap :math:`\omega` directly as part of the bias
function.
:param inputs: A 1-D :class:`~torch.Tensor` containing the predictions inside a ground-truth window.
:return: The overlap :math:`\omega`.
"""
return torch.sum(inputs).item() / inputs.shape[0]
def improved_cardinality_fn(cardinality: int, gt_length: int):
r"""
Recall-consistent cardinality function introduced by [Wagner2023]_ that assigns lower weight to ground-truth windows
that overlap with many predicted windows.
This function computes
.. math::
\left(\frac{\text{gt_length} - 1}{\text{gt_length}}\right)^{\text{cardinality} - 1}.
:param cardinality: Number of predicted windows that overlap the ground-truth window in question.
:param gt_length: Length of the ground-truth window.
:return: The cardinality factor.
"""
return ((gt_length - 1) / gt_length) ** (cardinality - 1)
class basic_metricor():
def __init__(self, a = 1, probability = True, bias = 'flat', ):
self.a = a
self.probability = probability
self.bias = bias
self.eps = 1e-15
def detect_model(self, model, label, contamination = 0.1, window = 100, is_A = False, is_threshold = True):
if is_threshold:
score = self.scale_threshold(model.decision_scores_, model._mu, model._sigma)
else:
score = self.scale_contamination(model.decision_scores_, contamination = contamination)
if is_A is False:
scoreX = np.zeros(len(score)+window)
scoreX[math.ceil(window/2): len(score)+window - math.floor(window/2)] = score
else:
scoreX = score
self.score_=scoreX
L = self.metric(label, scoreX)
return L
def w(self, AnomalyRange, p):
MyValue = 0
MaxValue = 0
start = AnomalyRange[0]
AnomalyLength = AnomalyRange[1] - AnomalyRange[0] + 1
for i in range(start, start +AnomalyLength):
bi = self.b(i, AnomalyLength)
MaxValue += bi
if i in p:
MyValue += bi
return MyValue/MaxValue
def Cardinality_factor(self, Anomolyrange, Prange):
score = 0
start = Anomolyrange[0]
end = Anomolyrange[1]
for i in Prange:
if i[0] >= start and i[0] <= end:
score +=1
elif start >= i[0] and start <= i[1]:
score += 1
elif end >= i[0] and end <= i[1]:
score += 1
elif start >= i[0] and end <= i[1]:
score += 1
if score == 0:
return 0
else:
return 1/score
def b(self, i, length):
bias = self.bias
if bias == 'flat':
return 1
elif bias == 'front-end bias':
return length - i + 1
elif bias == 'back-end bias':
return i
else:
if i <= length/2:
return i
else:
return length - i + 1
def scale_threshold(self, score, score_mu, score_sigma):
return (score >= (score_mu + 3*score_sigma)).astype(int)
def _adjust_predicts(self, score, label, threshold=None, pred=None, calc_latency=False):
"""
Calculate adjusted predict labels using given `score`, `threshold` (or given `pred`) and `label`.
Args:
score (np.ndarray): The anomaly score
label (np.ndarray): The ground-truth label
threshold (float): The threshold of anomaly score.
A point is labeled as "anomaly" if its score is higher than the threshold.
pred (np.ndarray or None): if not None, adjust `pred` and ignore `score` and `threshold`,
calc_latency (bool):
Returns:
np.ndarray: predict labels
"""
if len(score) != len(label):
raise ValueError("score and label must have the same length")
score = np.asarray(score)
label = np.asarray(label)
latency = 0
if pred is None:
predict = score > threshold
else:
predict = copy.deepcopy(pred)
actual = label > 0.1
anomaly_state = False
anomaly_count = 0
for i in range(len(score)):
if actual[i] and predict[i] and not anomaly_state:
anomaly_state = True
anomaly_count += 1
for j in range(i, 0, -1):
if not actual[j]:
break
else:
if not predict[j]:
predict[j] = True
latency += 1
elif not actual[i]:
anomaly_state = False
if anomaly_state:
predict[i] = True
if calc_latency:
return predict, latency / (anomaly_count + 1e-4)
else:
return predict
def adjustment(self, gt, pred):
adjusted_pred = np.array(pred)
anomaly_state = False
for i in range(len(gt)):
if gt[i] == 1 and adjusted_pred[i] == 1 and not anomaly_state:
anomaly_state = True
for j in range(i, 0, -1):
if gt[j] == 0:
break
else:
if adjusted_pred[j] == 0:
adjusted_pred[j] = 1
for j in range(i, len(gt)):
if gt[j] == 0:
break
else:
if adjusted_pred[j] == 0:
adjusted_pred[j] = 1
elif gt[i] == 0:
anomaly_state = False
if anomaly_state:
adjusted_pred[i] = 1
return adjusted_pred
def metric_new(self, label, score, preds, plot_ROC=False, alpha=0.2):
'''input:
Real labels and anomaly score in prediction
output:
AUC,
Precision,
Recall,
F-score,
Range-precision,
Range-recall,
Range-Fscore,
Precison@k,
k is chosen to be # of outliers in real labels
'''
if np.sum(label) == 0:
print('All labels are 0. Label must have groud truth value for calculating AUC score.')
return None
if np.isnan(score).any() or score is None:
print('Score must not be none.')
return None
#area under curve
auc = metrics.roc_auc_score(label, score)
# plor ROC curve
if plot_ROC:
fpr, tpr, thresholds = metrics.roc_curve(label, score)
# display = metrics.RocCurveDisplay(fpr=fpr, tpr=tpr, roc_auc=auc)
# display.plot()
#precision, recall, F
if preds is None:
preds = score > (np.mean(score)+3*np.std(score))
Precision, Recall, F, Support = metrics.precision_recall_fscore_support(label, preds, zero_division=0)
precision = Precision[1]
recall = Recall[1]
f = F[1]
#point-adjust
adjust_preds = self._adjust_predicts(score, label, pred=preds)
PointF1PA = metrics.f1_score(label, adjust_preds)
#range anomaly
Rrecall, ExistenceReward, OverlapReward = self.range_recall_new(label, preds, alpha)
Rprecision = self.range_recall_new(preds, label, 0)[0]
if Rprecision + Rrecall==0:
Rf=0
else:
Rf = 2 * Rrecall * Rprecision / (Rprecision + Rrecall)
# top-k
k = int(np.sum(label))
threshold = np.percentile(score, 100 * (1-k/len(label)))
# precision_at_k = metrics.top_k_accuracy_score(label, score, k)
p_at_k = np.where(preds > threshold)[0]
TP_at_k = sum(label[p_at_k])
precision_at_k = TP_at_k/k
L = [auc, precision, recall, f, PointF1PA, Rrecall, ExistenceReward, OverlapReward, Rprecision, Rf, precision_at_k]
if plot_ROC:
return L, fpr, tpr
return L
def metric_ROC(self, label, score):
return metrics.roc_auc_score(label, score)
def metric_PR(self, label, score):
return metrics.average_precision_score(label, score)
def metric_PointF1(self, label, score, preds=None):
if preds is None:
precision, recall, thresholds = metrics.precision_recall_curve(label, score)
f1_scores = 2 * (precision * recall) / (precision + recall + 0.00001)
F1 = np.max(f1_scores)
threshold = thresholds[np.argmax(f1_scores)]
else:
Precision, Recall, F, Support = metrics.precision_recall_fscore_support(label, preds, zero_division=0)
F1 = F[1]
return F1
def metric_standard_F1(self, true_labels, anomaly_scores, threshold=None):
"""
Calculate F1, Precision, Recall, and other metrics for anomaly detection.
Args:
anomaly_scores: np.ndarray, anomaly scores (continuous values)
true_labels: np.ndarray, ground truth binary labels (0=normal, 1=anomaly)
threshold: float, optional. If None, will use optimal threshold based on F1 score
Returns:
dict: Dictionary containing various metrics
"""
# If no threshold provided, find optimal threshold
if threshold is None:
thresholds = np.linspace(0, 1, 1500)
best_f1 = 0
best_threshold = 0
for t in tqdm(thresholds, total=len(thresholds), desc="Finding optimal threshold"):
threshold = np.quantile(anomaly_scores, t)
predictions = (anomaly_scores >= threshold).astype(int)
if len(np.unique(predictions)) > 1: # Avoid division by zero
precision, recall, f1, _ = precision_recall_fscore_support(
true_labels, predictions, average='binary', zero_division=0
)
# print(f1, t)
if f1 > best_f1:
best_f1 = f1
best_threshold = threshold
threshold = best_threshold
# print("aaa", threshold, best_threshold, best_f1)
# Calculate predictions based on threshold
predictions = (anomaly_scores >= threshold).astype(int)
# Calculate basic metrics
precision, recall, f1, _ = precision_recall_fscore_support(
true_labels, predictions, average='binary', zero_division=0
)
# print(threshold, f1)
return {
'F1': f1,
'Recall': recall,
'Precision': precision, }
def metric_Affiliation(self, label, score, preds=None):
from .affiliation.generics import convert_vector_to_events
from .affiliation.metrics import pr_from_events
# Ensure proper data types to avoid float/integer issues
label = np.asarray(label, dtype=int)
score = np.asarray(score, dtype=float)
# Convert ground truth to events once, outside the loop
events_gt = convert_vector_to_events(label)
if preds is None:
# print("Calculating afiliation metrics using score thresholds.")
p_values = np.linspace(0, 1, 1500)
# print(f"Using {thresholds} thresholds for affiliation metrics.")
Affiliation_scores = []
Affiliation_Precision_scores = []
Affiliation_Recall_scores = []
# print("Score values", score)
for p in tqdm(p_values, total=(len(p_values)), desc="Calculating Affiliation Metrics"):
threshold = np.quantile(score, p)
preds_loop = (score > threshold).astype(int)
events_pred = convert_vector_to_events(preds_loop)
# events_gt is already calculated
Trange = (0, len(preds_loop))
affiliation_metrics = pr_from_events(events_pred, events_gt, Trange)
Affiliation_Precision = affiliation_metrics['Affiliation_Precision']
Affiliation_Recall = affiliation_metrics['Affiliation_Recall']
# --- FIX 1: Prevent division by zero ---
denominator = Affiliation_Precision + Affiliation_Recall
if denominator > 0:
Affiliation_F = 2 * Affiliation_Precision * Affiliation_Recall / (denominator + self.eps)
else:
Affiliation_F = 0.0
# # Use a local variable for the F1 score in the loop
# Affiliation_F = 2 * Affiliation_Precision * Affiliation_Recall / (
# Affiliation_Precision + Affiliation_Recall + self.eps)
Affiliation_scores.append(Affiliation_F)
Affiliation_Precision_scores.append(Affiliation_Precision)
Affiliation_Recall_scores.append(Affiliation_Recall)
# Find the best scores after the loop
# print("Here are the Affiliation scores:", Affiliation_scores)
best_index = np.argmax(Affiliation_scores)
# print(f"Best Affiliation F1 score found at index {best_index} with value {Affiliation_scores[best_index]}")
Best_Affiliation_F1 = Affiliation_scores[best_index]
Best_Affiliation_Precision = Affiliation_Precision_scores[best_index]
Best_Affiliation_Recall = Affiliation_Recall_scores[best_index]
else:
print("Using provided predictions for affiliation metrics.")
# This block runs when 'preds' is provided
events_pred = convert_vector_to_events(preds)
Trange = (0, len(preds))
affiliation_metrics = pr_from_events(events_pred, events_gt, Trange)
# FIX: Assign the calculated values to the 'Best_' variables
# so they exist for the return statement.
Best_Affiliation_Precision = affiliation_metrics['Affiliation_Precision']
Best_Affiliation_Recall = affiliation_metrics['Affiliation_Recall']
Best_Affiliation_F1 = 2 * Best_Affiliation_Precision * Best_Affiliation_Recall / (
Best_Affiliation_Precision + Best_Affiliation_Recall + self.eps)
# FIX: Corrected the typo from Best_Affiliation_Rec to Best_Affiliation_Recall
return Best_Affiliation_F1, Best_Affiliation_Precision, Best_Affiliation_Recall
def metric_RF1(self, label, score, preds=None):
if preds is None:
q_values = np.linspace(0, 1, 1000)
Rf1_scores = []
thresholds = []
for q in tqdm(q_values, total=(len(q_values)), desc="Calculating RF1 Metrics"):
# Calculate prediction
threshold = np.quantile(score, q)
preds = (score > threshold).astype(int)
Rrecall, ExistenceReward, OverlapReward = self.range_recall_new(label, preds, alpha=0.2)
Rprecision = self.range_recall_new(preds, label, 0)[0]
if Rprecision + Rrecall==0:
Rf=0
else:
Rf = 2 * Rrecall * Rprecision / (Rprecision + Rrecall)
Rf1_scores.append(Rf)
thresholds.append(threshold)
RF1_Threshold = thresholds[np.argmax(Rf1_scores)]
RF1 = max(Rf1_scores)
else:
Rrecall, ExistenceReward, OverlapReward = self.range_recall_new(label, preds, alpha=0.2)
Rprecision = self.range_recall_new(preds, label, 0)[0]
if Rprecision + Rrecall==0:
RF1=0
else:
RF1 = 2 * Rrecall * Rprecision / (Rprecision + Rrecall)
return RF1
# def metric_F1_T(self, labels: torch.Tensor, scores: torch.Tensor):
# """
# Computes the F1 score for time series anomaly detection by finding the best threshold.
#
# Args:
# labels (torch.Tensor): Ground truth labels for the time series data.
# scores (torch.Tensor): Anomaly scores predicted by the model.
#
# Returns:
# Tuple[float, Dict[str, Any]]: The best F1 score and a dictionary with additional metrics.
# """
# result = {}
# labels = torch.tensor(labels, dtype=torch.int)
# score = torch.tensor(scores, dtype=torch.float)
# f1, details = self.__best_ts_fbeta_score(labels, score, beta=1,)
# result['thre_T'] = details['threshold']
# result['ACC_T'] = sklearn.metrics.accuracy_score(labels, score > details['threshold'])
# result['P_T'] = details['precision']
# result['R_T'] = details['recall']
# result['F1_T'] = f1
#
# return result
def metric_F1_T(self, labels: torch.Tensor, scores: torch.Tensor, use_parallel=True,
parallel_method='chunked', chunk_size=10, max_workers=8):
"""
Computes the F1 score with optional parallel processing.
Args:
labels: Ground truth labels
scores: Anomaly scores
use_parallel: Whether to use parallel processing (default: True)
parallel_method: Type of parallel processing ('standard' or 'chunked')
chunk_size: Size of chunks for chunked parallel processing
max_workers: Maximum number of worker threads
"""
result = {}
labels = torch.tensor(labels, dtype=torch.int)
score = torch.tensor(scores, dtype=torch.float)
# Choose which method to use
if use_parallel:
if parallel_method == 'chunked':
f1, details = self.__best_ts_fbeta_score_parallel_chunked(
labels, score, beta=1, chunk_size=chunk_size, max_workers=max_workers
)
else: # standard parallel
f1, details = self.__best_ts_fbeta_score_parallel(labels, score, beta=1)
else:
f1, details = self.__best_ts_fbeta_score(labels, score, beta=1)
result['thre_T'] = details['threshold']
result['ACC_T'] = sklearn.metrics.accuracy_score(labels, score > details['threshold'])
result['P_T'] = details['precision']
result['R_T'] = details['recall']
result['F1_T'] = f1
return result
def __best_ts_fbeta_score_parallel(self, labels: torch.Tensor, scores: torch.Tensor, beta: float,
recall_cardinality_fn: Callable = improved_cardinality_fn,
weighted_precision: bool = True, n_splits: int = 1500) -> Tuple[
float, Dict[str, Any]]:
"""
Parallel version of best_ts_fbeta_score using ThreadPoolExecutor.
Uses threading instead of multiprocessing to avoid serialization issues
with PyTorch tensors and instance methods.
"""
# Use same parameter range as sequential version for consistency
device = scores.device
p_values = torch.linspace(0, 1.0, steps=n_splits, device=device)
thresholds = torch.quantile(scores, p_values)
label_ranges = self.compute_window_indices(labels)
precision = torch.empty_like(thresholds, dtype=torch.float)
recall = torch.empty_like(thresholds, dtype=torch.float)
def process_single_threshold(idx_threshold_pair):
"""Process a single threshold computation"""
idx, threshold = idx_threshold_pair
# Create predictions for this threshold
predictions = (scores > threshold).long()
# Calculate precision and recall using instance method
prec, rec = self.ts_precision_and_recall(
labels,
predictions,
alpha=0,
recall_cardinality_fn=recall_cardinality_fn,
anomaly_ranges=label_ranges,
weighted_precision=weighted_precision,
)
# Handle edge case to avoid 0/0 in F-score computation
if prec == 0 and rec == 0:
rec = 1
return idx, prec, rec
# Use ThreadPoolExecutor instead of ProcessPoolExecutor
# This allows us to use instance methods and share PyTorch tensors safely
max_workers = min(16, len(thresholds)) # Don't create more threads than thresholds
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all threshold computations
futures = {
executor.submit(process_single_threshold, (i, t)): i
for i, t in enumerate(thresholds)
}
# Collect results as they complete
for future in tqdm(as_completed(futures), total=len(futures),
desc="Calculating F-beta score (parallel)"):
idx, prec, rec = future.result()
precision[idx] = prec
recall[idx] = rec
# Compute F-scores and find the best one
f_score = (1 + beta ** 2) * precision * recall / (beta ** 2 * precision + recall)
max_score_index = torch.argmax(f_score)
return (
f_score[max_score_index].item(),
dict(
threshold=thresholds[max_score_index].item(),
precision=precision[max_score_index].item(),
recall=recall[max_score_index].item(),
),
)
def __best_ts_fbeta_score_parallel_chunked(self, labels: torch.Tensor, scores: torch.Tensor, beta: float,
recall_cardinality_fn: Callable = improved_cardinality_fn,
weighted_precision: bool = True, n_splits: int = 1500,
chunk_size: int = 10, max_workers: int = 8) -> Tuple[float, Dict[str, Any]]:
"""
Chunked parallel version of best_ts_fbeta_score using ThreadPoolExecutor.
This version processes thresholds in chunks to reduce overhead and improve efficiency.
Args:
labels: Ground truth labels
scores: Anomaly scores
beta: Beta parameter for F-beta score
recall_cardinality_fn: Cardinality function for recall calculation
weighted_precision: Whether to use weighted precision
n_splits: Number of threshold splits
chunk_size: Number of thresholds to process in each chunk
max_workers: Maximum number of worker threads
"""
# Use same parameter range as sequential version for consistency
device = scores.device
p_values = torch.linspace(0, 1.0, steps=n_splits, device=device)
thresholds = torch.quantile(scores, p_values)
label_ranges = self.compute_window_indices(labels)
precision = torch.empty_like(thresholds, dtype=torch.float)
recall = torch.empty_like(thresholds, dtype=torch.float)
def process_threshold_chunk(chunk_data):
"""Process a chunk of thresholds"""
chunk_indices, chunk_thresholds = chunk_data
chunk_results = []
# Process each threshold in the chunk
for i, (idx, threshold) in enumerate(zip(chunk_indices, chunk_thresholds)):
# Create predictions for this threshold
predictions = (scores > threshold).long()
# Calculate precision and recall using instance method
prec, rec = self.ts_precision_and_recall(
labels,
predictions,
alpha=0,
recall_cardinality_fn=recall_cardinality_fn,
anomaly_ranges=label_ranges,
weighted_precision=weighted_precision,
)
# Handle edge case to avoid 0/0 in F-score computation
if prec == 0 and rec == 0:
rec = 1
chunk_results.append((idx, prec, rec))
return chunk_results
# Create chunks of threshold indices and values
chunks = []
for i in range(0, len(thresholds), chunk_size):
end_idx = min(i + chunk_size, len(thresholds))
chunk_indices = list(range(i, end_idx))
chunk_thresholds = thresholds[i:end_idx]
chunks.append((chunk_indices, chunk_thresholds))
print(f"Processing {len(thresholds)} thresholds in {len(chunks)} chunks of size ~{chunk_size}")
# Use ThreadPoolExecutor to process chunks in parallel
actual_workers = min(max_workers, len(chunks))
with ThreadPoolExecutor(max_workers=actual_workers) as executor:
# Submit all chunk computations
futures = {
executor.submit(process_threshold_chunk, chunk): i
for i, chunk in enumerate(chunks)
}
# Collect results as they complete
for future in tqdm(as_completed(futures), total=len(futures),
desc=f"Processing {len(chunks)} chunks (chunked parallel)"):
chunk_results = future.result()
# Store results in the appropriate positions
for idx, prec, rec in chunk_results:
precision[idx] = prec
recall[idx] = rec
# Compute F-scores and find the best one
f_score = (1 + beta ** 2) * precision * recall / (beta ** 2 * precision + recall)
max_score_index = torch.argmax(f_score)
return (
f_score[max_score_index].item(),
dict(
threshold=thresholds[max_score_index].item(),
precision=precision[max_score_index].item(),
recall=recall[max_score_index].item(),
),
)
def compute_window_indices(self, binary_labels: torch.Tensor) -> List[Tuple[int, int]]:
"""
Compute a list of indices where anomaly windows begin and end.
:param binary_labels: A 1-D :class:`~torch.Tensor` containing ``1`` for an anomalous time step or ``0`` otherwise.
:return: A list of tuples ``(start, end)`` for each anomaly window in ``binary_labels``, where ``start`` is the
index at which the window starts and ``end`` is the first index after the end of the window.
"""
boundaries = torch.empty_like(binary_labels)
boundaries[0] = 0
boundaries[1:] = binary_labels[:-1]
boundaries *= -1
boundaries += binary_labels
# boundaries will be 1 where a window starts and -1 at the end of a window
indices = torch.nonzero(boundaries, as_tuple=True)[0].tolist()
if len(indices) % 2 != 0:
# Add the last index as the end of a window if appropriate
indices.append(binary_labels.shape[0])
indices = [(indices[i], indices[i + 1]) for i in range(0, len(indices), 2)]
return indices
def _compute_overlap(self, preds: torch.Tensor, pred_indices: List[Tuple[int, int]],
gt_indices: List[Tuple[int, int]], alpha: float,
bias_fn: Callable, cardinality_fn: Callable,
use_window_weight: bool = False) -> float:
n_gt_windows = len(gt_indices)
n_pred_windows = len(pred_indices)
total_score = 0.0
total_gt_points = 0
i = j = 0
while i < n_gt_windows and j < n_pred_windows:
gt_start, gt_end = gt_indices[i]
window_length = gt_end - gt_start
total_gt_points += window_length
i += 1
cardinality = 0
while j < n_pred_windows and pred_indices[j][1] <= gt_start:
j += 1
while j < n_pred_windows and pred_indices[j][0] < gt_end:
j += 1
cardinality += 1
if cardinality == 0:
# cardinality == 0 means no overlap at all, hence no contribution
continue
# The last predicted window that overlaps our current window could also overlap the next window.
# Therefore, we must consider it again in the next loop iteration.
j -= 1
cardinality_multiplier = cardinality_fn(cardinality, window_length)
prediction_inside_ground_truth = preds[gt_start:gt_end]
# We calculate omega directly in the bias function, because this can greatly improve running time
# for the constant bias, for example.
omega = bias_fn(prediction_inside_ground_truth)
# Either weight evenly across all windows or based on window length
weight = window_length if use_window_weight else 1
# Existence reward (if cardinality > 0 then this is certainly 1)
total_score += alpha * weight
# Overlap reward
total_score += (1 - alpha) * cardinality_multiplier * omega * weight
denom = total_gt_points if use_window_weight else n_gt_windows
return total_score / denom
def ts_precision_and_recall(self, anomalies: torch.Tensor, predictions: torch.Tensor, alpha: float = 0,
recall_bias_fn: Callable[[torch.Tensor], float] = constant_bias_fn,
recall_cardinality_fn: Callable[[int], float] = inverse_proportional_cardinality_fn,
precision_bias_fn: Optional[Callable] = None,
precision_cardinality_fn: Optional[Callable] = None,
anomaly_ranges: Optional[List[Tuple[int, int]]] = None,
prediction_ranges: Optional[List[Tuple[int, int]]] = None,
weighted_precision: bool = False) -> Tuple[float, float]:
"""
Computes precision and recall for time series as defined in [Tatbul2018]_.
.. note::
The default parameters for this function correspond to the defaults recommended in [Tatbul2018]_. However,
those might not be desirable in most cases, please see [Wagner2023]_ for a detailed discussion.
:param anomalies: Binary 1-D :class:`~torch.Tensor` of shape ``(length,)`` containing the true labels.
:param predictions: Binary 1-D :class:`~torch.Tensor` of shape ``(length,)`` containing the predicted labels.
:param alpha: Weight for existence term in recall.
:param recall_bias_fn: Function that computes the bias term for a given ground-truth window.
:param recall_cardinality_fn: Function that compute the cardinality factor for a given ground-truth window.
:param precision_bias_fn: Function that computes the bias term for a given predicted window.
If ``None``, this will be the same as ``recall_bias_function``.
:param precision_cardinality_fn: Function that computes the cardinality factor for a given predicted window.
If ``None``, this will be the same as ``recall_cardinality_function``.
:param weighted_precision: If True, the precision score of a predicted window will be weighted with the
length of the window in the final score. Otherwise, each window will have the same weight.
:param anomaly_ranges: A list of tuples ``(start, end)`` for each anomaly window in ``anomalies``, where ``start``
is the index at which the window starts and ``end`` is the first index after the end of the window. This can
be ``None``, in which case the list is computed automatically from ``anomalies``.
:param prediction_ranges: A list of tuples ``(start, end)`` for each anomaly window in ``predictions``, where
``start`` is the index at which the window starts and ``end`` is the first index after the end of the window.
This can be ``None``, in which case the list is computed automatically from ``predictions``.
:return: A tuple consisting of the time-series precision and recall for the given labels.
"""
has_anomalies = torch.any(anomalies > 0).item()
has_predictions = torch.any(predictions > 0).item()
# Catch special cases which would cause a division by zero
if not has_predictions and not has_anomalies:
# In this case, the classifier is perfect, so it makes sense to set precision and recall to 1
return 1, 1
elif not has_predictions or not has_anomalies:
return 0, 0
# Set precision functions to the same as recall functions if they are not given
if precision_bias_fn is None:
precision_bias_fn = recall_bias_fn
if precision_cardinality_fn is None:
precision_cardinality_fn = recall_cardinality_fn
if anomaly_ranges is None:
anomaly_ranges = self.compute_window_indices(anomalies)
if prediction_ranges is None:
prediction_ranges = self.compute_window_indices(predictions)
recall = self._compute_overlap(predictions, prediction_ranges, anomaly_ranges, alpha, recall_bias_fn,
recall_cardinality_fn)
precision = self._compute_overlap(anomalies, anomaly_ranges, prediction_ranges, 0, precision_bias_fn,
precision_cardinality_fn, use_window_weight=weighted_precision)
return precision, recall
def __best_ts_fbeta_score(self, labels: torch.Tensor, scores: torch.Tensor, beta: float,
recall_cardinality_fn: Callable = improved_cardinality_fn,
weighted_precision: bool = True, n_splits: int = 1500) -> Tuple[float, Dict[str, Any]]:
# Build thresholds from p-values (quantiles/percentiles) of the score distribution
# p_values in [0, 1]; thresholds = percentile(scores, p_values)
device = scores.device
p_values = torch.linspace(0, 1.0, steps=n_splits, device=device)
thresholds = torch.quantile(scores, p_values)
print("Here is the shape of thresholds",thresholds.shape)
precision = torch.empty_like(thresholds, dtype=torch.float)
recall = torch.empty_like(thresholds, dtype=torch.float)
predictions = torch.empty_like(scores, dtype=torch.long)
print("Here is the shape of labels",labels.shape)
print("Here is the shape of scores",scores.shape)
print("Here is the shape of predictions",predictions.shape)
print("Here is the shape of precision",precision.shape)
print("Here is the shape of recall",recall.shape)
label_ranges = self.compute_window_indices(labels)
for i, t in tqdm(enumerate(thresholds), total=len(thresholds),
desc="Calculating F-beta score for thresholds"):
# predictions are 0/1 longs to be compatible with downstream computations
torch.greater(scores, t, out=predictions)
prec, rec = self.ts_precision_and_recall(
labels,
predictions,
alpha=0,
recall_cardinality_fn=recall_cardinality_fn,
anomaly_ranges=label_ranges,
weighted_precision=weighted_precision,
)
# Avoid 0/0 in F-score computation when both prec and rec are 0
if prec == 0 and rec == 0:
rec = 1
precision[i] = prec
recall[i] = rec
f_score = (1 + beta ** 2) * precision * recall / (beta ** 2 * precision + recall)
max_score_index = torch.argmax(f_score)
return (
f_score[max_score_index].item(),
dict(
threshold=thresholds[max_score_index].item(),
precision=precision[max_score_index].item(),
recall=recall[max_score_index].item(),
),
)
def metric_PointF1PA(self, label, score, preds=None):
import sklearn.metrics
best_f1_adjusted = 0
best_result = None
q_values = np.arange(0.7, 0.99, 0.001)
for q in tqdm(q_values, total= len(q_values), desc="Calculating PointF1PA"):
thre = np.quantile(score, q)
result = {}
pred = (score > thre).astype(int)
adjusted_pred = self.adjustment(label, pred)
accuracy = sklearn.metrics.accuracy_score(label, adjusted_pred)
P, R, F1, _ = sklearn.metrics.precision_recall_fscore_support(label, adjusted_pred, average="binary")
result['thre_PA'] = thre
result['ACC_PA'] = accuracy
result['P_PA'] = P
result['R_PA'] = R
result['F1_PA'] = F1
# results.append(pd.DataFrame([result]))
if F1 >= best_f1_adjusted:
best_f1_adjusted = F1
best_result = result
if best_result is not None:
return best_result
else:
assert False, "No best result found, check the input data."
# results_storage['f1_pa'] = pd.concat(results, axis=0).reset_index(drop=True)
def _get_events(self, y_test, outlier=1, normal=0):
events = dict()
label_prev = normal
event = 0 # corresponds to no event
event_start = 0
for tim, label in enumerate(y_test):
if label == outlier:
if label_prev == normal:
event += 1
event_start = tim
else:
if label_prev == outlier:
event_end = tim - 1
events[event] = (event_start, event_end)
label_prev = label
if label_prev == outlier:
event_end = tim - 1
events[event] = (event_start, event_end)
return events
def metric_EventF1PA(self, label, score, preds=None):
from sklearn.metrics import precision_score
true_events = self._get_events(label)
if preds is None:
thresholds = np.linspace(score.min(), score.max(), 100)
EventF1PA_scores = []
for threshold in tqdm(thresholds, total=len(thresholds), desc="Calculating EventF1PA"):
preds = (score > threshold).astype(int)
tp = np.sum([preds[start:end + 1].any() for start, end in true_events.values()])
fn = len(true_events) - tp
rec_e = tp/(tp + fn)
prec_t = precision_score(label, preds)
EventF1PA = 2 * rec_e * prec_t / (rec_e + prec_t + self.eps)
EventF1PA_scores.append(EventF1PA)
EventF1PA_Threshold = thresholds[np.argmax(EventF1PA_scores)]
EventF1PA1 = max(EventF1PA_scores)
else:
tp = np.sum([preds[start:end + 1].any() for start, end in true_events.values()])
fn = len(true_events) - tp
rec_e = tp/(tp + fn)
prec_t = precision_score(label, preds)
EventF1PA1 = 2 * rec_e * prec_t / (rec_e + prec_t + self.eps)
return EventF1PA1
def range_recall_new(self, labels, preds, alpha):
p = np.where(preds == 1)[0] # positions of predicted label==1
range_pred = self.range_convers_new(preds)
range_label = self.range_convers_new(labels)
Nr = len(range_label) # total # of real anomaly segments
ExistenceReward = self.existence_reward(range_label, preds)
OverlapReward = 0
for i in range_label:
OverlapReward += self.w(i, p) * self.Cardinality_factor(i, range_pred)
score = alpha * ExistenceReward + (1-alpha) * OverlapReward
if Nr != 0:
return score/Nr, ExistenceReward/Nr, OverlapReward/Nr
else:
return 0,0,0
def range_convers_new(self, label):
'''
input: arrays of binary values
output: list of ordered pair [[a0,b0], [a1,b1]... ] of the inputs
'''
anomaly_starts = np.where(np.diff(label) == 1)[0] + 1
anomaly_ends, = np.where(np.diff(label) == -1)
if len(anomaly_ends):
if not len(anomaly_starts) or anomaly_ends[0] < anomaly_starts[0]:
# we started with an anomaly, so the start of the first anomaly is the start of the labels
anomaly_starts = np.concatenate([[0], anomaly_starts])
if len(anomaly_starts):
if not len(anomaly_ends) or anomaly_ends[-1] < anomaly_starts[-1]:
# we ended on an anomaly, so the end of the last anomaly is the end of the labels
anomaly_ends = np.concatenate([anomaly_ends, [len(label) - 1]])
return list(zip(anomaly_starts, anomaly_ends))
def existence_reward(self, labels, preds):
'''
labels: list of ordered pair
preds predicted data
'''
score = 0
for i in labels:
if preds[i[0]:i[1]+1].any():
score += 1
return score
def num_nonzero_segments(self, x):
count=0
if x[0]>0:
count+=1
for i in range(1, len(x)):
if x[i]>0 and x[i-1]==0:
count+=1
return count
def extend_postive_range(self, x, window=5):
label = x.copy().astype(float)
L = self.range_convers_new(label) # index of non-zero segments
length = len(label)
for k in range(len(L)):
s = L[k][0]
e = L[k][1]
x1 = np.arange(e,min(e+window//2,length))
label[x1] += np.sqrt(1 - (x1-e)/(window))
x2 = np.arange(max(s-window//2,0),s)
label[x2] += np.sqrt(1 - (s-x2)/(window))
label = np.minimum(np.ones(length), label)
return label
def extend_postive_range_individual(self, x, percentage=0.2):
label = x.copy().astype(float)
L = self.range_convers_new(label) # index of non-zero segments
length = len(label)
for k in range(len(L)):
s = L[k][0]
e = L[k][1]
l0 = int((e-s+1)*percentage)
x1 = np.arange(e,min(e+l0,length))
label[x1] += np.sqrt(1 - (x1-e)/(2*l0))
x2 = np.arange(max(s-l0,0),s)
label[x2] += np.sqrt(1 - (s-x2)/(2*l0))
label = np.minimum(np.ones(length), label)
return label
def TPR_FPR_RangeAUC(self, labels, pred, P, L):
indices = np.where(labels == 1)[0]
product = labels * pred
TP = np.sum(product)
newlabels = product.copy()
newlabels[indices] = 1
# recall = min(TP/P,1)
P_new = (P + np.sum(newlabels)) / 2 # so TPR is neither large nor small
# P_new = np.sum(labels)
recall = min(TP / P_new, 1)
# recall = TP/np.sum(labels)
# print('recall '+str(recall))
existence = 0
for seg in L:
if np.sum(product[seg[0]:(seg[1] + 1)]) > 0: # if newlabels>0, that segment must contained
existence += 1
existence_ratio = existence / len(L)
# print(existence_ratio)
# TPR_RangeAUC = np.sqrt(recall*existence_ratio)
# print(existence_ratio)
TPR_RangeAUC = recall * existence_ratio
FP = np.sum(pred) - TP
# TN = np.sum((1-pred) * (1-labels))
# FPR_RangeAUC = FP/(FP+TN)
N_new = len(labels) - P_new
FPR_RangeAUC = FP / N_new
Precision_RangeAUC = TP / np.sum(pred)
return TPR_RangeAUC, FPR_RangeAUC, Precision_RangeAUC
def RangeAUC(self, labels, score, window=0, percentage=0, plot_ROC=False, AUC_type='window'):
# AUC_type='window'/'percentage'
score_sorted = -np.sort(-score)
P = np.sum(labels)
# print(np.sum(labels))
if AUC_type == 'window':
labels = self.extend_postive_range(labels, window=window)
else:
labels = self.extend_postive_range_individual(labels, percentage=percentage)
# print(np.sum(labels))
L = self.range_convers_new(labels)
TPR_list = [0]
FPR_list = [0]
Precision_list = [1]
for i in np.linspace(0, len(score) - 1, 250).astype(int):
threshold = score_sorted[i]
# print('thre='+str(threshold))
pred = score >= threshold
TPR, FPR, Precision = self.TPR_FPR_RangeAUC(labels, pred, P, L)
TPR_list.append(TPR)
FPR_list.append(FPR)
Precision_list.append(Precision)
TPR_list.append(1)
FPR_list.append(1) # otherwise, range-AUC will stop earlier than (1,1)
tpr = np.array(TPR_list)
fpr = np.array(FPR_list)
prec = np.array(Precision_list)
width = fpr[1:] - fpr[:-1]
height = (tpr[1:] + tpr[:-1]) / 2
AUC_range = np.sum(width * height)
width_PR = tpr[1:-1] - tpr[:-2]
height_PR = prec[1:]
AP_range = np.sum(width_PR * height_PR)
if plot_ROC:
return AUC_range, AP_range, fpr, tpr, prec
return AUC_range
def range_convers_new(self, label):
'''
input: arrays of binary values
output: list of ordered pair [[a0,b0], [a1,b1]... ] of the inputs
'''
anomaly_starts = np.where(np.diff(label) == 1)[0] + 1
anomaly_ends, = np.where(np.diff(label) == -1)
if len(anomaly_ends):
if not len(anomaly_starts) or anomaly_ends[0] < anomaly_starts[0]:
# we started with an anomaly, so the start of the first anomaly is the start of the labels
anomaly_starts = np.concatenate([[0], anomaly_starts])
if len(anomaly_starts):
if not len(anomaly_ends) or anomaly_ends[-1] < anomaly_starts[-1]:
# we ended on an anomaly, so the end of the last anomaly is the end of the labels
anomaly_ends = np.concatenate([anomaly_ends, [len(label) - 1]])
return list(zip(anomaly_starts, anomaly_ends))
def new_sequence(self, label, sequence_original, window):
a = max(sequence_original[0][0] - window // 2, 0)
sequence_new = []
for i in range(len(sequence_original) - 1):
if sequence_original[i][1] + window // 2 < sequence_original[i + 1][0] - window // 2:
sequence_new.append((a, sequence_original[i][1] + window // 2))
a = sequence_original[i + 1][0] - window // 2
sequence_new.append((a, min(sequence_original[len(sequence_original) - 1][1] + window // 2, len(label) - 1)))
return sequence_new
def sequencing(self, x, L, window=5):
label = x.copy().astype(float)
length = len(label)
for k in range(len(L)):
s = L[k][0]
e = L[k][1]
x1 = np.arange(e + 1, min(e + window // 2 + 1, length))
label[x1] += np.sqrt(1 - (x1 - e) / (window))
x2 = np.arange(max(s - window // 2, 0), s)
label[x2] += np.sqrt(1 - (s - x2) / (window))
label = np.minimum(np.ones(length), label)
return label
# TPR_FPR_window
def RangeAUC_volume_opt(self, labels_original, score, windowSize, thre=250):
window_3d = np.arange(0, windowSize + 1, 1)
P = np.sum(labels_original)
seq = self.range_convers_new(labels_original)
l = self.new_sequence(labels_original, seq, windowSize)
score_sorted = -np.sort(-score)
tpr_3d = np.zeros((windowSize + 1, thre + 2))
fpr_3d = np.zeros((windowSize + 1, thre + 2))
prec_3d = np.zeros((windowSize + 1, thre + 1))
auc_3d = np.zeros(windowSize + 1)
ap_3d = np.zeros(windowSize + 1)
tp = np.zeros(thre)
N_pred = np.zeros(thre)
for k, i in enumerate(np.linspace(0, len(score) - 1, thre).astype(int)):
threshold = score_sorted[i]
pred = score >= threshold
N_pred[k] = np.sum(pred)
for window in window_3d:
labels_extended = self.sequencing(labels_original, seq, window)
L = self.new_sequence(labels_extended, seq, window)
TF_list = np.zeros((thre + 2, 2))
Precision_list = np.ones(thre + 1)
j = 0
for i in np.linspace(0, len(score) - 1, thre).astype(int):
threshold = score_sorted[i]
pred = score >= threshold
labels = labels_extended.copy()
existence = 0
for seg in L:
labels[seg[0]:seg[1] + 1] = labels_extended[seg[0]:seg[1] + 1] * pred[seg[0]:seg[1] + 1]
if (pred[seg[0]:(seg[1] + 1)] > 0).any():
existence += 1
for seg in seq:
labels[seg[0]:seg[1] + 1] = 1
TP = 0
N_labels = 0
for seg in l:
TP += np.dot(labels[seg[0]:seg[1] + 1], pred[seg[0]:seg[1] + 1])
N_labels += np.sum(labels[seg[0]:seg[1] + 1])
TP += tp[j]
FP = N_pred[j] - TP
existence_ratio = existence / len(L)
P_new = (P + N_labels) / 2
recall = min(TP / P_new, 1)
TPR = recall * existence_ratio
N_new = len(labels) - P_new
FPR = FP / N_new
Precision = TP / N_pred[j]
j += 1
TF_list[j] = [TPR, FPR]
Precision_list[j] = Precision
TF_list[j + 1] = [1, 1] # otherwise, range-AUC will stop earlier than (1,1)
tpr_3d[window] = TF_list[:, 0]
fpr_3d[window] = TF_list[:, 1]
prec_3d[window] = Precision_list
width = TF_list[1:, 1] - TF_list[:-1, 1]
height = (TF_list[1:, 0] + TF_list[:-1, 0]) / 2
AUC_range = np.dot(width, height)
auc_3d[window] = (AUC_range)
width_PR = TF_list[1:-1, 0] - TF_list[:-2, 0]
height_PR = Precision_list[1:]
AP_range = np.dot(width_PR, height_PR)
ap_3d[window] = AP_range
return tpr_3d, fpr_3d, prec_3d, window_3d, sum(auc_3d) / len(window_3d), sum(ap_3d) / len(window_3d)
def RangeAUC_volume_opt_mem(self, labels_original, score, windowSize, thre=250):
window_3d = np.arange(0, windowSize + 1, 1)
P = np.sum(labels_original)
seq = self.range_convers_new(labels_original)
l = self.new_sequence(labels_original, seq, windowSize)
score_sorted = -np.sort(-score)
tpr_3d = np.zeros((windowSize + 1, thre + 2))
fpr_3d = np.zeros((windowSize + 1, thre + 2))
prec_3d = np.zeros((windowSize + 1, thre + 1))
auc_3d = np.zeros(windowSize + 1)
ap_3d = np.zeros(windowSize + 1)
tp = np.zeros(thre)
N_pred = np.zeros(thre)
p = np.zeros((thre, len(score)))
for k, i in enumerate(np.linspace(0, len(score) - 1, thre).astype(int)):
threshold = score_sorted[i]
pred = score >= threshold
p[k] = pred
N_pred[k] = np.sum(pred)
for window in window_3d:
labels_extended = self.sequencing(labels_original, seq, window)
L = self.new_sequence(labels_extended, seq, window)
TF_list = np.zeros((thre + 2, 2))
Precision_list = np.ones(thre + 1)
j = 0
for i in np.linspace(0, len(score) - 1, thre).astype(int):
labels = labels_extended.copy()
existence = 0
for seg in L:
labels[seg[0]:seg[1] + 1] = labels_extended[seg[0]:seg[1] + 1] * p[j][seg[0]:seg[1] + 1]
if (p[j][seg[0]:(seg[1] + 1)] > 0).any():
existence += 1
for seg in seq:
labels[seg[0]:seg[1] + 1] = 1
N_labels = 0
TP = 0
for seg in l:
TP += np.dot(labels[seg[0]:seg[1] + 1], p[j][seg[0]:seg[1] + 1])
N_labels += np.sum(labels[seg[0]:seg[1] + 1])
TP += tp[j]
FP = N_pred[j] - TP
existence_ratio = existence / len(L)
P_new = (P + N_labels) / 2
recall = min(TP / P_new, 1)
TPR = recall * existence_ratio
N_new = len(labels) - P_new
FPR = FP / N_new
Precision = TP / N_pred[j]
j += 1
TF_list[j] = [TPR, FPR]
Precision_list[j] = Precision
TF_list[j + 1] = [1, 1]
tpr_3d[window] = TF_list[:, 0]
fpr_3d[window] = TF_list[:, 1]
prec_3d[window] = Precision_list
width = TF_list[1:, 1] - TF_list[:-1, 1]
height = (TF_list[1:, 0] + TF_list[:-1, 0]) / 2
AUC_range = np.dot(width, height)
auc_3d[window] = (AUC_range)
width_PR = TF_list[1:-1, 0] - TF_list[:-2, 0]
height_PR = Precision_list[1:]
AP_range = np.dot(width_PR, height_PR)
ap_3d[window] = (AP_range)
return tpr_3d, fpr_3d, prec_3d, window_3d, sum(auc_3d) / len(window_3d), sum(ap_3d) / len(window_3d)
def metric_VUS_pred(self, labels, preds, windowSize):
window_3d = np.arange(0, windowSize + 1, 1)
P = np.sum(labels)
seq = self.range_convers_new(labels)
l = self.new_sequence(labels, seq, windowSize)
recall_3d = np.zeros((windowSize + 1))
prec_3d = np.zeros((windowSize + 1))
f_3d = np.zeros((windowSize + 1))
N_pred = np.sum(preds)
for window in window_3d:
labels_extended = self.sequencing(labels, seq, window)
L = self.new_sequence(labels_extended, seq, window)
labels = labels_extended.copy()
existence = 0
for seg in L:
labels[seg[0]:seg[1] + 1] = labels_extended[seg[0]:seg[1] + 1] * preds[seg[0]:seg[1] + 1]
if (preds[seg[0]:(seg[1] + 1)] > 0).any():
existence += 1
for seg in seq:
labels[seg[0]:seg[1] + 1] = 1
TP = 0
N_labels = 0
for seg in l:
TP += np.dot(labels[seg[0]:seg[1] + 1], preds[seg[0]:seg[1] + 1])
N_labels += np.sum(labels[seg[0]:seg[1] + 1])
P_new = (P + N_labels) / 2
recall = min(TP / P_new, 1)
Precision = TP / N_pred
recall_3d[window] = recall
prec_3d[window] = Precision
f_3d[window] = 2 * Precision * recall / (Precision + recall) if (Precision + recall) > 0 else 0
return sum(recall_3d) / len(window_3d), sum(prec_3d) / len(window_3d), sum(f_3d) / len(window_3d)
# def metric_F1_T_gpu_corrected(self, labels, scores, device='cuda', batch_size=50):
# """
# GPU-accelerated F1_T that maintains exact compatibility with CPU version
# Only the threshold generation and prediction computation is done on GPU
# The actual metric calculation uses your original CPU functions
# """
# if not torch.cuda.is_available():
# print("CUDA not available, falling back to CPU implementation")
# return self.metric_F1_T(labels, scores)
#
# print(f"Computing F1_T on {device} (corrected version)")
# start_time = time.time()
#
# # Keep original data types for compatibility
# labels_np = np.array(labels)
# scores_np = np.array(scores)
#
# # Use GPU only for threshold generation
# scores_gpu = torch.tensor(scores_np, dtype=torch.float32, device=device)
# n_splits = 1000
# p_values = torch.linspace(0.0, 1.0, steps=n_splits, device=device)
# thresholds_gpu = torch.quantile(scores_gpu, p_values)
# thresholds = thresholds_gpu.cpu().numpy()
#
# # Convert to torch tensors for CPU computation (matching original)
# labels_torch = torch.tensor(labels_np, dtype=torch.int)
# scores_torch = torch.tensor(scores_np, dtype=torch.float)
#
# # Compute label ranges once
# label_ranges = self.compute_window_indices(labels_torch)
#
# # Process thresholds in batches but use original metric calculation
# precision_list = []
# recall_list = []
#
# if batch_size is None:
# batch_size = 50 # Default batch size
#
# beta = 1
# predictions = torch.empty_like(scores_torch, dtype=torch.long)
#
# for i in tqdm(range(0, n_splits, batch_size),
# desc="Computing metrics (corrected)"):
# end_idx = min(i + batch_size, n_splits)
#
# batch_precisions = []
# batch_recalls = []
#
# for j in range(i, end_idx):
# threshold = thresholds[j]
#
# # Compute predictions
# torch.greater(scores_torch, threshold, out=predictions)
#
# # Use your original ts_precision_and_recall function
# prec, rec = self.ts_precision_and_recall(
# labels_torch,
# predictions,
# alpha=0,
# recall_cardinality_fn=improved_cardinality_fn,
# anomaly_ranges=label_ranges,
# weighted_precision=True,
# )
#
# # Handle edge case
# if prec == 0 and rec == 0:
# rec = 1
#
# batch_precisions.append(prec)
# batch_recalls.append(rec)
#
# precision_list.extend(batch_precisions)
# recall_list.extend(batch_recalls)
#
# # Convert to tensors for final computation
# precision = torch.tensor(precision_list, dtype=torch.float)
# recall = torch.tensor(recall_list, dtype=torch.float)
#
# # Compute F-scores
# f_scores = (1 + beta ** 2) * precision * recall / (beta ** 2 * precision + recall)
#
# # Find best threshold
# best_idx = torch.argmax(f_scores)
# best_threshold = thresholds[best_idx]
#
# # Compute accuracy
# best_predictions = scores_np > best_threshold
# accuracy = np.mean(best_predictions == labels_np)
#
# elapsed = time.time() - start_time
# print(f"F1_T computed in {elapsed:.2f}s")
#
# return {
# 'F1_T': f_scores[best_idx].item(),
# 'P_T': precision[best_idx].item(),
# 'R_T': recall[best_idx].item(),
# 'thre_T': best_threshold,
# 'ACC_T': accuracy
# }
#
# def metric_F1_T_parallel_cpu(self, labels, scores, num_workers=8):
# """
# CPU-parallel version that matches the original exactly
# Uses multiprocessing to speed up threshold evaluation
# """
# from concurrent.futures import ProcessPoolExecutor
# import multiprocessing as mp
#
# print(f"Computing F1_T with {num_workers} CPU workers")
# start_time = time.time()
#
# # Convert to torch tensors
# labels = torch.tensor(labels, dtype=torch.int)
# scores = torch.tensor(scores, dtype=torch.float)
#
# # Generate thresholds
# n_splits = 1000
# p_values = torch.linspace(0.0, 1.0, steps=n_splits)
# thresholds = torch.quantile(scores, p_values)
#
# # Compute label ranges once
# label_ranges = self.compute_window_indices(labels)
#
# # Split thresholds for parallel processing
# threshold_chunks = torch.chunk(thresholds, num_workers)
#
# # Process in parallel
# beta = 1
# with ProcessPoolExecutor(max_workers=num_workers) as executor:
# futures = []
# for chunk in threshold_chunks:
# future = executor.submit(
# self._compute_f1t_chunk,
# chunk, labels, scores, beta, label_ranges, True
# )
# futures.append(future)
#
# # Collect results
# all_results = []
# for future in tqdm(as_completed(futures),
# total=len(futures),
# desc="Processing chunks"):
# chunk_results = future.result()
# all_results.extend(chunk_results)
#
# # Find best result
# best_result = max(all_results, key=lambda x: x['f_score'])
#
# # Compute accuracy
# best_predictions = scores > best_result['threshold']
# accuracy = torch.mean((best_predictions == labels).float()).item()
#
# elapsed = time.time() - start_time
# print(f"F1_T computed in {elapsed:.2f}s")
#
# return {
# 'F1_T': best_result['f_score'],
# 'P_T': best_result['precision'],
# 'R_T': best_result['recall'],
# 'thre_T': best_result['threshold'],
# 'ACC_T': accuracy
# }
#
# def metric_F1_T_hybrid(self, labels, scores, device='cuda'):
# """
# Hybrid approach: GPU for threshold generation and prediction,
# CPU parallel for metric calculation
# """
# if not torch.cuda.is_available():
# return self.metric_F1_T_parallel_cpu(labels, scores)
#
# print(f"Computing F1_T with hybrid GPU/CPU approach")
# start_time = time.time()
#
# # Generate thresholds on GPU (fast)
# labels_gpu = torch.tensor(labels, dtype=torch.int32, device=device)
# scores_gpu = torch.tensor(scores, dtype=torch.float32, device=device)
#
# n_splits = 1000
# p_values = torch.linspace(0.0, 1.0, steps=n_splits, device=device)
# thresholds_gpu = torch.quantile(scores_gpu, p_values)
#
# # Generate all predictions on GPU at once (if memory allows)
# try:
# # This creates a matrix of shape (n_thresholds, n_samples)
# all_predictions_gpu = scores_gpu.unsqueeze(0) > thresholds_gpu.unsqueeze(1)
# all_predictions = all_predictions_gpu.cpu().long()
# thresholds = thresholds_gpu.cpu()
# print(" Generated all predictions on GPU")
# except RuntimeError as e:
# if "out of memory" in str(e):
# print(" Not enough GPU memory, falling back to batched approach")
# return self.metric_F1_T_gpu_corrected(labels, scores, batch_size=50)
# else:
# raise e
#
# # Move back to CPU for metric calculation
# labels_cpu = torch.tensor(labels, dtype=torch.int)
# scores_cpu = torch.tensor(scores, dtype=torch.float)
#
# # Compute label ranges
# label_ranges = self.compute_window_indices(labels_cpu)
#
# # Parallel CPU computation of metrics
# beta = 1
# from concurrent.futures import ThreadPoolExecutor
#
# def compute_single_threshold(idx):
# predictions = all_predictions[idx]
#
# prec, rec = self.ts_precision_and_recall(
# labels_cpu,
# predictions,
# alpha=0,
# recall_cardinality_fn=improved_cardinality_fn,
# anomaly_ranges=label_ranges,
# weighted_precision=True,
# )
#
# if prec == 0 and rec == 0:
# rec = 1
#
# f_score = (1 + beta ** 2) * prec * rec / (beta ** 2 * prec + rec)
#
# return {
# 'idx': idx,
# 'f_score': f_score,
# 'precision': prec,
# 'recall': rec,
# 'threshold': thresholds[idx].item()
# }
#
# # Process with thread pool
# with ThreadPoolExecutor(max_workers=8) as executor:
# futures = [executor.submit(compute_single_threshold, i)
# for i in range(n_splits)]
#
# results = []
# for future in tqdm(as_completed(futures),
# total=n_splits,
# desc="Computing metrics"):
# results.append(future.result())
#
# # Find best result
# best_result = max(results, key=lambda x: x['f_score'])
#
# # Compute accuracy
# best_predictions = scores_cpu > best_result['threshold']
# accuracy = torch.mean((best_predictions == labels_cpu).float()).item()
#
# elapsed = time.time() - start_time
# print(f"F1_T computed in {elapsed:.2f}s")
#
# return {
# 'F1_T': best_result['f_score'],
# 'P_T': best_result['precision'],
# 'R_T': best_result['recall'],
# 'thre_T': best_result['threshold'],
# 'ACC_T': accuracy
# }
#
# def metric_F1_T_optimized(self, labels, scores, num_workers=None):
# """
# Optimized version using the best strategies from our tests
# """
# if num_workers is None:
# num_workers = min(mp.cpu_count(), 8)
#
# print(f"Computing F1_T (optimized) with {num_workers} workers")
# start_time = time.time()
#
# # Convert to torch tensors
# labels = torch.tensor(labels, dtype=torch.int)
# scores = torch.tensor(scores, dtype=torch.float)
#
# # Generate thresholds
# n_splits = 1000
# p_values = torch.linspace(0.0, 1.0, steps=n_splits)
# thresholds = torch.quantile(scores, p_values)
#
# # Pre-compute label ranges once
# label_ranges = self.compute_window_indices(labels)
#
# # Pre-generate all predictions at once (memory efficient)
# print("Pre-computing predictions...")
# predictions_list = []
# for i in range(0, n_splits, 100): # Process in chunks to save memory
# end_idx = min(i + 100, n_splits)
# batch_thresholds = thresholds[i:end_idx]
# # Create boolean predictions then convert to long
# batch_preds = (scores.unsqueeze(0) > batch_thresholds.unsqueeze(1)).long() # FIX: Convert to long
# predictions_list.append(batch_preds)
#
# all_predictions = torch.cat(predictions_list, dim=0)
# print(f"Predictions ready, computing metrics...")
#
# # Define worker function
# def compute_metrics_batch(indices):
# results = []
# for idx in indices:
# predictions = all_predictions[idx]
#
# prec, rec = self.ts_precision_and_recall(
# labels,
# predictions,
# alpha=0,
# recall_cardinality_fn=improved_cardinality_fn,
# anomaly_ranges=label_ranges,
# weighted_precision=True,
# )
#
# if prec == 0 and rec == 0:
# rec = 1
#
# f_score = 2 * prec * rec / (prec + rec)
#
# results.append({
# 'idx': idx,
# 'f_score': f_score,
# 'precision': prec,
# 'recall': rec,
# 'threshold': thresholds[idx].item()
# })
#
# return results
#
# # Split indices for workers
# indices = list(range(n_splits))
# chunk_size = len(indices) // num_workers
# if chunk_size == 0:
# chunk_size = 1
# index_chunks = [indices[i:i + chunk_size] for i in range(0, len(indices), chunk_size)]
#
# # Process with thread pool (better for this workload than process pool)
# all_results = []
# with ThreadPoolExecutor(max_workers=num_workers) as executor:
# futures = [executor.submit(compute_metrics_batch, chunk) for chunk in index_chunks]
#
# completed = 0
# for future in as_completed(futures):
# all_results.extend(future.result())
# completed += 1
# print(f"Progress: {completed}/{len(futures)} chunks completed", end='\r')
#
# print() # New line after progress
#
# # Find best result
# best_result = max(all_results, key=lambda x: x['f_score'])
#
# # Compute accuracy
# best_predictions = scores > best_result['threshold']
# accuracy = torch.mean((best_predictions == labels).float()).item()
#
# elapsed = time.time() - start_time
# print(f"F1_T computed in {elapsed:.2f}s")
#
# return {
# 'F1_T': best_result['f_score'],
# 'P_T': best_result['precision'],
# 'R_T': best_result['recall'],
# 'thre_T': best_result['threshold'],
# 'ACC_T': accuracy
# }
#
# def metric_F1_T_sampling(self, labels, scores, sample_rate=0.2):
# """
# Fast approximation by sampling thresholds
# Good for quick estimates or hyperparameter tuning
# """
# print(f"Computing F1_T with threshold sampling (rate={sample_rate})")
# start_time = time.time()
#
# # Convert to torch tensors
# labels = torch.tensor(labels, dtype=torch.int)
# scores = torch.tensor(scores, dtype=torch.float)
#
# # Generate fewer thresholds
# n_splits = int(1000 * sample_rate)
# p_values = torch.linspace(0.0, 1.0, steps=n_splits)
# thresholds = torch.quantile(scores, p_values)
#
# # Rest is same as original
# precision = torch.empty_like(thresholds, dtype=torch.float)
# recall = torch.empty_like(thresholds, dtype=torch.float)
# predictions = torch.empty_like(scores, dtype=torch.long)
#
# label_ranges = self.compute_window_indices(labels)
# beta = 1
#
# for i, t in enumerate(thresholds):
# torch.greater(scores, t, out=predictions)
# prec, rec = self.ts_precision_and_recall(
# labels,
# predictions,
# alpha=0,
# recall_cardinality_fn=improved_cardinality_fn,
# anomaly_ranges=label_ranges,
# weighted_precision=True,
# )
#
# if prec == 0 and rec == 0:
# rec = 1
#
# precision[i] = prec
# recall[i] = rec
#
# f_score = (1 + beta ** 2) * precision * recall / (beta ** 2 * precision + recall)
# max_score_index = torch.argmax(f_score)
#
# elapsed = time.time() - start_time
# print(f"F1_T computed in {elapsed:.2f}s (approximate)")
#
# return {
# 'F1_T': f_score[max_score_index].item(),
# 'P_T': precision[max_score_index].item(),
# 'R_T': recall[max_score_index].item(),
# 'thre_T': thresholds[max_score_index].item(),
# 'ACC_T': sklearn.metrics.accuracy_score(labels, scores > thresholds[max_score_index])
# }
#
# def metric_F1_T_chunked(self, labels, scores, chunk_size=50, num_workers=4):
# """
# Simple chunked parallel processing without pre-computing all predictions
# More memory efficient and often faster
# """
# from concurrent.futures import ProcessPoolExecutor
# import multiprocessing as mp
#
# print(f"Computing F1_T (chunked) with {num_workers} workers, chunk_size={chunk_size}")
# start_time = time.time()
#
# # Convert to torch tensors
# labels_t = torch.tensor(labels, dtype=torch.int)
# scores_t = torch.tensor(scores, dtype=torch.float)
#
# # Generate thresholds
# n_splits = 1000
# p_values = torch.linspace(0.0, 1.0, steps=n_splits)
# thresholds = torch.quantile(scores_t, p_values).numpy()
#
# # Convert back to numpy for pickling
# labels_np = labels_t.numpy()
# scores_np = scores_t.numpy()
#
# # Helper function for parallel processing
# def process_chunk(args):
# chunk_thresholds, labels_local, scores_local = args
# results = []
#
# # Convert back to torch tensors in worker
# labels_tensor = torch.tensor(labels_local, dtype=torch.int)
# scores_tensor = torch.tensor(scores_local, dtype=torch.float)
# predictions = torch.empty_like(scores_tensor, dtype=torch.long)
#
# # Compute label ranges in worker
# label_ranges_local = self.compute_window_indices(labels_tensor)
#
# for threshold in chunk_thresholds:
# torch.greater(scores_tensor, threshold, out=predictions)
#
# prec, rec = self.ts_precision_and_recall(
# labels_tensor,
# predictions,
# alpha=0,
# recall_cardinality_fn=improved_cardinality_fn,
# anomaly_ranges=label_ranges_local,
# weighted_precision=True,
# )
#
# if prec == 0 and rec == 0:
# rec = 1
#
# f_score = 2 * prec * rec / (prec + rec)
#
# results.append({
# 'f_score': f_score,
# 'precision': prec,
# 'recall': rec,
# 'threshold': threshold
# })
#
# return results
#
# # Create chunks of thresholds
# threshold_chunks = [thresholds[i:i + chunk_size]
# for i in range(0, len(thresholds), chunk_size)]
#
# # Prepare arguments for workers
# chunk_args = [(chunk, labels_np, scores_np) for chunk in threshold_chunks]
#
# # Process in parallel
# all_results = []
# with ProcessPoolExecutor(max_workers=num_workers) as executor:
# for i, result_chunk in enumerate(executor.map(process_chunk, chunk_args)):
# all_results.extend(result_chunk)
# print(f"Progress: {(i + 1) * chunk_size}/{n_splits} thresholds processed", end='\r')
#
# print() # New line
#
# # Find best result
# best_result = max(all_results, key=lambda x: x['f_score'])
#
# # Compute accuracy
# best_predictions = scores_np > best_result['threshold']
# accuracy = np.mean(best_predictions == labels_np)
#
# elapsed = time.time() - start_time
# print(f"F1_T computed in {elapsed:.2f}s")
#
# return {
# 'F1_T': best_result['f_score'],
# 'P_T': best_result['precision'],
# 'R_T': best_result['recall'],
# 'thre_T': best_result['threshold'],
# 'ACC_T': accuracy
# }
# def metric_F1_T_optimized(self, labels, scores, num_workers=None):
# """
# Optimized version using the best strategies from our tests
# """
# if num_workers is None:
# num_workers = min(mp.cpu_count(), 8)
#
# print(f"Computing F1_T (optimized) with {num_workers} workers")
# start_time = time.time()
#
# # Convert to torch tensors
# labels = torch.tensor(labels, dtype=torch.int)
# scores = torch.tensor(scores, dtype=torch.float)
#
# # Generate thresholds
# n_splits = 1000
# p_values = torch.linspace(0.0, 1.0, steps=n_splits)
# thresholds = torch.quantile(scores, p_values)
#
# # Pre-compute label ranges once
# label_ranges = self.compute_window_indices(labels)
#
# # Pre-generate all predictions at once (memory efficient)
# print("Pre-computing predictions...")
# predictions_list = []
# for i in range(0, n_splits, 100): # Process in chunks to save memory
# end_idx = min(i + 100, n_splits)
# batch_thresholds = thresholds[i:end_idx]
# # Create boolean predictions then convert to long
# batch_preds = (scores.unsqueeze(0) > batch_thresholds.unsqueeze(1)).long() # FIX: Convert to long
# predictions_list.append(batch_preds)
#
# all_predictions = torch.cat(predictions_list, dim=0)
# print(f"Predictions ready, computing metrics...")
#
# # Define worker function
# def compute_metrics_batch(indices):
# results = []
# for idx in indices:
# predictions = all_predictions[idx]
#
# prec, rec = self.ts_precision_and_recall(
# labels,
# predictions,
# alpha=0,
# recall_cardinality_fn=improved_cardinality_fn,
# anomaly_ranges=label_ranges,
# weighted_precision=True,
# )
#
# if prec == 0 and rec == 0:
# rec = 1
#
# f_score = 2 * prec * rec / (prec + rec)
#
# results.append({
# 'idx': idx,
# 'f_score': f_score,
# 'precision': prec,
# 'recall': rec,
# 'threshold': thresholds[idx].item()
# })
#
# return results
#
# # Split indices for workers
# indices = list(range(n_splits))
# chunk_size = len(indices) // num_workers
# if chunk_size == 0:
# chunk_size = 1
# index_chunks = [indices[i:i + chunk_size] for i in range(0, len(indices), chunk_size)]
#
# # Process with thread pool (better for this workload than process pool)
# all_results = []
# with ThreadPoolExecutor(max_workers=num_workers) as executor:
# futures = [executor.submit(compute_metrics_batch, chunk) for chunk in index_chunks]
#
# completed = 0
# for future in as_completed(futures):
# all_results.extend(future.result())
# completed += 1
# print(f"Progress: {completed}/{len(futures)} chunks completed", end='\r')
#
# print() # New line after progress
#
# # Find best result
# best_result = max(all_results, key=lambda x: x['f_score'])
#
# # Compute accuracy
# best_predictions = scores > best_result['threshold']
# accuracy = torch.mean((best_predictions == labels).float()).item()
#
# elapsed = time.time() - start_time
# print(f"F1_T computed in {elapsed:.2f}s")
#
# return {
# 'F1_T': best_result['f_score'],
# 'P_T': best_result['precision'],
# 'R_T': best_result['recall'],
# 'thre_T': best_result['threshold'],
# 'ACC_T': accuracy
# }
#
# def metric_F1_T_sampling(self, labels, scores, sample_rate=0.2):
# """
# Fast approximation by sampling thresholds
# Good for quick estimates or hyperparameter tuning
# """
# print(f"Computing F1_T with threshold sampling (rate={sample_rate})")
# start_time = time.time()
#
# # Convert to torch tensors
# labels = torch.tensor(labels, dtype=torch.int)
# scores = torch.tensor(scores, dtype=torch.float)
#
# # Generate fewer thresholds
# n_splits = int(1000 * sample_rate)
# p_values = torch.linspace(0.0, 1.0, steps=n_splits)
# thresholds = torch.quantile(scores, p_values)
#
# # Rest is same as original
# precision = torch.empty_like(thresholds, dtype=torch.float)
# recall = torch.empty_like(thresholds, dtype=torch.float)
# predictions = torch.empty_like(scores, dtype=torch.long) # FIX: Ensure long type
#
# label_ranges = self.compute_window_indices(labels)
# beta = 1
#
# for i, t in enumerate(thresholds):
# torch.greater(scores, t, out=predictions)
# prec, rec = self.ts_precision_and_recall(
# labels,
# predictions,
# alpha=0,
# recall_cardinality_fn=improved_cardinality_fn,
# anomaly_ranges=label_ranges,
# weighted_precision=True,
# )
#
# if prec == 0 and rec == 0:
# rec = 1
#
# precision[i] = prec
# recall[i] = rec
#
# f_score = (1 + beta ** 2) * precision * recall / (beta ** 2 * precision + recall)
# max_score_index = torch.argmax(f_score)
#
# # Calculate accuracy
# best_predictions = (scores > thresholds[max_score_index]).long()
# accuracy = torch.mean((best_predictions == labels).float()).item()
#
# elapsed = time.time() - start_time
# print(f"F1_T computed in {elapsed:.2f}s (approximate)")
#
# return {
# 'F1_T': f_score[max_score_index].item(),
# 'P_T': precision[max_score_index].item(),
# 'R_T': recall[max_score_index].item(),
# 'thre_T': thresholds[max_score_index].item(),
# 'ACC_T': accuracy
# }
#
# def metric_F1_T_chunked(self, labels, scores, chunk_size=50, num_workers=4):
# """
# Simple chunked parallel processing with detailed progress bar
# """
# from concurrent.futures import ProcessPoolExecutor, as_completed
# from tqdm import tqdm
# import multiprocessing as mp
#
# print(f"Computing F1_T (chunked) with {num_workers} workers, chunk_size={chunk_size}")
# start_time = time.time()
#
# # Convert to torch tensors
# labels_t = torch.tensor(labels, dtype=torch.int)
# scores_t = torch.tensor(scores, dtype=torch.float)
#
# # Generate thresholds
# n_splits = 1000
# p_values = torch.linspace(0.0, 1.0, steps=n_splits)
# thresholds = torch.quantile(scores_t, p_values).numpy()
#
# # Convert back to numpy for pickling
# labels_np = labels_t.numpy()
# scores_np = scores_t.numpy()
#
# # Create chunks of thresholds
# threshold_chunks = [thresholds[i:i + chunk_size]
# for i in range(0, len(thresholds), chunk_size)]
#
# total_chunks = len(threshold_chunks)
# print(f"Split {n_splits} thresholds into {total_chunks} chunks")
#
# # Process in parallel with progress bar
# all_results = []
#
# # Method 1: Using executor.map with tqdm
# with ProcessPoolExecutor(max_workers=num_workers) as executor:
# with tqdm(total=n_splits, desc="Processing F1_T thresholds", unit="threshold", colour="blue") as pbar:
# # Prepare arguments
# chunk_args = [(chunk, labels_np, scores_np) for chunk in threshold_chunks]
#
# # Process and update progress bar
# for i, result_chunk in enumerate(executor.map(self._process_f1t_chunk, chunk_args)):
# all_results.extend(result_chunk)
# pbar.update(len(threshold_chunks[i])) # Update by number of thresholds in chunk
# pbar.set_postfix({
# 'chunk': f"{i + 1}/{total_chunks}",
# 'results': len(all_results)
# })
#
# # Find best result
# best_result = max(all_results, key=lambda x: x['f_score'])
#
# # Compute accuracy
# best_predictions = scores_np > best_result['threshold']
# accuracy = np.mean(best_predictions == labels_np)
#
# elapsed = time.time() - start_time
# print(f"✓ F1_T computed in {elapsed:.2f}s")
# print(f" Best F1: {best_result['f_score']:.4f} at threshold {best_result['threshold']:.4f}")
#
# return {
# 'F1_T': best_result['f_score'],
# 'P_T': best_result['precision'],
# 'R_T': best_result['recall'],
# 'thre_T': best_result['threshold'],
# 'ACC_T': accuracy
# }
#
# @staticmethod
# def _process_f1t_chunk(args):
# """
# Static method to process a chunk of thresholds for F1_T metrics.
# This can be pickled for multiprocessing.
# """
# chunk_thresholds, labels_local, scores_local = args
# results = []
#
# # Convert back to torch tensors in worker
# labels_tensor = torch.tensor(labels_local, dtype=torch.int)
# scores_tensor = torch.tensor(scores_local, dtype=torch.float)
# predictions = torch.empty_like(scores_tensor, dtype=torch.long)
#
# # Compute label ranges in worker
# # We need to create a basic_metricor instance to access methods
# grader = basic_metricor()
# label_ranges_local = grader.compute_window_indices(labels_tensor)
#
# for threshold in chunk_thresholds:
# torch.greater(scores_tensor, threshold, out=predictions)
#
# prec, rec = grader.ts_precision_and_recall(
# labels_tensor,
# predictions,
# alpha=0,
# recall_cardinality_fn=improved_cardinality_fn,
# anomaly_ranges=label_ranges_local,
# weighted_precision=True,
# )
#
# if prec == 0 and rec == 0:
# rec = 1
#
# f_score = 2 * prec * rec / (prec + rec)
#
# results.append({
# 'f_score': f_score,
# 'precision': prec,
# 'recall': rec,
# 'threshold': threshold
# })
#
# return results
def metric_Affiliation_optimized(self, label, score, num_workers=None):
"""
Optimized version with ThreadPool and better chunking
"""
if num_workers is None:
num_workers = min(mp.cpu_count(), 8)
print(f"Computing Affiliation (optimized) with {num_workers} workers")
start_time = time.time()
from .affiliation.generics import convert_vector_to_events
from .affiliation.metrics import pr_from_events
# Pre-compute ground truth events once
events_gt = convert_vector_to_events(label)
Trange = (0, len(label))
# Generate p-values and thresholds
p_values = np.linspace(0.8, 1, 300)
# Pre-compute all thresholds
thresholds = np.quantile(score, p_values)
# Pre-compute all predictions
print("Pre-computing predictions...")
all_predictions = []
for threshold in thresholds:
preds = (score > threshold).astype(int)
all_predictions.append(preds)
print("Computing affiliation metrics...")
# Function to process a batch of indices
def compute_metrics_batch(indices):
results = []
for idx in indices:
preds = all_predictions[idx]
events_pred = convert_vector_to_events(preds)
affiliation_metrics = pr_from_events(events_pred, events_gt, Trange)
prec = affiliation_metrics['Affiliation_Precision']
rec = affiliation_metrics['Affiliation_Recall']
if prec + rec > 0:
f1 = 2 * prec * rec / (prec + rec + self.eps)
else:
f1 = 0.0
results.append({
'f1': f1,
'precision': prec,
'recall': rec,
'p_value': p_values[idx],
'threshold': thresholds[idx]
})
return results
# Split indices for workers
indices = list(range(len(p_values)))
chunk_size = len(indices) // num_workers
if chunk_size == 0:
chunk_size = 1
index_chunks = [indices[i:i + chunk_size] for i in range(0, len(indices), chunk_size)]
# Process with thread pool
all_results = []
with ThreadPoolExecutor(max_workers=num_workers) as executor:
futures = [executor.submit(compute_metrics_batch, chunk) for chunk in index_chunks]
completed = 0
for future in as_completed(futures):
all_results.extend(future.result())
completed += 1
print(f"Progress: {completed}/{len(futures)} chunks completed", end='\r')
print() # New line
# Find best result
best_result = max(all_results, key=lambda x: x['f1'])
elapsed = time.time() - start_time
print(f"Affiliation computed in {elapsed:.2f}s")
return best_result['f1'], best_result['precision'], best_result['recall']
def metric_Affiliation_chunked(self, label, score, chunk_size=30, num_workers=4):
"""
Simple chunked parallel processing
"""
print(f"Computing Affiliation (chunked) with {num_workers} workers, chunk_size={chunk_size}")
start_time = time.time()
# Generate p-values
p_values = np.linspace(0.8, 1, 300)
# Create chunks of p-values
p_value_chunks = [p_values[i:i + chunk_size]
for i in range(0, len(p_values), chunk_size)]
# Prepare arguments for workers
chunk_args = [(chunk, label, score) for chunk in p_value_chunks]
# Process in parallel
all_results = []
with ProcessPoolExecutor(max_workers=num_workers) as executor:
for i, result_chunk in enumerate(executor.map(self._process_affiliation_chunk, chunk_args)):
all_results.extend(result_chunk)
print(f"Progress: {(i + 1) * chunk_size}/{len(p_values)} thresholds processed", end='\r')
print() # New line
# Find best result
best_result = max(all_results, key=lambda x: x['f1'])
elapsed = time.time() - start_time
print(f"Affiliation computed in {elapsed:.2f}s")
return best_result['f1'], best_result['precision'], best_result['recall']
def _compute_affiliation_chunk(self, p_values_chunk, score, label, eps=1e-7):
"""
Process a chunk of p-values for affiliation metrics
"""
from .affiliation.generics import convert_vector_to_events
from .affiliation.metrics import pr_from_events
# Ensure proper data types to avoid float/integer issues
label = np.asarray(label, dtype=int)
score = np.asarray(score, dtype=float)
# Convert ground truth to events once for this chunk
events_gt = convert_vector_to_events(label)
Trange = (0, len(label))
chunk_results = []
for p in p_values_chunk:
threshold = np.quantile(score, p)
preds_loop = (score > threshold).astype(int)
events_pred = convert_vector_to_events(preds_loop)
affiliation_metrics = pr_from_events(events_pred, events_gt, Trange)
Affiliation_Precision = affiliation_metrics['Affiliation_Precision']
Affiliation_Recall = affiliation_metrics['Affiliation_Recall']
denominator = Affiliation_Precision + Affiliation_Recall
if denominator > 0:
Affiliation_F = 2 * Affiliation_Precision * Affiliation_Recall / (denominator + eps)
else:
Affiliation_F = 0.0
chunk_results.append({
'f1': Affiliation_F,
'precision': Affiliation_Precision,
'recall': Affiliation_Recall,
'p_value': p,
'threshold': threshold
})
return chunk_results
def _compute_affiliation_parallel(self, label, score, num_workers=8):
"""
Parallel computation with progress bar
"""
print(f"Computing Affiliation (parallel) with {num_workers} workers")
start_time = time.time()
# Generate p-values
p_values = np.linspace(0.8, 1, 300)
total_thresholds = len(p_values)
# Split p-values into chunks for parallel processing
p_value_chunks = np.array_split(p_values, num_workers)
# Process chunks in parallel with progress bar
with ProcessPoolExecutor(max_workers=num_workers) as executor:
# Submit all tasks and track chunk sizes
futures = {}
for i, chunk in enumerate(p_value_chunks):
future = executor.submit(self._compute_affiliation_chunk, chunk, score, label)
futures[future] = len(chunk)
# Collect results with progress bar
all_results = []
with tqdm(
total=total_thresholds,
desc="Computing affiliation metrics",
unit="threshold",
colour="green"
) as pbar:
for future in as_completed(futures):
chunk_results = future.result()
all_results.extend(chunk_results)
# Update by the number of thresholds processed in this chunk
pbar.update(futures[future])
# Find best result
best_result = max(all_results, key=lambda x: x['f1'])
elapsed = time.time() - start_time
print(f"Affiliation computed in {elapsed:.2f}s")
return best_result['f1'], best_result['precision'], best_result['recall']
def metric_Affiliation_optimized(self, label, score, num_workers=None):
"""
Optimized version with ThreadPool and better chunking
"""
if num_workers is None:
num_workers = min(mp.cpu_count(), 8)
print(f"Computing Affiliation (optimized) with {num_workers} workers")
start_time = time.time()
from .affiliation.generics import convert_vector_to_events
from .affiliation.metrics import pr_from_events
# Pre-compute ground truth events once
events_gt = convert_vector_to_events(label)
Trange = (0, len(label))
# Generate p-values and thresholds
p_values = np.linspace(0.8, 1, 300)
# Pre-compute all thresholds
thresholds = np.quantile(score, p_values)
# Pre-compute all predictions
print("Pre-computing predictions...")
all_predictions = []
for threshold in thresholds:
preds = (score > threshold).astype(int)
all_predictions.append(preds)
print("Computing affiliation metrics...")
# Function to process a batch of indices
def compute_metrics_batch(indices):
results = []
for idx in indices:
preds = all_predictions[idx]
events_pred = convert_vector_to_events(preds)
affiliation_metrics = pr_from_events(events_pred, events_gt, Trange)
prec = affiliation_metrics['Affiliation_Precision']
rec = affiliation_metrics['Affiliation_Recall']
if prec + rec > 0:
f1 = 2 * prec * rec / (prec + rec + self.eps)
else:
f1 = 0.0
results.append({
'f1': f1,
'precision': prec,
'recall': rec,
'p_value': p_values[idx],
'threshold': thresholds[idx]
})
return results
# Split indices for workers
indices = list(range(len(p_values)))
chunk_size = len(indices) // num_workers
if chunk_size == 0:
chunk_size = 1
index_chunks = [indices[i:i + chunk_size] for i in range(0, len(indices), chunk_size)]
# Process with thread pool
all_results = []
with ThreadPoolExecutor(max_workers=num_workers) as executor:
futures = [executor.submit(compute_metrics_batch, chunk) for chunk in index_chunks]
completed = 0
for future in as_completed(futures):
all_results.extend(future.result())
completed += 1
print(f"Progress: {completed}/{len(futures)} chunks completed", end='\r')
print() # New line
# Find best result
best_result = max(all_results, key=lambda x: x['f1'])
elapsed = time.time() - start_time
print(f"Affiliation computed in {elapsed:.2f}s")
return best_result['f1'], best_result['precision'], best_result['recall']
def metric_Affiliation_chunked(self, label, score, chunk_size=30, num_workers=4):
"""
Simple chunked parallel processing
"""
print(f"Computing Affiliation (chunked) with {num_workers} workers, chunk_size={chunk_size}")
start_time = time.time()
# Generate p-values
p_values = np.linspace(0.8, 1, 300)
# Create chunks of p-values
p_value_chunks = [p_values[i:i + chunk_size]
for i in range(0, len(p_values), chunk_size)]
# Prepare arguments for workers
chunk_args = [(chunk, label, score) for chunk in p_value_chunks]
# Process in parallel
all_results = []
with ProcessPoolExecutor(max_workers=num_workers) as executor:
for i, result_chunk in enumerate(executor.map(self._process_affiliation_chunk, chunk_args)):
all_results.extend(result_chunk)
print(f"Progress: {(i + 1) * chunk_size}/{len(p_values)} thresholds processed", end='\r')
print() # New line
# Find best result
best_result = max(all_results, key=lambda x: x['f1'])
elapsed = time.time() - start_time
print(f"Affiliation computed in {elapsed:.2f}s")
return best_result['f1'], best_result['precision'], best_result['recall']
@staticmethod
def _process_affiliation_chunk(args):
"""
Static method to process a chunk of p-values for affiliation metrics.
This can be pickled for multiprocessing.
"""
chunk_p_values, label_local, score_local = args
from .affiliation.generics import convert_vector_to_events
from .affiliation.metrics import pr_from_events
# Convert ground truth to events once for this chunk
events_gt = convert_vector_to_events(label_local)
Trange = (0, len(label_local))
results = []
for p in chunk_p_values:
threshold = np.quantile(score_local, p)
preds = (score_local > threshold).astype(int)
events_pred = convert_vector_to_events(preds)
affiliation_metrics = pr_from_events(events_pred, events_gt, Trange)
prec = affiliation_metrics['Affiliation_Precision']
rec = affiliation_metrics['Affiliation_Recall']
if prec + rec > 0:
f1 = 2 * prec * rec / (prec + rec + 1e-7)
else:
f1 = 0.0
results.append({
'f1': f1,
'precision': prec,
'recall': rec,
'p_value': p,
'threshold': threshold
})
return results
def metric_Affiliation_sampling(self, label, score, sample_rate=0.2):
"""
Fast approximation by sampling thresholds
"""
print(f"Computing Affiliation with threshold sampling (rate={sample_rate})")
start_time = time.time()
from .affiliation.generics import convert_vector_to_events
from .affiliation.metrics import pr_from_events
# Convert ground truth to events once
events_gt = convert_vector_to_events(label)
Trange = (0, len(label))
# Generate fewer p-values
n_samples = int(300 * sample_rate)
p_values = np.linspace(0.8, 1, n_samples)
results = []
for p in tqdm(p_values, desc="Sampling affiliation", unit="threshold"):
threshold = np.quantile(score, p)
preds = (score > threshold).astype(int)
events_pred = convert_vector_to_events(preds)
affiliation_metrics = pr_from_events(events_pred, events_gt, Trange)
prec = affiliation_metrics['Affiliation_Precision']
rec = affiliation_metrics['Affiliation_Recall']
if prec + rec > 0:
f1 = 2 * prec * rec / (prec + rec + self.eps)
else:
f1 = 0.0
results.append({
'f1': f1,
'precision': prec,
'recall': rec,
'p_value': p,
'threshold': threshold
})
# Find best result
best_result = max(results, key=lambda x: x['f1'])
elapsed = time.time() - start_time
print(f"Affiliation computed in {elapsed:.2f}s (approximate)")
return best_result['f1'], best_result['precision'], best_result['recall']
def metric_standard_F1_chunked(self, true_labels, anomaly_scores, threshold=None, chunk_size=50, num_workers=4):
"""
Optimized chunked parallel version of metric_standard_F1.
Calculate F1, Precision, Recall using parallel threshold processing.
Args:
true_labels: np.ndarray, ground truth binary labels (0=normal, 1=anomaly)
anomaly_scores: np.ndarray, anomaly scores (continuous values)
threshold: float, optional. If None, will use optimal threshold based on F1 score
chunk_size: int, number of thresholds to process in each chunk
num_workers: int, number of parallel workers
Returns:
dict: Dictionary containing various metrics
"""
# If threshold is provided, use original method
if threshold is not None:
return self.metric_standard_F1(true_labels, anomaly_scores, threshold)
print(f"Computing standard F1 (chunked) with {num_workers} workers, chunk_size={chunk_size}")
start_time = time.time()
# Generate thresholds
thresholds = np.linspace(0.5, 1, 500)
total_thresholds = len(thresholds)
# Create chunks of thresholds
threshold_chunks = [thresholds[i:i + chunk_size]
for i in range(0, len(thresholds), chunk_size)]
print(f"Split {total_thresholds} thresholds into {len(threshold_chunks)} chunks")
# Process in parallel
all_results = []
with ProcessPoolExecutor(max_workers=num_workers) as executor:
with tqdm(total=total_thresholds, desc="Processing standard F1 thresholds", unit="threshold", colour="blue") as pbar:
# Prepare arguments
chunk_args = [(chunk, true_labels, anomaly_scores) for chunk in threshold_chunks]
# Process and update progress bar
for i, result_chunk in enumerate(executor.map(self._process_standard_f1_chunk, chunk_args)):
all_results.extend(result_chunk)
pbar.update(len(threshold_chunks[i]))
pbar.set_postfix({
'chunk': f"{i + 1}/{len(threshold_chunks)}",
'results': len(all_results)
})
# Find best result
best_result = max(all_results, key=lambda x: x['f1'])
elapsed = time.time() - start_time
print(f"✓ Standard F1 computed in {elapsed:.2f}s")
print(f" Best F1: {best_result['f1']:.4f} at threshold {best_result['threshold']:.4f}")
return {
'F1': best_result['f1'],
'Recall': best_result['recall'],
'Precision': best_result['precision']
}
@staticmethod
def _process_standard_f1_chunk(args):
"""
Static method to process a chunk of thresholds for standard F1 metrics.
This can be pickled for multiprocessing.
"""
chunk_thresholds, true_labels, anomaly_scores = args
results = []
for t in chunk_thresholds:
threshold = np.quantile(anomaly_scores, t)
predictions = (anomaly_scores >= threshold).astype(int)
if len(np.unique(predictions)) > 1: # Avoid division by zero
precision, recall, f1, _ = precision_recall_fscore_support(
true_labels, predictions, average='binary', zero_division=0
)
else:
precision, recall, f1 = 0.0, 0.0, 0.0
results.append({
'f1': f1,
'precision': precision,
'recall': recall,
'threshold': threshold,
'quantile': t
})
return results
def metric_PointF1PA_chunked(self, label, score, preds=None, chunk_size=50, num_workers=4):
"""
Optimized chunked parallel version of metric_PointF1PA.
Calculate Point F1 with Point Adjustment using parallel threshold processing.
Args:
label: np.ndarray, ground truth binary labels
score: np.ndarray, anomaly scores
preds: np.ndarray, optional. If provided, use these predictions directly
chunk_size: int, number of thresholds to process in each chunk
num_workers: int, number of parallel workers
Returns:
dict: Dictionary containing various metrics (same format as original method)
"""
# If predictions are provided, use original method
if preds is not None:
return self.metric_PointF1PA(label, score, preds)
print(f"Computing PointF1PA (chunked) with {num_workers} workers, chunk_size={chunk_size}")
start_time = time.time()
# Generate q_values (quantiles)
q_values = np.arange(0.7, 0.99, 0.001)
total_thresholds = len(q_values)
# Create chunks of q_values
q_value_chunks = [q_values[i:i + chunk_size]
for i in range(0, len(q_values), chunk_size)]
print(f"Split {total_thresholds} thresholds into {len(q_value_chunks)} chunks")
# Process in parallel
all_results = []
with ProcessPoolExecutor(max_workers=num_workers) as executor:
with tqdm(total=total_thresholds, desc="Processing PointF1PA thresholds", unit="threshold", colour="green") as pbar:
# Prepare arguments
chunk_args = [(chunk, label, score) for chunk in q_value_chunks]
# Process and update progress bar
for i, result_chunk in enumerate(executor.map(self._process_pointf1pa_chunk, chunk_args)):
all_results.extend(result_chunk)
pbar.update(len(q_value_chunks[i]))
pbar.set_postfix({
'chunk': f"{i + 1}/{len(q_value_chunks)}",
'results': len(all_results)
})
# Find best result
best_result = max(all_results, key=lambda x: x['F1_PA'])
elapsed = time.time() - start_time
print(f"✓ PointF1PA computed in {elapsed:.2f}s")
print(f" Best F1_PA: {best_result['F1_PA']:.4f} at threshold {best_result['thre_PA']:.4f}")
return best_result
@staticmethod
def _process_pointf1pa_chunk(args):
"""
Static method to process a chunk of q_values for PointF1PA metrics.
This can be pickled for multiprocessing.
"""
import sklearn.metrics
chunk_q_values, label, score = args
results = []
# Create a basic_metricor instance to access adjustment method
grader = basic_metricor()
for q in chunk_q_values:
thre = np.quantile(score, q)
pred = (score > thre).astype(int)
adjusted_pred = grader.adjustment(label, pred)
accuracy = sklearn.metrics.accuracy_score(label, adjusted_pred)
P, R, F1, _ = sklearn.metrics.precision_recall_fscore_support(label, adjusted_pred, average="binary")
result = {
'thre_PA': thre,
'ACC_PA': accuracy,
'P_PA': P,
'R_PA': R,
'F1_PA': F1,
'quantile': q
}
results.append(result)
return results