Spaces:
Running
Running
File size: 9,027 Bytes
b7d08cf fcc5d97 b7d08cf 4317c58 b7d08cf 4317c58 b7d08cf 529a8bd b7d08cf fcc5d97 cec5bde fcc5d97 b7d08cf 9066e49 b7d08cf |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 |
from datetime import datetime, timedelta, timezone
import json
from agno.tools import Toolkit
from src.services.openweather_service import OpenWeatherMapService
from src.infra.poi_repository import poi_repo
from src.infra.context import get_session_id
from src.infra.logger import get_logger
logger = get_logger(__name__)
class WeatherToolkit(Toolkit):
def __init__(self, openweather_api_key: str):
super().__init__(name="weather_toolkit")
self.weather_service = OpenWeatherMapService(api_key=openweather_api_key)
self.register(self.check_weather_for_timeline)
def check_weather_for_timeline(self, nav_ref_id: str) -> str:
"""
Enriches the solved navigation route with weather forecasts and Air Quality Index (AQI) to create the final timeline.
This tool is the final post-processing step. It loads the solved route data, calculates precise local arrival times for each stop
(dynamically adjusting for the destination's timezone), and fetches specific weather conditions and AQI for those times.
It also resolves final location names and saves the complete itinerary for presentation.
Args:
nav_ref_id (str): The unique reference ID returned by the Route Solver (or Navigation) step.
Returns:
str: A JSON string containing the reference ID for the finalized data.
Structure:
{
"final_ref_id": str
}
"""
logger.debug(f"🌤️ Weatherman: Loading Ref {nav_ref_id}...")
data = poi_repo.load(nav_ref_id)
if not data:
logger.warning(f"⚠️ Warning: Ref ID '{nav_ref_id}' not found.")
session_id = get_session_id()
logger.warning(f"⚠️ Warning: Checking latest Session ID: {session_id}")
if session_id:
latest_id = poi_repo.get_last_id_by_session(session_id)
if latest_id and latest_id != nav_ref_id and latest_id.startswith("navigation"):
logger.warning(f"🔄 Auto-Correcting: Switching to latest Session ID: {latest_id}")
data = poi_repo.load(latest_id)
if not data:
return "❌ Error: Data not found."
# [CRITICAL] 複製原始數據,確保 global_info, tasks 等不會遺失
final_data = data.copy()
traffic_res = data.get("precise_traffic_result", {})
legs = traffic_res.get("legs", [])
waypoints = data.get("solved_waypoints", []) or traffic_res.get("stops", [])
# 準備查表 Map (用於解析地點名稱)
tasks = data.get("tasks", [])
task_map = {str(t.get('id') or t.get('task_id')): t for t in tasks}
route_structure = data.get("route", [])
# ============================================================
# ✅ [FIX] 動態提取目標時區 (Dynamic Timezone Extraction)
# 我們從 global_info.departure_time 提取時區,而不是寫死 +8
# ============================================================
global_info = data.get("global_info", {})
departure_str = global_info.get("departure_time")
# 預設 fallback 為 UTC (萬一真的沒資料)
target_tz = timezone.utc
if departure_str:
try:
# 解析出發時間字串 (e.g. "2025-11-24T09:00:00+08:00")
start_dt_ref = datetime.fromisoformat(departure_str)
# 如果這個時間有帶時區 (tzinfo),我們就用它作為整個行程的標準時區
if start_dt_ref.tzinfo:
target_tz = start_dt_ref.tzinfo
logger.debug(f" 🌍 Detected Trip Timezone: {target_tz}")
except ValueError:
logger.warning(" ⚠️ Could not parse departure_time timezone, defaulting to UTC")
current_now = datetime.now(timezone.utc)
final_timeline = []
logger.debug(f"🌤️ Weatherman: Checking Weather & AQI for {len(waypoints)} stops...")
for i, point in enumerate(waypoints):
target_time = None
travel_time = 0
if i == 0:
start_str = traffic_res.get("start_time")
target_time = datetime.fromisoformat(start_str) if start_str else current_now
elif i - 1 < len(legs):
leg = legs[i - 1]
duration_sec = leg.get("duration_seconds", 0)
travel_time = duration_sec // 60
dep_str = leg.get("departure_time")
if dep_str:
target_time = datetime.fromisoformat(dep_str) + timedelta(seconds=duration_sec)
# 防呆:如果沒算出時間,用現在
if not target_time: target_time = current_now
if target_time.tzinfo is None: target_time = target_time.replace(tzinfo=timezone.utc)
# ============================================================
# ✅ [FIX] 將 UTC 時間轉為「該行程的時區」 (Local Time Conversion)
# ============================================================
local_time = target_time.astimezone(target_tz)
logger.debug(
f" ⏱️ Stop {i + 1}: UTC {target_time.strftime('%H:%M')} -> Local {local_time.strftime('%H:%M')}")
# --- 天氣 & AQI 查詢 ---
# 天氣查詢使用 UTC 時間比較準確 (API 通常吃 UTC)
diff_min = int((target_time - current_now).total_seconds() / 60)
weather_desc = "N/A"
temp_str = ""
aqi_info = {"aqi": -1, "label": "N/A"}
if diff_min >= 0:
try:
# 1. 查天氣
forecast = self.weather_service.get_forecast_weather(
location=point, future_minutes=diff_min
)
cond = forecast.get('condition', 'Unknown')
temp = forecast.get('temperature', 'N/A')
weather_desc = f"{cond}"
temp_str = f"{temp}°C"
# 2. 查空氣品質 (AQI) & 產生 Emoji
air = self.weather_service.get_forecast_air_pollution(
location=point, future_minutes=diff_min
)
aqi_val = air.get("aqi", 0) # 1=Good, 5=Very Poor
# AQI 映射表
aqi_map = {
1: "🟢", # Good
2: "🟡", # Fair
3: "🟠", # Moderate
4: "🔴", # Poor
5: "🟣" # Very Poor
}
emoji = aqi_map.get(aqi_val, "⚪")
aqi_info = {
"aqi": aqi_val,
"label": f"AQI {aqi_val} {emoji}"
}
except Exception as e:
logger.error(f"Weather/AQI Error: {e}")
weather_desc = "API Error"
# --- 地點名稱解析 ---
location_name = f"Stop {i + 1}"
address = ""
if i < len(route_structure):
step = route_structure[i]
step_type = step.get("type")
if step_type in ["depot", "start"]:
location_name = "Start Location"
elif step.get("task_id"):
tid = str(step.get("task_id") or step.get("id"))
if tid in task_map:
t_info = task_map[tid]
location_name = t_info.get("description", location_name)
if t_info.get("candidates"):
cand = t_info["candidates"][0]
location_name = cand.get("name", location_name)
address = cand.get("address", "")
# --- 寫入 Timeline ---
timeline_entry = {
"stop_index": i,
"time": local_time.strftime("%H:%M"),
"location": location_name,
"address": address,
"weather": f"{weather_desc}, {temp_str}",
"aqi": aqi_info,
"travel_time_from_prev": f"{travel_time} mins",
"coordinates": point
}
final_timeline.append(timeline_entry)
# 寫入最終結果
final_data["timeline"] = final_timeline
# 再次確認 global_info 是否存在 (雖然 copy() 應該要有,但保險起見)
if "global_info" not in final_data:
final_data["global_info"] = global_info
final_ref_id = poi_repo.save(final_data, data_type="final_itinerary")
return json.dumps({
#"status": "SUCCESS",
"final_ref_id": final_ref_id,
#"note": "Pass this final_ref_id to the Presenter immediately."
})
|