Spaces:
Running
Running
| import json | |
| from datetime import datetime, timedelta | |
| from agno.tools import Toolkit | |
| from src.optimization.tsptw_solver import TSPTWSolver | |
| from src.infra.poi_repository import poi_repo | |
| from src.infra.logger import get_logger | |
| logger = get_logger(__name__) | |
| class OptimizationToolkit(Toolkit): | |
| def __init__(self): | |
| super().__init__(name="optimization_toolkit") | |
| self.solver = TSPTWSolver() | |
| self.register(self.optimize_from_ref) | |
| def optimize_from_ref(self, ref_id: str, max_wait_time_min: int = 0 , return_to_start: bool = None) -> str: | |
| """ | |
| Executes the mathematical route solver (TSPTW) to find the most efficient task sequence. | |
| This tool loads the POI data, calculates travel times, and reorders tasks to respect time windows. | |
| It returns detailed status information, allowing the caller to decide if a retry (with relaxed constraints) is needed. | |
| Args: | |
| ref_id (str): The unique reference ID returned by the Scout Agent. | |
| max_wait_time_min (int, optional): Max waiting time allowed at a location. Defaults to 0. | |
| return_to_start (bool, optional): Whether to force a round trip. Defaults to None. If None, it defaults to the value in 'global_info.return_to_start'. | |
| Returns: | |
| str: A JSON string containing the result status and reference ID. | |
| Structure: | |
| { | |
| "status": "OPTIMAL" | "FEASIBLE" | "INFEASIBLE" | "ERROR", | |
| "opt_ref_id": str, | |
| "dropped_tasks_count": int, | |
| "skipped_tasks_id": list, | |
| "message": str | |
| } | |
| """ | |
| logger.info(f"🧮 Optimizer: Loading Ref {ref_id}...") | |
| data = poi_repo.load(ref_id) | |
| if not data: | |
| return "❌ Error: Data not found." | |
| tasks = data.get("tasks", []) | |
| # ✅ [Critical] 讀取 global_info | |
| global_info = data.get("global_info", {}) | |
| if return_to_start is None: | |
| return_to_start = global_info.get("return_to_start", True) | |
| #logger.debug( | |
| # f"🧮 Optimizer: Parameters - max_wait_time_min: {max_wait_time_min}, return_to_start: {return_to_start}") | |
| # 處理時間 | |
| start_time_str = global_info.get("departure_time") | |
| if not start_time_str: | |
| logger.warning("⚠️ Warning: departure_time not found in global_info. Using Tomorrow at present time.") | |
| default_date = datetime.now().date() + timedelta(days=1) | |
| if start_time_str: | |
| try: | |
| # 嘗試 1: 標準 ISO 解析 (YYYY-MM-DDTHH:MM:SS+TZ) | |
| start_time = datetime.fromisoformat(start_time_str) | |
| except ValueError: | |
| try: | |
| from datetime import time as dt_time | |
| parsed_time = dt_time.fromisoformat(start_time_str) | |
| start_time = datetime.combine(default_date, parsed_time) | |
| logger.warning(f"⚠️ Warning: Received time-only '{start_time_str}'. Auto-fixed to: {start_time}") | |
| except ValueError: | |
| return json.dumps({ | |
| "status": "ERROR", | |
| "reason": f"Invalid departure_time format: {start_time_str}", | |
| "opt_ref_id": None | |
| }) | |
| else: | |
| start_time = datetime.now().astimezone() | |
| deadline_str = global_info.get("deadline") | |
| if not deadline_str: | |
| deadline = start_time+ timedelta(days=1) | |
| else: | |
| try: | |
| deadline = datetime.fromisoformat(deadline_str) | |
| except ValueError: | |
| return json.dumps({ | |
| "status": "ERROR", | |
| "reason": f"Invalid deadline format: {deadline_str}", | |
| "opt_ref_id": None | |
| }) | |
| # 處理起點 (優先從 global_info 拿,沒有則 fallback) | |
| start_loc_data = global_info.get("start_location", {}) | |
| if "lat" in start_loc_data: | |
| start_location = start_loc_data | |
| else: | |
| start_location = None | |
| # 執行優化 | |
| try: | |
| if not start_location: | |
| raise ValueError(f"Start location not found in global_info. {global_info}") | |
| logger.info(f"🧮 Optimizer: Running TSPTW solver - start_time{start_time}, deadline{deadline}, max_wait_time_min:{max_wait_time_min}, " | |
| f"return_to_start:{return_to_start}, return_to_start:{return_to_start}") | |
| result = self.solver.solve( | |
| start_location=start_location, | |
| start_time=start_time, | |
| deadline=deadline, | |
| tasks=tasks, | |
| max_wait_time_min=max_wait_time_min, | |
| return_to_start=return_to_start | |
| ) | |
| except Exception as e: | |
| logger.warning(f"Solver failed: {e}") | |
| return json.dumps({ | |
| "status": "ERROR", | |
| "reason": str(e), | |
| "opt_ref_id": None | |
| }) | |
| # ✅ [Critical] 將 global_info 繼承下去! | |
| # 如果不加這一行,Navigator 就會因為找不到 departure_time 而報錯 | |
| result["tasks"] = tasks | |
| result["global_info"] = global_info | |
| logger.info(f"🧾 Optimizer: Inherited global_info to result.\n {result}") | |
| # 儲存結果 | |
| result_ref_id = poi_repo.save(result, data_type="optimization_result") | |
| raw_status = result.get("status", "UNKNOWN") | |
| skipped_tasks = result.get("unassigned", []) | |
| dropped_count = len(skipped_tasks) | |
| if raw_status == "OK" and dropped_count > 0: | |
| agent_visible_status = "INFEASIBLE" | |
| message = f"Constraints too tight. Dropped {dropped_count} tasks." | |
| elif raw_status == "OK": | |
| agent_visible_status = "OPTIMAL" | |
| message = "Optimization complete. All tasks included." | |
| else: | |
| agent_visible_status = raw_status # ERROR or UNKNOWN | |
| message = "Solver error." | |
| # ✅ [關鍵修改] 建構一個讓 LLM 能做決策的 Output | |
| output = { | |
| "status": agent_visible_status, # 讓 Agent 看到 "INFEASIBLE" | |
| "opt_ref_id": result_ref_id, | |
| "dropped_tasks_count": len(skipped_tasks), # 讓 Agent 知道有沒有任務被犧牲 | |
| "skipped_tasks_id": skipped_tasks, | |
| "message": message | |
| } | |
| return json.dumps(output) |