File size: 2,517 Bytes
b8db48d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
from fastapi import HTTPException, Security, status, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from config import settings
import logging
from datetime import datetime
from typing import Optional

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

security = HTTPBearer()
optional_security = HTTPBearer(auto_error=False)  # Separate instance for optional auth

async def verify_token(credentials: HTTPAuthorizationCredentials = Security(security)) -> str:
    """
    Verify the Bearer token for regular API operations
    """
    if not credentials:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Authorization header missing",
            headers={"WWW-Authenticate": "Bearer"},
        )

    if credentials.credentials != settings.api_key:
        logger.warning(f"Failed API access attempt with token: {credentials.credentials[:8]}...")
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication token",
            headers={"WWW-Authenticate": "Bearer"},
        )

    return credentials.credentials

async def verify_admin_token(credentials: HTTPAuthorizationCredentials = Security(security)) -> str:
    """
    Verify admin token for sensitive operations like key management
    """
    if not credentials:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Admin authorization required",
            headers={"WWW-Authenticate": "Bearer"},
        )

    if credentials.credentials != settings.admin_key:
        # Log failed admin access attempts with timestamp
        logger.warning(f"🚨 FAILED ADMIN ACCESS: {datetime.now().isoformat()} - Token: {credentials.credentials[:8]}...")
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Invalid admin token. Admin access denied.",
            headers={"WWW-Authenticate": "Bearer"},
        )

    # Log successful admin access
    logger.info(f"βœ… ADMIN ACCESS GRANTED: {datetime.now().isoformat()}")
    return credentials.credentials

async def optional_verify_token(credentials: Optional[HTTPAuthorizationCredentials] = Security(optional_security)) -> bool:
    """
    Optional token verification - returns True if valid, False if not
    """
    if not credentials:
        return False
    return credentials.credentials == settings.api_key