File size: 8,081 Bytes
d03866e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
"""
Moirai model wrapper for anomaly detection
Adapted from the test_anomaly.py implementation
"""

import numpy as np
import pandas as pd
import torch
from tqdm import tqdm
import tempfile
import warnings
warnings.filterwarnings('ignore')

from gluonts.dataset.pandas import PandasDataset
from gluonts.dataset.split import split
from uni2ts.model.moirai.forecast import MoiraiForecast, MoiraiModule

from .base import BaseDetector


class Moirai(BaseDetector):
    def __init__(self, 
                 win_size=96,
                 model_path="Salesforce/moirai-1.0-R-small",
                 num_samples=100,
                 device='cuda:0',
                 use_score=False,
                 threshold=0.5):

        self.model_name = 'Moirai'
        self.win_size = win_size
        self.model_path = model_path
        self.num_samples = num_samples
        self.device = torch.device(device if torch.cuda.is_available() else 'cpu')
        self.use_score = use_score
        self.threshold = threshold
        self.decision_scores_ = None

    def fit(self, data):
        """
        Fit Moirai on the data and compute anomaly scores using zero-shot approach
        This implementation follows the exact windowing logic from the data loaders
        """
        print(f"Moirai zero-shot anomaly detection on data shape: {data.shape}")
        
        # Handle univariate data (ensure 2D shape)
        if data.ndim == 1:
            data = data.reshape(-1, 1)
        
        # Check if we have enough data
        if data.shape[0] < 2 * self.win_size:
            raise ValueError(f"Data length ({data.shape[0]}) is less than required minimum (2 * win_size = {2 * self.win_size})")
        
        all_target = []
        all_moirai_preds = []
        last_pred_label = None
        
        # Create sliding windows following the data loader pattern
        # For testing, we use stride = win_size (non-overlapping windows like in data loaders)
        num_windows = (data.shape[0] - 2 * self.win_size) // self.win_size + 1
        
        for i in tqdm(range(num_windows), desc="Processing windows"):
            # Extract window following data loader logic
            start_idx = i * self.win_size
            end_idx = start_idx + 2 * self.win_size
            
            if end_idx > data.shape[0]:
                break
                
            # Get the 2*win_size window (this matches batch_x from data loader)
            window_data = data[start_idx:end_idx]  # Shape: (2*win_size, n_features)
            
            # Create synthetic labels (all zeros initially, replaced by predictions)
            label = np.zeros(window_data.shape[0])
            
            # Replace the first win_size labels with last prediction if not first window
            if i != 0 and last_pred_label is not None:
                label[:self.win_size] = last_pred_label
            
            # Convert to DataFrame format required by GluonTS
            # Handle both univariate and multivariate data
            if window_data.shape[1] == 1:
                # Univariate case
                feature = pd.DataFrame(window_data, columns=['value'])
            else:
                # Multivariate case
                feature = pd.DataFrame(window_data)
                feature.columns = [f'feature_{j}' for j in range(feature.shape[1])]
            
            label_df = pd.DataFrame(label, columns=['label'])
            df = pd.concat([feature, label_df], axis=1)
            
            # Add timestamp and unique_id
            new_index = pd.date_range(
                start=pd.Timestamp('2023-01-01 10:00:00'), 
                periods=len(df), 
                freq='T'
            )
            new_index_iso = new_index.strftime('%Y-%m-%d %H:%M:%S')
            df.insert(0, 'Timestamp', new_index_iso)
            df['unique_id'] = 0
            moirai_df = df.set_index('Timestamp')

            # Create GluonTS dataset
            feat_cols = feature.columns.tolist()
            ds = PandasDataset.from_long_dataframe(
                moirai_df,
                target="label",
                item_id="unique_id",
                feat_dynamic_real=feat_cols,
            )

            test_size = self.win_size
            _, test_template = split(ds, offset=-test_size)

            test_data = test_template.generate_instances(
                prediction_length=self.win_size,
                windows=1,
                distance=self.win_size,
                max_history=self.win_size,
            )

            # Create Moirai model (recreate for each window to avoid memory issues)
            model = MoiraiForecast(
                module=MoiraiModule.from_pretrained(self.model_path),
                prediction_length=self.win_size,
                context_length=self.win_size,
                patch_size="auto",
                num_samples=self.num_samples,
                target_dim=1,
                feat_dynamic_real_dim=ds.num_feat_dynamic_real,
                past_feat_dynamic_real_dim=ds.num_past_feat_dynamic_real,
            )

            try:
                predictor = model.create_predictor(batch_size=1, device=self.device)
                forecasts = predictor.predict(test_data.input)
                forecasts = list(forecasts)

                moirai_preds = np.median(forecasts[0].samples, axis=0)
                all_moirai_preds.append(moirai_preds)
                
                # Collect targets for verification
                input_it = iter(test_data.label)
                for item in input_it:
                    all_target.extend(item['target'])
                
                # Update last prediction for next window
                if self.use_score:
                    last_pred_label = moirai_preds
                else:
                    last_pred_label = (moirai_preds >= self.threshold).astype(int)
                    
            except Exception as e:
                print(f"Error processing window {i}: {e}")
                # Use zeros as fallback
                moirai_preds = np.zeros(self.win_size)
                all_moirai_preds.append(moirai_preds)
                last_pred_label = moirai_preds

        # Concatenate all predictions
        if all_moirai_preds:
            all_moirai_preds = np.concatenate(all_moirai_preds, axis=0)
        else:
            all_moirai_preds = np.zeros(0)
        
        # Create scores array that matches the original data length
        # This follows the pattern from data loaders: each window predicts win_size points
        padded_scores = np.zeros(data.shape[0])
        
        if len(all_moirai_preds) > 0:
            # Map predictions back to original data indices
            for i, pred_window in enumerate(np.array_split(all_moirai_preds, num_windows)):
                if len(pred_window) > 0:
                    start_pred_idx = self.win_size + i * self.win_size  # Start from win_size offset
                    end_pred_idx = min(start_pred_idx + len(pred_window), data.shape[0])
                    actual_len = end_pred_idx - start_pred_idx
                    padded_scores[start_pred_idx:end_pred_idx] = pred_window[:actual_len]
            
            # Fill the first win_size points with the first prediction if available
            if self.win_size < len(padded_scores):
                first_pred = all_moirai_preds[0] if len(all_moirai_preds) > 0 else 0
                padded_scores[:self.win_size] = first_pred
        
        self.decision_scores_ = padded_scores
        print(f"Generated anomaly scores shape: {self.decision_scores_.shape}")
        return self

    def decision_function(self, X):
        """
        Return anomaly scores for X
        """
        if self.decision_scores_ is None:
            raise ValueError("Model must be fitted before calling decision_function")
        return self.decision_scores_[:len(X)]

    def zero_shot(self, data):
        """
        Zero-shot anomaly detection
        """
        self.fit(data)
        return self.decision_scores_