|
|
""" |
|
|
API Key Authentication System for FleetMind MCP Server |
|
|
|
|
|
Simple API key management for multi-tenant authentication without OAuth complexity. |
|
|
Works with Claude Desktop and mcp-remote today! |
|
|
|
|
|
Usage: |
|
|
1. User generates API key via web interface or CLI |
|
|
2. User adds API key to Claude Desktop config |
|
|
3. MCP server validates key and returns user_id |
|
|
4. Multi-tenant isolation works automatically |
|
|
""" |
|
|
|
|
|
import os |
|
|
import secrets |
|
|
import hashlib |
|
|
from datetime import datetime |
|
|
from typing import Optional, Dict |
|
|
from database.connection import get_db_connection |
|
|
|
|
|
def create_api_keys_table(): |
|
|
"""Create api_keys table if it doesn't exist""" |
|
|
conn = get_db_connection() |
|
|
cursor = conn.cursor() |
|
|
|
|
|
cursor.execute(""" |
|
|
CREATE TABLE IF NOT EXISTS api_keys ( |
|
|
key_id SERIAL PRIMARY KEY, |
|
|
user_id VARCHAR(100) NOT NULL, |
|
|
email VARCHAR(255) NOT NULL, |
|
|
name VARCHAR(255), |
|
|
api_key_hash VARCHAR(64) NOT NULL UNIQUE, |
|
|
api_key_prefix VARCHAR(20) NOT NULL, |
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
|
|
last_used_at TIMESTAMP, |
|
|
is_active BOOLEAN DEFAULT true, |
|
|
UNIQUE(user_id) |
|
|
) |
|
|
""") |
|
|
|
|
|
cursor.execute(""" |
|
|
CREATE INDEX IF NOT EXISTS idx_api_keys_hash |
|
|
ON api_keys(api_key_hash) |
|
|
""") |
|
|
|
|
|
cursor.execute(""" |
|
|
CREATE INDEX IF NOT EXISTS idx_api_keys_user |
|
|
ON api_keys(user_id) |
|
|
""") |
|
|
|
|
|
conn.commit() |
|
|
cursor.close() |
|
|
conn.close() |
|
|
|
|
|
def generate_api_key(email: str, name: str = None) -> Dict[str, str]: |
|
|
""" |
|
|
Generate a new API key for a user |
|
|
|
|
|
Args: |
|
|
email: User's email address (used as identifier) |
|
|
name: User's display name (optional) |
|
|
|
|
|
Returns: |
|
|
dict with api_key (show once!), user_id, email, name |
|
|
""" |
|
|
|
|
|
api_key = f"fm_{secrets.token_urlsafe(32)}" |
|
|
|
|
|
|
|
|
api_key_hash = hashlib.sha256(api_key.encode()).hexdigest() |
|
|
|
|
|
|
|
|
api_key_prefix = api_key[:12] |
|
|
|
|
|
|
|
|
user_id = f"user_{hashlib.md5(email.encode()).hexdigest()[:12]}" |
|
|
|
|
|
conn = get_db_connection() |
|
|
cursor = conn.cursor() |
|
|
|
|
|
try: |
|
|
|
|
|
cursor.execute("SELECT user_id FROM api_keys WHERE email = %s", (email,)) |
|
|
existing = cursor.fetchone() |
|
|
|
|
|
if existing: |
|
|
cursor.close() |
|
|
conn.close() |
|
|
return { |
|
|
"success": False, |
|
|
"error": "User already has an API key. Revoke the old key first." |
|
|
} |
|
|
|
|
|
|
|
|
cursor.execute(""" |
|
|
INSERT INTO api_keys (user_id, email, name, api_key_hash, api_key_prefix) |
|
|
VALUES (%s, %s, %s, %s, %s) |
|
|
RETURNING user_id, email, name, created_at |
|
|
""", (user_id, email, name, api_key_hash, api_key_prefix)) |
|
|
|
|
|
result = cursor.fetchone() |
|
|
conn.commit() |
|
|
|
|
|
if not result: |
|
|
raise Exception("Failed to insert API key") |
|
|
|
|
|
|
|
|
ret_user_id, ret_email, ret_name, ret_created_at = result |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"api_key": api_key, |
|
|
"user_id": ret_user_id, |
|
|
"email": ret_email, |
|
|
"name": ret_name or "FleetMind User", |
|
|
"created_at": str(ret_created_at) if ret_created_at else "", |
|
|
"message": "⚠️ IMPORTANT: Save this API key now! It won't be shown again." |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
conn.rollback() |
|
|
import traceback |
|
|
error_details = traceback.format_exc() |
|
|
print(f"API Key Generation Error: {e}") |
|
|
print(f"Error details: {error_details}") |
|
|
return { |
|
|
"success": False, |
|
|
"error": f"Failed to generate API key: {str(e)}" |
|
|
} |
|
|
finally: |
|
|
cursor.close() |
|
|
conn.close() |
|
|
|
|
|
def verify_api_key(api_key: str) -> Optional[Dict[str, str]]: |
|
|
""" |
|
|
Verify API key and return user info |
|
|
|
|
|
Args: |
|
|
api_key: The API key to verify |
|
|
|
|
|
Returns: |
|
|
User info dict if valid, None if invalid |
|
|
""" |
|
|
if not api_key or not api_key.startswith("fm_"): |
|
|
return None |
|
|
|
|
|
|
|
|
api_key_hash = hashlib.sha256(api_key.encode()).hexdigest() |
|
|
|
|
|
conn = get_db_connection() |
|
|
cursor = conn.cursor() |
|
|
|
|
|
try: |
|
|
|
|
|
cursor.execute(""" |
|
|
SELECT user_id, email, name, is_active |
|
|
FROM api_keys |
|
|
WHERE api_key_hash = %s |
|
|
""", (api_key_hash,)) |
|
|
|
|
|
result = cursor.fetchone() |
|
|
|
|
|
if not result: |
|
|
return None |
|
|
|
|
|
|
|
|
user_id = result['user_id'] |
|
|
email = result['email'] |
|
|
name = result['name'] |
|
|
is_active = result['is_active'] |
|
|
|
|
|
if not is_active: |
|
|
return None |
|
|
|
|
|
|
|
|
cursor.execute(""" |
|
|
UPDATE api_keys |
|
|
SET last_used_at = CURRENT_TIMESTAMP |
|
|
WHERE api_key_hash = %s |
|
|
""", (api_key_hash,)) |
|
|
conn.commit() |
|
|
|
|
|
return { |
|
|
'user_id': user_id, |
|
|
'email': email, |
|
|
'name': name or 'FleetMind User', |
|
|
'scopes': ['orders:read', 'orders:write', 'drivers:read', 'drivers:write', 'assignments:manage'] |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
print(f"API key verification error: {e}") |
|
|
return None |
|
|
finally: |
|
|
cursor.close() |
|
|
conn.close() |
|
|
|
|
|
def list_api_keys() -> list: |
|
|
"""List all API keys (without showing actual keys)""" |
|
|
conn = get_db_connection() |
|
|
cursor = conn.cursor() |
|
|
|
|
|
cursor.execute(""" |
|
|
SELECT user_id, email, name, api_key_prefix, created_at, last_used_at, is_active |
|
|
FROM api_keys |
|
|
ORDER BY created_at DESC |
|
|
""") |
|
|
|
|
|
keys = [] |
|
|
for row in cursor.fetchall(): |
|
|
keys.append({ |
|
|
'user_id': row[0], |
|
|
'email': row[1], |
|
|
'name': row[2], |
|
|
'key_preview': f"{row[3]}...", |
|
|
'created_at': row[4].isoformat(), |
|
|
'last_used_at': row[5].isoformat() if row[5] else None, |
|
|
'is_active': row[6] |
|
|
}) |
|
|
|
|
|
cursor.close() |
|
|
conn.close() |
|
|
return keys |
|
|
|
|
|
def revoke_api_key(email: str) -> Dict[str, any]: |
|
|
"""Revoke (deactivate) an API key""" |
|
|
conn = get_db_connection() |
|
|
cursor = conn.cursor() |
|
|
|
|
|
try: |
|
|
cursor.execute(""" |
|
|
UPDATE api_keys |
|
|
SET is_active = false |
|
|
WHERE email = %s |
|
|
RETURNING user_id, email |
|
|
""", (email,)) |
|
|
|
|
|
result = cursor.fetchone() |
|
|
conn.commit() |
|
|
|
|
|
if result: |
|
|
return { |
|
|
"success": True, |
|
|
"message": f"API key revoked for {result[1]}" |
|
|
} |
|
|
else: |
|
|
return { |
|
|
"success": False, |
|
|
"error": "No API key found for this email" |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
conn.rollback() |
|
|
return { |
|
|
"success": False, |
|
|
"error": f"Failed to revoke key: {str(e)}" |
|
|
} |
|
|
finally: |
|
|
cursor.close() |
|
|
conn.close() |
|
|
|
|
|
|
|
|
try: |
|
|
create_api_keys_table() |
|
|
except: |
|
|
pass |
|
|
|