supply-roster-optimization / src /config /optimization_config.py
haileyhalimj@gmail.com
Refactor optimization configuration and constants integration
fa2c20f
import pandas as pd
import src.preprocess.transform as transformed_data
import datetime
from datetime import timedelta
import src.preprocess.extract as extract
from src.config.constants import ShiftType, LineType, KitLevel, DefaultConfig
# Re-import all the packages
import importlib
# Reload modules to get latest changes - REMOVED to prevent infinite loops
# importlib.reload(extract)
# importlib.reload(transformed_data) # Uncomment if needed
def get_date_span():
"""Get date span from streamlit session state, or return default"""
try:
import streamlit as st
if hasattr(st, 'session_state'):
# Get from session state without printing (avoid spam)
if 'start_date' in st.session_state and 'planning_days' in st.session_state:
from datetime import datetime, timedelta
start_date = datetime.combine(st.session_state.start_date, datetime.min.time())
planning_days = st.session_state.planning_days
end_date = start_date + timedelta(days=planning_days - 1)
date_span = list(range(1, planning_days + 1))
return date_span, start_date, end_date
except:
pass
# Default values - no printing to avoid spam
from datetime import datetime
return list(range(1, 6)), datetime(2025, 7, 7), datetime(2025, 7, 11)
# Only call get_date_span() when explicitly needed - avoid module-level execution
# DATE_SPAN, start_date, end_date = get_date_span() # REMOVED - called dynamically instead
DATE_SPAN = None
start_date = None
end_date = None
def get_product_list():
"""Get filtered product list without printing spam"""
try:
from src.demand_filtering import DemandFilter
filter_instance = DemandFilter()
filter_instance.load_data(force_reload=True)
return filter_instance.get_filtered_product_list()
except:
# Fallback: get from session state start_date
date_span, start_date, end_date = get_date_span()
return transformed_data.get_released_product_list(start_date)
def get_employee_type_list():
"""Get employee type list from session state or default"""
try:
import streamlit as st
if hasattr(st, 'session_state') and 'selected_employee_types' in st.session_state:
return st.session_state.selected_employee_types
except:
pass
# Default: load from data files
employee_type_list = extract.read_employee_data()
return employee_type_list["employment_type"].unique().tolist()
def get_shift_list():
"""Get shift list from session state or default"""
try:
import streamlit as st
if hasattr(st, 'session_state') and 'selected_shifts' in st.session_state:
return st.session_state.selected_shifts
except:
pass
# Default: load from data files
shift_list = extract.get_shift_info()
return shift_list["id"].unique().tolist()
# Evening shift activation mode - define early to avoid circular dependency
# Options:
# "normal" - Only use regular shift (1) and overtime shift (3) - NO evening shift
# "activate_evening" - Allow evening shift (2) when demand is too high or cost-effective
# "always_available" - Evening shift always available as option
EVENING_SHIFT_MODE = "normal" # Default: only regular + overtime
# Evening shift activation threshold
# If demand cannot be met with regular + overtime, suggest evening shift activation
EVENING_SHIFT_DEMAND_THRESHOLD = 0.9 # Activate if regular+overtime capacity < 90% of demand
#Where?
def get_active_shift_list():
"""
Get the list of active shifts based on EVENING_SHIFT_MODE setting.
"""
all_shifts = get_shift_list()
if EVENING_SHIFT_MODE == "normal":
# Only regular and overtime shifts - NO evening shift
active_shifts = [s for s in all_shifts if s in ShiftType.REGULAR_AND_OVERTIME]
print(f"[SHIFT MODE] Normal mode: Using shifts {active_shifts} (Regular + Overtime only, NO evening)")
elif EVENING_SHIFT_MODE == "activate_evening":
# All shifts including evening (2)
active_shifts = list(all_shifts)
print(f"[SHIFT MODE] Evening activated: Using all shifts {active_shifts}")
elif EVENING_SHIFT_MODE == "always_available":
# All shifts always available
active_shifts = list(all_shifts)
print(f"[SHIFT MODE] Always available: Using all shifts {active_shifts}")
else:
# Default to normal mode
active_shifts = [s for s in all_shifts if s in ShiftType.REGULAR_AND_OVERTIME]
print(f"[SHIFT MODE] Unknown mode '{EVENING_SHIFT_MODE}', defaulting to normal: {active_shifts}")
return active_shifts
# DO NOT load at import time - always call get_active_shift_list() dynamically
# SHIFT_LIST = get_active_shift_list() # REMOVED - was causing stale data!
#where?
def get_line_list():
"""Get line list - try from streamlit session state first, then from data files"""
try:
# Try to get from streamlit session state (from Dataset Metadata page)
import streamlit as st
if hasattr(st, 'session_state') and 'selected_lines' in st.session_state:
print(f"Using lines from Dataset Metadata page: {st.session_state.selected_lines}")
return st.session_state.selected_lines
except Exception as e:
print(f"Could not get lines from streamlit session: {e}")
# Default: load from data files
print(f"Loading line list from data files")
line_df = extract.read_packaging_line_data()
line_list = line_df["id"].unique().tolist()
return line_list
# DO NOT load at import time - always call get_line_list() dynamically
# LINE_LIST = get_line_list() # REMOVED - was causing stale data!
#where?
def get_kit_line_match():
kit_line_match = extract.read_kit_line_match_data()
kit_line_match_dict = kit_line_match.set_index("kit_name")["line_type"].to_dict()
# Create line name to ID mapping
line_name_to_id = {
"long line": LineType.LONG_LINE,
"mini load": LineType.MINI_LOAD,
"miniload": LineType.MINI_LOAD, # Alternative naming (no space)
"Long_line": LineType.LONG_LINE, # Alternative naming
"Mini_load": LineType.MINI_LOAD, # Alternative naming
}
# Convert string line names to numeric IDs
converted_dict = {}
for kit, line_name in kit_line_match_dict.items():
if isinstance(line_name, str) and line_name.strip():
# Convert string names to numeric IDs
line_id = line_name_to_id.get(line_name.strip(), None)
if line_id is not None:
converted_dict[kit] = line_id
else:
print(f"Warning: Unknown line type '{line_name}' for kit {kit}")
# Default to long line if unknown
converted_dict[kit] = LineType.LONG_LINE
elif isinstance(line_name, (int, float)) and not pd.isna(line_name):
# Already numeric
converted_dict[kit] = int(line_name)
else:
# Missing or empty line type - skip (no production needed for non-standalone masters)
pass # Don't add to converted_dict - these kits won't have line assignments
return converted_dict
KIT_LINE_MATCH_DICT = get_kit_line_match()
def get_line_cnt_per_type():
try:
# Try to get from streamlit session state (from config page)
import streamlit as st
if hasattr(st, 'session_state') and 'line_counts' in st.session_state:
print(f"Using line counts from config page: {st.session_state.line_counts}")
return st.session_state.line_counts
except Exception as e:
print(f"Could not get line counts from streamlit session: {e}")
print(f"Loading default line count values from data files")
line_df = extract.read_packaging_line_data()
line_cnt_per_type = line_df.set_index("id")["line_count"].to_dict()
print("line cnt per type", line_cnt_per_type)
return line_cnt_per_type
# DO NOT load at import time - always call get_line_cnt_per_type() dynamically
# LINE_CNT_PER_TYPE = get_line_cnt_per_type() # REMOVED - was causing stale data!
#where?
def get_demand_dictionary(force_reload=False):
"""
Get filtered demand dictionary.
IMPORTANT: This dynamically loads data to reflect current Streamlit configs/dates.
"""
try:
# Always get fresh filtered demand to reflect current configs
from src.demand_filtering import DemandFilter
filter_instance = DemandFilter()
# Force reload data to pick up new dates/configs
filter_instance.load_data(force_reload=True)
demand_dictionary = filter_instance.get_filtered_demand_dictionary()
print(f"📈 FRESH FILTERED DEMAND: {len(demand_dictionary)} products with total demand {sum(demand_dictionary.values())}")
print(f"🔄 LOADED DYNAMICALLY: Reflects current Streamlit configs")
return demand_dictionary
except Exception as e:
print(f"Error loading dynamic demand dictionary: {e}")
raise Exception("Demand dictionary not found with error:"+str(e))
# DO NOT load at import time - always call get_demand_dictionary() dynamically
# DEMAND_DICTIONARY = get_demand_dictionary() # REMOVED - was causing stale data!
#delete as already using default cost rates
def get_cost_list_per_emp_shift():
try:
# Try to get from streamlit session state (from config page)
import streamlit as st
if hasattr(st, 'session_state') and 'cost_list_per_emp_shift' in st.session_state:
print(f"Using cost list from config page: {st.session_state.cost_list_per_emp_shift}")
return st.session_state.cost_list_per_emp_shift
except Exception as e:
print(f"Could not get cost list from streamlit session: {e}")
print(f"Loading default cost values")
# Default hourly rates - Important: multiple employment types with different costs
return DefaultConfig.DEFAULT_COST_RATES
def shift_code_to_name():
return ShiftType.get_all_names()
def line_code_to_name():
"""Convert line type IDs to readable names"""
return LineType.get_all_names()
# DO NOT load at import time - always call get_cost_list_per_emp_shift() dynamically
# COST_LIST_PER_EMP_SHIFT = get_cost_list_per_emp_shift() # REMOVED - was causing stale data!
# COST_LIST_PER_EMP_SHIFT = { # WH_Workforce_Hourly_Pay_Scale
# "Fixed": {1: 0, 2: 22, 3: 18},
# "Humanizer": {1: 10, 2: 10, 3: 10},
# }
#where to put?
def get_team_requirements(product_list=None):
"""
Extract team requirements from Kits Calculation CSV.
Returns dictionary with employee type as key and product requirements as nested dict.
"""
if product_list is None:
product_list = get_product_list() # Get fresh product list
kits_df = extract.read_personnel_requirement_data()
team_req_dict = {
"UNICEF Fixed term": {},
"Humanizer": {}
}
# Process each product in the product list
for product in product_list:
print("product",product)
print(f"Processing team requirements for product: {product}")
product_data = kits_df[kits_df['Kit'] == product]
print("product_data",product_data)
if not product_data.empty:
# Extract Humanizer and UNICEF staff requirements
humanizer_req = product_data["Humanizer"].iloc[0]
unicef_req = product_data["UNICEF staff"].iloc[0]
# Convert to int (data is already cleaned in extract function)
team_req_dict["Humanizer"][product] = int(humanizer_req)
team_req_dict["UNICEF Fixed term"][product] = int(unicef_req)
else:
print(f"Warning: Product {product} not found in Kits Calculation data, setting requirements to 0")
return team_req_dict
def get_max_employee_per_type_on_day():
try:
# Try to get from streamlit session state (from config page)
import streamlit as st
if hasattr(st, 'session_state') and 'max_employee_per_type_on_day' in st.session_state:
print(f"Using max employee counts from config page: {st.session_state.max_employee_per_type_on_day}")
return st.session_state.max_employee_per_type_on_day
except Exception as e:
print(f"Could not get max employee counts from streamlit session: {e}")
print(f"Loading default max employee values")
# Get date span dynamically if not available
if DATE_SPAN is None:
date_span, _, _ = get_date_span()
else:
date_span = DATE_SPAN
max_employee_per_type_on_day = {
"UNICEF Fixed term": {
t: 8 for t in date_span
},
"Humanizer": {
t: 10 for t in date_span
}
}
return max_employee_per_type_on_day
# Keep the constant for backward compatibility, but use function instead
MAX_HOUR_PER_PERSON_PER_DAY = 14 # legal standard
def get_max_hour_per_shift_per_person():
"""Get max hours per shift per person from session state or default"""
try:
import streamlit as st
if hasattr(st, 'session_state'):
# Build from individual session state values
max_hours = {
ShiftType.REGULAR: st.session_state.get('max_hours_shift_1', DefaultConfig.MAX_HOUR_PER_SHIFT_PER_PERSON[ShiftType.REGULAR]),
ShiftType.EVENING: st.session_state.get('max_hours_shift_2', DefaultConfig.MAX_HOUR_PER_SHIFT_PER_PERSON[ShiftType.EVENING]),
ShiftType.OVERTIME: st.session_state.get('max_hours_shift_3', DefaultConfig.MAX_HOUR_PER_SHIFT_PER_PERSON[ShiftType.OVERTIME])
}
return max_hours
except Exception as e:
print(f"Could not get max hours per shift from session: {e}")
# Fallback to default
return DefaultConfig.MAX_HOUR_PER_SHIFT_PER_PERSON
# Keep these complex getters that access DefaultConfig or have complex logic:
def get_evening_shift_demand_threshold():
"""Get evening shift demand threshold from session state or default"""
try:
import streamlit as st
if hasattr(st, 'session_state'):
return st.session_state.get('evening_shift_threshold', DefaultConfig.EVENING_SHIFT_DEMAND_THRESHOLD)
except Exception as e:
print(f"Could not get evening shift threshold from session: {e}")
# Fallback to default
return DefaultConfig.EVENING_SHIFT_DEMAND_THRESHOLD
# ---- Kit Hierarchy for Production Ordering ----
def get_kit_hierarchy_data():
kit_levels, dependencies, priority_order = extract.get_production_order_data()
return kit_levels, dependencies, priority_order
KIT_LEVELS, KIT_DEPENDENCIES, PRODUCTION_PRIORITY_ORDER = get_kit_hierarchy_data()
print(f"Kit Hierarchy loaded: {len(KIT_LEVELS)} kits, Priority order: {len(PRODUCTION_PRIORITY_ORDER)} items")
def get_kit_levels():
"""Get kit levels lazily - returns {kit_id: level} where 0=prepack, 1=subkit, 2=master"""
kit_levels, _, _ = get_kit_hierarchy_data()
return kit_levels
def get_kit_dependencies():
"""Get kit dependencies lazily - returns {kit_id: [dependency_list]}"""
_, dependencies, _ = get_kit_hierarchy_data()
return dependencies
def get_max_parallel_workers():
"""Get max parallel workers from session state or default"""
try:
import streamlit as st
if hasattr(st, 'session_state'):
# Build from individual session state values
max_parallel_workers = {
LineType.LONG_LINE: st.session_state.get('max_parallel_workers_long_line', DefaultConfig.MAX_PARALLEL_WORKERS_LONG_LINE),
LineType.MINI_LOAD: st.session_state.get('max_parallel_workers_mini_load', DefaultConfig.MAX_PARALLEL_WORKERS_MINI_LOAD)
}
return max_parallel_workers
except Exception as e:
print(f"Could not get max parallel workers from session: {e}")
# Fallback to default
return {
LineType.LONG_LINE: DefaultConfig.MAX_PARALLEL_WORKERS_LONG_LINE,
LineType.MINI_LOAD: DefaultConfig.MAX_PARALLEL_WORKERS_MINI_LOAD
}
def get_fixed_min_unicef_per_day():
"""
Get fixed minimum UNICEF employees per day - try from streamlit session state first, then default
This ensures a minimum number of UNICEF fixed-term staff are present every working day
"""
try:
import streamlit as st
if hasattr(st, 'session_state') and 'fixed_min_unicef_per_day' in st.session_state:
print(f"Using fixed minimum UNICEF per day from config page: {st.session_state.fixed_min_unicef_per_day}")
return st.session_state.fixed_min_unicef_per_day
except ImportError:
pass
# Fallback to default configuration
return DefaultConfig.FIXED_MIN_UNICEF_PER_DAY
def get_payment_mode_config():
"""
Get payment mode configuration - try from streamlit session state first, then default values
Payment modes:
- "bulk": If employee works any hours in shift, pay for full shift hours
- "partial": Pay only for actual hours worked
"""
try:
# Try to get from streamlit session state (from Dataset Metadata page)
import streamlit as st
if hasattr(st, 'session_state') and 'payment_mode_config' in st.session_state:
print(f"Using payment mode config from streamlit session: {st.session_state.payment_mode_config}")
return st.session_state.payment_mode_config
except Exception as e:
print(f"Could not get payment mode config from streamlit session: {e}")
# Default payment mode configuration
print(f"Loading default payment mode configuration")
payment_mode_config = DefaultConfig.PAYMENT_MODE_CONFIG
return payment_mode_config
print("✅ Module-level configuration functions defined (variables initialized dynamically)")