Spaces:
Sleeping
Sleeping
| """Compute reliability indices (SAIFI, SAIDI, CAIDI, MAIFI) from outage event CSV. | |
| Usage (programmatic): | |
| from scripts.compute_reliability import compute_reliability | |
| summary = compute_reliability('data/data_1.csv', total_customers=500000) | |
| Usage (CLI): | |
| python scripts/compute_reliability.py --input data/data_1.csv --total-customers 500000 | |
| Assumptions & mapping (from inspected CSV): | |
| - Outage start: `OutageDateTime` | |
| - Outage end: prefer `CloseEventDateTime`, else `LastRestoDateTime`, else `FirstRestoDateTime` | |
| - Customers affected: prefer `AffectedCustomer` column; else sum `AffectedCustomer1..5`; else `AllStepCusXTime` or `AllStepCusXTime1..5` fallback. | |
| - Planned outages: rows with `EventType` containing 'แผน' (e.g., 'แผนดับไฟ') are considered planned and can be excluded by default. | |
| - Date format is day-first like '10-01-2025 10:28:00'. | |
| Outputs saved to `outputs/reliability_summary.csv` and breakdown CSVs. | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| from typing import Optional, Dict | |
| import pandas as pd | |
| import numpy as np | |
| from pathlib import Path | |
| DATE_COLS = ['OutageDateTime', 'FirstRestoDateTime', 'LastRestoDateTime', 'CreateEventDateTime', 'CloseEventDateTime'] | |
| def parse_dates(df: pd.DataFrame) -> pd.DataFrame: | |
| for c in DATE_COLS: | |
| if c in df.columns: | |
| # many dates are in format dd-mm-YYYY HH:MM:SS | |
| df[c] = pd.to_datetime(df[c], dayfirst=True, errors='coerce') | |
| return df | |
| def coalesce_end_time(row: pd.Series) -> pd.Timestamp | None: | |
| for c in ('CloseEventDateTime', 'LastRestoDateTime', 'FirstRestoDateTime', 'CreateEventDateTime'): | |
| if c in row and pd.notna(row[c]): | |
| return row[c] | |
| return pd.NaT | |
| def estimate_customers(row: pd.Series) -> float: | |
| # Prefer AffectedCustomer if present and numeric | |
| def to_num(x): | |
| try: | |
| if pd.isna(x) or x == '': | |
| return np.nan | |
| return float(x) | |
| except Exception: | |
| return np.nan | |
| cols = row.index | |
| # Try AffectedCustomer | |
| if 'AffectedCustomer' in cols: | |
| v = to_num(row['AffectedCustomer']) | |
| if not np.isnan(v): | |
| return v | |
| # Sum AffectedCustomer1..5 | |
| acs = [] | |
| for i in range(1, 6): | |
| k = f'AffectedCustomer{i}' | |
| if k in cols: | |
| acs.append(to_num(row[k])) | |
| acs = [x for x in acs if not np.isnan(x)] | |
| if acs: | |
| return float(sum(acs)) | |
| # Try AllStepCusXTime or AllStepCusXTime1..5 | |
| if 'AllStepCusXTime' in cols: | |
| v = to_num(row['AllStepCusXTime']) | |
| if not np.isnan(v): | |
| return v | |
| asts = [] | |
| for i in range(1, 6): | |
| k = f'AllStepCusXTime{i}' | |
| if k in cols: | |
| asts.append(to_num(row[k])) | |
| asts = [x for x in asts if not np.isnan(x)] | |
| if asts: | |
| return float(sum(asts)) | |
| # As last resort, try numeric columns near end: Capacity(kVA) or Load(MW) are not customer counts | |
| return np.nan | |
| def flag_planned(event_type: Optional[str]) -> bool: | |
| if pd.isna(event_type): | |
| return False | |
| s = str(event_type) | |
| # In this dataset planned outages use Thai word 'แผน' | |
| if 'แผน' in s: | |
| return True | |
| # else treat as unplanned | |
| return False | |
| def compute_reliability( | |
| input_csv: str | Path, | |
| total_customers: Optional[float] = None, | |
| customers_map: Optional[Dict[str, float]] = None, | |
| exclude_planned: bool = True, | |
| momentary_threshold_min: float = 1.0, | |
| groupby_cols: list[str] | None = None, | |
| out_dir: str | Path | None = 'outputs', | |
| ) -> Dict[str, pd.DataFrame]: | |
| """Reads CSV and computes reliability indices. | |
| Returns a dict of DataFrames: overall, by_group. | |
| """ | |
| input_csv = Path(input_csv) | |
| out_dir = Path(out_dir) | |
| out_dir.mkdir(parents=True, exist_ok=True) | |
| df = pd.read_csv(input_csv, dtype=str) | |
| # parse dates | |
| df = parse_dates(df) | |
| # coalesce end time | |
| df['OutageStart'] = df.get('OutageDateTime') | |
| df['OutageEnd'] = df.apply(coalesce_end_time, axis=1) | |
| # compute duration in minutes | |
| df['DurationMin'] = (pd.to_datetime(df['OutageEnd']) - pd.to_datetime(df['OutageStart'])).dt.total_seconds() / 60.0 | |
| # customers affected | |
| df['CustomersAffected'] = df.apply(estimate_customers, axis=1) | |
| # flag planned | |
| df['IsPlanned'] = df['EventType'].apply(flag_planned) if 'EventType' in df.columns else False | |
| if exclude_planned: | |
| df_work = df[~df['IsPlanned']].copy() | |
| else: | |
| df_work = df.copy() | |
| # Fill missing durations (negative or NaN) with 0 | |
| df_work['DurationMin'] = df_work['DurationMin'].fillna(0) | |
| df_work.loc[df_work['DurationMin'] < 0, 'DurationMin'] = 0 | |
| # ensure numeric customers | |
| df_work['CustomersAffected'] = pd.to_numeric(df_work['CustomersAffected'], errors='coerce').fillna(0) | |
| # Choose grouping | |
| if groupby_cols is None: | |
| groupby_cols = [] | |
| # helper to compute indices given total customers | |
| def compute_from_df(dfall: pd.DataFrame, cust_total: float) -> Dict[str, float]: | |
| total_interruptions = dfall['CustomersAffected'].sum() | |
| total_customer_minutes = (dfall['CustomersAffected'] * dfall['DurationMin']).sum() | |
| # momentary interruptions: durations less than threshold | |
| momentary_interruptions = dfall.loc[dfall['DurationMin'] < momentary_threshold_min, 'CustomersAffected'].sum() | |
| saifi = total_interruptions / cust_total if cust_total and cust_total > 0 else np.nan | |
| saidi = total_customer_minutes / cust_total if cust_total and cust_total > 0 else np.nan | |
| caidi = (saidi / saifi) if (saifi and saifi > 0) else np.nan | |
| maifi = momentary_interruptions / cust_total if cust_total and cust_total > 0 else np.nan | |
| return { | |
| 'TotalInterruptions': total_interruptions, | |
| 'TotalCustomerMinutes': total_customer_minutes, | |
| 'MomentaryInterruptions': momentary_interruptions, | |
| 'SAIFI': saifi, | |
| 'SAIDI': saidi, | |
| 'CAIDI': caidi, | |
| 'MAIFI': maifi, | |
| } | |
| results = {} | |
| if customers_map is not None: | |
| # customers_map expects keys matching grouping (e.g., Feeder or AffectedAreaID). We'll compute per key | |
| # Overall must supply a 'TOTAL' or we sum map values | |
| total_customers_map_sum = sum(customers_map.values()) | |
| overall = compute_from_df(df_work, total_customers_map_sum if total_customers is None else total_customers) | |
| results['overall'] = pd.DataFrame([overall]) | |
| # per-group | |
| if groupby_cols: | |
| group = df_work.groupby(groupby_cols).agg({'CustomersAffected': 'sum', 'DurationMin': 'mean'}) | |
| else: | |
| # if no group col provided, try Feeder then AffectedAreaID | |
| if 'Feeder' in df_work.columns: | |
| groupby_cols = ['Feeder'] | |
| elif 'AffectedAreaID' in df_work.columns: | |
| groupby_cols = ['AffectedAreaID'] | |
| else: | |
| groupby_cols = [] | |
| if groupby_cols: | |
| rows = [] | |
| for key, sub in df_work.groupby(groupby_cols): | |
| # key can be tuple | |
| keyname = key if isinstance(key, str) else '_'.join(map(str, key)) | |
| cust = customers_map.get(keyname, np.nan) | |
| metrics = compute_from_df(sub, cust if not np.isnan(cust) else np.nan) | |
| metrics.update({'Group': keyname}) | |
| rows.append(metrics) | |
| results['by_group'] = pd.DataFrame(rows) | |
| else: | |
| results['by_group'] = pd.DataFrame() | |
| else: | |
| # customers_map not provided: require total_customers | |
| if total_customers is None: | |
| raise ValueError('Either total_customers or customers_map must be provided to compute per-customer indices') | |
| overall = compute_from_df(df_work, float(total_customers)) | |
| results['overall'] = pd.DataFrame([overall]) | |
| # per-group breakdowns (SAIFI-like per 1000 customers will use proportion of total customers by share) | |
| if groupby_cols: | |
| rows = [] | |
| # If we don't have customers per group, we will compute interruption counts and durations but can't compute per-customer normalized indices without providing customers_map. | |
| for key, sub in df_work.groupby(groupby_cols): | |
| keyname = key if isinstance(key, str) else '_'.join(map(str, key)) | |
| rows.append({ | |
| 'Group': keyname, | |
| 'TotalInterruptions': sub['CustomersAffected'].sum(), | |
| 'TotalCustomerMinutes': (sub['CustomersAffected'] * sub['DurationMin']).sum(), | |
| 'Events': len(sub), | |
| }) | |
| results['by_group'] = pd.DataFrame(rows) | |
| else: | |
| results['by_group'] = pd.DataFrame() | |
| # Save CSVs | |
| results['raw'] = df_work | |
| results['raw'].to_csv(out_dir / 'events_cleaned.csv', index=False) | |
| results['overall'].to_csv(out_dir / 'reliability_overall.csv', index=False) | |
| if 'by_group' in results: | |
| results['by_group'].to_csv(out_dir / 'reliability_by_group.csv', index=False) | |
| return results | |
| if __name__ == '__main__': | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument('--input', '-i', required=True, help='Input CSV file') | |
| parser.add_argument('--total-customers', type=float, help='Total customers served in the system (required if no customers map)') | |
| parser.add_argument('--exclude-planned', action='store_true', help='Exclude planned outages (default True)') | |
| parser.add_argument('--momentary-threshold-min', type=float, default=1.0, help='Threshold in minutes for momentary interruption') | |
| parser.add_argument('--groupby', nargs='*', default=['Feeder'], help='Columns to group by for breakdown (default: Feeder)') | |
| args = parser.parse_args() | |
| res = compute_reliability(args.input, total_customers=args.total_customers, exclude_planned=args.exclude_planned, momentary_threshold_min=args.momentary_threshold_min, groupby_cols=args.groupby) | |
| print('Wrote outputs to outputs/ (events_cleaned.csv, reliability_overall.csv, reliability_by_group.csv)') | |