Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| from pathlib import Path | |
| def suggest_labels(df: pd.DataFrame, top_k: int = 1): | |
| """ | |
| Given an event dataframe, suggest CauseType for rows with KnowUnknowCause == 'ไม่พบสาเหตุ' or missing. | |
| Returns a DataFrame with suggestions and scores. | |
| """ | |
| df = df.copy() | |
| # ensure columns exist | |
| needed = ['KnowUnknowCause','CauseType','SubCauseType','OpDeviceType','FaultDeviceType','FaultDeviceCondition','Weather','Capacity(kVA)','Load(MW)','OpDeviceSysType','OpDevicePhase'] | |
| for c in needed: | |
| if c not in df.columns: | |
| df[c] = None | |
| known = df[df['KnowUnknowCause'].notna() & (df['KnowUnknowCause'].str.strip() != 'ไม่พบสาเหตุ')] | |
| unknown = df[df['KnowUnknowCause'].notna() & (df['KnowUnknowCause'].str.strip() == 'ไม่พบสาเหตุ')] | |
| unknown = pd.concat([unknown, df[df['KnowUnknowCause'].isna()]], ignore_index=False) | |
| if known.empty or unknown.empty: | |
| return pd.DataFrame([]) | |
| for num in ['Capacity(kVA)','Load(MW)']: | |
| known[num] = pd.to_numeric(known.get(num), errors='coerce') | |
| unknown[num] = pd.to_numeric(unknown.get(num), errors='coerce') | |
| cat_features = ['OpDeviceType','FaultDeviceType','FaultDeviceCondition','Weather','OpDeviceSysType','OpDevicePhase'] | |
| num_features = ['Capacity(kVA)','Load(MW)'] | |
| cause_groups = {cause: g for cause, g in known.groupby('CauseType')} | |
| # compute most common SubCauseType per cause group | |
| cause_submode = {} | |
| for cause, g in cause_groups.items(): | |
| try: | |
| # pick first mode that's non-null | |
| modes = g['SubCauseType'].dropna() | |
| if not modes.empty: | |
| cause_submode[cause] = modes.mode().iloc[0] | |
| else: | |
| cause_submode[cause] = None | |
| except Exception: | |
| cause_submode[cause] = None | |
| suggestions = [] | |
| for idx, row in unknown.iterrows(): | |
| scores = {} | |
| for cause, g in cause_groups.items(): | |
| cat_score = 0.0 | |
| matches = 0 | |
| for f in cat_features: | |
| val = row.get(f) | |
| if pd.isna(val) or val is None: | |
| continue | |
| same = (g[f] == val).sum() | |
| frac = same / max(1, len(g)) | |
| cat_score += frac | |
| matches += 1 | |
| cat_score = cat_score / matches if matches>0 else 0.0 | |
| num_score = 0.0 | |
| ncount = 0 | |
| for nf in num_features: | |
| rv = row.get(nf) | |
| if pd.isna(rv) or rv is None: | |
| continue | |
| median = g[nf].median() | |
| if pd.isna(median): | |
| continue | |
| scale = abs(median) if abs(median) > 0 else 1.0 | |
| num_score += 1.0 / (1.0 + abs(rv - median) / scale) | |
| ncount += 1 | |
| num_score = num_score / ncount if ncount>0 else 0.0 | |
| combined = 0.7 * cat_score + 0.3 * num_score | |
| scores[cause] = combined | |
| sorted_scores = sorted(scores.items(), key=lambda x: x[1], reverse=True) | |
| top = sorted_scores[:int(top_k)] | |
| suggestion_text = '; '.join([f"{c}: {s:.3f}" for c,s in top]) | |
| suggested_cause = top[0][0] if top else None | |
| suggested_subcause = cause_submode.get(suggested_cause) if suggested_cause is not None else None | |
| suggestions.append({ | |
| 'Number': row.get('Number'), | |
| 'EventNumber': row.get('EventNumber'), | |
| 'OutageDateTime': row.get('OutageDateTime'), | |
| 'KnowUnknowCause': row.get('KnowUnknowCause'), | |
| 'Orig_CauseType': row.get('CauseType'), | |
| 'Orig_SubCauseType': row.get('SubCauseType'), | |
| 'SuggestedCause': suggested_cause, | |
| 'SuggestedSubCauseType': suggested_subcause, | |
| 'Scores': suggestion_text | |
| }) | |
| return pd.DataFrame(suggestions) | |
| def suggest_labels_to_file(df: pd.DataFrame, out_path: str = None, top_k: int = 1): | |
| out_df = suggest_labels(df, top_k=top_k) | |
| if out_path: | |
| p = Path(out_path) | |
| p.parent.mkdir(parents=True, exist_ok=True) | |
| out_df.to_csv(p, index=False, encoding='utf-8-sig') | |
| return out_df | |