Spaces:
Sleeping
Sleeping
| from fastapi import HTTPException, Security, status, Depends | |
| from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
| from config import settings | |
| import logging | |
| from datetime import datetime | |
| from typing import Optional | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| security = HTTPBearer() | |
| optional_security = HTTPBearer(auto_error=False) # Separate instance for optional auth | |
| async def verify_token(credentials: HTTPAuthorizationCredentials = Security(security)) -> str: | |
| """ | |
| Verify the Bearer token for regular API operations | |
| """ | |
| if not credentials: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Authorization header missing", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| if credentials.credentials != settings.api_key: | |
| logger.warning(f"Failed API access attempt with token: {credentials.credentials[:8]}...") | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Invalid authentication token", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| return credentials.credentials | |
| async def verify_admin_token(credentials: HTTPAuthorizationCredentials = Security(security)) -> str: | |
| """ | |
| Verify admin token for sensitive operations like key management | |
| """ | |
| if not credentials: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Admin authorization required", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| if credentials.credentials != settings.admin_key: | |
| # Log failed admin access attempts with timestamp | |
| logger.warning(f"π¨ FAILED ADMIN ACCESS: {datetime.now().isoformat()} - Token: {credentials.credentials[:8]}...") | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="Invalid admin token. Admin access denied.", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| # Log successful admin access | |
| logger.info(f"β ADMIN ACCESS GRANTED: {datetime.now().isoformat()}") | |
| return credentials.credentials | |
| async def optional_verify_token(credentials: Optional[HTTPAuthorizationCredentials] = Security(optional_security)) -> bool: | |
| """ | |
| Optional token verification - returns True if valid, False if not | |
| """ | |
| if not credentials: | |
| return False | |
| return credentials.credentials == settings.api_key |