Spaces:
Running
Running
| from datetime import datetime, timedelta, timezone | |
| import json | |
| from agno.tools import Toolkit | |
| from src.services.openweather_service import OpenWeatherMapService | |
| from src.infra.poi_repository import poi_repo | |
| from src.infra.context import get_session_id | |
| from src.infra.logger import get_logger | |
| logger = get_logger(__name__) | |
| class WeatherToolkit(Toolkit): | |
| def __init__(self, openweather_api_key: str): | |
| super().__init__(name="weather_toolkit") | |
| self.weather_service = OpenWeatherMapService(api_key=openweather_api_key) | |
| self.register(self.check_weather_for_timeline) | |
| def check_weather_for_timeline(self, nav_ref_id: str) -> str: | |
| """ | |
| Enriches the solved navigation route with weather forecasts and Air Quality Index (AQI) to create the final timeline. | |
| This tool is the final post-processing step. It loads the solved route data, calculates precise local arrival times for each stop | |
| (dynamically adjusting for the destination's timezone), and fetches specific weather conditions and AQI for those times. | |
| It also resolves final location names and saves the complete itinerary for presentation. | |
| Args: | |
| nav_ref_id (str): The unique reference ID returned by the Route Solver (or Navigation) step. | |
| Returns: | |
| str: A JSON string containing the reference ID for the finalized data. | |
| Structure: | |
| { | |
| "final_ref_id": str | |
| } | |
| """ | |
| logger.debug(f"🌤️ Weatherman: Loading Ref {nav_ref_id}...") | |
| data = poi_repo.load(nav_ref_id) | |
| if not data: | |
| logger.warning(f"⚠️ Warning: Ref ID '{nav_ref_id}' not found.") | |
| session_id = get_session_id() | |
| logger.warning(f"⚠️ Warning: Checking latest Session ID: {session_id}") | |
| if session_id: | |
| latest_id = poi_repo.get_last_id_by_session(session_id) | |
| if latest_id and latest_id != nav_ref_id and latest_id.startswith("navigation"): | |
| logger.warning(f"🔄 Auto-Correcting: Switching to latest Session ID: {latest_id}") | |
| data = poi_repo.load(latest_id) | |
| if not data: | |
| return "❌ Error: Data not found." | |
| # [CRITICAL] 複製原始數據,確保 global_info, tasks 等不會遺失 | |
| final_data = data.copy() | |
| traffic_res = data.get("precise_traffic_result", {}) | |
| legs = traffic_res.get("legs", []) | |
| waypoints = data.get("solved_waypoints", []) or traffic_res.get("stops", []) | |
| # 準備查表 Map (用於解析地點名稱) | |
| tasks = data.get("tasks", []) | |
| task_map = {str(t.get('id') or t.get('task_id')): t for t in tasks} | |
| route_structure = data.get("route", []) | |
| # ============================================================ | |
| # ✅ [FIX] 動態提取目標時區 (Dynamic Timezone Extraction) | |
| # 我們從 global_info.departure_time 提取時區,而不是寫死 +8 | |
| # ============================================================ | |
| global_info = data.get("global_info", {}) | |
| departure_str = global_info.get("departure_time") | |
| # 預設 fallback 為 UTC (萬一真的沒資料) | |
| target_tz = timezone.utc | |
| if departure_str: | |
| try: | |
| # 解析出發時間字串 (e.g. "2025-11-24T09:00:00+08:00") | |
| start_dt_ref = datetime.fromisoformat(departure_str) | |
| # 如果這個時間有帶時區 (tzinfo),我們就用它作為整個行程的標準時區 | |
| if start_dt_ref.tzinfo: | |
| target_tz = start_dt_ref.tzinfo | |
| logger.debug(f" 🌍 Detected Trip Timezone: {target_tz}") | |
| except ValueError: | |
| logger.warning(" ⚠️ Could not parse departure_time timezone, defaulting to UTC") | |
| current_now = datetime.now(timezone.utc) | |
| final_timeline = [] | |
| logger.debug(f"🌤️ Weatherman: Checking Weather & AQI for {len(waypoints)} stops...") | |
| for i, point in enumerate(waypoints): | |
| target_time = None | |
| travel_time = 0 | |
| if i == 0: | |
| start_str = traffic_res.get("start_time") | |
| target_time = datetime.fromisoformat(start_str) if start_str else current_now | |
| elif i - 1 < len(legs): | |
| leg = legs[i - 1] | |
| duration_sec = leg.get("duration_seconds", 0) | |
| travel_time = duration_sec // 60 | |
| dep_str = leg.get("departure_time") | |
| if dep_str: | |
| target_time = datetime.fromisoformat(dep_str) + timedelta(seconds=duration_sec) | |
| # 防呆:如果沒算出時間,用現在 | |
| if not target_time: target_time = current_now | |
| if target_time.tzinfo is None: target_time = target_time.replace(tzinfo=timezone.utc) | |
| # ============================================================ | |
| # ✅ [FIX] 將 UTC 時間轉為「該行程的時區」 (Local Time Conversion) | |
| # ============================================================ | |
| local_time = target_time.astimezone(target_tz) | |
| logger.debug( | |
| f" ⏱️ Stop {i + 1}: UTC {target_time.strftime('%H:%M')} -> Local {local_time.strftime('%H:%M')}") | |
| # --- 天氣 & AQI 查詢 --- | |
| # 天氣查詢使用 UTC 時間比較準確 (API 通常吃 UTC) | |
| diff_min = int((target_time - current_now).total_seconds() / 60) | |
| weather_desc = "N/A" | |
| temp_str = "" | |
| aqi_info = {"aqi": -1, "label": "N/A"} | |
| if diff_min >= 0: | |
| try: | |
| # 1. 查天氣 | |
| forecast = self.weather_service.get_forecast_weather( | |
| location=point, future_minutes=diff_min | |
| ) | |
| cond = forecast.get('condition', 'Unknown') | |
| temp = forecast.get('temperature', 'N/A') | |
| weather_desc = f"{cond}" | |
| temp_str = f"{temp}°C" | |
| # 2. 查空氣品質 (AQI) & 產生 Emoji | |
| air = self.weather_service.get_forecast_air_pollution( | |
| location=point, future_minutes=diff_min | |
| ) | |
| aqi_val = air.get("aqi", 0) # 1=Good, 5=Very Poor | |
| # AQI 映射表 | |
| aqi_map = { | |
| 1: "🟢", # Good | |
| 2: "🟡", # Fair | |
| 3: "🟠", # Moderate | |
| 4: "🔴", # Poor | |
| 5: "🟣" # Very Poor | |
| } | |
| emoji = aqi_map.get(aqi_val, "⚪") | |
| aqi_info = { | |
| "aqi": aqi_val, | |
| "label": f"AQI {aqi_val} {emoji}" | |
| } | |
| except Exception as e: | |
| logger.error(f"Weather/AQI Error: {e}") | |
| weather_desc = "API Error" | |
| # --- 地點名稱解析 --- | |
| location_name = f"Stop {i + 1}" | |
| address = "" | |
| if i < len(route_structure): | |
| step = route_structure[i] | |
| step_type = step.get("type") | |
| if step_type in ["depot", "start"]: | |
| location_name = "Start Location" | |
| elif step.get("task_id"): | |
| tid = str(step.get("task_id") or step.get("id")) | |
| if tid in task_map: | |
| t_info = task_map[tid] | |
| location_name = t_info.get("description", location_name) | |
| if t_info.get("candidates"): | |
| cand = t_info["candidates"][0] | |
| location_name = cand.get("name", location_name) | |
| address = cand.get("address", "") | |
| # --- 寫入 Timeline --- | |
| timeline_entry = { | |
| "stop_index": i, | |
| "time": local_time.strftime("%H:%M"), | |
| "location": location_name, | |
| "address": address, | |
| "weather": f"{weather_desc}, {temp_str}", | |
| "aqi": aqi_info, | |
| "travel_time_from_prev": f"{travel_time} mins", | |
| "coordinates": point | |
| } | |
| final_timeline.append(timeline_entry) | |
| # 寫入最終結果 | |
| final_data["timeline"] = final_timeline | |
| # 再次確認 global_info 是否存在 (雖然 copy() 應該要有,但保險起見) | |
| if "global_info" not in final_data: | |
| final_data["global_info"] = global_info | |
| final_ref_id = poi_repo.save(final_data, data_type="final_itinerary") | |
| return json.dumps({ | |
| #"status": "SUCCESS", | |
| "final_ref_id": final_ref_id, | |
| #"note": "Pass this final_ref_id to the Presenter immediately." | |
| }) | |