AI-OMS-Analyze / scripts /classify.py
kawaiipeace's picture
Update Function
cc2e1db
raw
history blame
7.77 kB
import sys
import os
from pathlib import Path
import pandas as pd
import numpy as np
from typing import Optional
# sklearn imports
from sklearn.model_selection import train_test_split, StratifiedKFold, GridSearchCV
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.neural_network import MLPClassifier
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OneHotEncoder, StandardScaler, LabelEncoder
from sklearn.impute import SimpleImputer
from sklearn.metrics import classification_report, confusion_matrix
import joblib
# Optional HF weak-labeling
HF_TOKEN = os.environ.get('HF_TOKEN')
# optional boosters
try:
import xgboost as xgb
_has_xgb = True
except Exception:
_has_xgb = False
def parse_and_features(df: pd.DataFrame) -> pd.DataFrame:
df = df.copy()
# parse datetimes
for c in ['OutageDateTime','FirstRestoDateTime','LastRestoDateTime']:
if c in df.columns:
df[c+'_dt'] = pd.to_datetime(df[c], format='%d-%m-%Y %H:%M:%S', errors='coerce')
# duration
if 'OutageDateTime_dt' in df.columns and 'LastRestoDateTime_dt' in df.columns:
df['duration_min'] = (df['LastRestoDateTime_dt'] - df['OutageDateTime_dt']).dt.total_seconds() / 60.0
else:
df['duration_min'] = np.nan
# numeric columns
for col in ['Load(MW)','Capacity(kVA)','FirstStepDuration','LastStepDuration','AffectedCustomer']:
if col in df.columns:
df[col+'_num'] = pd.to_numeric(df[col], errors='coerce')
else:
df[col+'_num'] = np.nan
# time features
if 'OutageDateTime_dt' in df.columns:
df['hour'] = df['OutageDateTime_dt'].dt.hour
df['weekday'] = df['OutageDateTime_dt'].dt.weekday
else:
df['hour'] = np.nan
df['weekday'] = np.nan
# device frequency
if 'OpDeviceType' in df.columns:
freq = df['OpDeviceType'].fillna('NA').value_counts()
df['device_freq'] = df['OpDeviceType'].map(lambda x: freq.get(x,0))
else:
df['device_freq'] = 0
# small cleanup for categorical
for c in ['OpDeviceType','Owner','Weather','EventType']:
if c in df.columns:
df[c] = df[c].fillna('NA')
else:
df[c] = 'NA'
return df
def weak_label_with_hf(text: str) -> Optional[str]:
# Use HF router via OpenAI-compatible client to map free-text to a label suggestions
if not HF_TOKEN or not isinstance(text, str) or not text.strip():
return None
try:
from openai import OpenAI
client = OpenAI(base_url='https://router.huggingface.co/v1', api_key=HF_TOKEN)
prompt = f"ให้จัดหมวดสาเหตุของเหตุการณ์ไฟฟ้า ในคำสั้นๆ (ไทย) จากข้อความนี้:\n\n{text}\n\nตอบเป็นคำเดียวหรือวลีสั้นๆ เช่น 'สายขาด' หรือ 'บำรุงรักษา'"
completion = client.chat.completions.create(
model='meta-llama/Llama-4-Scout-17B-16E-Instruct:novita',
messages=[{"role":"user","content":[{"type":"text","text":prompt}]}],
max_tokens=40,
)
choice = completion.choices[0]
msg = getattr(choice, 'message', None) or (choice.get('message') if isinstance(choice, dict) else None)
content = None
if msg:
content = msg.get('content') if isinstance(msg, dict) else getattr(msg, 'content', None)
if isinstance(content, list) and content:
# find text
for it in content:
if isinstance(it, dict) and it.get('type') in ('output_text','text'):
return it.get('text').strip()
return str(content[0]).strip()
# fallback
text_out = choice.get('text') if isinstance(choice, dict) else None
return text_out.strip() if text_out else None
except Exception:
return None
def train_classifier(df: pd.DataFrame, label_col: str = 'CauseType', test_size: float = 0.2, random_state: int = 42, min_count_to_keep: int = 2, model_type: str = 'rf', hyperparams: dict = {}):
df = parse_and_features(df)
# optionally weak-label rows missing label
if label_col not in df.columns:
df[label_col] = None
if df[label_col].isna().sum() > 0 and HF_TOKEN:
# attempt weak labeling for missing entries using Detail or FaultDetail
for idx, row in df[df[label_col].isna()].iterrows():
text = None
for f in ['Detail','FaultDetail','SiteDetail']:
if f in df.columns and pd.notna(row.get(f)):
text = row.get(f)
break
if text:
try:
lbl = weak_label_with_hf(text)
if lbl:
df.at[idx, label_col] = lbl
except Exception:
pass
# filter rare classes and drop na
if df[label_col].notna().any():
vc = df[label_col].value_counts()
rare = vc[vc < min_count_to_keep].index
if len(rare) > 0:
df[label_col] = df[label_col].apply(lambda x: 'Other' if x in rare else x)
df = df.dropna(subset=[label_col])
# features
feature_cols = ['duration_min','Load(MW)_num','Capacity(kVA)_num','AffectedCustomer_num','hour','weekday','device_freq','OpDeviceType','Owner','Weather','EventType']
X = df[feature_cols]
y = df[label_col].astype(str)
le = LabelEncoder()
y_encoded = le.fit_transform(y)
# split
X_train, X_test, y_train, y_test = train_test_split(X, y_encoded, test_size=test_size, random_state=random_state, stratify=y_encoded)
# model
if model_type == 'rf':
clf = RandomForestClassifier(random_state=random_state, **hyperparams)
elif model_type == 'gb':
clf = GradientBoostingClassifier(random_state=random_state, **hyperparams)
elif model_type == 'mlp':
clf = MLPClassifier(random_state=random_state, **hyperparams)
else:
raise ValueError(f"Unknown model_type: {model_type}")
# preprocessor
preprocessor = ColumnTransformer(
transformers=[
('num', Pipeline([('imputer', SimpleImputer(strategy='median')), ('scaler', StandardScaler())]), ['duration_min','Load(MW)_num','Capacity(kVA)_num','AffectedCustomer_num','hour','weekday','device_freq']),
('cat', Pipeline([('imputer', SimpleImputer(strategy='most_frequent')), ('encoder', OneHotEncoder(handle_unknown='ignore'))]), ['OpDeviceType','Owner','Weather','EventType'])
]
)
pipeline = Pipeline([('preprocessor', preprocessor), ('classifier', clf)])
pipeline.fit(X_train, y_train)
y_pred = pipeline.predict(X_test)
y_test_inv = le.inverse_transform(y_test)
y_pred_inv = le.inverse_transform(y_pred)
report = classification_report(y_test_inv, y_pred_inv, zero_division=0)
# save model
model_file = Path('outputs') / f'classifier_{model_type}_{label_col}.joblib'
model_file.parent.mkdir(exist_ok=True)
joblib.dump({'pipeline': pipeline, 'label_encoder': le}, model_file)
# predictions on train set for download
y_pred_train = pipeline.predict(X)
pred_df = df.copy()
pred_df[f'Predicted_{label_col}'] = le.inverse_transform(y_pred_train)
preds_file = Path('outputs') / f'predictions_{model_type}_{label_col}.csv'
pred_df.to_csv(preds_file, index=False)
return {
'report': report,
'model_file': str(model_file),
'predictions_file': str(preds_file)
}