LifeFlow-AI / src /tools /navigation_toolkit.py
Marco310's picture
feat: 🚀 Evolve to Stateful MCP Architecture with Context Injection Middleware
529a8bd
from datetime import datetime, timedelta, timezone
import json
from agno.tools import Toolkit
from src.services.googlemap_api_service import GoogleMapAPIService
from src.infra.poi_repository import poi_repo
from src.infra.context import get_session_id
from src.infra.logger import get_logger
logger = get_logger(__name__)
class NavigationToolkit(Toolkit):
def __init__(self, google_maps_api_key: str = None):
super().__init__(name="navigation_toolkit")
self.gmaps = GoogleMapAPIService(api_key=google_maps_api_key)
self.register(self.calculate_traffic_and_timing)
def calculate_traffic_and_timing(self, optimization_ref_id: str) -> str:
"""
Calculates precise travel times, traffic delays, and arrival timestamps for the optimized route.
This tool acts as a 'Reality Check' by applying Google Routes data to the sequence generated by the Optimizer.
It ensures the schedule is physically possible under current traffic conditions and generates the final timeline.
Args:
optimization_ref_id (str): The unique reference ID returned by the Optimizer Agent (e.g., "optimization_result_xyz").
This ID links to the logically sorted task list.
Returns:
str: A JSON string containing the 'nav_ref_id' (e.g., '{"nav_ref_id": "navigation_result_abc"}').
"""
logger.info(f"🚗 Navigator: Loading Ref {optimization_ref_id}...")
data = poi_repo.load(optimization_ref_id)
if not data:
logger.warning(f"⚠️ Warning: Ref ID '{optimization_ref_id}' not found.")
session_id = get_session_id()
if session_id:
latest_id = poi_repo.get_last_id_by_session(session_id)
logger.warning(f"⚠️ Warning: Checking latest Session ID: {latest_id}")
if latest_id and latest_id.startswith("optimization"):
logger.warning(f"🔄 Auto-Correcting: Switching to latest Session ID: {latest_id}")
data = poi_repo.load(latest_id)
if not data:
return "❌ Error: Data not found."
route = data.get("route", [])
tasks_detail = data.get("tasks_detail", [])
global_info = data.get("global_info", {})
if not route:
return "❌ Error: No route found in data."
# 1. 建立座標查照表 (Lookup Map)
poi_lookup = {}
for task in tasks_detail:
t_id = str(task.get("task_id"))
chosen = task.get("chosen_poi", {})
if chosen is None:
continue
if "lat" in chosen and "lng" in chosen:
poi_lookup[t_id] = {"lat": chosen["lat"], "lng": chosen["lng"]}
# 2. 決定起點 (Start Location)
user_start = global_info.get("start_location")
if user_start and "lat" in user_start and "lng" in user_start:
start_coord = user_start
logger.info(f"📍 Using User Start Location: {start_coord}")
# 3. 依據 Route 的順序組裝 Waypoints
waypoints = []
stop_times = [] # 單位:秒
for i, step in enumerate(route):
lat, lng = None, None
step_type = step.get("type")
task_id = str(step.get("task_id")) if step.get("task_id") is not None else None
# [FIX] 從 Route Step 讀取正確的服務時間 (分鐘 -> 秒)
# 優化器 (Optimizer) 已經算好每個點要停多久,這裡必須沿用
service_min = step.get("service_duration_min", 0)
service_sec = service_min * 60
# Case A: 起點/終點 (Depot)
if step_type in ["depot", "start"] or i == 0:
lat = start_coord["lat"]
lng = start_coord["lng"]
# Depot 停留時間通常為 0,除非有特別設定
stop_times.append(service_sec)
# Case B: 任務點 (Task POI)
elif task_id and task_id in poi_lookup:
coords = poi_lookup[task_id]
lat = coords["lat"]
lng = coords["lng"]
stop_times.append(service_sec)
if lat is not None and lng is not None:
waypoints.append({"lat": lat, "lng": lng})
else:
pass
logger.warning(f"⚠️ Skipped step {i}: No coordinates found. Type: {step_type}, ID: {task_id}")
# 4. 驗證與呼叫 API
if len(waypoints) < 2:
return f"❌ Error: Not enough valid waypoints ({len(waypoints)})."
try:
dep_str = global_info.get("departure_time")
# 確保時間帶有時區,避免 API 錯誤
try:
start_time = datetime.fromisoformat(dep_str)
if start_time.tzinfo is None:
start_time = start_time.replace(tzinfo=timezone.utc)
except:
start_time = datetime.now(timezone.utc)
logger.info(f"🚗 Navigator: Calling Google Routes for {len(waypoints)} stops...")
traffic_result = self.gmaps.compute_routes(
place_points=waypoints,
start_time=start_time,
stop_times=stop_times, # [FIX] 傳入正確的秒數列表
travel_mode="DRIVE",
routing_preference="TRAFFIC_AWARE"
)
except Exception as e:
return f"❌ Traffic API Failed: {e}"
data["traffic_summary"] = {
"total_distance_km": traffic_result.get('total_distance_meters', 0) / 1000,
"total_duration_min": traffic_result.get('total_duration_seconds', 0) // 60
}
data["precise_traffic_result"] = traffic_result
data["solved_waypoints"] = waypoints
if "global_info" not in data:
data["global_info"] = global_info
logger.info(f"✅ Traffic and timing calculated successfully.\n {data}")
nav_ref_id = poi_repo.save(data, data_type="navigation_result")
return json.dumps({
#"status": "SUCCESS",
"nav_ref_id": nav_ref_id,
#"traffic_summary": data["traffic_summary"],
#"note": "Please pass this nav_ref_id to the Weatherman immediately."
})