LifeFlow-AI / src /services /openweather_service.py
Marco310's picture
build outside api server layers
a6744e0
raw
history blame
20.5 kB
"""
OpenWeatherMap API Service - Improved Version
"""
import requests
from typing import Dict, Any, Optional, List
from datetime import datetime, timezone, timedelta
from src.infra.logger import get_logger
logger = get_logger(__name__)
class OpenWeatherMapService:
"""
OpenWeatherMap API Service
Provides unified interface to:
- Current Weather API
- 5 Day Forecast API (3-hour step)
- Air Pollution API (Current)
- Air Pollution Forecast API (Hourly)
Free tier limits:
- 1,000 calls/day
- 60 calls/minute
"""
def __init__(self, api_key: Optional[str] = None):
"""
Initialize OpenWeatherMap service
Args:
api_key: OpenWeatherMap API key
Raises:
ValueError: If API key is not provided
"""
# ✅ 改進: 簡化 API key 獲取
if api_key:
self.api_key = api_key
else:
import os
self.api_key = os.getenv("OPENWEATHER_API_KEY")
# 如果環境變量沒有,嘗試從配置讀取
if not self.api_key:
try:
from src.infra.config import get_settings
settings = get_settings()
self.api_key = settings.openweather_api_key
except (ImportError, AttributeError):
pass
# ✅ 改進: 在 __init__ 時就檢查,後面不用重複檢查
if not self.api_key:
raise ValueError(
"OpenWeatherMap API key is required. " # ✅ 修復錯誤訊息
"Set via parameter, config, or OPENWEATHER_API_KEY env var"
)
# API endpoints
self.current_weather_url = "https://api.openweathermap.org/data/2.5/weather"
self.forecast_weather_url = "https://api.openweathermap.org/data/2.5/forecast"
self.air_pollution_url = "https://api.openweathermap.org/data/2.5/air_pollution"
self.forecast_air_pollution_url = "https://api.openweathermap.org/data/2.5/air_pollution/forecast"
logger.info("✅ OpenWeatherMap service initialized")
# ========================================================================
# Helper Functions
# ========================================================================
@staticmethod
def _compute_future_index(
future_minutes: int,
time_step_hours: int = 3
) -> int:
"""
Compute the index in forecast array for a future time
Args:
future_minutes: Minutes into the future
time_step_hours: Time step in hours (3 for weather, 1 for air pollution)
Returns:
Index in forecast array
"""
future_hours = future_minutes / 60.0
index = int(future_hours / time_step_hours)
return index
@staticmethod
def _find_closest_forecast(
forecast_list: List[Dict],
target_time: datetime
) -> Optional[Dict]:
"""
Find the forecast entry closest to target time
Args:
forecast_list: List of forecast entries with 'timestamp' key
target_time: Target datetime
Returns:
Closest forecast entry or None
"""
if not forecast_list:
return None
closest = min(
forecast_list,
key=lambda x: abs((x['timestamp'] - target_time).total_seconds())
)
return closest
# ========================================================================
# Current Weather API
# ========================================================================
def get_current_weather(
self,
location: Dict[str, float],
units: str = "metric",
lang: str = "zh_tw"
) -> Dict[str, Any]:
"""
Get current weather at location
Args:
location: Location dict with 'lat' and 'lng' keys
units: Units (metric/imperial/standard)
lang: Language code (zh_tw, en, etc.)
Returns:
Weather data dict with keys:
- condition: str (e.g., "Clear", "Rain")
- description: str (e.g., "clear sky")
- temperature: float (°C if metric)
- feels_like: float
- temp_min: float
- temp_max: float
- pressure: int (hPa)
- humidity: int (%)
- wind_speed: float (m/s if metric)
- wind_direction: int (degrees)
- cloudiness: int (%)
- rain_1h: float (mm)
- timestamp: datetime
Raises:
requests.RequestException: If API request fails
Reference:
https://openweathermap.org/current
"""
try:
params = {
"lat": location["lat"],
"lon": location["lng"],
"appid": self.api_key,
"units": units,
"lang": lang
}
logger.debug(
f"🌤️ Getting current weather for "
f"({location['lat']:.4f}, {location['lng']:.4f})"
)
response = requests.get(
self.current_weather_url,
params=params,
timeout=10
)
response.raise_for_status()
data = response.json()
# ✅ 檢查錯誤
if "cod" in data and data["cod"] != 200:
raise ValueError(f"API Error: {data.get('message', 'Unknown error')}")
# Parse response
weather = self._parse_current_weather(data)
logger.info(
f"✅ Current weather: {weather['condition']}, "
f"{weather['temperature']}°C"
)
return weather
except requests.Timeout:
logger.error("⏰ Weather API request timeout")
raise
except requests.RequestException as e:
logger.error(f"❌ Weather API request error: {e}")
raise
except Exception as e:
logger.error(f"❌ Error getting current weather: {e}")
raise
@staticmethod
def _parse_current_weather(data: Dict[str, Any]) -> Dict[str, Any]:
"""Parse current weather API response"""
weather_list = data.get("weather", [])
main_weather = weather_list[0] if weather_list else {}
main_data = data.get("main", {})
wind_data = data.get("wind", {})
clouds_data = data.get("clouds", {})
rain_data = data.get("rain", {})
return {
"condition": main_weather.get("main", "Unknown"),
"description": main_weather.get("description", ""),
"temperature": main_data.get("temp", 0),
"feels_like": main_data.get("feels_like", 0),
"temp_min": main_data.get("temp_min", 0),
"temp_max": main_data.get("temp_max", 0),
"pressure": main_data.get("pressure", 0),
"humidity": main_data.get("humidity", 0),
"wind_speed": wind_data.get("speed", 0),
"wind_direction": wind_data.get("deg", 0),
"cloudiness": clouds_data.get("all", 0),
"rain_1h": rain_data.get("1h", 0),
"timestamp": datetime.fromtimestamp(
data.get("dt", 0),
tz=timezone.utc
)
}
# ========================================================================
# 5 Day Forecast API (3-hour step)
# ========================================================================
@staticmethod
def _parse_forecast_weather(data: Dict[str, Any]) -> Dict[str, Any]:
"""Parse 5-day forecast API response"""
forecast_list = data.get("list", [])
hourly = []
for item in forecast_list:
weather_list = item.get("weather", [])
main_weather = weather_list[0] if weather_list else {}
main_data = item.get("main", {})
wind_data = item.get("wind", {})
rain_data = item.get("rain", {})
hourly.append({
"timestamp": datetime.fromtimestamp(
item.get("dt", 0),
tz=timezone.utc
),
"condition": main_weather.get("main", "Unknown"),
"description": main_weather.get("description", ""),
"temperature": main_data.get("temp", 0),
"feels_like": main_data.get("feels_like", 0),
"humidity": main_data.get("humidity", 0),
"wind_speed": wind_data.get("speed", 0),
"rain_3h": rain_data.get("3h", 0),
"pop": item.get("pop", 0) # Probability of precipitation
})
return {
"hourly": hourly,
"city": data.get("city", {})
}
# ========================================================================
# Current Air Pollution API
# ========================================================================
def get_air_pollution(
self,
location: Dict[str, float]
) -> Dict[str, Any]:
"""
Get current air pollution data
Args:
location: Location dict with 'lat' and 'lng' keys
Returns:
Air quality dict with keys:
- aqi: int (1-5, 1=Good, 5=Very Poor)
- aqi_label: str ("Good", "Fair", etc.)
- co: float (Carbon monoxide, μg/m³)
- no: float (Nitrogen monoxide, μg/m³)
- no2: float (Nitrogen dioxide, μg/m³)
- o3: float (Ozone, μg/m³)
- so2: float (Sulphur dioxide, μg/m³)
- pm2_5: float (Fine particles, μg/m³)
- pm10: float (Coarse particles, μg/m³)
- nh3: float (Ammonia, μg/m³)
- timestamp: datetime
Raises:
requests.RequestException: If API request fails
Reference:
https://openweathermap.org/api/air-pollution
"""
try:
params = {
"lat": location["lat"],
"lon": location["lng"],
"appid": self.api_key
}
logger.debug(
f"🌫️ Getting air pollution for "
f"({location['lat']:.4f}, {location['lng']:.4f})"
)
response = requests.get(
self.air_pollution_url,
params=params,
timeout=10
)
response.raise_for_status()
data = response.json()
# Parse response
pollution = self._parse_air_pollution(data)
logger.info(
f"✅ Air quality: AQI {pollution['aqi']} ({pollution['aqi_label']})"
)
return pollution
except requests.Timeout:
logger.error("⏰ Air Pollution API request timeout")
raise
except requests.RequestException as e:
logger.error(f"❌ Air Pollution API request error: {e}")
raise
except Exception as e:
logger.error(f"❌ Error getting air pollution: {e}")
raise
def _parse_air_pollution(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Parse air pollution API response"""
list_data = data.get("list", [])
if not list_data:
return {
"aqi": 0,
"aqi_label": "Unknown",
"co": 0,
"no": 0,
"no2": 0,
"o3": 0,
"so2": 0,
"pm2_5": 0,
"pm10": 0,
"nh3": 0,
"timestamp": datetime.now(timezone.utc)
}
item = list_data[0]
main = item.get("main", {})
components = item.get("components", {})
# AQI levels: 1 = Good, 2 = Fair, 3 = Moderate, 4 = Poor, 5 = Very Poor
aqi = main.get("aqi", 0)
aqi_labels = {
1: "Good",
2: "Fair",
3: "Moderate",
4: "Poor",
5: "Very Poor"
}
return {
"aqi": aqi,
"aqi_label": aqi_labels.get(aqi, "Unknown"),
"co": components.get("co", 0),
"no": components.get("no", 0),
"no2": components.get("no2", 0),
"o3": components.get("o3", 0),
"so2": components.get("so2", 0),
"pm2_5": components.get("pm2_5", 0),
"pm10": components.get("pm10", 0),
"nh3": components.get("nh3", 0),
"timestamp": datetime.fromtimestamp(
item.get("dt", 0),
tz=timezone.utc
)
}
# ========================================================================
# ✅ 新增: Air Pollution Forecast API (Hourly)
# ========================================================================
def get_forecast_weather(
self,
location: Dict[str, float],
future_minutes: int = 0,
) -> Dict[str, Any]:
"""
Get weather forecast for a specific time in the future.
Use this to check conditions for planned activities (e.g., "Will it rain at 3 PM?").
Args:
location: Target location {"lat": float, "lng": float}.
future_minutes: Minutes from NOW. (e.g., 0=now, 180=in 3 hours).
Returns:
Dict: Forecast data including:
- timestamp: datetime
- condition: str
- temperature: float
- pop: float (Probability of Precipitation 0-1)
"""
try:
params = {
"lat": location["lat"],
"lon": location["lng"],
"appid": self.api_key,
"units": "metric",
"lang": "zh_tw"
}
response = requests.get(self.forecast_weather_url, params=params, timeout=10)
response.raise_for_status()
data = response.json()
if "cod" in data and data["cod"] != "200":
raise ValueError(f"API Error: {data.get('message', 'Unknown error')}")
forecast = self._parse_forecast_weather(data)
if future_minutes == 0:
return forecast['hourly'][0] if forecast['hourly'] else {}
else:
target_time = datetime.now(timezone.utc) + timedelta(minutes=future_minutes)
closest = self._find_closest_forecast(forecast['hourly'], target_time)
return closest if closest else {}
except Exception as e:
logger.error(f"❌ Error getting forecast: {e}")
raise
@staticmethod
def _parse_forecast_air_pollution(data: Dict[str, Any]) -> Dict[str, Any]:
"""
✅ 新增方法: Parse air pollution forecast API response
"""
list_data = data.get("list", [])
hourly = []
aqi_labels = {
1: "Good",
2: "Fair",
3: "Moderate",
4: "Poor",
5: "Very Poor"
}
for item in list_data:
main = item.get("main", {})
components = item.get("components", {})
aqi = main.get("aqi", 0)
hourly.append({
"timestamp": datetime.fromtimestamp(
item.get("dt", 0),
tz=timezone.utc
),
"aqi": aqi,
"aqi_label": aqi_labels.get(aqi, "Unknown"),
"co": components.get("co", 0),
"no": components.get("no", 0),
"no2": components.get("no2", 0),
"o3": components.get("o3", 0),
"so2": components.get("so2", 0),
"pm2_5": components.get("pm2_5", 0),
"pm10": components.get("pm10", 0),
"nh3": components.get("nh3", 0),
})
return {
"hourly": hourly
}
def get_forecast_air_pollution(
self,
location: Dict[str, float],
future_minutes: int = 0
) -> Dict[str, Any]:
"""
Get air quality (AQI) forecast for a future time.
Args:
location: Target location {"lat": float, "lng": float}.
future_minutes: Minutes from NOW.
Returns:
Dict: Air quality data including:
- aqi: int (1=Good, 5=Very Poor)
- pm2_5: float
- timestamp: datetime
"""
try:
params = {
"lat": location["lat"],
"lon": location["lng"],
"appid": self.api_key
}
response = requests.get(self.forecast_air_pollution_url, params=params, timeout=10)
response.raise_for_status()
data = response.json()
forecast = self._parse_forecast_air_pollution(data)
if future_minutes == 0:
return forecast['hourly'][0] if forecast['hourly'] else {}
else:
target_time = datetime.now(timezone.utc) + timedelta(minutes=future_minutes)
closest = self._find_closest_forecast(forecast['hourly'], target_time)
return closest if closest else {}
except Exception as e:
logger.error(f"❌ Error getting air pollution forecast: {e}")
raise
# ============================================================================
# Usage Examples
# ============================================================================
if __name__ == "__main__":
import os
from src.infra.config import get_settings
settings = get_settings()
# Initialize service
api_key = settings.openweather_api_key
if not api_key:
print("❌ Please set OPENWEATHER_API_KEY environment variable")
print(" Get your free API key at: https://openweathermap.org/api")
exit(1)
service = OpenWeatherMapService(api_key=api_key)
# Test location (Taipei 101)
taipei_101 = {"lat": 25.0330, "lng": 121.5654}
# Example 1: Current weather
try:
print("\n" + "="*60)
print("Example 1: Current Weather")
print("="*60)
weather = service.get_current_weather(taipei_101)
print(f"🌡️ Temperature: {weather['temperature']}°C")
print(f"💧 Humidity: {weather['humidity']}%")
print(f"🌤️ Condition: {weather['condition']} ({weather['description']})")
print(f"💨 Wind: {weather['wind_speed']} m/s")
except Exception as e:
print(f"❌ Failed: {e}")
# Example 2: Weather forecast (3 hours from now)
try:
print("\n" + "="*60)
print("Example 2: Weather Forecast (3 hours)")
print("="*60)
forecast = service.get_forecast_weather(taipei_101, future_minutes=180)
print(f"⏰ Time: {forecast['timestamp']}")
print(f"🌡️ Temperature: {forecast['temperature']}°C")
print(f"🌧️ Precipitation probability: {forecast['pop']*100:.0f}%")
print(f"🌤️ Condition: {forecast['condition']}")
except Exception as e:
print(f"❌ Failed: {e}")
# Example 3: Current air quality
try:
print("\n" + "="*60)
print("Example 3: Current Air Quality")
print("="*60)
air = service.get_air_pollution(taipei_101)
print(f"🌫️ AQI: {air['aqi']} ({air['aqi_label']})")
print(f" PM2.5: {air['pm2_5']:.1f} μg/m³")
print(f" PM10: {air['pm10']:.1f} μg/m³")
print(f" O3: {air['o3']:.1f} μg/m³")
except Exception as e:
print(f"❌ Failed: {e}")
# Example 4: Air quality forecast (6 hours from now)
try:
print("\n" + "="*60)
print("Example 4: Air Quality Forecast (6 hours)")
print("="*60)
forecast_air = service.get_forecast_air_pollution(taipei_101, future_minutes=360)
print(f"⏰ Time: {forecast_air['timestamp']}")
print(f"🌫️ AQI: {forecast_air['aqi']} ({forecast_air['aqi_label']})")
print(f" PM2.5: {forecast_air['pm2_5']:.1f} μg/m³")
print(f" PM10: {forecast_air['pm10']:.1f} μg/m³")
except Exception as e:
print(f"❌ Failed: {e}")
print("\n✅ All examples completed!")