Spaces:
Running
Running
| from datetime import datetime, timedelta, timezone | |
| import json | |
| from agno.tools import Toolkit | |
| from src.services.googlemap_api_service import GoogleMapAPIService | |
| from src.infra.poi_repository import poi_repo | |
| from src.infra.context import get_session_id | |
| from src.infra.logger import get_logger | |
| logger = get_logger(__name__) | |
| class NavigationToolkit(Toolkit): | |
| def __init__(self, google_maps_api_key: str = None): | |
| super().__init__(name="navigation_toolkit") | |
| self.gmaps = GoogleMapAPIService(api_key=google_maps_api_key) | |
| self.register(self.calculate_traffic_and_timing) | |
| def calculate_traffic_and_timing(self, optimization_ref_id: str) -> str: | |
| """ | |
| Calculates precise travel times, traffic delays, and arrival timestamps for the optimized route. | |
| This tool acts as a 'Reality Check' by applying Google Routes data to the sequence generated by the Optimizer. | |
| It ensures the schedule is physically possible under current traffic conditions and generates the final timeline. | |
| Args: | |
| optimization_ref_id (str): The unique reference ID returned by the Optimizer Agent (e.g., "optimization_result_xyz"). | |
| This ID links to the logically sorted task list. | |
| Returns: | |
| str: A JSON string containing the 'nav_ref_id' (e.g., '{"nav_ref_id": "navigation_result_abc"}'). | |
| """ | |
| logger.info(f"🚗 Navigator: Loading Ref {optimization_ref_id}...") | |
| data = poi_repo.load(optimization_ref_id) | |
| if not data: | |
| logger.warning(f"⚠️ Warning: Ref ID '{optimization_ref_id}' not found.") | |
| session_id = get_session_id() | |
| if session_id: | |
| latest_id = poi_repo.get_last_id_by_session(session_id) | |
| logger.warning(f"⚠️ Warning: Checking latest Session ID: {latest_id}") | |
| if latest_id and latest_id.startswith("optimization"): | |
| logger.warning(f"🔄 Auto-Correcting: Switching to latest Session ID: {latest_id}") | |
| data = poi_repo.load(latest_id) | |
| if not data: | |
| return "❌ Error: Data not found." | |
| route = data.get("route", []) | |
| tasks_detail = data.get("tasks_detail", []) | |
| global_info = data.get("global_info", {}) | |
| if not route: | |
| return "❌ Error: No route found in data." | |
| # 1. 建立座標查照表 (Lookup Map) | |
| poi_lookup = {} | |
| for task in tasks_detail: | |
| t_id = str(task.get("task_id")) | |
| chosen = task.get("chosen_poi", {}) | |
| if chosen is None: | |
| continue | |
| if "lat" in chosen and "lng" in chosen: | |
| poi_lookup[t_id] = {"lat": chosen["lat"], "lng": chosen["lng"]} | |
| # 2. 決定起點 (Start Location) | |
| user_start = global_info.get("start_location") | |
| if user_start and "lat" in user_start and "lng" in user_start: | |
| start_coord = user_start | |
| logger.info(f"📍 Using User Start Location: {start_coord}") | |
| # 3. 依據 Route 的順序組裝 Waypoints | |
| waypoints = [] | |
| stop_times = [] # 單位:秒 | |
| for i, step in enumerate(route): | |
| lat, lng = None, None | |
| step_type = step.get("type") | |
| task_id = str(step.get("task_id")) if step.get("task_id") is not None else None | |
| # [FIX] 從 Route Step 讀取正確的服務時間 (分鐘 -> 秒) | |
| # 優化器 (Optimizer) 已經算好每個點要停多久,這裡必須沿用 | |
| service_min = step.get("service_duration_min", 0) | |
| service_sec = service_min * 60 | |
| # Case A: 起點/終點 (Depot) | |
| if step_type in ["depot", "start"] or i == 0: | |
| lat = start_coord["lat"] | |
| lng = start_coord["lng"] | |
| # Depot 停留時間通常為 0,除非有特別設定 | |
| stop_times.append(service_sec) | |
| # Case B: 任務點 (Task POI) | |
| elif task_id and task_id in poi_lookup: | |
| coords = poi_lookup[task_id] | |
| lat = coords["lat"] | |
| lng = coords["lng"] | |
| stop_times.append(service_sec) | |
| if lat is not None and lng is not None: | |
| waypoints.append({"lat": lat, "lng": lng}) | |
| else: | |
| pass | |
| logger.warning(f"⚠️ Skipped step {i}: No coordinates found. Type: {step_type}, ID: {task_id}") | |
| # 4. 驗證與呼叫 API | |
| if len(waypoints) < 2: | |
| return f"❌ Error: Not enough valid waypoints ({len(waypoints)})." | |
| try: | |
| dep_str = global_info.get("departure_time") | |
| # 確保時間帶有時區,避免 API 錯誤 | |
| try: | |
| start_time = datetime.fromisoformat(dep_str) | |
| if start_time.tzinfo is None: | |
| start_time = start_time.replace(tzinfo=timezone.utc) | |
| except: | |
| start_time = datetime.now(timezone.utc) | |
| logger.info(f"🚗 Navigator: Calling Google Routes for {len(waypoints)} stops...") | |
| traffic_result = self.gmaps.compute_routes( | |
| place_points=waypoints, | |
| start_time=start_time, | |
| stop_times=stop_times, # [FIX] 傳入正確的秒數列表 | |
| travel_mode="DRIVE", | |
| routing_preference="TRAFFIC_AWARE" | |
| ) | |
| except Exception as e: | |
| return f"❌ Traffic API Failed: {e}" | |
| data["traffic_summary"] = { | |
| "total_distance_km": traffic_result.get('total_distance_meters', 0) / 1000, | |
| "total_duration_min": traffic_result.get('total_duration_seconds', 0) // 60 | |
| } | |
| data["precise_traffic_result"] = traffic_result | |
| data["solved_waypoints"] = waypoints | |
| if "global_info" not in data: | |
| data["global_info"] = global_info | |
| logger.info(f"✅ Traffic and timing calculated successfully.\n {data}") | |
| nav_ref_id = poi_repo.save(data, data_type="navigation_result") | |
| return json.dumps({ | |
| #"status": "SUCCESS", | |
| "nav_ref_id": nav_ref_id, | |
| #"traffic_summary": data["traffic_summary"], | |
| #"note": "Please pass this nav_ref_id to the Weatherman immediately." | |
| }) |