File size: 2,431 Bytes
02af15b
 
 
 
bc6b6db
02af15b
 
 
 
bc6b6db
02af15b
085e790
 
02af15b
 
 
 
 
 
 
 
 
 
 
bc6b6db
 
 
 
 
 
 
 
 
 
8aedfb3
bc6b6db
02af15b
 
 
 
 
 
085e790
 
 
 
 
 
 
 
 
 
 
02af15b
 
 
 
 
 
 
 
 
 
 
 
 
 
bc6b6db
 
 
 
 
 
 
 
02af15b
 
bc6b6db
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
"""Authentication dependency helpers."""

from __future__ import annotations

from dataclasses import dataclass
from typing import Annotated, Optional

from fastapi import Header, HTTPException, status

from ...models.auth import JWTPayload
from ...services.auth import AuthError, AuthService
from ...services.config import get_config
from datetime import datetime, timezone

auth_service = AuthService()


def _unauthorized(message: str, error: str = "unauthorized") -> HTTPException:
    return HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail={"error": error, "message": message},
    )


@dataclass
class AuthContext:
    """Context extracted from a bearer token."""

    user_id: str
    token: str
    payload: JWTPayload


def get_auth_context(
    authorization: Annotated[Optional[str], Header(alias="Authorization")] = None,
) -> AuthContext:
    """
    Extract and validate the user_id from a Bearer token.

    Raises HTTPException if the header is missing/invalid.
    """
    if not authorization:
        # Check for No-Auth mode (Hackathon/Demo)
        config = get_config()
        if config.enable_noauth_mcp:
            # Create a dummy payload for demo user
            payload = JWTPayload(
                sub="demo-user",
                iat=int(datetime.now(timezone.utc).timestamp()),
                exp=int(datetime.now(timezone.utc).timestamp()) + 3600
            )
            return AuthContext(user_id="demo-user", token="no-auth", payload=payload)
            
        raise _unauthorized("Authorization header required")

    scheme, _, token = authorization.partition(" ")
    if scheme.lower() != "bearer" or not token:
        raise _unauthorized("Authorization header must be in format: Bearer <token>")

    try:
        payload = auth_service.validate_jwt(token)
    except AuthError as exc:
        raise HTTPException(
            status_code=exc.status_code,
            detail={"error": exc.error, "message": exc.message, "detail": exc.detail},
        ) from exc

    return AuthContext(user_id=payload.sub, token=token, payload=payload)


def extract_user_id_from_jwt(
    authorization: Annotated[Optional[str], Header(alias="Authorization")] = None,
) -> str:
    """Compatibility helper that returns only the user_id."""
    return get_auth_context(authorization).user_id


__all__ = ["AuthContext", "extract_user_id_from_jwt", "get_auth_context"]