""" OpenWeatherMap API Service - Improved Version """ import requests from typing import Dict, Any, Optional, List from datetime import datetime, timezone, timedelta from src.infra.logger import get_logger logger = get_logger(__name__) class OpenWeatherMapService: """ OpenWeatherMap API Service Provides unified interface to: - Current Weather API - 5 Day Forecast API (3-hour step) - Air Pollution API (Current) - Air Pollution Forecast API (Hourly) Free tier limits: - 1,000 calls/day - 60 calls/minute """ def __init__(self, api_key: Optional[str] = None): """ Initialize OpenWeatherMap service Args: api_key: OpenWeatherMap API key Raises: ValueError: If API key is not provided """ # ✅ 改進: 簡化 API key 獲取 if api_key: self.api_key = api_key else: import os self.api_key = os.getenv("OPENWEATHER_API_KEY") # 如果環境變量沒有,嘗試從配置讀取 if not self.api_key: try: from src.infra.config import get_settings settings = get_settings() self.api_key = settings.openweather_api_key except (ImportError, AttributeError): pass # ✅ 改進: 在 __init__ 時就檢查,後面不用重複檢查 if not self.api_key: raise ValueError( "OpenWeatherMap API key is required. " # ✅ 修復錯誤訊息 "Set via parameter, config, or OPENWEATHER_API_KEY env var" ) # API endpoints self.current_weather_url = "https://api.openweathermap.org/data/2.5/weather" self.forecast_weather_url = "https://api.openweathermap.org/data/2.5/forecast" self.air_pollution_url = "https://api.openweathermap.org/data/2.5/air_pollution" self.forecast_air_pollution_url = "https://api.openweathermap.org/data/2.5/air_pollution/forecast" logger.info("✅ OpenWeatherMap service initialized") # ======================================================================== # Helper Functions # ======================================================================== @staticmethod def _compute_future_index( future_minutes: int, time_step_hours: int = 3 ) -> int: """ Compute the index in forecast array for a future time Args: future_minutes: Minutes into the future time_step_hours: Time step in hours (3 for weather, 1 for air pollution) Returns: Index in forecast array """ future_hours = future_minutes / 60.0 index = int(future_hours / time_step_hours) return index @staticmethod def _find_closest_forecast( forecast_list: List[Dict], target_time: datetime ) -> Optional[Dict]: """ Find the forecast entry closest to target time Args: forecast_list: List of forecast entries with 'timestamp' key target_time: Target datetime Returns: Closest forecast entry or None """ if not forecast_list: return None closest = min( forecast_list, key=lambda x: abs((x['timestamp'] - target_time).total_seconds()) ) return closest # ======================================================================== # Current Weather API # ======================================================================== def get_current_weather( self, location: Dict[str, float], units: str = "metric", lang: str = "zh_tw" ) -> Dict[str, Any]: """ Get current weather at location Args: location: Location dict with 'lat' and 'lng' keys units: Units (metric/imperial/standard) lang: Language code (zh_tw, en, etc.) Returns: Weather data dict with keys: - condition: str (e.g., "Clear", "Rain") - description: str (e.g., "clear sky") - temperature: float (°C if metric) - feels_like: float - temp_min: float - temp_max: float - pressure: int (hPa) - humidity: int (%) - wind_speed: float (m/s if metric) - wind_direction: int (degrees) - cloudiness: int (%) - rain_1h: float (mm) - timestamp: datetime Raises: requests.RequestException: If API request fails Reference: https://openweathermap.org/current """ try: params = { "lat": location["lat"], "lon": location["lng"], "appid": self.api_key, "units": units, "lang": lang } logger.debug( f"🌤️ Getting current weather for " f"({location['lat']:.4f}, {location['lng']:.4f})" ) response = requests.get( self.current_weather_url, params=params, timeout=10 ) response.raise_for_status() data = response.json() # ✅ 檢查錯誤 if "cod" in data and data["cod"] != 200: raise ValueError(f"API Error: {data.get('message', 'Unknown error')}") # Parse response weather = self._parse_current_weather(data) logger.info( f"✅ Current weather: {weather['condition']}, " f"{weather['temperature']}°C" ) return weather except requests.Timeout: logger.error("⏰ Weather API request timeout") raise except requests.RequestException as e: logger.error(f"❌ Weather API request error: {e}") raise except Exception as e: logger.error(f"❌ Error getting current weather: {e}") raise @staticmethod def _parse_current_weather(data: Dict[str, Any]) -> Dict[str, Any]: """Parse current weather API response""" weather_list = data.get("weather", []) main_weather = weather_list[0] if weather_list else {} main_data = data.get("main", {}) wind_data = data.get("wind", {}) clouds_data = data.get("clouds", {}) rain_data = data.get("rain", {}) return { "condition": main_weather.get("main", "Unknown"), "description": main_weather.get("description", ""), "temperature": main_data.get("temp", 0), "feels_like": main_data.get("feels_like", 0), "temp_min": main_data.get("temp_min", 0), "temp_max": main_data.get("temp_max", 0), "pressure": main_data.get("pressure", 0), "humidity": main_data.get("humidity", 0), "wind_speed": wind_data.get("speed", 0), "wind_direction": wind_data.get("deg", 0), "cloudiness": clouds_data.get("all", 0), "rain_1h": rain_data.get("1h", 0), "timestamp": datetime.fromtimestamp( data.get("dt", 0), tz=timezone.utc ) } # ======================================================================== # 5 Day Forecast API (3-hour step) # ======================================================================== @staticmethod def _parse_forecast_weather(data: Dict[str, Any]) -> Dict[str, Any]: """Parse 5-day forecast API response""" forecast_list = data.get("list", []) hourly = [] for item in forecast_list: weather_list = item.get("weather", []) main_weather = weather_list[0] if weather_list else {} main_data = item.get("main", {}) wind_data = item.get("wind", {}) rain_data = item.get("rain", {}) hourly.append({ "timestamp": datetime.fromtimestamp( item.get("dt", 0), tz=timezone.utc ), "condition": main_weather.get("main", "Unknown"), "description": main_weather.get("description", ""), "temperature": main_data.get("temp", 0), "feels_like": main_data.get("feels_like", 0), "humidity": main_data.get("humidity", 0), "wind_speed": wind_data.get("speed", 0), "rain_3h": rain_data.get("3h", 0), "pop": item.get("pop", 0) # Probability of precipitation }) return { "hourly": hourly, "city": data.get("city", {}) } # ======================================================================== # Current Air Pollution API # ======================================================================== def get_air_pollution( self, location: Dict[str, float] ) -> Dict[str, Any]: """ Get current air pollution data Args: location: Location dict with 'lat' and 'lng' keys Returns: Air quality dict with keys: - aqi: int (1-5, 1=Good, 5=Very Poor) - aqi_label: str ("Good", "Fair", etc.) - co: float (Carbon monoxide, μg/m³) - no: float (Nitrogen monoxide, μg/m³) - no2: float (Nitrogen dioxide, μg/m³) - o3: float (Ozone, μg/m³) - so2: float (Sulphur dioxide, μg/m³) - pm2_5: float (Fine particles, μg/m³) - pm10: float (Coarse particles, μg/m³) - nh3: float (Ammonia, μg/m³) - timestamp: datetime Raises: requests.RequestException: If API request fails Reference: https://openweathermap.org/api/air-pollution """ try: params = { "lat": location["lat"], "lon": location["lng"], "appid": self.api_key } logger.debug( f"🌫️ Getting air pollution for " f"({location['lat']:.4f}, {location['lng']:.4f})" ) response = requests.get( self.air_pollution_url, params=params, timeout=10 ) response.raise_for_status() data = response.json() # Parse response pollution = self._parse_air_pollution(data) logger.info( f"✅ Air quality: AQI {pollution['aqi']} ({pollution['aqi_label']})" ) return pollution except requests.Timeout: logger.error("⏰ Air Pollution API request timeout") raise except requests.RequestException as e: logger.error(f"❌ Air Pollution API request error: {e}") raise except Exception as e: logger.error(f"❌ Error getting air pollution: {e}") raise def _parse_air_pollution(self, data: Dict[str, Any]) -> Dict[str, Any]: """Parse air pollution API response""" list_data = data.get("list", []) if not list_data: return { "aqi": 0, "aqi_label": "Unknown", "co": 0, "no": 0, "no2": 0, "o3": 0, "so2": 0, "pm2_5": 0, "pm10": 0, "nh3": 0, "timestamp": datetime.now(timezone.utc) } item = list_data[0] main = item.get("main", {}) components = item.get("components", {}) # AQI levels: 1 = Good, 2 = Fair, 3 = Moderate, 4 = Poor, 5 = Very Poor aqi = main.get("aqi", 0) aqi_labels = { 1: "Good", 2: "Fair", 3: "Moderate", 4: "Poor", 5: "Very Poor" } return { "aqi": aqi, "aqi_label": aqi_labels.get(aqi, "Unknown"), "co": components.get("co", 0), "no": components.get("no", 0), "no2": components.get("no2", 0), "o3": components.get("o3", 0), "so2": components.get("so2", 0), "pm2_5": components.get("pm2_5", 0), "pm10": components.get("pm10", 0), "nh3": components.get("nh3", 0), "timestamp": datetime.fromtimestamp( item.get("dt", 0), tz=timezone.utc ) } # ======================================================================== # ✅ 新增: Air Pollution Forecast API (Hourly) # ======================================================================== def get_forecast_weather( self, location: Dict[str, float], future_minutes: int = 0, ) -> Dict[str, Any]: """ Get weather forecast for a specific time in the future. Use this to check conditions for planned activities (e.g., "Will it rain at 3 PM?"). Args: location: Target location {"lat": float, "lng": float}. future_minutes: Minutes from NOW. (e.g., 0=now, 180=in 3 hours). Returns: Dict: Forecast data including: - timestamp: datetime - condition: str - temperature: float - pop: float (Probability of Precipitation 0-1) """ try: params = { "lat": location["lat"], "lon": location["lng"], "appid": self.api_key, "units": "metric", "lang": "zh_tw" } response = requests.get(self.forecast_weather_url, params=params, timeout=10) response.raise_for_status() data = response.json() if "cod" in data and data["cod"] != "200": raise ValueError(f"API Error: {data.get('message', 'Unknown error')}") forecast = self._parse_forecast_weather(data) if future_minutes == 0: return forecast['hourly'][0] if forecast['hourly'] else {} else: target_time = datetime.now(timezone.utc) + timedelta(minutes=future_minutes) closest = self._find_closest_forecast(forecast['hourly'], target_time) return closest if closest else {} except Exception as e: logger.error(f"❌ Error getting forecast: {e}") raise @staticmethod def _parse_forecast_air_pollution(data: Dict[str, Any]) -> Dict[str, Any]: """ ✅ 新增方法: Parse air pollution forecast API response """ list_data = data.get("list", []) hourly = [] aqi_labels = { 1: "Good", 2: "Fair", 3: "Moderate", 4: "Poor", 5: "Very Poor" } for item in list_data: main = item.get("main", {}) components = item.get("components", {}) aqi = main.get("aqi", 0) hourly.append({ "timestamp": datetime.fromtimestamp( item.get("dt", 0), tz=timezone.utc ), "aqi": aqi, "aqi_label": aqi_labels.get(aqi, "Unknown"), "co": components.get("co", 0), "no": components.get("no", 0), "no2": components.get("no2", 0), "o3": components.get("o3", 0), "so2": components.get("so2", 0), "pm2_5": components.get("pm2_5", 0), "pm10": components.get("pm10", 0), "nh3": components.get("nh3", 0), }) return { "hourly": hourly } def get_forecast_air_pollution( self, location: Dict[str, float], future_minutes: int = 0 ) -> Dict[str, Any]: """ Get air quality (AQI) forecast for a future time. Args: location: Target location {"lat": float, "lng": float}. future_minutes: Minutes from NOW. Returns: Dict: Air quality data including: - aqi: int (1=Good, 5=Very Poor) - pm2_5: float - timestamp: datetime """ try: params = { "lat": location["lat"], "lon": location["lng"], "appid": self.api_key } response = requests.get(self.forecast_air_pollution_url, params=params, timeout=10) response.raise_for_status() data = response.json() forecast = self._parse_forecast_air_pollution(data) if future_minutes == 0: return forecast['hourly'][0] if forecast['hourly'] else {} else: target_time = datetime.now(timezone.utc) + timedelta(minutes=future_minutes) closest = self._find_closest_forecast(forecast['hourly'], target_time) return closest if closest else {} except Exception as e: logger.error(f"❌ Error getting air pollution forecast: {e}") raise # ============================================================================ # Usage Examples # ============================================================================ if __name__ == "__main__": import os from src.infra.config import get_settings settings = get_settings() # Initialize service api_key = settings.openweather_api_key if not api_key: print("❌ Please set OPENWEATHER_API_KEY environment variable") print(" Get your free API key at: https://openweathermap.org/api") exit(1) service = OpenWeatherMapService(api_key=api_key) # Test location (Taipei 101) taipei_101 = {"lat": 25.0330, "lng": 121.5654} # Example 1: Current weather try: print("\n" + "="*60) print("Example 1: Current Weather") print("="*60) weather = service.get_current_weather(taipei_101) print(f"🌡️ Temperature: {weather['temperature']}°C") print(f"💧 Humidity: {weather['humidity']}%") print(f"🌤️ Condition: {weather['condition']} ({weather['description']})") print(f"💨 Wind: {weather['wind_speed']} m/s") except Exception as e: print(f"❌ Failed: {e}") # Example 2: Weather forecast (3 hours from now) try: print("\n" + "="*60) print("Example 2: Weather Forecast (3 hours)") print("="*60) forecast = service.get_forecast_weather(taipei_101, future_minutes=180) print(f"⏰ Time: {forecast['timestamp']}") print(f"🌡️ Temperature: {forecast['temperature']}°C") print(f"🌧️ Precipitation probability: {forecast['pop']*100:.0f}%") print(f"🌤️ Condition: {forecast['condition']}") except Exception as e: print(f"❌ Failed: {e}") # Example 3: Current air quality try: print("\n" + "="*60) print("Example 3: Current Air Quality") print("="*60) air = service.get_air_pollution(taipei_101) print(f"🌫️ AQI: {air['aqi']} ({air['aqi_label']})") print(f" PM2.5: {air['pm2_5']:.1f} μg/m³") print(f" PM10: {air['pm10']:.1f} μg/m³") print(f" O3: {air['o3']:.1f} μg/m³") except Exception as e: print(f"❌ Failed: {e}") # Example 4: Air quality forecast (6 hours from now) try: print("\n" + "="*60) print("Example 4: Air Quality Forecast (6 hours)") print("="*60) forecast_air = service.get_forecast_air_pollution(taipei_101, future_minutes=360) print(f"⏰ Time: {forecast_air['timestamp']}") print(f"🌫️ AQI: {forecast_air['aqi']} ({forecast_air['aqi_label']})") print(f" PM2.5: {forecast_air['pm2_5']:.1f} μg/m³") print(f" PM10: {forecast_air['pm10']:.1f} μg/m³") except Exception as e: print(f"❌ Failed: {e}") print("\n✅ All examples completed!")