from datetime import datetime, timedelta, timezone import json from agno.tools import Toolkit from src.services.googlemap_api_service import GoogleMapAPIService from src.infra.poi_repository import poi_repo from src.infra.context import get_session_id from src.infra.logger import get_logger logger = get_logger(__name__) class NavigationToolkit(Toolkit): def __init__(self, google_maps_api_key: str = None): super().__init__(name="navigation_toolkit") self.gmaps = GoogleMapAPIService(api_key=google_maps_api_key) self.register(self.calculate_traffic_and_timing) def calculate_traffic_and_timing(self, optimization_ref_id: str) -> str: """ Calculates precise travel times, traffic delays, and arrival timestamps for the optimized route. This tool acts as a 'Reality Check' by applying Google Routes data to the sequence generated by the Optimizer. It ensures the schedule is physically possible under current traffic conditions and generates the final timeline. Args: optimization_ref_id (str): The unique reference ID returned by the Optimizer Agent (e.g., "optimization_result_xyz"). This ID links to the logically sorted task list. Returns: str: A JSON string containing the 'nav_ref_id' (e.g., '{"nav_ref_id": "navigation_result_abc"}'). """ logger.info(f"🚗 Navigator: Loading Ref {optimization_ref_id}...") data = poi_repo.load(optimization_ref_id) if not data: logger.warning(f"⚠️ Warning: Ref ID '{optimization_ref_id}' not found.") session_id = get_session_id() if session_id: latest_id = poi_repo.get_last_id_by_session(session_id) logger.warning(f"⚠️ Warning: Checking latest Session ID: {latest_id}") if latest_id and latest_id.startswith("optimization"): logger.warning(f"🔄 Auto-Correcting: Switching to latest Session ID: {latest_id}") data = poi_repo.load(latest_id) if not data: return "❌ Error: Data not found." route = data.get("route", []) tasks_detail = data.get("tasks_detail", []) global_info = data.get("global_info", {}) if not route: return "❌ Error: No route found in data." # 1. 建立座標查照表 (Lookup Map) poi_lookup = {} for task in tasks_detail: t_id = str(task.get("task_id")) chosen = task.get("chosen_poi", {}) if chosen is None: continue if "lat" in chosen and "lng" in chosen: poi_lookup[t_id] = {"lat": chosen["lat"], "lng": chosen["lng"]} # 2. 決定起點 (Start Location) user_start = global_info.get("start_location") if user_start and "lat" in user_start and "lng" in user_start: start_coord = user_start logger.info(f"📍 Using User Start Location: {start_coord}") # 3. 依據 Route 的順序組裝 Waypoints waypoints = [] stop_times = [] # 單位:秒 for i, step in enumerate(route): lat, lng = None, None step_type = step.get("type") task_id = str(step.get("task_id")) if step.get("task_id") is not None else None # [FIX] 從 Route Step 讀取正確的服務時間 (分鐘 -> 秒) # 優化器 (Optimizer) 已經算好每個點要停多久,這裡必須沿用 service_min = step.get("service_duration_min", 0) service_sec = service_min * 60 # Case A: 起點/終點 (Depot) if step_type in ["depot", "start"] or i == 0: lat = start_coord["lat"] lng = start_coord["lng"] # Depot 停留時間通常為 0,除非有特別設定 stop_times.append(service_sec) # Case B: 任務點 (Task POI) elif task_id and task_id in poi_lookup: coords = poi_lookup[task_id] lat = coords["lat"] lng = coords["lng"] stop_times.append(service_sec) if lat is not None and lng is not None: waypoints.append({"lat": lat, "lng": lng}) else: pass logger.warning(f"⚠️ Skipped step {i}: No coordinates found. Type: {step_type}, ID: {task_id}") # 4. 驗證與呼叫 API if len(waypoints) < 2: return f"❌ Error: Not enough valid waypoints ({len(waypoints)})." try: dep_str = global_info.get("departure_time") # 確保時間帶有時區,避免 API 錯誤 try: start_time = datetime.fromisoformat(dep_str) if start_time.tzinfo is None: start_time = start_time.replace(tzinfo=timezone.utc) except: start_time = datetime.now(timezone.utc) logger.info(f"🚗 Navigator: Calling Google Routes for {len(waypoints)} stops...") traffic_result = self.gmaps.compute_routes( place_points=waypoints, start_time=start_time, stop_times=stop_times, # [FIX] 傳入正確的秒數列表 travel_mode="DRIVE", routing_preference="TRAFFIC_AWARE" ) except Exception as e: return f"❌ Traffic API Failed: {e}" data["traffic_summary"] = { "total_distance_km": traffic_result.get('total_distance_meters', 0) / 1000, "total_duration_min": traffic_result.get('total_duration_seconds', 0) // 60 } data["precise_traffic_result"] = traffic_result data["solved_waypoints"] = waypoints if "global_info" not in data: data["global_info"] = global_info logger.info(f"✅ Traffic and timing calculated successfully.\n {data}") nav_ref_id = poi_repo.save(data, data_type="navigation_result") return json.dumps({ #"status": "SUCCESS", "nav_ref_id": nav_ref_id, #"traffic_summary": data["traffic_summary"], #"note": "Please pass this nav_ref_id to the Weatherman immediately." })