Time_RCD / models /Moirai.py
Oliver Le
Initial commit
d03866e
"""
Moirai model for anomaly detection using zero-shot forecasting.
Adapted from test_anomaly.py approach for TSB-AD framework.
"""
import numpy as np
import pandas as pd
import torch
from torch.utils.data import DataLoader
from tqdm import tqdm
import warnings
warnings.filterwarnings('ignore')
from gluonts.dataset.pandas import PandasDataset
from gluonts.dataset.split import split
from uni2ts.model.moirai import MoiraiForecast, MoiraiModule
from .base import BaseDetector
from ..utils.dataset import MoiraiWindowedDataset
class Moirai(BaseDetector):
def __init__(self,
win_size=96,
model_path="Salesforce/moirai-1.0-R-small",
num_samples=100,
device='cuda:0',
use_score=False,
threshold=0.5):
"""
Initialize Moirai anomaly detector.
Args:
win_size (int): Window size for context and prediction
model_path (str): Path to pretrained Moirai model
num_samples (int): Number of forecast samples
device (str): Device to run model on
use_score (bool): Whether to use raw scores or threshold
threshold (float): Threshold for binary classification
"""
self.model_name = 'Moirai'
self.win_size = win_size
self.model_path = model_path
self.num_samples = num_samples
self.device = torch.device(device if torch.cuda.is_available() else 'cpu')
self.use_score = use_score
self.threshold = threshold
self.decision_scores_ = None
def fit(self, data):
"""
Fit the Moirai model and compute anomaly scores.
Args:
data: Input time series data (1D or 2D numpy array)
"""
try:
# Ensure data is in the right format
if data.ndim == 1:
data = data.reshape(-1, 1)
print(f"Moirai: Processing data with shape {data.shape}")
# Create windowed dataset following test_anomaly.py pattern
dataset = MoiraiWindowedDataset(
data=data,
win_size=self.win_size,
step=self.win_size, # Non-overlapping windows
normalize=False # Let Moirai handle normalization
)
print(f"Moirai: Created {len(dataset)} windows")
if len(dataset) == 0:
print("Warning: No valid windows created. Data might be too short.")
self.decision_scores_ = np.zeros(len(data))
return
# Process each window using DataLoader (similar to test_anomaly.py)
data_loader = DataLoader(
dataset=dataset,
batch_size=1,
shuffle=False,
drop_last=False
)
all_predictions = []
all_targets = []
print("Processing windows with Moirai model...")
# Add progress bar for window processing
for i, (context, target) in enumerate(tqdm(data_loader, desc="Processing windows", unit="window")):
# Process single window following test_anomaly.py pattern
scores = self._process_window(context.squeeze(0).numpy(), target.squeeze(0).numpy(), i)
all_predictions.append(scores)
all_targets.append(target.squeeze(0).numpy())
# Combine all predictions
if all_predictions:
print("Computing anomaly scores...")
# Concatenate predictions along time dimension
combined_predictions = np.concatenate(all_predictions, axis=0)
combined_targets = np.concatenate(all_targets, axis=0)
# Compute anomaly scores as prediction error
if combined_targets.ndim == 1 or combined_predictions.ndim == 1:
# Handle univariate case or when predictions are 1D
if combined_targets.ndim != combined_predictions.ndim:
# Ensure both have same number of dimensions
if combined_predictions.ndim == 1 and combined_targets.ndim == 2:
combined_predictions = combined_predictions.reshape(-1, 1)
elif combined_targets.ndim == 1 and combined_predictions.ndim == 2:
combined_targets = combined_targets.reshape(-1, 1)
if combined_targets.shape != combined_predictions.shape:
print(f"Shape mismatch: targets {combined_targets.shape}, predictions {combined_predictions.shape}")
# Use only the first feature if shapes don't match
if combined_targets.ndim == 2:
combined_targets = combined_targets[:, 0]
if combined_predictions.ndim == 2:
combined_predictions = combined_predictions[:, 0]
anomaly_scores = (combined_targets - combined_predictions) ** 2
if anomaly_scores.ndim == 2:
anomaly_scores = np.mean(anomaly_scores, axis=1)
else:
# For multivariate, use mean squared error across features
if combined_targets.shape != combined_predictions.shape:
print(f"Shape mismatch: targets {combined_targets.shape}, predictions {combined_predictions.shape}")
# Use only matching dimensions
min_features = min(combined_targets.shape[1], combined_predictions.shape[1])
combined_targets = combined_targets[:, :min_features]
combined_predictions = combined_predictions[:, :min_features]
anomaly_scores = np.mean((combined_targets - combined_predictions) ** 2, axis=1)
# Pad scores to match original data length
print("Padding scores to original data length...")
self.decision_scores_ = self._pad_scores_to_original_length(
anomaly_scores, len(data), dataset.get_window_info()
)
else:
print("Warning: No predictions generated")
self.decision_scores_ = np.zeros(len(data))
except Exception as e:
print(f"Error in Moirai.fit(): {str(e)}")
import traceback
traceback.print_exc()
self.decision_scores_ = np.zeros(len(data))
def _process_window(self, context, target, window_index):
"""
Process a single window following the test_anomaly.py approach.
Args:
context: Context data for the window (win_size, n_features)
target: Target data for the window (win_size, n_features)
window_index: Index of the current window
Returns:
predictions: Forecasted values for the target period
"""
try:
# Update progress description in tqdm (this will be shown in the progress bar)
tqdm.write(f"Processing window {window_index + 1}")
# Ensure 2D shape
if context.ndim == 1:
context = context.reshape(-1, 1)
if target.ndim == 1:
target = target.reshape(-1, 1)
# Combine context and target for full window (following test_anomaly.py)
full_window = np.vstack([context, target])
# Create DataFrame
feature_df = pd.DataFrame(full_window)
# For multivariate data, we need to handle it properly
if feature_df.shape[1] == 1:
feature_df.columns = ['target']
target_col = 'target'
feature_cols = []
else:
# For multivariate, use all features as target
feature_df.columns = [f'target_{i}' for i in range(feature_df.shape[1])]
target_col = feature_df.columns.tolist() # Use all columns as targets
feature_cols = []
# Add timestamp and unique_id
timestamp_range = pd.date_range(
start=pd.Timestamp('2023-01-01 10:00:00'),
periods=len(feature_df),
freq='T'
)
feature_df.index = timestamp_range
feature_df['unique_id'] = window_index
# Create GluonTS dataset
moirai_df = feature_df.reset_index().rename(columns={'index': 'timestamp'})
if isinstance(target_col, list):
# Multivariate case - use multiple target columns
ds = PandasDataset.from_long_dataframe(
moirai_df,
target=target_col,
item_id="unique_id",
timestamp="timestamp",
)
else:
# Univariate case
if feature_cols:
ds = PandasDataset.from_long_dataframe(
moirai_df,
target=target_col,
item_id="unique_id",
timestamp="timestamp",
feat_dynamic_real=feature_cols,
)
else:
ds = PandasDataset.from_long_dataframe(
moirai_df,
target=target_col,
item_id="unique_id",
timestamp="timestamp",
)
# Split dataset (following test_anomaly.py)
test_size = self.win_size
_, test_template = split(ds, offset=-test_size)
test_data = test_template.generate_instances(
prediction_length=self.win_size,
windows=1,
distance=self.win_size,
max_history=self.win_size,
)
# Create Moirai model
# Determine target dimension based on number of features
target_dim = target.shape[1] if target.ndim > 1 else 1
model = MoiraiForecast(
module=MoiraiModule.from_pretrained(self.model_path),
prediction_length=self.win_size,
context_length=self.win_size,
patch_size="auto",
num_samples=self.num_samples,
target_dim=target_dim,
feat_dynamic_real_dim=ds.num_feat_dynamic_real,
past_feat_dynamic_real_dim=ds.num_past_feat_dynamic_real,
)
# Create predictor and generate forecasts
predictor = model.create_predictor(batch_size=1, device=self.device)
forecasts = predictor.predict(test_data.input)
forecasts = list(forecasts)
# Get median prediction (following test_anomaly.py)
predictions = np.median(forecasts[0].samples, axis=0)
return predictions
except Exception as e:
print(f"Error processing window {window_index}: {str(e)}")
# Return zeros as fallback with correct shape
target_shape = (self.win_size, target.shape[1]) if target.ndim > 1 else (self.win_size,)
return np.zeros(target_shape)
def _pad_scores_to_original_length(self, scores, original_length, window_info):
"""
Pad anomaly scores to match the original data length.
Args:
scores: Computed anomaly scores from windows
original_length: Length of the original input data
window_info: Information about windowing strategy
Returns:
padded_scores: Scores padded to original length
"""
padded_scores = np.zeros(original_length)
win_size = window_info['win_size']
step = window_info['step']
# Fill in scores from each window
score_windows = scores.reshape(-1, win_size)
for i, score_window in enumerate(tqdm(score_windows, desc="Padding scores", unit="window")):
start_idx = i * step + win_size # Offset by win_size (context part)
end_idx = start_idx + win_size
if end_idx <= original_length:
padded_scores[start_idx:end_idx] = score_window
elif start_idx < original_length:
# Partial window at the end
remaining = original_length - start_idx
padded_scores[start_idx:] = score_window[:remaining]
# Fill beginning (context part) with first window's average
if len(scores) > 0:
first_score = np.mean(scores[:win_size]) if len(scores) >= win_size else np.mean(scores)
padded_scores[:win_size] = first_score
return padded_scores
def decision_function(self, X):
"""
Not used for zero-shot approach, present for API consistency.
"""
return self.decision_scores_