Spaces:
Sleeping
Sleeping
File size: 7,020 Bytes
d4d1ca8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 |
import pandas as pd
import numpy as np
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
from sklearn.preprocessing import StandardScaler
from typing import Tuple
try:
import tensorflow as tf
from tensorflow.keras import layers, models
TF_AVAILABLE = True
except ImportError:
TF_AVAILABLE = False
def parse_datetime_cols(df: pd.DataFrame) -> pd.DataFrame:
for c in ['OutageDateTime','FirstRestoDateTime','LastRestoDateTime']:
if c in df.columns:
df[c+'_dt'] = pd.to_datetime(df[c], format='%d-%m-%Y %H:%M:%S', errors='coerce')
return df
def feature_engineer(df: pd.DataFrame) -> pd.DataFrame:
df = df.copy()
df = parse_datetime_cols(df)
# Duration in minutes between outage and last restore
if 'OutageDateTime_dt' in df.columns and 'LastRestoDateTime_dt' in df.columns:
df['duration_min'] = (df['LastRestoDateTime_dt'] - df['OutageDateTime_dt']).dt.total_seconds() / 60.0
else:
df['duration_min'] = np.nan
# Load numeric
for col in ['Load(MW)','Capacity(kVA)','FirstStepDuration','LastStepDuration','AffectedCustomer']:
if col in df.columns:
df[col+'_num'] = pd.to_numeric(df[col], errors='coerce')
else:
df[col+'_num'] = np.nan
# time of day
if 'OutageDateTime_dt' in df.columns:
df['hour'] = df['OutageDateTime_dt'].dt.hour
else:
df['hour'] = np.nan
# device type one-hot small encoding: frequency
if 'OpDeviceType' in df.columns:
freq = df['OpDeviceType'].fillna('NA').value_counts()
df['device_freq'] = df['OpDeviceType'].map(lambda x: freq.get(x,0))
else:
df['device_freq'] = 0
# coordinates
if 'OpDeviceXYcoord' in df.columns:
def parse_xy(s):
try:
s = str(s).strip().strip('"')
x,y = s.split(',')
return float(x), float(y)
except Exception:
return (np.nan, np.nan)
xs, ys = zip(*df['OpDeviceXYcoord'].map(parse_xy))
df['x'] = xs
df['y'] = ys
else:
df['x'] = np.nan
df['y'] = np.nan
return df
def build_feature_matrix(df: pd.DataFrame) -> Tuple[np.ndarray, list]:
df_fe = feature_engineer(df)
features = ['duration_min','Load(MW)_num','AffectedCustomer_num','hour','device_freq','x','y']
X = df_fe[features].copy()
# Fill na with median
X = X.fillna(X.median())
scaler = StandardScaler()
Xs = scaler.fit_transform(X)
return Xs, features, df_fe, scaler
def run_isolation_forest(X: np.ndarray, contamination: float = 0.05, random_state: int = 42):
iso = IsolationForest(contamination=contamination, random_state=random_state)
preds = iso.fit_predict(X)
# IsolationForest returns -1 for outliers
scores = iso.decision_function(X)
return preds, scores
def run_lof(X: np.ndarray, contamination: float = 0.05, n_neighbors: int = 20):
lof = LocalOutlierFactor(n_neighbors=n_neighbors, contamination=contamination)
preds = lof.fit_predict(X)
# negative_outlier_factor_ (the lower, more abnormal)
scores = lof.negative_outlier_factor_
return preds, scores
def run_autoencoder(X: np.ndarray, contamination: float = 0.05, latent_dim: int = 4, epochs: int = 50, batch_size: int = 32):
if not TF_AVAILABLE:
raise ImportError("TensorFlow not available. Install tensorflow to use autoencoder.")
input_dim = X.shape[1]
# Build autoencoder
encoder = models.Sequential([
layers.Input(shape=(input_dim,)),
layers.Dense(16, activation='relu'),
layers.Dense(latent_dim, activation='relu')
])
decoder = models.Sequential([
layers.Input(shape=(latent_dim,)),
layers.Dense(16, activation='relu'),
layers.Dense(input_dim, activation='linear')
])
autoencoder = models.Sequential([encoder, decoder])
autoencoder.compile(optimizer='adam', loss='mse')
# Train
autoencoder.fit(X, X, epochs=epochs, batch_size=batch_size, verbose=0, validation_split=0.1)
# Reconstruction error
reconstructed = autoencoder.predict(X, verbose=0)
mse = np.mean((X - reconstructed)**2, axis=1)
# Threshold based on contamination
threshold = np.percentile(mse, (1 - contamination) * 100)
preds = (mse > threshold).astype(int) * -1 # -1 for outliers
preds[preds == 0] = 1 # 1 for inliers
return preds, mse
def explain_anomalies(df_fe: pd.DataFrame, explain_features=None):
# explain_features: which numeric columns to compute z-score on
if explain_features is None:
explain_features = ['duration_min','Load(MW)_num','AffectedCustomer_num','hour','device_freq']
df_num = df_fe[explain_features].astype(float).fillna(df_fe[explain_features].median())
means = df_num.mean()
stds = df_num.std().replace(0, 1.0)
z = (df_num - means) / stds
# create explanation string for each row: top 3 absolute z-scores
explanations = []
for i, row in z.iterrows():
abs_row = row.abs()
top = abs_row.sort_values(ascending=False).head(3)
parts = []
for feat in top.index:
val = row[feat]
sign = 'สูง' if val > 0 else 'ต่ำ' if val < 0 else 'ปกติ'
parts.append(f"{feat} {sign} (z={val:.2f})")
explanations.append('; '.join(parts))
return z, explanations
def detect_anomalies(df: pd.DataFrame, contamination: float = 0.05, algorithm: str = 'both') -> pd.DataFrame:
Xs, features, df_fe, scaler = build_feature_matrix(df)
if algorithm == 'autoencoder':
preds, scores = run_autoencoder(Xs, contamination=contamination)
res = df.copy().reset_index(drop=True)
res['auto_pred'] = preds
res['auto_score'] = scores
res['final_flag'] = res['auto_pred'] == -1
else:
preds_iso, scores_iso = run_isolation_forest(Xs, contamination=contamination)
preds_lof, scores_lof = run_lof(Xs, contamination=contamination)
res = df.copy().reset_index(drop=True)
res['iso_pred'] = preds_iso
res['iso_score'] = scores_iso
res['lof_pred'] = preds_lof
res['lof_score'] = scores_lof
# ensemble: flag if both mark as outlier (-1)
res['ensemble_flag'] = ((res['iso_pred'] == -1) & (res['lof_pred'] == -1))
# algorithm filter: if algorithm == 'iso' or 'lof' or 'both', compute final_flag
if algorithm == 'iso':
res['final_flag'] = res['iso_pred'] == -1
elif algorithm == 'lof':
res['final_flag'] = res['lof_pred'] == -1
else:
res['final_flag'] = res['ensemble_flag']
# explainability (same for all)
z_df, explanations = explain_anomalies(df_fe)
# attach z-scores for explain features
for col in z_df.columns:
res[f'z_{col}'] = z_df[col].values
res['explanation'] = explanations
return res
|