AI-OMS-Analyze / scripts /anomaly.py
kawaiipeace's picture
Initialization
d4d1ca8
raw
history blame
7.02 kB
import pandas as pd
import numpy as np
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
from sklearn.preprocessing import StandardScaler
from typing import Tuple
try:
import tensorflow as tf
from tensorflow.keras import layers, models
TF_AVAILABLE = True
except ImportError:
TF_AVAILABLE = False
def parse_datetime_cols(df: pd.DataFrame) -> pd.DataFrame:
for c in ['OutageDateTime','FirstRestoDateTime','LastRestoDateTime']:
if c in df.columns:
df[c+'_dt'] = pd.to_datetime(df[c], format='%d-%m-%Y %H:%M:%S', errors='coerce')
return df
def feature_engineer(df: pd.DataFrame) -> pd.DataFrame:
df = df.copy()
df = parse_datetime_cols(df)
# Duration in minutes between outage and last restore
if 'OutageDateTime_dt' in df.columns and 'LastRestoDateTime_dt' in df.columns:
df['duration_min'] = (df['LastRestoDateTime_dt'] - df['OutageDateTime_dt']).dt.total_seconds() / 60.0
else:
df['duration_min'] = np.nan
# Load numeric
for col in ['Load(MW)','Capacity(kVA)','FirstStepDuration','LastStepDuration','AffectedCustomer']:
if col in df.columns:
df[col+'_num'] = pd.to_numeric(df[col], errors='coerce')
else:
df[col+'_num'] = np.nan
# time of day
if 'OutageDateTime_dt' in df.columns:
df['hour'] = df['OutageDateTime_dt'].dt.hour
else:
df['hour'] = np.nan
# device type one-hot small encoding: frequency
if 'OpDeviceType' in df.columns:
freq = df['OpDeviceType'].fillna('NA').value_counts()
df['device_freq'] = df['OpDeviceType'].map(lambda x: freq.get(x,0))
else:
df['device_freq'] = 0
# coordinates
if 'OpDeviceXYcoord' in df.columns:
def parse_xy(s):
try:
s = str(s).strip().strip('"')
x,y = s.split(',')
return float(x), float(y)
except Exception:
return (np.nan, np.nan)
xs, ys = zip(*df['OpDeviceXYcoord'].map(parse_xy))
df['x'] = xs
df['y'] = ys
else:
df['x'] = np.nan
df['y'] = np.nan
return df
def build_feature_matrix(df: pd.DataFrame) -> Tuple[np.ndarray, list]:
df_fe = feature_engineer(df)
features = ['duration_min','Load(MW)_num','AffectedCustomer_num','hour','device_freq','x','y']
X = df_fe[features].copy()
# Fill na with median
X = X.fillna(X.median())
scaler = StandardScaler()
Xs = scaler.fit_transform(X)
return Xs, features, df_fe, scaler
def run_isolation_forest(X: np.ndarray, contamination: float = 0.05, random_state: int = 42):
iso = IsolationForest(contamination=contamination, random_state=random_state)
preds = iso.fit_predict(X)
# IsolationForest returns -1 for outliers
scores = iso.decision_function(X)
return preds, scores
def run_lof(X: np.ndarray, contamination: float = 0.05, n_neighbors: int = 20):
lof = LocalOutlierFactor(n_neighbors=n_neighbors, contamination=contamination)
preds = lof.fit_predict(X)
# negative_outlier_factor_ (the lower, more abnormal)
scores = lof.negative_outlier_factor_
return preds, scores
def run_autoencoder(X: np.ndarray, contamination: float = 0.05, latent_dim: int = 4, epochs: int = 50, batch_size: int = 32):
if not TF_AVAILABLE:
raise ImportError("TensorFlow not available. Install tensorflow to use autoencoder.")
input_dim = X.shape[1]
# Build autoencoder
encoder = models.Sequential([
layers.Input(shape=(input_dim,)),
layers.Dense(16, activation='relu'),
layers.Dense(latent_dim, activation='relu')
])
decoder = models.Sequential([
layers.Input(shape=(latent_dim,)),
layers.Dense(16, activation='relu'),
layers.Dense(input_dim, activation='linear')
])
autoencoder = models.Sequential([encoder, decoder])
autoencoder.compile(optimizer='adam', loss='mse')
# Train
autoencoder.fit(X, X, epochs=epochs, batch_size=batch_size, verbose=0, validation_split=0.1)
# Reconstruction error
reconstructed = autoencoder.predict(X, verbose=0)
mse = np.mean((X - reconstructed)**2, axis=1)
# Threshold based on contamination
threshold = np.percentile(mse, (1 - contamination) * 100)
preds = (mse > threshold).astype(int) * -1 # -1 for outliers
preds[preds == 0] = 1 # 1 for inliers
return preds, mse
def explain_anomalies(df_fe: pd.DataFrame, explain_features=None):
# explain_features: which numeric columns to compute z-score on
if explain_features is None:
explain_features = ['duration_min','Load(MW)_num','AffectedCustomer_num','hour','device_freq']
df_num = df_fe[explain_features].astype(float).fillna(df_fe[explain_features].median())
means = df_num.mean()
stds = df_num.std().replace(0, 1.0)
z = (df_num - means) / stds
# create explanation string for each row: top 3 absolute z-scores
explanations = []
for i, row in z.iterrows():
abs_row = row.abs()
top = abs_row.sort_values(ascending=False).head(3)
parts = []
for feat in top.index:
val = row[feat]
sign = 'สูง' if val > 0 else 'ต่ำ' if val < 0 else 'ปกติ'
parts.append(f"{feat} {sign} (z={val:.2f})")
explanations.append('; '.join(parts))
return z, explanations
def detect_anomalies(df: pd.DataFrame, contamination: float = 0.05, algorithm: str = 'both') -> pd.DataFrame:
Xs, features, df_fe, scaler = build_feature_matrix(df)
if algorithm == 'autoencoder':
preds, scores = run_autoencoder(Xs, contamination=contamination)
res = df.copy().reset_index(drop=True)
res['auto_pred'] = preds
res['auto_score'] = scores
res['final_flag'] = res['auto_pred'] == -1
else:
preds_iso, scores_iso = run_isolation_forest(Xs, contamination=contamination)
preds_lof, scores_lof = run_lof(Xs, contamination=contamination)
res = df.copy().reset_index(drop=True)
res['iso_pred'] = preds_iso
res['iso_score'] = scores_iso
res['lof_pred'] = preds_lof
res['lof_score'] = scores_lof
# ensemble: flag if both mark as outlier (-1)
res['ensemble_flag'] = ((res['iso_pred'] == -1) & (res['lof_pred'] == -1))
# algorithm filter: if algorithm == 'iso' or 'lof' or 'both', compute final_flag
if algorithm == 'iso':
res['final_flag'] = res['iso_pred'] == -1
elif algorithm == 'lof':
res['final_flag'] = res['lof_pred'] == -1
else:
res['final_flag'] = res['ensemble_flag']
# explainability (same for all)
z_df, explanations = explain_anomalies(df_fe)
# attach z-scores for explain features
for col in z_df.columns:
res[f'z_{col}'] = z_df[col].values
res['explanation'] = explanations
return res