File size: 7,433 Bytes
b338c26
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
79f305b
 
 
 
 
b338c26
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
"""
API Key Authentication System for FleetMind MCP Server

Simple API key management for multi-tenant authentication without OAuth complexity.
Works with Claude Desktop and mcp-remote today!

Usage:
    1. User generates API key via web interface or CLI
    2. User adds API key to Claude Desktop config
    3. MCP server validates key and returns user_id
    4. Multi-tenant isolation works automatically
"""

import os
import secrets
import hashlib
from datetime import datetime
from typing import Optional, Dict
from database.connection import get_db_connection

def create_api_keys_table():
    """Create api_keys table if it doesn't exist"""
    conn = get_db_connection()
    cursor = conn.cursor()

    cursor.execute("""
        CREATE TABLE IF NOT EXISTS api_keys (
            key_id SERIAL PRIMARY KEY,
            user_id VARCHAR(100) NOT NULL,
            email VARCHAR(255) NOT NULL,
            name VARCHAR(255),
            api_key_hash VARCHAR(64) NOT NULL UNIQUE,
            api_key_prefix VARCHAR(20) NOT NULL,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            last_used_at TIMESTAMP,
            is_active BOOLEAN DEFAULT true,
            UNIQUE(user_id)
        )
    """)

    cursor.execute("""
        CREATE INDEX IF NOT EXISTS idx_api_keys_hash
        ON api_keys(api_key_hash)
    """)

    cursor.execute("""
        CREATE INDEX IF NOT EXISTS idx_api_keys_user
        ON api_keys(user_id)
    """)

    conn.commit()
    cursor.close()
    conn.close()

def generate_api_key(email: str, name: str = None) -> Dict[str, str]:
    """
    Generate a new API key for a user

    Args:
        email: User's email address (used as identifier)
        name: User's display name (optional)

    Returns:
        dict with api_key (show once!), user_id, email, name
    """
    # Generate secure random API key
    api_key = f"fm_{secrets.token_urlsafe(32)}"  # fm_ prefix for FleetMind

    # Hash the API key for storage (never store plain text!)
    api_key_hash = hashlib.sha256(api_key.encode()).hexdigest()

    # Store prefix for display (first 12 chars)
    api_key_prefix = api_key[:12]

    # Generate user_id from email
    user_id = f"user_{hashlib.md5(email.encode()).hexdigest()[:12]}"

    conn = get_db_connection()
    cursor = conn.cursor()

    try:
        # Check if user already has a key
        cursor.execute("SELECT user_id FROM api_keys WHERE email = %s", (email,))
        existing = cursor.fetchone()

        if existing:
            cursor.close()
            conn.close()
            return {
                "success": False,
                "error": "User already has an API key. Revoke the old key first."
            }

        # Insert new API key
        cursor.execute("""
            INSERT INTO api_keys (user_id, email, name, api_key_hash, api_key_prefix)
            VALUES (%s, %s, %s, %s, %s)
            RETURNING user_id, email, name, created_at
        """, (user_id, email, name, api_key_hash, api_key_prefix))

        result = cursor.fetchone()
        conn.commit()

        if not result:
            raise Exception("Failed to insert API key")

        # Unpack the result tuple
        ret_user_id, ret_email, ret_name, ret_created_at = result

        return {
            "success": True,
            "api_key": api_key,  # SHOW THIS ONCE! Never displayed again
            "user_id": ret_user_id,
            "email": ret_email,
            "name": ret_name or "FleetMind User",
            "created_at": str(ret_created_at) if ret_created_at else "",
            "message": "⚠️ IMPORTANT: Save this API key now! It won't be shown again."
        }

    except Exception as e:
        conn.rollback()
        import traceback
        error_details = traceback.format_exc()
        print(f"API Key Generation Error: {e}")
        print(f"Error details: {error_details}")
        return {
            "success": False,
            "error": f"Failed to generate API key: {str(e)}"
        }
    finally:
        cursor.close()
        conn.close()

def verify_api_key(api_key: str) -> Optional[Dict[str, str]]:
    """
    Verify API key and return user info

    Args:
        api_key: The API key to verify

    Returns:
        User info dict if valid, None if invalid
    """
    if not api_key or not api_key.startswith("fm_"):
        return None

    # Hash the provided key
    api_key_hash = hashlib.sha256(api_key.encode()).hexdigest()

    conn = get_db_connection()
    cursor = conn.cursor()

    try:
        # Look up the key
        cursor.execute("""
            SELECT user_id, email, name, is_active
            FROM api_keys
            WHERE api_key_hash = %s
        """, (api_key_hash,))

        result = cursor.fetchone()

        if not result:
            return None

        # Access RealDictRow fields by key (not tuple unpacking!)
        user_id = result['user_id']
        email = result['email']
        name = result['name']
        is_active = result['is_active']

        if not is_active:
            return None

        # Update last_used_at
        cursor.execute("""
            UPDATE api_keys
            SET last_used_at = CURRENT_TIMESTAMP
            WHERE api_key_hash = %s
        """, (api_key_hash,))
        conn.commit()

        return {
            'user_id': user_id,
            'email': email,
            'name': name or 'FleetMind User',
            'scopes': ['orders:read', 'orders:write', 'drivers:read', 'drivers:write', 'assignments:manage']
        }

    except Exception as e:
        print(f"API key verification error: {e}")
        return None
    finally:
        cursor.close()
        conn.close()

def list_api_keys() -> list:
    """List all API keys (without showing actual keys)"""
    conn = get_db_connection()
    cursor = conn.cursor()

    cursor.execute("""
        SELECT user_id, email, name, api_key_prefix, created_at, last_used_at, is_active
        FROM api_keys
        ORDER BY created_at DESC
    """)

    keys = []
    for row in cursor.fetchall():
        keys.append({
            'user_id': row[0],
            'email': row[1],
            'name': row[2],
            'key_preview': f"{row[3]}...",
            'created_at': row[4].isoformat(),
            'last_used_at': row[5].isoformat() if row[5] else None,
            'is_active': row[6]
        })

    cursor.close()
    conn.close()
    return keys

def revoke_api_key(email: str) -> Dict[str, any]:
    """Revoke (deactivate) an API key"""
    conn = get_db_connection()
    cursor = conn.cursor()

    try:
        cursor.execute("""
            UPDATE api_keys
            SET is_active = false
            WHERE email = %s
            RETURNING user_id, email
        """, (email,))

        result = cursor.fetchone()
        conn.commit()

        if result:
            return {
                "success": True,
                "message": f"API key revoked for {result[1]}"
            }
        else:
            return {
                "success": False,
                "error": "No API key found for this email"
            }

    except Exception as e:
        conn.rollback()
        return {
            "success": False,
            "error": f"Failed to revoke key: {str(e)}"
        }
    finally:
        cursor.close()
        conn.close()

# Initialize table on import
try:
    create_api_keys_table()
except:
    pass  # Table might already exist