Spaces:
Running
Running
| """ | |
| OpenWeatherMap API Service - Improved Version | |
| """ | |
| import requests | |
| from typing import Dict, Any, Optional, List | |
| from datetime import datetime, timezone, timedelta | |
| from src.infra.logger import get_logger | |
| logger = get_logger(__name__) | |
| class OpenWeatherMapService: | |
| """ | |
| OpenWeatherMap API Service | |
| Provides unified interface to: | |
| - Current Weather API | |
| - 5 Day Forecast API (3-hour step) | |
| - Air Pollution API (Current) | |
| - Air Pollution Forecast API (Hourly) | |
| Free tier limits: | |
| - 1,000 calls/day | |
| - 60 calls/minute | |
| """ | |
| def __init__(self, api_key: Optional[str] = None): | |
| """ | |
| Initialize OpenWeatherMap service | |
| Args: | |
| api_key: OpenWeatherMap API key | |
| Raises: | |
| ValueError: If API key is not provided | |
| """ | |
| # ✅ 改進: 簡化 API key 獲取 | |
| if api_key: | |
| self.api_key = api_key | |
| else: | |
| import os | |
| self.api_key = os.getenv("OPENWEATHER_API_KEY") | |
| # 如果環境變量沒有,嘗試從配置讀取 | |
| if not self.api_key: | |
| try: | |
| from src.infra.config import get_settings | |
| settings = get_settings() | |
| self.api_key = settings.openweather_api_key | |
| except (ImportError, AttributeError): | |
| pass | |
| # ✅ 改進: 在 __init__ 時就檢查,後面不用重複檢查 | |
| if not self.api_key: | |
| raise ValueError( | |
| "OpenWeatherMap API key is required. " # ✅ 修復錯誤訊息 | |
| "Set via parameter, config, or OPENWEATHER_API_KEY env var" | |
| ) | |
| # API endpoints | |
| self.current_weather_url = "https://api.openweathermap.org/data/2.5/weather" | |
| self.forecast_weather_url = "https://api.openweathermap.org/data/2.5/forecast" | |
| self.air_pollution_url = "https://api.openweathermap.org/data/2.5/air_pollution" | |
| self.forecast_air_pollution_url = "https://api.openweathermap.org/data/2.5/air_pollution/forecast" | |
| logger.info("✅ OpenWeatherMap service initialized") | |
| # ======================================================================== | |
| # Helper Functions | |
| # ======================================================================== | |
| def _compute_future_index( | |
| future_minutes: int, | |
| time_step_hours: int = 3 | |
| ) -> int: | |
| """ | |
| Compute the index in forecast array for a future time | |
| Args: | |
| future_minutes: Minutes into the future | |
| time_step_hours: Time step in hours (3 for weather, 1 for air pollution) | |
| Returns: | |
| Index in forecast array | |
| """ | |
| future_hours = future_minutes / 60.0 | |
| index = int(future_hours / time_step_hours) | |
| return index | |
| def _find_closest_forecast( | |
| forecast_list: List[Dict], | |
| target_time: datetime | |
| ) -> Optional[Dict]: | |
| """ | |
| Find the forecast entry closest to target time | |
| Args: | |
| forecast_list: List of forecast entries with 'timestamp' key | |
| target_time: Target datetime | |
| Returns: | |
| Closest forecast entry or None | |
| """ | |
| if not forecast_list: | |
| return None | |
| closest = min( | |
| forecast_list, | |
| key=lambda x: abs((x['timestamp'] - target_time).total_seconds()) | |
| ) | |
| return closest | |
| # ======================================================================== | |
| # Current Weather API | |
| # ======================================================================== | |
| def get_current_weather( | |
| self, | |
| location: Dict[str, float], | |
| units: str = "metric", | |
| lang: str = "zh_tw" | |
| ) -> Dict[str, Any]: | |
| """ | |
| Get current weather at location | |
| Args: | |
| location: Location dict with 'lat' and 'lng' keys | |
| units: Units (metric/imperial/standard) | |
| lang: Language code (zh_tw, en, etc.) | |
| Returns: | |
| Weather data dict with keys: | |
| - condition: str (e.g., "Clear", "Rain") | |
| - description: str (e.g., "clear sky") | |
| - temperature: float (°C if metric) | |
| - feels_like: float | |
| - temp_min: float | |
| - temp_max: float | |
| - pressure: int (hPa) | |
| - humidity: int (%) | |
| - wind_speed: float (m/s if metric) | |
| - wind_direction: int (degrees) | |
| - cloudiness: int (%) | |
| - rain_1h: float (mm) | |
| - timestamp: datetime | |
| Raises: | |
| requests.RequestException: If API request fails | |
| Reference: | |
| https://openweathermap.org/current | |
| """ | |
| try: | |
| params = { | |
| "lat": location["lat"], | |
| "lon": location["lng"], | |
| "appid": self.api_key, | |
| "units": units, | |
| "lang": lang | |
| } | |
| logger.debug( | |
| f"🌤️ Getting current weather for " | |
| f"({location['lat']:.4f}, {location['lng']:.4f})" | |
| ) | |
| response = requests.get( | |
| self.current_weather_url, | |
| params=params, | |
| timeout=10 | |
| ) | |
| response.raise_for_status() | |
| data = response.json() | |
| # ✅ 檢查錯誤 | |
| if "cod" in data and data["cod"] != 200: | |
| raise ValueError(f"API Error: {data.get('message', 'Unknown error')}") | |
| # Parse response | |
| weather = self._parse_current_weather(data) | |
| logger.info( | |
| f"✅ Current weather: {weather['condition']}, " | |
| f"{weather['temperature']}°C" | |
| ) | |
| return weather | |
| except requests.Timeout: | |
| logger.error("⏰ Weather API request timeout") | |
| raise | |
| except requests.RequestException as e: | |
| logger.error(f"❌ Weather API request error: {e}") | |
| raise | |
| except Exception as e: | |
| logger.error(f"❌ Error getting current weather: {e}") | |
| raise | |
| def _parse_current_weather(data: Dict[str, Any]) -> Dict[str, Any]: | |
| """Parse current weather API response""" | |
| weather_list = data.get("weather", []) | |
| main_weather = weather_list[0] if weather_list else {} | |
| main_data = data.get("main", {}) | |
| wind_data = data.get("wind", {}) | |
| clouds_data = data.get("clouds", {}) | |
| rain_data = data.get("rain", {}) | |
| return { | |
| "condition": main_weather.get("main", "Unknown"), | |
| "description": main_weather.get("description", ""), | |
| "temperature": main_data.get("temp", 0), | |
| "feels_like": main_data.get("feels_like", 0), | |
| "temp_min": main_data.get("temp_min", 0), | |
| "temp_max": main_data.get("temp_max", 0), | |
| "pressure": main_data.get("pressure", 0), | |
| "humidity": main_data.get("humidity", 0), | |
| "wind_speed": wind_data.get("speed", 0), | |
| "wind_direction": wind_data.get("deg", 0), | |
| "cloudiness": clouds_data.get("all", 0), | |
| "rain_1h": rain_data.get("1h", 0), | |
| "timestamp": datetime.fromtimestamp( | |
| data.get("dt", 0), | |
| tz=timezone.utc | |
| ) | |
| } | |
| # ======================================================================== | |
| # 5 Day Forecast API (3-hour step) | |
| # ======================================================================== | |
| def _parse_forecast_weather(data: Dict[str, Any]) -> Dict[str, Any]: | |
| """Parse 5-day forecast API response""" | |
| forecast_list = data.get("list", []) | |
| hourly = [] | |
| for item in forecast_list: | |
| weather_list = item.get("weather", []) | |
| main_weather = weather_list[0] if weather_list else {} | |
| main_data = item.get("main", {}) | |
| wind_data = item.get("wind", {}) | |
| rain_data = item.get("rain", {}) | |
| hourly.append({ | |
| "timestamp": datetime.fromtimestamp( | |
| item.get("dt", 0), | |
| tz=timezone.utc | |
| ), | |
| "condition": main_weather.get("main", "Unknown"), | |
| "description": main_weather.get("description", ""), | |
| "temperature": main_data.get("temp", 0), | |
| "feels_like": main_data.get("feels_like", 0), | |
| "humidity": main_data.get("humidity", 0), | |
| "wind_speed": wind_data.get("speed", 0), | |
| "rain_3h": rain_data.get("3h", 0), | |
| "pop": item.get("pop", 0) # Probability of precipitation | |
| }) | |
| return { | |
| "hourly": hourly, | |
| "city": data.get("city", {}) | |
| } | |
| # ======================================================================== | |
| # Current Air Pollution API | |
| # ======================================================================== | |
| def get_air_pollution( | |
| self, | |
| location: Dict[str, float] | |
| ) -> Dict[str, Any]: | |
| """ | |
| Get current air pollution data | |
| Args: | |
| location: Location dict with 'lat' and 'lng' keys | |
| Returns: | |
| Air quality dict with keys: | |
| - aqi: int (1-5, 1=Good, 5=Very Poor) | |
| - aqi_label: str ("Good", "Fair", etc.) | |
| - co: float (Carbon monoxide, μg/m³) | |
| - no: float (Nitrogen monoxide, μg/m³) | |
| - no2: float (Nitrogen dioxide, μg/m³) | |
| - o3: float (Ozone, μg/m³) | |
| - so2: float (Sulphur dioxide, μg/m³) | |
| - pm2_5: float (Fine particles, μg/m³) | |
| - pm10: float (Coarse particles, μg/m³) | |
| - nh3: float (Ammonia, μg/m³) | |
| - timestamp: datetime | |
| Raises: | |
| requests.RequestException: If API request fails | |
| Reference: | |
| https://openweathermap.org/api/air-pollution | |
| """ | |
| try: | |
| params = { | |
| "lat": location["lat"], | |
| "lon": location["lng"], | |
| "appid": self.api_key | |
| } | |
| logger.debug( | |
| f"🌫️ Getting air pollution for " | |
| f"({location['lat']:.4f}, {location['lng']:.4f})" | |
| ) | |
| response = requests.get( | |
| self.air_pollution_url, | |
| params=params, | |
| timeout=10 | |
| ) | |
| response.raise_for_status() | |
| data = response.json() | |
| # Parse response | |
| pollution = self._parse_air_pollution(data) | |
| logger.info( | |
| f"✅ Air quality: AQI {pollution['aqi']} ({pollution['aqi_label']})" | |
| ) | |
| return pollution | |
| except requests.Timeout: | |
| logger.error("⏰ Air Pollution API request timeout") | |
| raise | |
| except requests.RequestException as e: | |
| logger.error(f"❌ Air Pollution API request error: {e}") | |
| raise | |
| except Exception as e: | |
| logger.error(f"❌ Error getting air pollution: {e}") | |
| raise | |
| def _parse_air_pollution(self, data: Dict[str, Any]) -> Dict[str, Any]: | |
| """Parse air pollution API response""" | |
| list_data = data.get("list", []) | |
| if not list_data: | |
| return { | |
| "aqi": 0, | |
| "aqi_label": "Unknown", | |
| "co": 0, | |
| "no": 0, | |
| "no2": 0, | |
| "o3": 0, | |
| "so2": 0, | |
| "pm2_5": 0, | |
| "pm10": 0, | |
| "nh3": 0, | |
| "timestamp": datetime.now(timezone.utc) | |
| } | |
| item = list_data[0] | |
| main = item.get("main", {}) | |
| components = item.get("components", {}) | |
| # AQI levels: 1 = Good, 2 = Fair, 3 = Moderate, 4 = Poor, 5 = Very Poor | |
| aqi = main.get("aqi", 0) | |
| aqi_labels = { | |
| 1: "Good", | |
| 2: "Fair", | |
| 3: "Moderate", | |
| 4: "Poor", | |
| 5: "Very Poor" | |
| } | |
| return { | |
| "aqi": aqi, | |
| "aqi_label": aqi_labels.get(aqi, "Unknown"), | |
| "co": components.get("co", 0), | |
| "no": components.get("no", 0), | |
| "no2": components.get("no2", 0), | |
| "o3": components.get("o3", 0), | |
| "so2": components.get("so2", 0), | |
| "pm2_5": components.get("pm2_5", 0), | |
| "pm10": components.get("pm10", 0), | |
| "nh3": components.get("nh3", 0), | |
| "timestamp": datetime.fromtimestamp( | |
| item.get("dt", 0), | |
| tz=timezone.utc | |
| ) | |
| } | |
| # ======================================================================== | |
| # ✅ 新增: Air Pollution Forecast API (Hourly) | |
| # ======================================================================== | |
| def get_forecast_weather( | |
| self, | |
| location: Dict[str, float], | |
| future_minutes: int = 0, | |
| ) -> Dict[str, Any]: | |
| """ | |
| Get weather forecast for a specific time in the future. | |
| Use this to check conditions for planned activities (e.g., "Will it rain at 3 PM?"). | |
| Args: | |
| location: Target location {"lat": float, "lng": float}. | |
| future_minutes: Minutes from NOW. (e.g., 0=now, 180=in 3 hours). | |
| Returns: | |
| Dict: Forecast data including: | |
| - timestamp: datetime | |
| - condition: str | |
| - temperature: float | |
| - pop: float (Probability of Precipitation 0-1) | |
| """ | |
| try: | |
| params = { | |
| "lat": location["lat"], | |
| "lon": location["lng"], | |
| "appid": self.api_key, | |
| "units": "metric", | |
| "lang": "zh_tw" | |
| } | |
| response = requests.get(self.forecast_weather_url, params=params, timeout=10) | |
| response.raise_for_status() | |
| data = response.json() | |
| if "cod" in data and data["cod"] != "200": | |
| raise ValueError(f"API Error: {data.get('message', 'Unknown error')}") | |
| forecast = self._parse_forecast_weather(data) | |
| if future_minutes == 0: | |
| return forecast['hourly'][0] if forecast['hourly'] else {} | |
| else: | |
| target_time = datetime.now(timezone.utc) + timedelta(minutes=future_minutes) | |
| closest = self._find_closest_forecast(forecast['hourly'], target_time) | |
| return closest if closest else {} | |
| except Exception as e: | |
| logger.error(f"❌ Error getting forecast: {e}") | |
| raise | |
| def _parse_forecast_air_pollution(data: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| ✅ 新增方法: Parse air pollution forecast API response | |
| """ | |
| list_data = data.get("list", []) | |
| hourly = [] | |
| aqi_labels = { | |
| 1: "Good", | |
| 2: "Fair", | |
| 3: "Moderate", | |
| 4: "Poor", | |
| 5: "Very Poor" | |
| } | |
| for item in list_data: | |
| main = item.get("main", {}) | |
| components = item.get("components", {}) | |
| aqi = main.get("aqi", 0) | |
| hourly.append({ | |
| "timestamp": datetime.fromtimestamp( | |
| item.get("dt", 0), | |
| tz=timezone.utc | |
| ), | |
| "aqi": aqi, | |
| "aqi_label": aqi_labels.get(aqi, "Unknown"), | |
| "co": components.get("co", 0), | |
| "no": components.get("no", 0), | |
| "no2": components.get("no2", 0), | |
| "o3": components.get("o3", 0), | |
| "so2": components.get("so2", 0), | |
| "pm2_5": components.get("pm2_5", 0), | |
| "pm10": components.get("pm10", 0), | |
| "nh3": components.get("nh3", 0), | |
| }) | |
| return { | |
| "hourly": hourly | |
| } | |
| def get_forecast_air_pollution( | |
| self, | |
| location: Dict[str, float], | |
| future_minutes: int = 0 | |
| ) -> Dict[str, Any]: | |
| """ | |
| Get air quality (AQI) forecast for a future time. | |
| Args: | |
| location: Target location {"lat": float, "lng": float}. | |
| future_minutes: Minutes from NOW. | |
| Returns: | |
| Dict: Air quality data including: | |
| - aqi: int (1=Good, 5=Very Poor) | |
| - pm2_5: float | |
| - timestamp: datetime | |
| """ | |
| try: | |
| params = { | |
| "lat": location["lat"], | |
| "lon": location["lng"], | |
| "appid": self.api_key | |
| } | |
| response = requests.get(self.forecast_air_pollution_url, params=params, timeout=10) | |
| response.raise_for_status() | |
| data = response.json() | |
| forecast = self._parse_forecast_air_pollution(data) | |
| if future_minutes == 0: | |
| return forecast['hourly'][0] if forecast['hourly'] else {} | |
| else: | |
| target_time = datetime.now(timezone.utc) + timedelta(minutes=future_minutes) | |
| closest = self._find_closest_forecast(forecast['hourly'], target_time) | |
| return closest if closest else {} | |
| except Exception as e: | |
| logger.error(f"❌ Error getting air pollution forecast: {e}") | |
| raise | |
| # ============================================================================ | |
| # Usage Examples | |
| # ============================================================================ | |
| if __name__ == "__main__": | |
| import os | |
| from src.infra.config import get_settings | |
| settings = get_settings() | |
| # Initialize service | |
| api_key = settings.openweather_api_key | |
| if not api_key: | |
| print("❌ Please set OPENWEATHER_API_KEY environment variable") | |
| print(" Get your free API key at: https://openweathermap.org/api") | |
| exit(1) | |
| service = OpenWeatherMapService(api_key=api_key) | |
| # Test location (Taipei 101) | |
| taipei_101 = {"lat": 25.0330, "lng": 121.5654} | |
| # Example 1: Current weather | |
| try: | |
| print("\n" + "="*60) | |
| print("Example 1: Current Weather") | |
| print("="*60) | |
| weather = service.get_current_weather(taipei_101) | |
| print(f"🌡️ Temperature: {weather['temperature']}°C") | |
| print(f"💧 Humidity: {weather['humidity']}%") | |
| print(f"🌤️ Condition: {weather['condition']} ({weather['description']})") | |
| print(f"💨 Wind: {weather['wind_speed']} m/s") | |
| except Exception as e: | |
| print(f"❌ Failed: {e}") | |
| # Example 2: Weather forecast (3 hours from now) | |
| try: | |
| print("\n" + "="*60) | |
| print("Example 2: Weather Forecast (3 hours)") | |
| print("="*60) | |
| forecast = service.get_forecast_weather(taipei_101, future_minutes=180) | |
| print(f"⏰ Time: {forecast['timestamp']}") | |
| print(f"🌡️ Temperature: {forecast['temperature']}°C") | |
| print(f"🌧️ Precipitation probability: {forecast['pop']*100:.0f}%") | |
| print(f"🌤️ Condition: {forecast['condition']}") | |
| except Exception as e: | |
| print(f"❌ Failed: {e}") | |
| # Example 3: Current air quality | |
| try: | |
| print("\n" + "="*60) | |
| print("Example 3: Current Air Quality") | |
| print("="*60) | |
| air = service.get_air_pollution(taipei_101) | |
| print(f"🌫️ AQI: {air['aqi']} ({air['aqi_label']})") | |
| print(f" PM2.5: {air['pm2_5']:.1f} μg/m³") | |
| print(f" PM10: {air['pm10']:.1f} μg/m³") | |
| print(f" O3: {air['o3']:.1f} μg/m³") | |
| except Exception as e: | |
| print(f"❌ Failed: {e}") | |
| # Example 4: Air quality forecast (6 hours from now) | |
| try: | |
| print("\n" + "="*60) | |
| print("Example 4: Air Quality Forecast (6 hours)") | |
| print("="*60) | |
| forecast_air = service.get_forecast_air_pollution(taipei_101, future_minutes=360) | |
| print(f"⏰ Time: {forecast_air['timestamp']}") | |
| print(f"🌫️ AQI: {forecast_air['aqi']} ({forecast_air['aqi_label']})") | |
| print(f" PM2.5: {forecast_air['pm2_5']:.1f} μg/m³") | |
| print(f" PM10: {forecast_air['pm10']:.1f} μg/m³") | |
| except Exception as e: | |
| print(f"❌ Failed: {e}") | |
| print("\n✅ All examples completed!") |