File size: 9,163 Bytes
b7d08cf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
"""
時間窗口處理器 - 完整改進版

✅ 改進 1: 完整的時區對齊(所有方法)
✅ 改進 2: 支持部分時間窗口 (None, datetime) 或 (datetime, None)
✅ 改進 3: 與 final_internal_models.py 配套使用
✅ 改進 4: 支持多種輸入格式(Dict, Tuple, 字符串)
"""
from typing import Tuple, Dict, Any, List, Optional, Union
from datetime import datetime

from ..models.internal_models import _Task


def _parse_datetime(dt: Union[None, str, datetime]) -> Optional[datetime]:
    """
    解析各種格式的時間

    支持:
    - None → None
    - datetime → datetime
    - str (ISO format) → datetime

    Args:
        dt: 時間輸入

    Returns:
        解析後的 datetime 或 None
    """
    if dt is None:
        return None

    if isinstance(dt, datetime):
        return dt

    if isinstance(dt, str):
        # 支持 ISO 格式字符串
        try:
            return datetime.fromisoformat(dt)
        except ValueError as e:
            raise ValueError(f"Invalid datetime string: {dt}") from e

    raise TypeError(f"Unsupported datetime type: {type(dt)}")


def _normalize_time_window(
    tw: Union[None, Dict[str, Any], Tuple[Optional[datetime], Optional[datetime]]]
) -> Optional[Tuple[Optional[datetime], Optional[datetime]]]:
    """
    標準化時間窗口為統一格式

    ✅ 支持多種輸入格式:
    1. None → None
    2. Dict {'earliest_time': ..., 'latest_time': ...} → Tuple
    3. Tuple (datetime, datetime) → Tuple
    4. Tuple (str, str) → Tuple (datetime, datetime)

    Args:
        tw: 時間窗口(多種格式)

    Returns:
        標準化的 (earliest, latest) 或 None
    """
    if tw is None:
        return None

    # 字典格式
    if isinstance(tw, dict):
        earliest = _parse_datetime(tw.get('earliest_time'))
        latest = _parse_datetime(tw.get('latest_time'))

        # (None, None) → None
        if earliest is None and latest is None:
            return None

        return (earliest, latest)

    # Tuple 格式
    if isinstance(tw, (tuple, list)) and len(tw) == 2:
        earliest = _parse_datetime(tw[0])
        latest = _parse_datetime(tw[1])

        # (None, None) → None
        if earliest is None and latest is None:
            return None

        return (earliest, latest)

    raise TypeError(f"Unsupported time window format: {type(tw)}")


def _align_dt(dt: Optional[datetime], base: datetime) -> Optional[datetime]:
    """
    把 dt 調整成跟 base 一樣的「有沒有時區」& 「時區」

    ✅ 改進: 支持 None 輸入
    ✅ 改進: 添加類型檢查

    規則:
    - dt 是 None → 返回 None
    - base 沒 tzinfo → 全部變成 naive(直接丟掉 tzinfo)
    - base 有 tzinfo → 全部變成同一個 timezone 的 aware dt

    Args:
        dt: 要對齊的時間(可能為 None)
        base: 基準時間

    Returns:
        對齊後的時間(可能為 None)
    """
    if dt is None:
        return None

    # ✅ 添加類型檢查
    if not isinstance(dt, datetime):
        raise TypeError(f"Expected datetime, got {type(dt)}")

    if base.tzinfo is None:
        # base 是 naive → 我們也回傳 naive
        if dt.tzinfo is None:
            return dt
        return dt.replace(tzinfo=None)
    else:
        # base 是 aware → 全部轉成同一個 tz 的 aware
        if dt.tzinfo is None:
            # 視為跟 base 同一個時區的本地時間
            return dt.replace(tzinfo=base.tzinfo)
        return dt.astimezone(base.tzinfo)


def _align_tw(
    tw: Optional[Tuple[Optional[datetime], Optional[datetime]]],
    base: datetime
) -> Optional[Tuple[Optional[datetime], Optional[datetime]]]:
    """
    對齊整個時間窗口的時區

    ✅ 改進: 支持部分時間窗口

    支持的輸入:
    - None → None
    - (None, None) → None
    - (None, datetime) → (None, aligned_datetime)
    - (datetime, None) → (aligned_datetime, None)
    - (datetime, datetime) → (aligned_datetime, aligned_datetime)

    Args:
        tw: 時間窗口(可能為 None 或包含 None 元素)
        base: 基準時間

    Returns:
        對齊後的時間窗口(可能為 None)
    """
    if tw is None:
        return None

    start, end = tw

    # (None, None) → None
    if start is None and end is None:
        return None

    # 對齊兩個時間點(保留 None)
    return (_align_dt(start, base), _align_dt(end, base))


class TimeWindowHandler:
    """
    時間窗口處理器

    ✅ 完整改進版

    職責:
    - 合併 Task-level & POI-level 時間窗口
    - 轉換時間為秒(OR-Tools 需要)
    - 處理部分時間窗口
    - 處理時區差異
    - 支持多種輸入格式
    """

    @staticmethod
    def get_node_time_window_sec(
        meta: Dict[str, Any],
        tasks: List[_Task],
        start_time: datetime,
    ) -> Tuple[int, int]:
        """
        回傳某個 node 的「有效時間窗」(秒),= 全域 [0,∞) ∩ task TW ∩ poi TW
        給備選 POI 用來檢查「在某個時間點去這個 node 合不合理」

        ✅ 改進 1: 添加時區對齊
        ✅ 改進 2: 支持部分時間窗口
        ✅ 改進 3: 支持多種輸入格式

        Args:
            meta: 節點元數據
            tasks: 任務列表
            start_time: 開始時間(基準)

        Returns:
            (start_sec, end_sec): 有效時間窗口(秒)
        """
        start_sec = 0
        end_sec = 10 ** 9

        task = tasks[meta["task_idx"]]

        # ✅ 標準化時間窗口格式
        task_tw = _normalize_time_window(task.time_window)
        poi_tw = _normalize_time_window(meta.get("poi_time_window"))

        # ✅ 對齊時區
        task_tw = _align_tw(task_tw, start_time)
        poi_tw = _align_tw(poi_tw, start_time)

        # 處理 task 時間窗口(支持部分窗口)
        if task_tw is not None:
            tw_start, tw_end = task_tw
            if tw_start is not None:
                t_start = int((tw_start - start_time).total_seconds())
                start_sec = max(start_sec, t_start)
            if tw_end is not None:
                t_end = int((tw_end - start_time).total_seconds())
                end_sec = min(end_sec, t_end)

        # 處理 POI 時間窗口(支持部分窗口)
        if poi_tw is not None:
            tw_start, tw_end = poi_tw
            if tw_start is not None:
                p_start = int((tw_start - start_time).total_seconds())
                start_sec = max(start_sec, p_start)
            if tw_end is not None:
                p_end = int((tw_end - start_time).total_seconds())
                end_sec = min(end_sec, p_end)

        return start_sec, end_sec

    @staticmethod
    def compute_effective_time_window(
        task_tw: Union[None, Dict[str, Any], Tuple[Optional[datetime], Optional[datetime]]],
        poi_tw: Union[None, Dict[str, Any], Tuple[Optional[datetime], Optional[datetime]]],
        start_time: datetime,
        horizon_sec: int,
    ) -> Tuple[int, int]:
        """
        計算有效時間窗口(秒)

        ✅ 改進 1: 時區對齊
        ✅ 改進 2: 支持部分時間窗口
        ✅ 改進 3: 支持多種輸入格式(Dict, Tuple)

        處理邏輯:
        1. 標準化輸入格式
        2. 對齊時區
        3. 初始化為 [0, horizon_sec]
        4. 應用 task 時間窗口(如果有)
        5. 應用 POI 時間窗口(如果有)
        6. Clamp 到 [0, horizon_sec]

        Args:
            task_tw: Task-level 時間窗口(Dict 或 Tuple)
            poi_tw: POI-level 時間窗口(Dict 或 Tuple)
            start_time: 開始時間(基準)
            horizon_sec: 截止時間(秒)

        Returns:
            (start_sec, end_sec): 有效時間窗口(秒)

        """
        # ✅ 標準化格式
        task_tw = _normalize_time_window(task_tw)
        poi_tw = _normalize_time_window(poi_tw)

        # ✅ 對齊時區
        task_tw = _align_tw(task_tw, start_time)
        poi_tw = _align_tw(poi_tw, start_time)

        # 初始化為全 horizon
        start_sec = 0
        end_sec = horizon_sec

        # 處理 task 時間窗口(支持部分窗口)
        if task_tw is not None:
            tw_start, tw_end = task_tw
            if tw_start is not None:
                start_sec = max(start_sec, int((tw_start - start_time).total_seconds()))
            if tw_end is not None:
                end_sec = min(end_sec, int((tw_end - start_time).total_seconds()))

        # 處理 POI 時間窗口(支持部分窗口)
        if poi_tw is not None:
            tw_start, tw_end = poi_tw
            if tw_start is not None:
                start_sec = max(start_sec, int((tw_start - start_time).total_seconds()))
            if tw_end is not None:
                end_sec = min(end_sec, int((tw_end - start_time).total_seconds()))

        # Clamp 到 [0, horizon_sec]
        start_sec = max(0, start_sec)
        end_sec = min(horizon_sec, end_sec)

        return start_sec, end_sec