AI-OMS-Analyze / scripts /compute_reliability.py
kawaiipeace's picture
Update Function
cc2e1db
raw
history blame
10.1 kB
"""Compute reliability indices (SAIFI, SAIDI, CAIDI, MAIFI) from outage event CSV.
Usage (programmatic):
from scripts.compute_reliability import compute_reliability
summary = compute_reliability('data/data_1.csv', total_customers=500000)
Usage (CLI):
python scripts/compute_reliability.py --input data/data_1.csv --total-customers 500000
Assumptions & mapping (from inspected CSV):
- Outage start: `OutageDateTime`
- Outage end: prefer `CloseEventDateTime`, else `LastRestoDateTime`, else `FirstRestoDateTime`
- Customers affected: prefer `AffectedCustomer` column; else sum `AffectedCustomer1..5`; else `AllStepCusXTime` or `AllStepCusXTime1..5` fallback.
- Planned outages: rows with `EventType` containing 'แผน' (e.g., 'แผนดับไฟ') are considered planned and can be excluded by default.
- Date format is day-first like '10-01-2025 10:28:00'.
Outputs saved to `outputs/reliability_summary.csv` and breakdown CSVs.
"""
from __future__ import annotations
import argparse
from typing import Optional, Dict
import pandas as pd
import numpy as np
from pathlib import Path
DATE_COLS = ['OutageDateTime', 'FirstRestoDateTime', 'LastRestoDateTime', 'CreateEventDateTime', 'CloseEventDateTime']
def parse_dates(df: pd.DataFrame) -> pd.DataFrame:
for c in DATE_COLS:
if c in df.columns:
# many dates are in format dd-mm-YYYY HH:MM:SS
df[c] = pd.to_datetime(df[c], dayfirst=True, errors='coerce')
return df
def coalesce_end_time(row: pd.Series) -> pd.Timestamp | None:
for c in ('CloseEventDateTime', 'LastRestoDateTime', 'FirstRestoDateTime', 'CreateEventDateTime'):
if c in row and pd.notna(row[c]):
return row[c]
return pd.NaT
def estimate_customers(row: pd.Series) -> float:
# Prefer AffectedCustomer if present and numeric
def to_num(x):
try:
if pd.isna(x) or x == '':
return np.nan
return float(x)
except Exception:
return np.nan
cols = row.index
# Try AffectedCustomer
if 'AffectedCustomer' in cols:
v = to_num(row['AffectedCustomer'])
if not np.isnan(v):
return v
# Sum AffectedCustomer1..5
acs = []
for i in range(1, 6):
k = f'AffectedCustomer{i}'
if k in cols:
acs.append(to_num(row[k]))
acs = [x for x in acs if not np.isnan(x)]
if acs:
return float(sum(acs))
# Try AllStepCusXTime or AllStepCusXTime1..5
if 'AllStepCusXTime' in cols:
v = to_num(row['AllStepCusXTime'])
if not np.isnan(v):
return v
asts = []
for i in range(1, 6):
k = f'AllStepCusXTime{i}'
if k in cols:
asts.append(to_num(row[k]))
asts = [x for x in asts if not np.isnan(x)]
if asts:
return float(sum(asts))
# As last resort, try numeric columns near end: Capacity(kVA) or Load(MW) are not customer counts
return np.nan
def flag_planned(event_type: Optional[str]) -> bool:
if pd.isna(event_type):
return False
s = str(event_type)
# In this dataset planned outages use Thai word 'แผน'
if 'แผน' in s:
return True
# else treat as unplanned
return False
def compute_reliability(
input_csv: str | Path,
total_customers: Optional[float] = None,
customers_map: Optional[Dict[str, float]] = None,
exclude_planned: bool = True,
momentary_threshold_min: float = 1.0,
groupby_cols: list[str] | None = None,
out_dir: str | Path | None = 'outputs',
) -> Dict[str, pd.DataFrame]:
"""Reads CSV and computes reliability indices.
Returns a dict of DataFrames: overall, by_group.
"""
input_csv = Path(input_csv)
out_dir = Path(out_dir)
out_dir.mkdir(parents=True, exist_ok=True)
df = pd.read_csv(input_csv, dtype=str)
# parse dates
df = parse_dates(df)
# coalesce end time
df['OutageStart'] = df.get('OutageDateTime')
df['OutageEnd'] = df.apply(coalesce_end_time, axis=1)
# compute duration in minutes
df['DurationMin'] = (pd.to_datetime(df['OutageEnd']) - pd.to_datetime(df['OutageStart'])).dt.total_seconds() / 60.0
# customers affected
df['CustomersAffected'] = df.apply(estimate_customers, axis=1)
# flag planned
df['IsPlanned'] = df['EventType'].apply(flag_planned) if 'EventType' in df.columns else False
if exclude_planned:
df_work = df[~df['IsPlanned']].copy()
else:
df_work = df.copy()
# Fill missing durations (negative or NaN) with 0
df_work['DurationMin'] = df_work['DurationMin'].fillna(0)
df_work.loc[df_work['DurationMin'] < 0, 'DurationMin'] = 0
# ensure numeric customers
df_work['CustomersAffected'] = pd.to_numeric(df_work['CustomersAffected'], errors='coerce').fillna(0)
# Choose grouping
if groupby_cols is None:
groupby_cols = []
# helper to compute indices given total customers
def compute_from_df(dfall: pd.DataFrame, cust_total: float) -> Dict[str, float]:
total_interruptions = dfall['CustomersAffected'].sum()
total_customer_minutes = (dfall['CustomersAffected'] * dfall['DurationMin']).sum()
# momentary interruptions: durations less than threshold
momentary_interruptions = dfall.loc[dfall['DurationMin'] < momentary_threshold_min, 'CustomersAffected'].sum()
saifi = total_interruptions / cust_total if cust_total and cust_total > 0 else np.nan
saidi = total_customer_minutes / cust_total if cust_total and cust_total > 0 else np.nan
caidi = (saidi / saifi) if (saifi and saifi > 0) else np.nan
maifi = momentary_interruptions / cust_total if cust_total and cust_total > 0 else np.nan
return {
'TotalInterruptions': total_interruptions,
'TotalCustomerMinutes': total_customer_minutes,
'MomentaryInterruptions': momentary_interruptions,
'SAIFI': saifi,
'SAIDI': saidi,
'CAIDI': caidi,
'MAIFI': maifi,
}
results = {}
if customers_map is not None:
# customers_map expects keys matching grouping (e.g., Feeder or AffectedAreaID). We'll compute per key
# Overall must supply a 'TOTAL' or we sum map values
total_customers_map_sum = sum(customers_map.values())
overall = compute_from_df(df_work, total_customers_map_sum if total_customers is None else total_customers)
results['overall'] = pd.DataFrame([overall])
# per-group
if groupby_cols:
group = df_work.groupby(groupby_cols).agg({'CustomersAffected': 'sum', 'DurationMin': 'mean'})
else:
# if no group col provided, try Feeder then AffectedAreaID
if 'Feeder' in df_work.columns:
groupby_cols = ['Feeder']
elif 'AffectedAreaID' in df_work.columns:
groupby_cols = ['AffectedAreaID']
else:
groupby_cols = []
if groupby_cols:
rows = []
for key, sub in df_work.groupby(groupby_cols):
# key can be tuple
keyname = key if isinstance(key, str) else '_'.join(map(str, key))
cust = customers_map.get(keyname, np.nan)
metrics = compute_from_df(sub, cust if not np.isnan(cust) else np.nan)
metrics.update({'Group': keyname})
rows.append(metrics)
results['by_group'] = pd.DataFrame(rows)
else:
results['by_group'] = pd.DataFrame()
else:
# customers_map not provided: require total_customers
if total_customers is None:
raise ValueError('Either total_customers or customers_map must be provided to compute per-customer indices')
overall = compute_from_df(df_work, float(total_customers))
results['overall'] = pd.DataFrame([overall])
# per-group breakdowns (SAIFI-like per 1000 customers will use proportion of total customers by share)
if groupby_cols:
rows = []
# If we don't have customers per group, we will compute interruption counts and durations but can't compute per-customer normalized indices without providing customers_map.
for key, sub in df_work.groupby(groupby_cols):
keyname = key if isinstance(key, str) else '_'.join(map(str, key))
rows.append({
'Group': keyname,
'TotalInterruptions': sub['CustomersAffected'].sum(),
'TotalCustomerMinutes': (sub['CustomersAffected'] * sub['DurationMin']).sum(),
'Events': len(sub),
})
results['by_group'] = pd.DataFrame(rows)
else:
results['by_group'] = pd.DataFrame()
# Save CSVs
results['raw'] = df_work
results['raw'].to_csv(out_dir / 'events_cleaned.csv', index=False)
results['overall'].to_csv(out_dir / 'reliability_overall.csv', index=False)
if 'by_group' in results:
results['by_group'].to_csv(out_dir / 'reliability_by_group.csv', index=False)
return results
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--input', '-i', required=True, help='Input CSV file')
parser.add_argument('--total-customers', type=float, help='Total customers served in the system (required if no customers map)')
parser.add_argument('--exclude-planned', action='store_true', help='Exclude planned outages (default True)')
parser.add_argument('--momentary-threshold-min', type=float, default=1.0, help='Threshold in minutes for momentary interruption')
parser.add_argument('--groupby', nargs='*', default=['Feeder'], help='Columns to group by for breakdown (default: Feeder)')
args = parser.parse_args()
res = compute_reliability(args.input, total_customers=args.total_customers, exclude_planned=args.exclude_planned, momentary_threshold_min=args.momentary_threshold_min, groupby_cols=args.groupby)
print('Wrote outputs to outputs/ (events_cleaned.csv, reliability_overall.csv, reliability_by_group.csv)')