Spaces:
Running
Running
File size: 6,579 Bytes
b7d08cf 25c25cb b7d08cf 25c25cb b7d08cf 25c25cb b7d08cf 9066e49 b7d08cf 9066e49 b7d08cf 529a8bd 9066e49 b7d08cf fcc5d97 b7d08cf 9066e49 b7d08cf 9066e49 b7d08cf 529a8bd b7d08cf 9066e49 b7d08cf 9066e49 b7d08cf 9066e49 b7d08cf 9066e49 b7d08cf 25c25cb 9066e49 b7d08cf 529a8bd b7d08cf 9066e49 b7d08cf 9066e49 b7d08cf 9066e49 b7d08cf |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
import json
from datetime import datetime, timedelta
from agno.tools import Toolkit
from src.optimization.tsptw_solver import TSPTWSolver
from src.infra.poi_repository import poi_repo
from src.infra.logger import get_logger
logger = get_logger(__name__)
class OptimizationToolkit(Toolkit):
def __init__(self):
super().__init__(name="optimization_toolkit")
self.solver = TSPTWSolver()
self.register(self.optimize_from_ref)
def optimize_from_ref(self, ref_id: str, max_wait_time_min: int = 0 , return_to_start: bool = None) -> str:
"""
Executes the mathematical route solver (TSPTW) to find the most efficient task sequence.
This tool loads the POI data, calculates travel times, and reorders tasks to respect time windows.
It returns detailed status information, allowing the caller to decide if a retry (with relaxed constraints) is needed.
Args:
ref_id (str): The unique reference ID returned by the Scout Agent.
max_wait_time_min (int, optional): Max waiting time allowed at a location. Defaults to 0.
return_to_start (bool, optional): Whether to force a round trip. Defaults to None. If None, it defaults to the value in 'global_info.return_to_start'.
Returns:
str: A JSON string containing the result status and reference ID.
Structure:
{
"status": "OPTIMAL" | "FEASIBLE" | "INFEASIBLE" | "ERROR",
"opt_ref_id": str,
"dropped_tasks_count": int,
"skipped_tasks_id": list,
"message": str
}
"""
logger.info(f"🧮 Optimizer: Loading Ref {ref_id}...")
data = poi_repo.load(ref_id)
if not data:
return "❌ Error: Data not found."
tasks = data.get("tasks", [])
# ✅ [Critical] 讀取 global_info
global_info = data.get("global_info", {})
if return_to_start is None:
return_to_start = global_info.get("return_to_start", True)
#logger.debug(
# f"🧮 Optimizer: Parameters - max_wait_time_min: {max_wait_time_min}, return_to_start: {return_to_start}")
# 處理時間
start_time_str = global_info.get("departure_time")
if not start_time_str:
logger.warning("⚠️ Warning: departure_time not found in global_info. Using Tomorrow at present time.")
default_date = datetime.now().date() + timedelta(days=1)
if start_time_str:
try:
# 嘗試 1: 標準 ISO 解析 (YYYY-MM-DDTHH:MM:SS+TZ)
start_time = datetime.fromisoformat(start_time_str)
except ValueError:
try:
from datetime import time as dt_time
parsed_time = dt_time.fromisoformat(start_time_str)
start_time = datetime.combine(default_date, parsed_time)
logger.warning(f"⚠️ Warning: Received time-only '{start_time_str}'. Auto-fixed to: {start_time}")
except ValueError:
return json.dumps({
"status": "ERROR",
"reason": f"Invalid departure_time format: {start_time_str}",
"opt_ref_id": None
})
else:
start_time = datetime.now().astimezone()
deadline_str = global_info.get("deadline")
if not deadline_str:
deadline = start_time+ timedelta(days=1)
else:
try:
deadline = datetime.fromisoformat(deadline_str)
except ValueError:
return json.dumps({
"status": "ERROR",
"reason": f"Invalid deadline format: {deadline_str}",
"opt_ref_id": None
})
# 處理起點 (優先從 global_info 拿,沒有則 fallback)
start_loc_data = global_info.get("start_location", {})
if "lat" in start_loc_data:
start_location = start_loc_data
else:
start_location = None
# 執行優化
try:
if not start_location:
raise ValueError(f"Start location not found in global_info. {global_info}")
logger.info(f"🧮 Optimizer: Running TSPTW solver - start_time{start_time}, deadline{deadline}, max_wait_time_min:{max_wait_time_min}, "
f"return_to_start:{return_to_start}, return_to_start:{return_to_start}")
result = self.solver.solve(
start_location=start_location,
start_time=start_time,
deadline=deadline,
tasks=tasks,
max_wait_time_min=max_wait_time_min,
return_to_start=return_to_start
)
except Exception as e:
logger.warning(f"Solver failed: {e}")
return json.dumps({
"status": "ERROR",
"reason": str(e),
"opt_ref_id": None
})
# ✅ [Critical] 將 global_info 繼承下去!
# 如果不加這一行,Navigator 就會因為找不到 departure_time 而報錯
result["tasks"] = tasks
result["global_info"] = global_info
logger.info(f"🧾 Optimizer: Inherited global_info to result.\n {result}")
# 儲存結果
result_ref_id = poi_repo.save(result, data_type="optimization_result")
raw_status = result.get("status", "UNKNOWN")
skipped_tasks = result.get("unassigned", [])
dropped_count = len(skipped_tasks)
if raw_status == "OK" and dropped_count > 0:
agent_visible_status = "INFEASIBLE"
message = f"Constraints too tight. Dropped {dropped_count} tasks."
elif raw_status == "OK":
agent_visible_status = "OPTIMAL"
message = "Optimization complete. All tasks included."
else:
agent_visible_status = raw_status # ERROR or UNKNOWN
message = "Solver error."
# ✅ [關鍵修改] 建構一個讓 LLM 能做決策的 Output
output = {
"status": agent_visible_status, # 讓 Agent 看到 "INFEASIBLE"
"opt_ref_id": result_ref_id,
"dropped_tasks_count": len(skipped_tasks), # 讓 Agent 知道有沒有任務被犧牲
"skipped_tasks_id": skipped_tasks,
"message": message
}
return json.dumps(output) |