Spaces:
Running
Running
| import os | |
| from typing import List, Dict, Any, Optional | |
| from datetime import datetime | |
| from src.infra.logger import get_logger | |
| from src.services.googlemap_api_service import GoogleMapAPIService | |
| from src.optimization.models import ( | |
| convert_tasks_to_internal, | |
| convert_location_to_internal, | |
| convert_result_to_dict, | |
| ) | |
| from src.optimization.graph import GraphBuilder | |
| from src.optimization.solver import ORToolsSolver, SolutionExtractor | |
| logger = get_logger(__name__) | |
| class TSPTWSolver: | |
| """ | |
| TSPTW (Traveling Salesman Problem with Time Windows) 求解器 | |
| ✅ 完全保留原始功能: | |
| - 外部 API 使用 Dict (向後兼容) | |
| - 內部使用 Pydantic (類型檢查 + 驗證) | |
| - 時間單位使用分鐘(service_duration_min) | |
| - 時間窗同時支援 Task-level & POI-level (含多段 time_windows) | |
| - 備選 POI 會考慮 time window 且不再推薦同一個 poi_id | |
| ✨ 重構改進: | |
| - 模塊化架構(易於測試和維護) | |
| - 清晰的職責分離 | |
| - 保持對外 API 完全不變 | |
| """ | |
| def __init__( | |
| self, | |
| time_limit_seconds: Optional[int] = None, | |
| verbose: bool = False, | |
| ): | |
| """ | |
| 初始化求解器 | |
| Args: | |
| api_key: Google Maps API Key | |
| time_limit_seconds: 求解時間限制(秒) | |
| verbose: 是否顯示詳細日誌 | |
| """ | |
| env_limit = ( | |
| os.getenv("SOLVER_TIME_LIMIT") | |
| or os.getenv("solver_time_limit") | |
| or "1" | |
| ) | |
| self.time_limit_seconds = ( | |
| time_limit_seconds if time_limit_seconds is not None else int(env_limit) | |
| ) | |
| self.verbose = verbose | |
| # 初始化各模塊 | |
| self.graph_builder = GraphBuilder() | |
| self.ortools_solver = ORToolsSolver( | |
| time_limit_seconds=self.time_limit_seconds, | |
| verbose=verbose, | |
| ) | |
| self.solution_extractor = SolutionExtractor() | |
| # ------------------------------------------------------------------ # | |
| # Public API - 完全保留原始接口 # | |
| # ------------------------------------------------------------------ # | |
| def solve( | |
| self, | |
| start_location: Dict[str, float], | |
| start_time: datetime, | |
| deadline: datetime, | |
| tasks: List[Dict[str, Any]] = None, | |
| travel_mode="DRIVE", | |
| max_wait_time_min: int = 10, | |
| alt_k: int = 3, | |
| return_to_start: bool = True, | |
| ) -> Dict[str, Any]: | |
| """ | |
| 求解 TSPTW | |
| ✅ 完全保留原始 API 和功能 | |
| Args: | |
| tasks: 任務列表,每個任務格式: | |
| { | |
| "task_id": str, | |
| "priority": "HIGH" | "MEDIUM" | "LOW", | |
| "time_window": (datetime, datetime) | None, | |
| "service_duration_min": int, | |
| "candidates": [ | |
| { | |
| "poi_id": str, | |
| "lat": float, | |
| "lng": float, | |
| "time_window": (datetime, datetime) | None, | |
| "time_windows": [(datetime, datetime), ...] | None | |
| } | |
| ] | |
| } | |
| start_location: {"lat": float, "lng": float} | |
| start_time: 開始時間 | |
| deadline: 截止時間 | |
| max_wait_time_min: 最大等待時間(分鐘) | |
| travel_mode: 矩陣計算的交通模式 | |
| alt_k: 回傳 Top-K 備選 POI | |
| return_to_start: 是否回到出發點 | |
| Returns: Dict(由 _TSPTWResult 轉出) | |
| { | |
| "status": "OK" | "NO_SOLUTION" | "NO_TASKS", | |
| "total_travel_time_min": int, | |
| "total_travel_distance_m": int, | |
| "route": [...], | |
| "visited_tasks": [...], | |
| "skipped_tasks": [...], | |
| "tasks_detail": [...] | |
| } | |
| """ | |
| logger.info("TSPTWSolver.solve() start, tasks=%d", len(tasks)) | |
| # 1. 驗證和轉換輸入 | |
| try: | |
| internal_tasks = convert_tasks_to_internal(tasks) | |
| internal_start_location = convert_location_to_internal(start_location) | |
| except Exception as e: | |
| logger.error(f"Failed to validate input: {e}") | |
| return { | |
| "status": "INVALID_INPUT", | |
| "error": str(e), | |
| "total_travel_time_min": 0, | |
| "total_travel_distance_m": 0, | |
| "route": [], | |
| "visited_tasks": [], | |
| "skipped_tasks": [t.get("task_id", "") for t in tasks], | |
| "tasks_detail": [], | |
| } | |
| # 2. 構建圖 | |
| graph = self.graph_builder.build_graph( | |
| start_location=internal_start_location, | |
| tasks=internal_tasks, | |
| travel_mode=travel_mode, | |
| ) | |
| num_nodes = len(graph.node_meta) | |
| if num_nodes <= 1: | |
| logger.warning("No POIs to visit, only depot.") | |
| return { | |
| "status": "NO_TASKS", | |
| "total_travel_time_min": 0, | |
| "total_travel_distance_m": 0, | |
| "route": [], | |
| "visited_tasks": [], | |
| "skipped_tasks": [t.task_id for t in internal_tasks], | |
| "tasks_detail": [], | |
| } | |
| # 3. 求解 | |
| max_wait_time_sec = max_wait_time_min * 60 | |
| try: | |
| routing, manager, solution = self.ortools_solver.solve( | |
| graph=graph, | |
| tasks=internal_tasks, | |
| start_time=start_time, | |
| deadline=deadline, | |
| max_wait_time_sec=max_wait_time_sec, | |
| ) | |
| except Exception as e: | |
| logger.error(f"OR-Tools solver failed: {e}") | |
| return { | |
| "status": "SOLVER_ERROR", | |
| "error": str(e), | |
| "total_travel_time_min": 0, | |
| "total_travel_distance_m": 0, | |
| "route": [], | |
| "visited_tasks": [], | |
| "skipped_tasks": [t.task_id for t in internal_tasks], | |
| "tasks_detail": [], | |
| } | |
| # 4. 檢查是否有解 | |
| if solution is None: | |
| logger.warning("No solution found") | |
| return { | |
| "status": "NO_SOLUTION", | |
| "total_travel_time_min": 0, | |
| "total_travel_distance_m": 0, | |
| "route": [], | |
| "visited_tasks": [], | |
| "skipped_tasks": [t.task_id for t in internal_tasks], | |
| "tasks_detail": [], | |
| } | |
| # 5. 提取結果 | |
| time_dimension = routing.GetDimensionOrDie("Time") | |
| result = self.solution_extractor.extract( | |
| routing=routing, | |
| manager=manager, | |
| solution=solution, | |
| time_dimension=time_dimension, | |
| start_time=start_time, | |
| graph=graph, | |
| tasks=internal_tasks, | |
| alt_k=alt_k, | |
| return_to_start=return_to_start, | |
| ) | |
| logger.info("TSPTWSolver.solve() done, status=%s", result.status) | |
| # 6. 轉換為外部 Dict | |
| return convert_result_to_dict(result) | |
| def test_time_window_handler(): | |
| from datetime import datetime, timezone, timedelta | |
| from src.optimization.graph.time_window_handler import TimeWindowHandler | |
| handler = TimeWindowHandler() | |
| tz = timezone(timedelta(hours=8)) # UTC+8 | |
| start_time = datetime(2025, 11, 22, 10, 0, 0, tzinfo=tz) | |
| horizon_sec = 8 * 3600 # 8 hours | |
| print("=== Test Case 1: 都沒有時間窗口 ===") | |
| start, end = handler.compute_effective_time_window(None, None, start_time, horizon_sec) | |
| assert start == 0 and end == horizon_sec | |
| print(f"✅ Pass: [{start}, {end}]") | |
| print("\n=== Test Case 2: Dict 格式 - 只有 task 有時間窗口 ===") | |
| task_tw = { | |
| 'earliest_time': datetime(2025, 11, 22, 11, 0, 0, tzinfo=tz), | |
| 'latest_time': datetime(2025, 11, 22, 15, 0, 0, tzinfo=tz) | |
| } | |
| start, end = handler.compute_effective_time_window(task_tw, None, start_time, horizon_sec) | |
| assert start == 3600 # 1 hour after start | |
| assert end == 18000 # 5 hours after start | |
| print(f"✅ Pass: [{start}, {end}]") | |
| print("\n=== Test Case 3: Tuple 格式 - 只有 POI 有時間窗口 ===") | |
| poi_tw = ( | |
| datetime(2025, 11, 22, 9, 0, 0, tzinfo=tz), # 開放時間 | |
| datetime(2025, 11, 22, 17, 0, 0, tzinfo=tz) # 關門時間 | |
| ) | |
| start, end = handler.compute_effective_time_window(None, poi_tw, start_time, horizon_sec) | |
| assert start == 0 # POI 已經開門 | |
| assert end == 25200 # 7 hours after start | |
| print(f"✅ Pass: [{start}, {end}]") | |
| print("\n=== Test Case 4: 字符串格式 ===") | |
| task_tw_str = { | |
| 'earliest_time': '2025-11-22T11:00:00+08:00', | |
| 'latest_time': '2025-11-22T15:00:00+08:00' | |
| } | |
| start, end = handler.compute_effective_time_window(task_tw_str, None, start_time, horizon_sec) | |
| assert start == 3600 | |
| assert end == 18000 | |
| print(f"✅ Pass: [{start}, {end}]") | |
| print("\n=== Test Case 5: 部分時間窗口 (只有 earliest) ===") | |
| partial_tw = { | |
| 'earliest_time': datetime(2025, 11, 22, 12, 0, 0, tzinfo=tz), | |
| 'latest_time': None | |
| } | |
| start, end = handler.compute_effective_time_window(partial_tw, None, start_time, horizon_sec) | |
| assert start == 7200 # 2 hours after start | |
| assert end == horizon_sec | |
| print(f"✅ Pass: [{start}, {end}]") | |
| print("\n=== Test Case 6: 部分時間窗口 (只有 latest) ===") | |
| partial_tw = { | |
| 'earliest_time': None, | |
| 'latest_time': datetime(2025, 11, 22, 16, 0, 0, tzinfo=tz) | |
| } | |
| start, end = handler.compute_effective_time_window(partial_tw, None, start_time, horizon_sec) | |
| assert start == 0 | |
| assert end == 21600 # 6 hours after start | |
| print(f"✅ Pass: [{start}, {end}]") | |
| print("\n=== Test Case 7: 實際場景 - Scout 返回的 POI time_window = None ===") | |
| poi_data = { | |
| 'place_id': 'ChIJ...', | |
| 'name': 'Rainbow Village', | |
| 'time_window': None # 你的實際情況 | |
| } | |
| start, end = handler.compute_effective_time_window(task_tw, poi_data.get('time_window'), start_time, horizon_sec) | |
| assert start == 3600 and end == 18000 | |
| print(f"✅ Pass: [{start}, {end}]") | |
| print("\n🎉 All tests passed!") | |
| if __name__ == "__main__": | |
| test_time_window_handler() |