LifeFlow-AI / src /optimization /graph /time_window_handler.py
Marco310's picture
buildup agent system
b7d08cf
raw
history blame
9.16 kB
"""
時間窗口處理器 - 完整改進版
✅ 改進 1: 完整的時區對齊(所有方法)
✅ 改進 2: 支持部分時間窗口 (None, datetime) 或 (datetime, None)
✅ 改進 3: 與 final_internal_models.py 配套使用
✅ 改進 4: 支持多種輸入格式(Dict, Tuple, 字符串)
"""
from typing import Tuple, Dict, Any, List, Optional, Union
from datetime import datetime
from ..models.internal_models import _Task
def _parse_datetime(dt: Union[None, str, datetime]) -> Optional[datetime]:
"""
解析各種格式的時間
支持:
- None → None
- datetime → datetime
- str (ISO format) → datetime
Args:
dt: 時間輸入
Returns:
解析後的 datetime 或 None
"""
if dt is None:
return None
if isinstance(dt, datetime):
return dt
if isinstance(dt, str):
# 支持 ISO 格式字符串
try:
return datetime.fromisoformat(dt)
except ValueError as e:
raise ValueError(f"Invalid datetime string: {dt}") from e
raise TypeError(f"Unsupported datetime type: {type(dt)}")
def _normalize_time_window(
tw: Union[None, Dict[str, Any], Tuple[Optional[datetime], Optional[datetime]]]
) -> Optional[Tuple[Optional[datetime], Optional[datetime]]]:
"""
標準化時間窗口為統一格式
✅ 支持多種輸入格式:
1. None → None
2. Dict {'earliest_time': ..., 'latest_time': ...} → Tuple
3. Tuple (datetime, datetime) → Tuple
4. Tuple (str, str) → Tuple (datetime, datetime)
Args:
tw: 時間窗口(多種格式)
Returns:
標準化的 (earliest, latest) 或 None
"""
if tw is None:
return None
# 字典格式
if isinstance(tw, dict):
earliest = _parse_datetime(tw.get('earliest_time'))
latest = _parse_datetime(tw.get('latest_time'))
# (None, None) → None
if earliest is None and latest is None:
return None
return (earliest, latest)
# Tuple 格式
if isinstance(tw, (tuple, list)) and len(tw) == 2:
earliest = _parse_datetime(tw[0])
latest = _parse_datetime(tw[1])
# (None, None) → None
if earliest is None and latest is None:
return None
return (earliest, latest)
raise TypeError(f"Unsupported time window format: {type(tw)}")
def _align_dt(dt: Optional[datetime], base: datetime) -> Optional[datetime]:
"""
把 dt 調整成跟 base 一樣的「有沒有時區」& 「時區」
✅ 改進: 支持 None 輸入
✅ 改進: 添加類型檢查
規則:
- dt 是 None → 返回 None
- base 沒 tzinfo → 全部變成 naive(直接丟掉 tzinfo)
- base 有 tzinfo → 全部變成同一個 timezone 的 aware dt
Args:
dt: 要對齊的時間(可能為 None)
base: 基準時間
Returns:
對齊後的時間(可能為 None)
"""
if dt is None:
return None
# ✅ 添加類型檢查
if not isinstance(dt, datetime):
raise TypeError(f"Expected datetime, got {type(dt)}")
if base.tzinfo is None:
# base 是 naive → 我們也回傳 naive
if dt.tzinfo is None:
return dt
return dt.replace(tzinfo=None)
else:
# base 是 aware → 全部轉成同一個 tz 的 aware
if dt.tzinfo is None:
# 視為跟 base 同一個時區的本地時間
return dt.replace(tzinfo=base.tzinfo)
return dt.astimezone(base.tzinfo)
def _align_tw(
tw: Optional[Tuple[Optional[datetime], Optional[datetime]]],
base: datetime
) -> Optional[Tuple[Optional[datetime], Optional[datetime]]]:
"""
對齊整個時間窗口的時區
✅ 改進: 支持部分時間窗口
支持的輸入:
- None → None
- (None, None) → None
- (None, datetime) → (None, aligned_datetime)
- (datetime, None) → (aligned_datetime, None)
- (datetime, datetime) → (aligned_datetime, aligned_datetime)
Args:
tw: 時間窗口(可能為 None 或包含 None 元素)
base: 基準時間
Returns:
對齊後的時間窗口(可能為 None)
"""
if tw is None:
return None
start, end = tw
# (None, None) → None
if start is None and end is None:
return None
# 對齊兩個時間點(保留 None)
return (_align_dt(start, base), _align_dt(end, base))
class TimeWindowHandler:
"""
時間窗口處理器
✅ 完整改進版
職責:
- 合併 Task-level & POI-level 時間窗口
- 轉換時間為秒(OR-Tools 需要)
- 處理部分時間窗口
- 處理時區差異
- 支持多種輸入格式
"""
@staticmethod
def get_node_time_window_sec(
meta: Dict[str, Any],
tasks: List[_Task],
start_time: datetime,
) -> Tuple[int, int]:
"""
回傳某個 node 的「有效時間窗」(秒),= 全域 [0,∞) ∩ task TW ∩ poi TW
給備選 POI 用來檢查「在某個時間點去這個 node 合不合理」
✅ 改進 1: 添加時區對齊
✅ 改進 2: 支持部分時間窗口
✅ 改進 3: 支持多種輸入格式
Args:
meta: 節點元數據
tasks: 任務列表
start_time: 開始時間(基準)
Returns:
(start_sec, end_sec): 有效時間窗口(秒)
"""
start_sec = 0
end_sec = 10 ** 9
task = tasks[meta["task_idx"]]
# ✅ 標準化時間窗口格式
task_tw = _normalize_time_window(task.time_window)
poi_tw = _normalize_time_window(meta.get("poi_time_window"))
# ✅ 對齊時區
task_tw = _align_tw(task_tw, start_time)
poi_tw = _align_tw(poi_tw, start_time)
# 處理 task 時間窗口(支持部分窗口)
if task_tw is not None:
tw_start, tw_end = task_tw
if tw_start is not None:
t_start = int((tw_start - start_time).total_seconds())
start_sec = max(start_sec, t_start)
if tw_end is not None:
t_end = int((tw_end - start_time).total_seconds())
end_sec = min(end_sec, t_end)
# 處理 POI 時間窗口(支持部分窗口)
if poi_tw is not None:
tw_start, tw_end = poi_tw
if tw_start is not None:
p_start = int((tw_start - start_time).total_seconds())
start_sec = max(start_sec, p_start)
if tw_end is not None:
p_end = int((tw_end - start_time).total_seconds())
end_sec = min(end_sec, p_end)
return start_sec, end_sec
@staticmethod
def compute_effective_time_window(
task_tw: Union[None, Dict[str, Any], Tuple[Optional[datetime], Optional[datetime]]],
poi_tw: Union[None, Dict[str, Any], Tuple[Optional[datetime], Optional[datetime]]],
start_time: datetime,
horizon_sec: int,
) -> Tuple[int, int]:
"""
計算有效時間窗口(秒)
✅ 改進 1: 時區對齊
✅ 改進 2: 支持部分時間窗口
✅ 改進 3: 支持多種輸入格式(Dict, Tuple)
處理邏輯:
1. 標準化輸入格式
2. 對齊時區
3. 初始化為 [0, horizon_sec]
4. 應用 task 時間窗口(如果有)
5. 應用 POI 時間窗口(如果有)
6. Clamp 到 [0, horizon_sec]
Args:
task_tw: Task-level 時間窗口(Dict 或 Tuple)
poi_tw: POI-level 時間窗口(Dict 或 Tuple)
start_time: 開始時間(基準)
horizon_sec: 截止時間(秒)
Returns:
(start_sec, end_sec): 有效時間窗口(秒)
"""
# ✅ 標準化格式
task_tw = _normalize_time_window(task_tw)
poi_tw = _normalize_time_window(poi_tw)
# ✅ 對齊時區
task_tw = _align_tw(task_tw, start_time)
poi_tw = _align_tw(poi_tw, start_time)
# 初始化為全 horizon
start_sec = 0
end_sec = horizon_sec
# 處理 task 時間窗口(支持部分窗口)
if task_tw is not None:
tw_start, tw_end = task_tw
if tw_start is not None:
start_sec = max(start_sec, int((tw_start - start_time).total_seconds()))
if tw_end is not None:
end_sec = min(end_sec, int((tw_end - start_time).total_seconds()))
# 處理 POI 時間窗口(支持部分窗口)
if poi_tw is not None:
tw_start, tw_end = poi_tw
if tw_start is not None:
start_sec = max(start_sec, int((tw_start - start_time).total_seconds()))
if tw_end is not None:
end_sec = min(end_sec, int((tw_end - start_time).total_seconds()))
# Clamp 到 [0, horizon_sec]
start_sec = max(0, start_sec)
end_sec = min(horizon_sec, end_sec)
return start_sec, end_sec