""" Moirai model for anomaly detection using zero-shot forecasting. Adapted from test_anomaly.py approach for TSB-AD framework. """ import numpy as np import pandas as pd import torch from torch.utils.data import DataLoader from tqdm import tqdm import warnings warnings.filterwarnings('ignore') from gluonts.dataset.pandas import PandasDataset from gluonts.dataset.split import split from uni2ts.model.moirai import MoiraiForecast, MoiraiModule from .base import BaseDetector from ..utils.dataset import MoiraiWindowedDataset class Moirai(BaseDetector): def __init__(self, win_size=96, model_path="Salesforce/moirai-1.0-R-small", num_samples=100, device='cuda:0', use_score=False, threshold=0.5): """ Initialize Moirai anomaly detector. Args: win_size (int): Window size for context and prediction model_path (str): Path to pretrained Moirai model num_samples (int): Number of forecast samples device (str): Device to run model on use_score (bool): Whether to use raw scores or threshold threshold (float): Threshold for binary classification """ self.model_name = 'Moirai' self.win_size = win_size self.model_path = model_path self.num_samples = num_samples self.device = torch.device(device if torch.cuda.is_available() else 'cpu') self.use_score = use_score self.threshold = threshold self.decision_scores_ = None def fit(self, data): """ Fit the Moirai model and compute anomaly scores. Args: data: Input time series data (1D or 2D numpy array) """ try: # Ensure data is in the right format if data.ndim == 1: data = data.reshape(-1, 1) print(f"Moirai: Processing data with shape {data.shape}") # Create windowed dataset following test_anomaly.py pattern dataset = MoiraiWindowedDataset( data=data, win_size=self.win_size, step=self.win_size, # Non-overlapping windows normalize=False # Let Moirai handle normalization ) print(f"Moirai: Created {len(dataset)} windows") if len(dataset) == 0: print("Warning: No valid windows created. Data might be too short.") self.decision_scores_ = np.zeros(len(data)) return # Process each window using DataLoader (similar to test_anomaly.py) data_loader = DataLoader( dataset=dataset, batch_size=1, shuffle=False, drop_last=False ) all_predictions = [] all_targets = [] print("Processing windows with Moirai model...") # Add progress bar for window processing for i, (context, target) in enumerate(tqdm(data_loader, desc="Processing windows", unit="window")): # Process single window following test_anomaly.py pattern scores = self._process_window(context.squeeze(0).numpy(), target.squeeze(0).numpy(), i) all_predictions.append(scores) all_targets.append(target.squeeze(0).numpy()) # Combine all predictions if all_predictions: print("Computing anomaly scores...") # Concatenate predictions along time dimension combined_predictions = np.concatenate(all_predictions, axis=0) combined_targets = np.concatenate(all_targets, axis=0) # Compute anomaly scores as prediction error if combined_targets.ndim == 1 or combined_predictions.ndim == 1: # Handle univariate case or when predictions are 1D if combined_targets.ndim != combined_predictions.ndim: # Ensure both have same number of dimensions if combined_predictions.ndim == 1 and combined_targets.ndim == 2: combined_predictions = combined_predictions.reshape(-1, 1) elif combined_targets.ndim == 1 and combined_predictions.ndim == 2: combined_targets = combined_targets.reshape(-1, 1) if combined_targets.shape != combined_predictions.shape: print(f"Shape mismatch: targets {combined_targets.shape}, predictions {combined_predictions.shape}") # Use only the first feature if shapes don't match if combined_targets.ndim == 2: combined_targets = combined_targets[:, 0] if combined_predictions.ndim == 2: combined_predictions = combined_predictions[:, 0] anomaly_scores = (combined_targets - combined_predictions) ** 2 if anomaly_scores.ndim == 2: anomaly_scores = np.mean(anomaly_scores, axis=1) else: # For multivariate, use mean squared error across features if combined_targets.shape != combined_predictions.shape: print(f"Shape mismatch: targets {combined_targets.shape}, predictions {combined_predictions.shape}") # Use only matching dimensions min_features = min(combined_targets.shape[1], combined_predictions.shape[1]) combined_targets = combined_targets[:, :min_features] combined_predictions = combined_predictions[:, :min_features] anomaly_scores = np.mean((combined_targets - combined_predictions) ** 2, axis=1) # Pad scores to match original data length print("Padding scores to original data length...") self.decision_scores_ = self._pad_scores_to_original_length( anomaly_scores, len(data), dataset.get_window_info() ) else: print("Warning: No predictions generated") self.decision_scores_ = np.zeros(len(data)) except Exception as e: print(f"Error in Moirai.fit(): {str(e)}") import traceback traceback.print_exc() self.decision_scores_ = np.zeros(len(data)) def _process_window(self, context, target, window_index): """ Process a single window following the test_anomaly.py approach. Args: context: Context data for the window (win_size, n_features) target: Target data for the window (win_size, n_features) window_index: Index of the current window Returns: predictions: Forecasted values for the target period """ try: # Update progress description in tqdm (this will be shown in the progress bar) tqdm.write(f"Processing window {window_index + 1}") # Ensure 2D shape if context.ndim == 1: context = context.reshape(-1, 1) if target.ndim == 1: target = target.reshape(-1, 1) # Combine context and target for full window (following test_anomaly.py) full_window = np.vstack([context, target]) # Create DataFrame feature_df = pd.DataFrame(full_window) # For multivariate data, we need to handle it properly if feature_df.shape[1] == 1: feature_df.columns = ['target'] target_col = 'target' feature_cols = [] else: # For multivariate, use all features as target feature_df.columns = [f'target_{i}' for i in range(feature_df.shape[1])] target_col = feature_df.columns.tolist() # Use all columns as targets feature_cols = [] # Add timestamp and unique_id timestamp_range = pd.date_range( start=pd.Timestamp('2023-01-01 10:00:00'), periods=len(feature_df), freq='T' ) feature_df.index = timestamp_range feature_df['unique_id'] = window_index # Create GluonTS dataset moirai_df = feature_df.reset_index().rename(columns={'index': 'timestamp'}) if isinstance(target_col, list): # Multivariate case - use multiple target columns ds = PandasDataset.from_long_dataframe( moirai_df, target=target_col, item_id="unique_id", timestamp="timestamp", ) else: # Univariate case if feature_cols: ds = PandasDataset.from_long_dataframe( moirai_df, target=target_col, item_id="unique_id", timestamp="timestamp", feat_dynamic_real=feature_cols, ) else: ds = PandasDataset.from_long_dataframe( moirai_df, target=target_col, item_id="unique_id", timestamp="timestamp", ) # Split dataset (following test_anomaly.py) test_size = self.win_size _, test_template = split(ds, offset=-test_size) test_data = test_template.generate_instances( prediction_length=self.win_size, windows=1, distance=self.win_size, max_history=self.win_size, ) # Create Moirai model # Determine target dimension based on number of features target_dim = target.shape[1] if target.ndim > 1 else 1 model = MoiraiForecast( module=MoiraiModule.from_pretrained(self.model_path), prediction_length=self.win_size, context_length=self.win_size, patch_size="auto", num_samples=self.num_samples, target_dim=target_dim, feat_dynamic_real_dim=ds.num_feat_dynamic_real, past_feat_dynamic_real_dim=ds.num_past_feat_dynamic_real, ) # Create predictor and generate forecasts predictor = model.create_predictor(batch_size=1, device=self.device) forecasts = predictor.predict(test_data.input) forecasts = list(forecasts) # Get median prediction (following test_anomaly.py) predictions = np.median(forecasts[0].samples, axis=0) return predictions except Exception as e: print(f"Error processing window {window_index}: {str(e)}") # Return zeros as fallback with correct shape target_shape = (self.win_size, target.shape[1]) if target.ndim > 1 else (self.win_size,) return np.zeros(target_shape) def _pad_scores_to_original_length(self, scores, original_length, window_info): """ Pad anomaly scores to match the original data length. Args: scores: Computed anomaly scores from windows original_length: Length of the original input data window_info: Information about windowing strategy Returns: padded_scores: Scores padded to original length """ padded_scores = np.zeros(original_length) win_size = window_info['win_size'] step = window_info['step'] # Fill in scores from each window score_windows = scores.reshape(-1, win_size) for i, score_window in enumerate(tqdm(score_windows, desc="Padding scores", unit="window")): start_idx = i * step + win_size # Offset by win_size (context part) end_idx = start_idx + win_size if end_idx <= original_length: padded_scores[start_idx:end_idx] = score_window elif start_idx < original_length: # Partial window at the end remaining = original_length - start_idx padded_scores[start_idx:] = score_window[:remaining] # Fill beginning (context part) with first window's average if len(scores) > 0: first_score = np.mean(scores[:win_size]) if len(scores) >= win_size else np.mean(scores) padded_scores[:win_size] = first_score return padded_scores def decision_function(self, X): """ Not used for zero-shot approach, present for API consistency. """ return self.decision_scores_