Spaces:
Sleeping
Sleeping
| from mcp.server.fastmcp import FastMCP | |
| from typing import Dict, Any, Optional | |
| from contextvars import ContextVar | |
| from starlette.middleware.base import BaseHTTPMiddleware | |
| from starlette.requests import Request | |
| from fastmcp.server.middleware import Middleware, MiddlewareContext | |
| import requests | |
| mcp = FastMCP(name="OpenWeatherServer", stateless_http=True) | |
| OPENWEATHER_BASE_URL = "https://api.openweathermap.org/data/2.5" | |
| # Context variable to hold the token for the current request | |
| _current_token: ContextVar[Optional[str]] = ContextVar("openweather_token", default=None) | |
| class AuthorizationHeaderMiddleware(BaseHTTPMiddleware): | |
| async def dispatch(self, request: Request, call_next): | |
| auth_header = request.headers.get("Authorization", "").strip() | |
| token: Optional[str] = None | |
| if auth_header: | |
| lower = auth_header.lower() | |
| if lower.startswith("bearer "): | |
| token = auth_header[7:].strip() | |
| else: | |
| token = auth_header | |
| _current_token.set(token) | |
| response = await call_next(request) | |
| return response | |
| class RequireAuthMiddleware(Middleware): | |
| async def on_request(self, context: MiddlewareContext, call_next): | |
| # Enforce a token is present for any MCP request (tool calls, listings, etc.) | |
| token = _current_token.get() | |
| if not token: | |
| raise ValueError( | |
| "Missing OpenWeather API key. Provide it as a Bearer token in the Authorization header." | |
| ) | |
| return await call_next(context) | |
| # Register FastMCP middleware | |
| mcp.add_middleware(RequireAuthMiddleware()) | |
| def _require_api_key_from_header() -> str: | |
| token = _current_token.get() | |
| if not token: | |
| raise ValueError( | |
| "Missing OpenWeather API key. Provide it as a Bearer token in the Authorization header." | |
| ) | |
| return token | |
| def _request(path: str, params: Dict[str, Any]) -> Dict[str, Any]: | |
| url = f"{OPENWEATHER_BASE_URL}/{path}" | |
| try: | |
| response = requests.get(url, params=params, timeout=15) | |
| except requests.RequestException as exc: | |
| raise RuntimeError(f"Failed to call OpenWeather: {exc}") from exc | |
| if response.status_code != 200: | |
| try: | |
| payload = response.json() | |
| except Exception: | |
| payload = {"message": response.text} | |
| message = payload.get("message") or payload | |
| raise ValueError(f"OpenWeather error {response.status_code}: {message}") | |
| return response.json() | |
| def get_current_weather_city( | |
| city: str, | |
| country_code: str = "", | |
| units: str = "metric", | |
| lang: str = "en", | |
| ) -> Dict[str, Any]: | |
| key = _require_api_key_from_header() | |
| q = city if not country_code else f"{city},{country_code}" | |
| params = {"q": q, "appid": key, "units": units, "lang": lang} | |
| return _request("weather", params) | |
| def get_current_weather_coords( | |
| lat: float, | |
| lon: float, | |
| units: str = "metric", | |
| lang: str = "en", | |
| ) -> Dict[str, Any]: | |
| key = _require_api_key_from_header() | |
| params = {"lat": lat, "lon": lon, "appid": key, "units": units, "lang": lang} | |
| return _request("weather", params) | |
| def get_forecast_city( | |
| city: str, | |
| country_code: str = "", | |
| units: str = "metric", | |
| lang: str = "en", | |
| ) -> Dict[str, Any]: | |
| key = _require_api_key_from_header() | |
| q = city if not country_code else f"{city},{country_code}" | |
| params = {"q": q, "appid": key, "units": units, "lang": lang} | |
| return _request("forecast", params) | |
| def get_forecast_coords( | |
| lat: float, | |
| lon: float, | |
| units: str = "metric", | |
| lang: str = "en", | |
| ) -> Dict[str, Any]: | |
| key = _require_api_key_from_header() | |
| params = {"lat": lat, "lon": lon, "appid": key, "units": units, "lang": lang} | |
| return _request("forecast", params) | |
| def openweather_http_app(): | |
| app = mcp.streamable_http_app() | |
| app.add_middleware(AuthorizationHeaderMiddleware) | |
| return app | |