File size: 4,373 Bytes
692e2cd
80dc3a4
d3ac497
80dc3a4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
692e2cd
d3ac497
80dc3a4
 
d3ac497
 
80dc3a4
 
 
 
 
 
 
d3ac497
 
80dc3a4
 
 
 
 
 
d3ac497
80dc3a4
 
 
 
 
 
 
d3ac497
80dc3a4
d3ac497
 
80dc3a4
 
 
 
 
 
 
 
 
 
 
d3ac497
 
80dc3a4
 
d3ac497
 
80dc3a4
d3ac497
80dc3a4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d3ac497
 
80dc3a4
d3ac497
80dc3a4
 
 
 
d3ac497
 
80dc3a4
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
from mcp.server.fastmcp import FastMCP
from typing import Dict, Any, Optional
from contextvars import ContextVar
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from fastmcp.server.middleware import Middleware, MiddlewareContext
import requests

mcp = FastMCP(name="OpenWeatherServer", stateless_http=True)

OPENWEATHER_BASE_URL = "https://api.openweathermap.org/data/2.5"

# Context variable to hold the token for the current request
_current_token: ContextVar[Optional[str]] = ContextVar("openweather_token", default=None)


class AuthorizationHeaderMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        auth_header = request.headers.get("Authorization", "").strip()
        token: Optional[str] = None
        if auth_header:
            lower = auth_header.lower()
            if lower.startswith("bearer "):
                token = auth_header[7:].strip()
            else:
                token = auth_header
        _current_token.set(token)
        response = await call_next(request)
        return response


class RequireAuthMiddleware(Middleware):
    async def on_request(self, context: MiddlewareContext, call_next):
        # Enforce a token is present for any MCP request (tool calls, listings, etc.)
        token = _current_token.get()
        if not token:
            raise ValueError(
                "Missing OpenWeather API key. Provide it as a Bearer token in the Authorization header."
            )
        return await call_next(context)


# Register FastMCP middleware
mcp.add_middleware(RequireAuthMiddleware())


def _require_api_key_from_header() -> str:
    token = _current_token.get()
    if not token:
        raise ValueError(
            "Missing OpenWeather API key. Provide it as a Bearer token in the Authorization header."
        )
    return token


def _request(path: str, params: Dict[str, Any]) -> Dict[str, Any]:
    url = f"{OPENWEATHER_BASE_URL}/{path}"
    try:
        response = requests.get(url, params=params, timeout=15)
    except requests.RequestException as exc:
        raise RuntimeError(f"Failed to call OpenWeather: {exc}") from exc

    if response.status_code != 200:
        try:
            payload = response.json()
        except Exception:
            payload = {"message": response.text}
        message = payload.get("message") or payload
        raise ValueError(f"OpenWeather error {response.status_code}: {message}")

    return response.json()


@mcp.tool(description="Get current weather by city name. Token must be in Authorization: Bearer <token> header.")
def get_current_weather_city(
    city: str,
    country_code: str = "",
    units: str = "metric",
    lang: str = "en",
) -> Dict[str, Any]:
    key = _require_api_key_from_header()
    q = city if not country_code else f"{city},{country_code}"
    params = {"q": q, "appid": key, "units": units, "lang": lang}
    return _request("weather", params)


@mcp.tool(description="Get current weather by geographic coordinates. Token must be in Authorization header.")
def get_current_weather_coords(
    lat: float,
    lon: float,
    units: str = "metric",
    lang: str = "en",
) -> Dict[str, Any]:
    key = _require_api_key_from_header()
    params = {"lat": lat, "lon": lon, "appid": key, "units": units, "lang": lang}
    return _request("weather", params)


@mcp.tool(description="Get 5 day / 3 hour forecast by city. Token must be in Authorization header.")
def get_forecast_city(
    city: str,
    country_code: str = "",
    units: str = "metric",
    lang: str = "en",
) -> Dict[str, Any]:
    key = _require_api_key_from_header()
    q = city if not country_code else f"{city},{country_code}"
    params = {"q": q, "appid": key, "units": units, "lang": lang}
    return _request("forecast", params)


@mcp.tool(description="Get 5 day / 3 hour forecast by coordinates. Token must be in Authorization header.")
def get_forecast_coords(
    lat: float,
    lon: float,
    units: str = "metric",
    lang: str = "en",
) -> Dict[str, Any]:
    key = _require_api_key_from_header()
    params = {"lat": lat, "lon": lon, "appid": key, "units": units, "lang": lang}
    return _request("forecast", params)


def openweather_http_app():
    app = mcp.streamable_http_app()
    app.add_middleware(AuthorizationHeaderMiddleware)
    return app