import sys import time from .basic_metrics import basic_metricor, generate_curve from statsmodels.tsa.stattools import acf from scipy.signal import argrelextrema import numpy as np import multiprocessing import multiprocessing import numpy as np import torch from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed from functools import partial from tqdm import tqdm import time # ============== Parallelized Affiliation ============== def _compute_auc_roc(labels, score): grader = basic_metricor() try: return grader.metric_ROC(labels, score) except Exception: return 0.0 def _compute_auc_pr(labels, score): grader = basic_metricor() try: return grader.metric_PR(labels, score) except Exception: return 0.0 def _compute_vus(labels, score, slidingWindow, version): try: _, _, _, _, _, _, VUS_ROC, VUS_PR = generate_curve(labels.astype(int), score, slidingWindow, version) return VUS_ROC, VUS_PR except Exception: return 0.0, 0.0 def _compute_pointf1(labels, score): # print("Evaluating F1 standard...") grader = basic_metricor() try: # print("Using chunked parallel F1 computation...") return grader.metric_standard_F1_chunked( true_labels=labels, anomaly_scores=score, chunk_size=25, # Process 25 thresholds per chunk num_workers=4 # Use 4 parallel workers ) except Exception: # print("F1 standard computation failed, returning zeros.") return {'F1': 0.0, 'Precision': 0.0, 'Recall': 0.0} def _compute_pointf1pa(labels, score): grader = basic_metricor() try: return grader.metric_PointF1PA_chunked( label=labels, score=score, chunk_size=30, # Process 30 quantiles per chunk num_workers=6 # Use 6 parallel workers ) except Exception: return {'F1_PA': 0.0, 'P_PA': 0.0, 'R_PA': 0.0} def _compute_affiliation(labels, score): grader = basic_metricor() try: return grader.metric_Affiliation(labels, score) except Exception: return 0.0, 0.0, 0.0 def _compute_t_score(labels, score): grader = basic_metricor() try: return grader.metric_F1_T(labels, score) except Exception: return {'F1_T': 0.0, 'P_T': 0.0, 'R_T': 0.0} def _compute_f1_t(labels, score): grader = basic_metricor() try: # Use non-parallel path here to avoid pickling issues inside thread workers # metric_F1_T(use_parallel=False) runs in-process and returns a dict return grader.metric_F1_T(labels, score, use_parallel=True) except Exception: # Always return a dict to keep downstream code consistent return {'F1_T': 0.0, 'P_T': 0.0, 'R_T': 0.0} def _run_task(func, args): return func(*args) def get_metrics_optimized(score, labels, slidingWindow=100, pred=None, version='opt', thre=250): """ Fully optimized metrics computation with proper parallelization """ metrics = {} start_total = time.time() # Ensure proper data types to avoid float/integer issues labels = np.asarray(labels, dtype=int) score = np.asarray(score, dtype=float) # Determine optimal number of workers based on CPU count and workload n_cores = multiprocessing.cpu_count() # For threshold-iterating functions (affiliation and F1_T) # Use more workers since they have heavy loops heavy_workers = min(n_cores - 2, 8) # Leave some cores for system # For simple metrics light_workers = min(n_cores // 2, 8) print(f"Using {heavy_workers} workers for heavy metrics, {light_workers} for light metrics") # Start the heavy computations first (they take longest) print("Starting heavy computations (Affiliation and F1_T)...") heavy_start = time.time() grader = basic_metricor() with ProcessPoolExecutor(max_workers=2) as main_executor: # Launch the two heaviest computations with their own internal parallelization affiliation_future = main_executor.submit( grader._compute_affiliation_parallel, labels, score, num_workers=heavy_workers ) # t_score_future = main_executor.submit( # grader.metric_F1_T_fast, # labels, # score, # num_workers=heavy_workers*2 # ) # # While heavy computations are running, compute light metrics print("Computing light metrics in parallel...") light_start = time.time() with ThreadPoolExecutor(max_workers=light_workers) as light_executor: light_futures = { 'auc_roc': light_executor.submit(_compute_auc_roc, labels, score), 'auc_pr': light_executor.submit(_compute_auc_pr, labels, score), 'vus': light_executor.submit(_compute_vus, labels, score, slidingWindow, version), 'pointf1': light_executor.submit(_compute_pointf1, labels, score), 'pointf1pa': light_executor.submit(_compute_pointf1pa, labels, score), 'f1_t': light_executor.submit(_compute_f1_t, labels, score) } # Collect light metric results as they complete light_results = {} for name, future in light_futures.items(): try: light_results[name] = future.result() print(f" ✓ {name} completed") except Exception as e: print(f" ✗ {name} failed: {e}") light_results[name] = None print(f"Light metrics completed in {time.time() - light_start:.2f}s") # Wait for heavy computations to complete print("Waiting for heavy computations...") try: Affiliation_F, Affiliation_P, Affiliation_R = affiliation_future.result() print(f" ✓ Affiliation completed") except Exception as e: print(f" ✗ Affiliation failed: {e}") Affiliation_F, Affiliation_P, Affiliation_R = 0.0, 0.0, 0.0 # try: # T_score = t_score_future.result() # print(f" ✓ F1_T completed") # except Exception as e: # print(f" ✗ F1_T failed: {e}") # T_score = {'F1_T': 0.0, 'P_T': 0.0, 'R_T': 0.0} print(f"Heavy metrics completed in {time.time() - heavy_start:.2f}s") # Unpack light results AUC_ROC = light_results.get('auc_roc', 0.0) AUC_PR = light_results.get('auc_pr', 0.0) VUS_result = light_results.get('vus', (0.0, 0.0)) if isinstance(VUS_result, tuple): VUS_ROC, VUS_PR = VUS_result else: VUS_ROC, VUS_PR = 0.0, 0.0 # print("HERE IS POINTF1: ") # print(light_results.get('pointf1',)) # sys.exit() PointF1 = light_results.get('pointf1', {'F1': 0.0, 'Precision': 0.0, 'Recall': 0.0}) PointF1PA = light_results.get('pointf1pa', {'F1_PA': 0.0, 'P_PA': 0.0, 'R_PA': 0.0}) T_score = light_results.get('f1_t', {'F1_T': 0.0, 'P_T': 0.0, 'R_T': 0.0}) # Safeguard: if upstream returned a tuple (e.g., from an older fallback), coerce to dict if isinstance(T_score, tuple): try: T_score = {'F1_T': T_score[0], 'P_T': T_score[1], 'R_T': T_score[2]} except Exception: T_score = {'F1_T': 0.0, 'P_T': 0.0, 'R_T': 0.0} # Build final metrics dictionary metrics['AUC-PR'] = AUC_PR metrics['AUC-ROC'] = AUC_ROC metrics['VUS-PR'] = VUS_PR metrics['VUS-ROC'] = VUS_ROC metrics['Standard-F1'] = PointF1.get('F1', 0.0) metrics['Standard-Precision'] = PointF1.get('Precision', 0.0) metrics['Standard-Recall'] = PointF1.get('Recall', 0.0) metrics['PA-F1'] = PointF1PA.get('F1_PA', 0.0) metrics['PA-Precision'] = PointF1PA.get('P_PA', 0.0) metrics['PA-Recall'] = PointF1PA.get('R_PA', 0.0) metrics['Affiliation-F'] = Affiliation_F metrics['Affiliation-P'] = Affiliation_P metrics['Affiliation-R'] = Affiliation_R metrics['F1_T'] = T_score.get('F1_T', 0.0) metrics['Precision_T'] = T_score.get('P_T', 0.0) metrics['Recall_T'] = T_score.get('R_T', 0.0) print(f"\nTotal computation time: {time.time() - start_total:.2f}s") return metrics def get_metrics(score, labels, slidingWindow=100, pred=None, version='opt', thre=250): metrics = {} # Ensure proper data types to avoid float/integer issues labels = np.asarray(labels, dtype=int) score = np.asarray(score, dtype=float) ''' Threshold Independent ''' grader = basic_metricor() # AUC_ROC, Precision, Recall, PointF1, PointF1PA, Rrecall, ExistenceReward, OverlapReward, Rprecision, RF, Precision_at_k = grader.metric_new(labels, score, pred, plot_ROC=False) try: AUC_ROC = grader.metric_ROC(labels, score) except Exception: AUC_ROC = 0.0 try: AUC_PR = grader.metric_PR(labels, score) except Exception: AUC_PR = 0.0 # R_AUC_ROC, R_AUC_PR, _, _, _ = grader.RangeAUC(labels=labels, score=score, window=slidingWindow, plot_ROC=True) try: _, _, _, _, _, _,VUS_ROC, VUS_PR = generate_curve(labels.astype(int), score, slidingWindow, version, ) except Exception: VUS_ROC, VUS_PR = 0.0, 0.0 ''' Threshold Dependent if pred is None --> use the oracle threshold ''' PointF1 = grader.metric_standard_F1(labels, score,) PointF1PA = grader.metric_PointF1PA(labels, score,) # EventF1PA = grader.metric_EventF1PA(labels, score,) # RF1 = grader.metric_RF1(labels, score,) try: Affiliation_F, Affiliation_P, Affiliation_R = grader.metric_Affiliation(labels, score) except Exception: Affiliation_F, Affiliation_P, Affiliation_R = 0.0, 0.0, 0.0 T_score = grader.metric_F1_T(labels, score) metrics['AUC-PR'] = AUC_PR metrics['AUC-ROC'] = AUC_ROC metrics['VUS-PR'] = VUS_PR metrics['VUS-ROC'] = VUS_ROC metrics['Standard-F1'] = PointF1['F1'] metrics['Standard-Precision'] = PointF1['Precision'] metrics['Standard-Recall'] = PointF1['Recall'] metrics['PA-F1'] = PointF1PA['F1_PA'] metrics['PA-Precision'] = PointF1PA['P_PA'] metrics['PA-Recall'] = PointF1PA['R_PA'] # metrics['Event-based-F1'] = EventF1PA # metrics['R-based-F1'] = RF1 metrics['Affiliation-F'] = Affiliation_F metrics['Affiliation-P'] = Affiliation_P metrics['Affiliation-R'] = Affiliation_R metrics['F1_T'] = T_score['F1_T'] metrics['Precision_T'] = T_score['P_T'] metrics['Recall_T'] = T_score['R_T'] return metrics def get_metrics_pred(score, labels, pred, slidingWindow=100): metrics = {} # Ensure proper data types to avoid float/integer issues labels = np.asarray(labels, dtype=int) score = np.asarray(score, dtype=float) pred = np.asarray(pred, dtype=int) grader = basic_metricor() PointF1 = grader.standard_F1(labels, score, preds=pred) PointF1PA = grader.metric_PointF1PA(labels, score, preds=pred) EventF1PA = grader.metric_EventF1PA(labels, score, preds=pred) RF1 = grader.metric_RF1(labels, score, preds=pred) Affiliation_F, Affiliation_P, Affiliation_R = grader.metric_Affiliation(labels, score, preds=pred) VUS_R, VUS_P, VUS_F = grader.metric_VUS_pred(labels, preds=pred, windowSize=slidingWindow) metrics['Standard-F1'] = PointF1['F1'] metrics['Standard-Precision'] = PointF1['Precision'] metrics['Standard-Recall'] = PointF1['Recall'] metrics['PA-F1'] = PointF1PA metrics['Event-based-F1'] = EventF1PA metrics['R-based-F1'] = RF1 metrics['Affiliation-F'] = Affiliation_F metrics['Affiliation-P'] = Affiliation_P metrics['Affiliation-R'] = Affiliation_R metrics['VUS-Recall'] = VUS_R metrics['VUS-Precision'] = VUS_P metrics['VUS-F'] = VUS_F return metrics def find_length_rank(data, rank=1): data = data.squeeze() if len(data.shape) > 1: return 0 if rank == 0: return 1 data = data[: min(20000, len(data))] base = 3 auto_corr = acf(data, nlags=400, fft=True)[base:] # plot_acf(data, lags=400, fft=True) # plt.xlabel('Lags') # plt.ylabel('Autocorrelation') # plt.title('Autocorrelation Function (ACF)') # plt.savefig('/data/liuqinghua/code/ts/TSAD-AutoML/AutoAD_Solution/candidate_pool/cd_diagram/ts_acf.png') local_max = argrelextrema(auto_corr, np.greater)[0] # print('auto_corr: ', auto_corr) # print('local_max: ', local_max) try: # max_local_max = np.argmax([auto_corr[lcm] for lcm in local_max]) sorted_local_max = np.argsort([auto_corr[lcm] for lcm in local_max])[::-1] # Ascending order max_local_max = sorted_local_max[0] # Default if rank == 1: max_local_max = sorted_local_max[0] if rank == 2: for i in sorted_local_max[1:]: if i > sorted_local_max[0]: max_local_max = i break if rank == 3: id_tmp = 1 for i in sorted_local_max[1:]: if i > sorted_local_max[0]: id_tmp = i break for i in sorted_local_max[id_tmp:]: if i > sorted_local_max[id_tmp]: max_local_max = i break # print('sorted_local_max: ', sorted_local_max) # print('max_local_max: ', max_local_max) if local_max[max_local_max] < 3 or local_max[max_local_max] > 300: return 125 return local_max[max_local_max] + base except Exception: return 125