import json from datetime import datetime, timedelta from agno.tools import Toolkit from src.optimization.tsptw_solver import TSPTWSolver from src.infra.poi_repository import poi_repo from src.infra.logger import get_logger logger = get_logger(__name__) class OptimizationToolkit(Toolkit): def __init__(self): super().__init__(name="optimization_toolkit") self.solver = TSPTWSolver() self.register(self.optimize_from_ref) def optimize_from_ref(self, ref_id: str, max_wait_time_min: int = 0 , return_to_start: bool = None) -> str: """ Executes the mathematical route solver (TSPTW) to find the most efficient task sequence. This tool loads the POI data, calculates travel times, and reorders tasks to respect time windows. It returns detailed status information, allowing the caller to decide if a retry (with relaxed constraints) is needed. Args: ref_id (str): The unique reference ID returned by the Scout Agent. max_wait_time_min (int, optional): Max waiting time allowed at a location. Defaults to 0. return_to_start (bool, optional): Whether to force a round trip. Defaults to None. If None, it defaults to the value in 'global_info.return_to_start'. Returns: str: A JSON string containing the result status and reference ID. Structure: { "status": "OPTIMAL" | "FEASIBLE" | "INFEASIBLE" | "ERROR", "opt_ref_id": str, "dropped_tasks_count": int, "skipped_tasks_id": list, "message": str } """ logger.info(f"🧮 Optimizer: Loading Ref {ref_id}...") data = poi_repo.load(ref_id) if not data: return "❌ Error: Data not found." tasks = data.get("tasks", []) # ✅ [Critical] 讀取 global_info global_info = data.get("global_info", {}) if return_to_start is None: return_to_start = global_info.get("return_to_start", True) #logger.debug( # f"🧮 Optimizer: Parameters - max_wait_time_min: {max_wait_time_min}, return_to_start: {return_to_start}") # 處理時間 start_time_str = global_info.get("departure_time") if not start_time_str: logger.warning("⚠️ Warning: departure_time not found in global_info. Using Tomorrow at present time.") default_date = datetime.now().date() + timedelta(days=1) if start_time_str: try: # 嘗試 1: 標準 ISO 解析 (YYYY-MM-DDTHH:MM:SS+TZ) start_time = datetime.fromisoformat(start_time_str) except ValueError: try: from datetime import time as dt_time parsed_time = dt_time.fromisoformat(start_time_str) start_time = datetime.combine(default_date, parsed_time) logger.warning(f"⚠️ Warning: Received time-only '{start_time_str}'. Auto-fixed to: {start_time}") except ValueError: return json.dumps({ "status": "ERROR", "reason": f"Invalid departure_time format: {start_time_str}", "opt_ref_id": None }) else: start_time = datetime.now().astimezone() deadline_str = global_info.get("deadline") if not deadline_str: deadline = start_time+ timedelta(days=1) else: try: deadline = datetime.fromisoformat(deadline_str) except ValueError: return json.dumps({ "status": "ERROR", "reason": f"Invalid deadline format: {deadline_str}", "opt_ref_id": None }) # 處理起點 (優先從 global_info 拿,沒有則 fallback) start_loc_data = global_info.get("start_location", {}) if "lat" in start_loc_data: start_location = start_loc_data else: start_location = None # 執行優化 try: if not start_location: raise ValueError(f"Start location not found in global_info. {global_info}") logger.info(f"🧮 Optimizer: Running TSPTW solver - start_time{start_time}, deadline{deadline}, max_wait_time_min:{max_wait_time_min}, " f"return_to_start:{return_to_start}, return_to_start:{return_to_start}") result = self.solver.solve( start_location=start_location, start_time=start_time, deadline=deadline, tasks=tasks, max_wait_time_min=max_wait_time_min, return_to_start=return_to_start ) except Exception as e: logger.warning(f"Solver failed: {e}") return json.dumps({ "status": "ERROR", "reason": str(e), "opt_ref_id": None }) # ✅ [Critical] 將 global_info 繼承下去! # 如果不加這一行,Navigator 就會因為找不到 departure_time 而報錯 result["tasks"] = tasks result["global_info"] = global_info logger.info(f"🧾 Optimizer: Inherited global_info to result.\n {result}") # 儲存結果 result_ref_id = poi_repo.save(result, data_type="optimization_result") raw_status = result.get("status", "UNKNOWN") skipped_tasks = result.get("unassigned", []) dropped_count = len(skipped_tasks) if raw_status == "OK" and dropped_count > 0: agent_visible_status = "INFEASIBLE" message = f"Constraints too tight. Dropped {dropped_count} tasks." elif raw_status == "OK": agent_visible_status = "OPTIMAL" message = "Optimization complete. All tasks included." else: agent_visible_status = raw_status # ERROR or UNKNOWN message = "Solver error." # ✅ [關鍵修改] 建構一個讓 LLM 能做決策的 Output output = { "status": agent_visible_status, # 讓 Agent 看到 "INFEASIBLE" "opt_ref_id": result_ref_id, "dropped_tasks_count": len(skipped_tasks), # 讓 Agent 知道有沒有任務被犧牲 "skipped_tasks_id": skipped_tasks, "message": message } return json.dumps(output)