mashrur950's picture
Enhance API key handling and user authentication
79f305b
"""
API Key Authentication System for FleetMind MCP Server
Simple API key management for multi-tenant authentication without OAuth complexity.
Works with Claude Desktop and mcp-remote today!
Usage:
1. User generates API key via web interface or CLI
2. User adds API key to Claude Desktop config
3. MCP server validates key and returns user_id
4. Multi-tenant isolation works automatically
"""
import os
import secrets
import hashlib
from datetime import datetime
from typing import Optional, Dict
from database.connection import get_db_connection
def create_api_keys_table():
"""Create api_keys table if it doesn't exist"""
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS api_keys (
key_id SERIAL PRIMARY KEY,
user_id VARCHAR(100) NOT NULL,
email VARCHAR(255) NOT NULL,
name VARCHAR(255),
api_key_hash VARCHAR(64) NOT NULL UNIQUE,
api_key_prefix VARCHAR(20) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_used_at TIMESTAMP,
is_active BOOLEAN DEFAULT true,
UNIQUE(user_id)
)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_api_keys_hash
ON api_keys(api_key_hash)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_api_keys_user
ON api_keys(user_id)
""")
conn.commit()
cursor.close()
conn.close()
def generate_api_key(email: str, name: str = None) -> Dict[str, str]:
"""
Generate a new API key for a user
Args:
email: User's email address (used as identifier)
name: User's display name (optional)
Returns:
dict with api_key (show once!), user_id, email, name
"""
# Generate secure random API key
api_key = f"fm_{secrets.token_urlsafe(32)}" # fm_ prefix for FleetMind
# Hash the API key for storage (never store plain text!)
api_key_hash = hashlib.sha256(api_key.encode()).hexdigest()
# Store prefix for display (first 12 chars)
api_key_prefix = api_key[:12]
# Generate user_id from email
user_id = f"user_{hashlib.md5(email.encode()).hexdigest()[:12]}"
conn = get_db_connection()
cursor = conn.cursor()
try:
# Check if user already has a key
cursor.execute("SELECT user_id FROM api_keys WHERE email = %s", (email,))
existing = cursor.fetchone()
if existing:
cursor.close()
conn.close()
return {
"success": False,
"error": "User already has an API key. Revoke the old key first."
}
# Insert new API key
cursor.execute("""
INSERT INTO api_keys (user_id, email, name, api_key_hash, api_key_prefix)
VALUES (%s, %s, %s, %s, %s)
RETURNING user_id, email, name, created_at
""", (user_id, email, name, api_key_hash, api_key_prefix))
result = cursor.fetchone()
conn.commit()
if not result:
raise Exception("Failed to insert API key")
# Unpack the result tuple
ret_user_id, ret_email, ret_name, ret_created_at = result
return {
"success": True,
"api_key": api_key, # SHOW THIS ONCE! Never displayed again
"user_id": ret_user_id,
"email": ret_email,
"name": ret_name or "FleetMind User",
"created_at": str(ret_created_at) if ret_created_at else "",
"message": "⚠️ IMPORTANT: Save this API key now! It won't be shown again."
}
except Exception as e:
conn.rollback()
import traceback
error_details = traceback.format_exc()
print(f"API Key Generation Error: {e}")
print(f"Error details: {error_details}")
return {
"success": False,
"error": f"Failed to generate API key: {str(e)}"
}
finally:
cursor.close()
conn.close()
def verify_api_key(api_key: str) -> Optional[Dict[str, str]]:
"""
Verify API key and return user info
Args:
api_key: The API key to verify
Returns:
User info dict if valid, None if invalid
"""
if not api_key or not api_key.startswith("fm_"):
return None
# Hash the provided key
api_key_hash = hashlib.sha256(api_key.encode()).hexdigest()
conn = get_db_connection()
cursor = conn.cursor()
try:
# Look up the key
cursor.execute("""
SELECT user_id, email, name, is_active
FROM api_keys
WHERE api_key_hash = %s
""", (api_key_hash,))
result = cursor.fetchone()
if not result:
return None
# Access RealDictRow fields by key (not tuple unpacking!)
user_id = result['user_id']
email = result['email']
name = result['name']
is_active = result['is_active']
if not is_active:
return None
# Update last_used_at
cursor.execute("""
UPDATE api_keys
SET last_used_at = CURRENT_TIMESTAMP
WHERE api_key_hash = %s
""", (api_key_hash,))
conn.commit()
return {
'user_id': user_id,
'email': email,
'name': name or 'FleetMind User',
'scopes': ['orders:read', 'orders:write', 'drivers:read', 'drivers:write', 'assignments:manage']
}
except Exception as e:
print(f"API key verification error: {e}")
return None
finally:
cursor.close()
conn.close()
def list_api_keys() -> list:
"""List all API keys (without showing actual keys)"""
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT user_id, email, name, api_key_prefix, created_at, last_used_at, is_active
FROM api_keys
ORDER BY created_at DESC
""")
keys = []
for row in cursor.fetchall():
keys.append({
'user_id': row[0],
'email': row[1],
'name': row[2],
'key_preview': f"{row[3]}...",
'created_at': row[4].isoformat(),
'last_used_at': row[5].isoformat() if row[5] else None,
'is_active': row[6]
})
cursor.close()
conn.close()
return keys
def revoke_api_key(email: str) -> Dict[str, any]:
"""Revoke (deactivate) an API key"""
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute("""
UPDATE api_keys
SET is_active = false
WHERE email = %s
RETURNING user_id, email
""", (email,))
result = cursor.fetchone()
conn.commit()
if result:
return {
"success": True,
"message": f"API key revoked for {result[1]}"
}
else:
return {
"success": False,
"error": "No API key found for this email"
}
except Exception as e:
conn.rollback()
return {
"success": False,
"error": f"Failed to revoke key: {str(e)}"
}
finally:
cursor.close()
conn.close()
# Initialize table on import
try:
create_api_keys_table()
except:
pass # Table might already exist