Time_RCD / models /Moirai_old.py
Oliver Le
Initial commit
d03866e
"""
Moirai model wrapper for anomaly detection
Adapted from the test_anomaly.py implementation
"""
import numpy as np
import pandas as pd
import torch
from tqdm import tqdm
import tempfile
import warnings
warnings.filterwarnings('ignore')
from gluonts.dataset.pandas import PandasDataset
from gluonts.dataset.split import split
from uni2ts.model.moirai.forecast import MoiraiForecast, MoiraiModule
from .base import BaseDetector
class Moirai(BaseDetector):
def __init__(self,
win_size=96,
model_path="Salesforce/moirai-1.0-R-small",
num_samples=100,
device='cuda:0',
use_score=False,
threshold=0.5):
self.model_name = 'Moirai'
self.win_size = win_size
self.model_path = model_path
self.num_samples = num_samples
self.device = torch.device(device if torch.cuda.is_available() else 'cpu')
self.use_score = use_score
self.threshold = threshold
self.decision_scores_ = None
def fit(self, data):
"""
Fit Moirai on the data and compute anomaly scores using zero-shot approach
This implementation follows the exact windowing logic from the data loaders
"""
print(f"Moirai zero-shot anomaly detection on data shape: {data.shape}")
# Handle univariate data (ensure 2D shape)
if data.ndim == 1:
data = data.reshape(-1, 1)
# Check if we have enough data
if data.shape[0] < 2 * self.win_size:
raise ValueError(f"Data length ({data.shape[0]}) is less than required minimum (2 * win_size = {2 * self.win_size})")
all_target = []
all_moirai_preds = []
last_pred_label = None
# Create sliding windows following the data loader pattern
# For testing, we use stride = win_size (non-overlapping windows like in data loaders)
num_windows = (data.shape[0] - 2 * self.win_size) // self.win_size + 1
for i in tqdm(range(num_windows), desc="Processing windows"):
# Extract window following data loader logic
start_idx = i * self.win_size
end_idx = start_idx + 2 * self.win_size
if end_idx > data.shape[0]:
break
# Get the 2*win_size window (this matches batch_x from data loader)
window_data = data[start_idx:end_idx] # Shape: (2*win_size, n_features)
# Create synthetic labels (all zeros initially, replaced by predictions)
label = np.zeros(window_data.shape[0])
# Replace the first win_size labels with last prediction if not first window
if i != 0 and last_pred_label is not None:
label[:self.win_size] = last_pred_label
# Convert to DataFrame format required by GluonTS
# Handle both univariate and multivariate data
if window_data.shape[1] == 1:
# Univariate case
feature = pd.DataFrame(window_data, columns=['value'])
else:
# Multivariate case
feature = pd.DataFrame(window_data)
feature.columns = [f'feature_{j}' for j in range(feature.shape[1])]
label_df = pd.DataFrame(label, columns=['label'])
df = pd.concat([feature, label_df], axis=1)
# Add timestamp and unique_id
new_index = pd.date_range(
start=pd.Timestamp('2023-01-01 10:00:00'),
periods=len(df),
freq='T'
)
new_index_iso = new_index.strftime('%Y-%m-%d %H:%M:%S')
df.insert(0, 'Timestamp', new_index_iso)
df['unique_id'] = 0
moirai_df = df.set_index('Timestamp')
# Create GluonTS dataset
feat_cols = feature.columns.tolist()
ds = PandasDataset.from_long_dataframe(
moirai_df,
target="label",
item_id="unique_id",
feat_dynamic_real=feat_cols,
)
test_size = self.win_size
_, test_template = split(ds, offset=-test_size)
test_data = test_template.generate_instances(
prediction_length=self.win_size,
windows=1,
distance=self.win_size,
max_history=self.win_size,
)
# Create Moirai model (recreate for each window to avoid memory issues)
model = MoiraiForecast(
module=MoiraiModule.from_pretrained(self.model_path),
prediction_length=self.win_size,
context_length=self.win_size,
patch_size="auto",
num_samples=self.num_samples,
target_dim=1,
feat_dynamic_real_dim=ds.num_feat_dynamic_real,
past_feat_dynamic_real_dim=ds.num_past_feat_dynamic_real,
)
try:
predictor = model.create_predictor(batch_size=1, device=self.device)
forecasts = predictor.predict(test_data.input)
forecasts = list(forecasts)
moirai_preds = np.median(forecasts[0].samples, axis=0)
all_moirai_preds.append(moirai_preds)
# Collect targets for verification
input_it = iter(test_data.label)
for item in input_it:
all_target.extend(item['target'])
# Update last prediction for next window
if self.use_score:
last_pred_label = moirai_preds
else:
last_pred_label = (moirai_preds >= self.threshold).astype(int)
except Exception as e:
print(f"Error processing window {i}: {e}")
# Use zeros as fallback
moirai_preds = np.zeros(self.win_size)
all_moirai_preds.append(moirai_preds)
last_pred_label = moirai_preds
# Concatenate all predictions
if all_moirai_preds:
all_moirai_preds = np.concatenate(all_moirai_preds, axis=0)
else:
all_moirai_preds = np.zeros(0)
# Create scores array that matches the original data length
# This follows the pattern from data loaders: each window predicts win_size points
padded_scores = np.zeros(data.shape[0])
if len(all_moirai_preds) > 0:
# Map predictions back to original data indices
for i, pred_window in enumerate(np.array_split(all_moirai_preds, num_windows)):
if len(pred_window) > 0:
start_pred_idx = self.win_size + i * self.win_size # Start from win_size offset
end_pred_idx = min(start_pred_idx + len(pred_window), data.shape[0])
actual_len = end_pred_idx - start_pred_idx
padded_scores[start_pred_idx:end_pred_idx] = pred_window[:actual_len]
# Fill the first win_size points with the first prediction if available
if self.win_size < len(padded_scores):
first_pred = all_moirai_preds[0] if len(all_moirai_preds) > 0 else 0
padded_scores[:self.win_size] = first_pred
self.decision_scores_ = padded_scores
print(f"Generated anomaly scores shape: {self.decision_scores_.shape}")
return self
def decision_function(self, X):
"""
Return anomaly scores for X
"""
if self.decision_scores_ is None:
raise ValueError("Model must be fitted before calling decision_function")
return self.decision_scores_[:len(X)]
def zero_shot(self, data):
"""
Zero-shot anomaly detection
"""
self.fit(data)
return self.decision_scores_