LifeFlow-AI / src /tools /weather_toolkit.py
Marco310's picture
feat: 🚀 Evolve to Stateful MCP Architecture with Context Injection Middleware
529a8bd
from datetime import datetime, timedelta, timezone
import json
from agno.tools import Toolkit
from src.services.openweather_service import OpenWeatherMapService
from src.infra.poi_repository import poi_repo
from src.infra.context import get_session_id
from src.infra.logger import get_logger
logger = get_logger(__name__)
class WeatherToolkit(Toolkit):
def __init__(self, openweather_api_key: str):
super().__init__(name="weather_toolkit")
self.weather_service = OpenWeatherMapService(api_key=openweather_api_key)
self.register(self.check_weather_for_timeline)
def check_weather_for_timeline(self, nav_ref_id: str) -> str:
"""
Enriches the solved navigation route with weather forecasts and Air Quality Index (AQI) to create the final timeline.
This tool is the final post-processing step. It loads the solved route data, calculates precise local arrival times for each stop
(dynamically adjusting for the destination's timezone), and fetches specific weather conditions and AQI for those times.
It also resolves final location names and saves the complete itinerary for presentation.
Args:
nav_ref_id (str): The unique reference ID returned by the Route Solver (or Navigation) step.
Returns:
str: A JSON string containing the reference ID for the finalized data.
Structure:
{
"final_ref_id": str
}
"""
logger.debug(f"🌤️ Weatherman: Loading Ref {nav_ref_id}...")
data = poi_repo.load(nav_ref_id)
if not data:
logger.warning(f"⚠️ Warning: Ref ID '{nav_ref_id}' not found.")
session_id = get_session_id()
logger.warning(f"⚠️ Warning: Checking latest Session ID: {session_id}")
if session_id:
latest_id = poi_repo.get_last_id_by_session(session_id)
if latest_id and latest_id != nav_ref_id and latest_id.startswith("navigation"):
logger.warning(f"🔄 Auto-Correcting: Switching to latest Session ID: {latest_id}")
data = poi_repo.load(latest_id)
if not data:
return "❌ Error: Data not found."
# [CRITICAL] 複製原始數據,確保 global_info, tasks 等不會遺失
final_data = data.copy()
traffic_res = data.get("precise_traffic_result", {})
legs = traffic_res.get("legs", [])
waypoints = data.get("solved_waypoints", []) or traffic_res.get("stops", [])
# 準備查表 Map (用於解析地點名稱)
tasks = data.get("tasks", [])
task_map = {str(t.get('id') or t.get('task_id')): t for t in tasks}
route_structure = data.get("route", [])
# ============================================================
# ✅ [FIX] 動態提取目標時區 (Dynamic Timezone Extraction)
# 我們從 global_info.departure_time 提取時區,而不是寫死 +8
# ============================================================
global_info = data.get("global_info", {})
departure_str = global_info.get("departure_time")
# 預設 fallback 為 UTC (萬一真的沒資料)
target_tz = timezone.utc
if departure_str:
try:
# 解析出發時間字串 (e.g. "2025-11-24T09:00:00+08:00")
start_dt_ref = datetime.fromisoformat(departure_str)
# 如果這個時間有帶時區 (tzinfo),我們就用它作為整個行程的標準時區
if start_dt_ref.tzinfo:
target_tz = start_dt_ref.tzinfo
logger.debug(f" 🌍 Detected Trip Timezone: {target_tz}")
except ValueError:
logger.warning(" ⚠️ Could not parse departure_time timezone, defaulting to UTC")
current_now = datetime.now(timezone.utc)
final_timeline = []
logger.debug(f"🌤️ Weatherman: Checking Weather & AQI for {len(waypoints)} stops...")
for i, point in enumerate(waypoints):
target_time = None
travel_time = 0
if i == 0:
start_str = traffic_res.get("start_time")
target_time = datetime.fromisoformat(start_str) if start_str else current_now
elif i - 1 < len(legs):
leg = legs[i - 1]
duration_sec = leg.get("duration_seconds", 0)
travel_time = duration_sec // 60
dep_str = leg.get("departure_time")
if dep_str:
target_time = datetime.fromisoformat(dep_str) + timedelta(seconds=duration_sec)
# 防呆:如果沒算出時間,用現在
if not target_time: target_time = current_now
if target_time.tzinfo is None: target_time = target_time.replace(tzinfo=timezone.utc)
# ============================================================
# ✅ [FIX] 將 UTC 時間轉為「該行程的時區」 (Local Time Conversion)
# ============================================================
local_time = target_time.astimezone(target_tz)
logger.debug(
f" ⏱️ Stop {i + 1}: UTC {target_time.strftime('%H:%M')} -> Local {local_time.strftime('%H:%M')}")
# --- 天氣 & AQI 查詢 ---
# 天氣查詢使用 UTC 時間比較準確 (API 通常吃 UTC)
diff_min = int((target_time - current_now).total_seconds() / 60)
weather_desc = "N/A"
temp_str = ""
aqi_info = {"aqi": -1, "label": "N/A"}
if diff_min >= 0:
try:
# 1. 查天氣
forecast = self.weather_service.get_forecast_weather(
location=point, future_minutes=diff_min
)
cond = forecast.get('condition', 'Unknown')
temp = forecast.get('temperature', 'N/A')
weather_desc = f"{cond}"
temp_str = f"{temp}°C"
# 2. 查空氣品質 (AQI) & 產生 Emoji
air = self.weather_service.get_forecast_air_pollution(
location=point, future_minutes=diff_min
)
aqi_val = air.get("aqi", 0) # 1=Good, 5=Very Poor
# AQI 映射表
aqi_map = {
1: "🟢", # Good
2: "🟡", # Fair
3: "🟠", # Moderate
4: "🔴", # Poor
5: "🟣" # Very Poor
}
emoji = aqi_map.get(aqi_val, "⚪")
aqi_info = {
"aqi": aqi_val,
"label": f"AQI {aqi_val} {emoji}"
}
except Exception as e:
logger.error(f"Weather/AQI Error: {e}")
weather_desc = "API Error"
# --- 地點名稱解析 ---
location_name = f"Stop {i + 1}"
address = ""
if i < len(route_structure):
step = route_structure[i]
step_type = step.get("type")
if step_type in ["depot", "start"]:
location_name = "Start Location"
elif step.get("task_id"):
tid = str(step.get("task_id") or step.get("id"))
if tid in task_map:
t_info = task_map[tid]
location_name = t_info.get("description", location_name)
if t_info.get("candidates"):
cand = t_info["candidates"][0]
location_name = cand.get("name", location_name)
address = cand.get("address", "")
# --- 寫入 Timeline ---
timeline_entry = {
"stop_index": i,
"time": local_time.strftime("%H:%M"),
"location": location_name,
"address": address,
"weather": f"{weather_desc}, {temp_str}",
"aqi": aqi_info,
"travel_time_from_prev": f"{travel_time} mins",
"coordinates": point
}
final_timeline.append(timeline_entry)
# 寫入最終結果
final_data["timeline"] = final_timeline
# 再次確認 global_info 是否存在 (雖然 copy() 應該要有,但保險起見)
if "global_info" not in final_data:
final_data["global_info"] = global_info
final_ref_id = poi_repo.save(final_data, data_type="final_itinerary")
return json.dumps({
#"status": "SUCCESS",
"final_ref_id": final_ref_id,
#"note": "Pass this final_ref_id to the Presenter immediately."
})