File size: 4,553 Bytes
b9a4f82
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
#!/usr/bin/env python3
"""Integration checks for JWT + HTTP MCP in HF Spaces."""

import os
import sys
from datetime import timedelta
from pathlib import Path

import pytest
import requests
from dotenv import load_dotenv
from mcp.server.streamable_http import MCP_SESSION_ID_HEADER

REPO_ROOT = Path(__file__).resolve().parents[3]
ENV_PATH = REPO_ROOT / "backend" / ".env"

# Load environment variables even when running from a different cwd
load_dotenv(dotenv_path=ENV_PATH)

# Make backend importable
BACKEND_ROOT = REPO_ROOT / "backend"
if str(BACKEND_ROOT) not in sys.path:
    sys.path.insert(0, str(BACKEND_ROOT))

from backend.src.services.auth import AuthService  # noqa: E402
from backend.src.services.config import get_config  # noqa: E402

BASE_URL = os.getenv("MCP_BASE_URL", "http://localhost:8001/mcp")
HTTP_TIMEOUT = float(os.getenv("MCP_TEST_TIMEOUT", "8.0"))


@pytest.fixture(scope="module")
def auth_service() -> AuthService:
    return AuthService(config=get_config())


@pytest.fixture(scope="module")
def tokens(auth_service: AuthService) -> dict[str, str]:
    users = [
        {"id": "hf_user_alice_123", "name": "Alice"},
        {"id": "hf_user_bob_456", "name": "Bob"},
        {"id": "hf_user_charlie_789", "name": "Charlie"},
    ]

    # If no JWT secret is configured, skip integration JWT issuance tests.
    if not auth_service.config.jwt_secret_key:
        pytest.skip("JWT_SECRET_KEY not configured; skipping integration JWT issuance tests")

    return {user["id"]: auth_service.create_jwt(user["id"]) for user in users}


@pytest.mark.integration
def test_jwt_generation_and_validation(auth_service: AuthService, tokens: dict[str, str]) -> None:
    for user_id, token in tokens.items():
        payload = auth_service.validate_jwt(token)
        assert payload.sub == user_id


def _post_or_skip(payload: dict, headers: dict) -> requests.Response:
    try:
        return requests.post(BASE_URL, json=payload, headers=headers, timeout=HTTP_TIMEOUT)
    except (requests.exceptions.ConnectionError, requests.exceptions.Timeout):
        pytest.skip(f"MCP server not reachable at {BASE_URL}")


@pytest.mark.integration
def test_http_initialize_and_list_notes(tokens: dict[str, str]) -> None:
    init_request = {
        "jsonrpc": "2.0",
        "id": 1,
        "method": "initialize",
        "params": {
            "protocolVersion": "2024-11-05",
            "capabilities": {},
            "clientInfo": {"name": "test-client", "version": "1.0.0"},
        },
    }

    tool_request = {
        "jsonrpc": "2.0",
        "id": 2,
        "method": "tools/call",
        "params": {"name": "list_notes", "arguments": {}},
    }

    for user_id, token in tokens.items():
        headers = {
            "Authorization": f"Bearer {token}",
            "Content-Type": "application/json",
            "Accept": "application/json, text/event-stream",
        }

        init_resp = _post_or_skip(init_request, headers)
        assert init_resp.status_code == 200, init_resp.text[:200]

        session_id = init_resp.headers.get(MCP_SESSION_ID_HEADER)
        assert session_id, "Missing mcp-session-id header"

        tool_headers = {**headers, MCP_SESSION_ID_HEADER: session_id}
        tool_resp = _post_or_skip(tool_request, tool_headers)
        assert tool_resp.status_code == 200, tool_resp.text[:200]


@pytest.mark.integration
def test_http_rejects_invalid_token() -> None:
    init_request = {
        "jsonrpc": "2.0",
        "id": 1,
        "method": "initialize",
        "params": {
            "protocolVersion": "2024-11-05",
            "capabilities": {},
            "clientInfo": {"name": "test-client", "version": "1.0.0"},
        },
    }
    headers = {
        "Authorization": "Bearer not-a-valid-token",
        "Content-Type": "application/json",
        "Accept": "application/json, text/event-stream",
    }

    resp = _post_or_skip(init_request, headers)

    if resp.status_code == 200:
        pytest.skip("Server accepted invalid token (likely running in permissive/local mode)")

    assert resp.status_code == 401


@pytest.mark.integration
def test_expired_token_rejected_by_service(auth_service: AuthService) -> None:
    if not auth_service.config.jwt_secret_key:
        pytest.skip("JWT_SECRET_KEY not configured; skipping expired token check")

    expired = auth_service.create_jwt("expired-user", expires_in=timedelta(seconds=-1))
    from backend.src.services.auth import AuthError

    with pytest.raises(AuthError):
        auth_service.validate_jwt(expired)