mashrur950's picture
deleted unnecessary files
63ff505
"""
FleetMind Dispatch Coordinator - MCP Server
Industry-standard Model Context Protocol server for delivery dispatch management
Provides 18 AI tools for order and driver management via standardized MCP protocol.
Compatible with Claude Desktop, Continue, Cline, and all MCP clients.
"""
import os
import sys
import json
import logging
from pathlib import Path
from typing import Literal
from datetime import datetime
from contextvars import ContextVar
# Add project root to path
sys.path.insert(0, str(Path(__file__).parent))
# Ensure logs directory exists
logs_dir = Path(__file__).parent / 'logs'
logs_dir.mkdir(exist_ok=True)
from fastmcp import FastMCP
# Import existing services (unchanged)
from chat.geocoding import GeocodingService
from database.connection import execute_query, execute_write, test_connection
# Import permission checking
from database.user_context import check_permission, get_required_scope
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(logs_dir / 'fleetmind_mcp.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# ============================================================================
# API KEY EXTRACTION FROM REQUEST
# ============================================================================
# Store API key per request using context variable
_current_api_key: ContextVar[str] = ContextVar('api_key', default=None)
# Session-to-API-key store: maps session_id -> api_key
# This allows tool calls to authenticate using the API key from the SSE connection
_session_auth_store: dict[str, str] = {}
def store_session_api_key(session_id: str, api_key: str):
"""Store API key for a session (called when SSE connection is made)"""
_session_auth_store[session_id] = api_key
logger.debug(f"Stored API key for session {session_id[:16]}...")
def get_api_key_from_session(session_id: str) -> str:
"""Get API key from session store"""
return _session_auth_store.get(session_id)
def cleanup_session(session_id: str):
"""Remove session from store when disconnected"""
if session_id in _session_auth_store:
del _session_auth_store[session_id]
logger.debug(f"Cleaned up session {session_id[:16]}...")
def get_api_key_from_context() -> str:
"""
Get API key from the current request context.
This is set by our custom dependencies.
"""
return _current_api_key.get(None)
def extract_api_key_from_request():
"""
Extract API key from HTTP request and store in context variable.
Called at the start of each tool execution.
Checks multiple sources:
1. api_key query parameter in current request
2. session_id query parameter -> lookup in session store
"""
try:
from fastmcp.server.dependencies import get_http_request
request = get_http_request()
# METHOD 1: Direct api_key in query params
api_key = request.query_params.get('api_key')
if api_key:
_current_api_key.set(api_key)
logger.debug(f"API key extracted from request: {api_key[:10]}...")
return api_key
# METHOD 2: Look up by session_id (for MCP tool calls)
session_id = request.query_params.get('session_id')
if session_id:
api_key = get_api_key_from_session(session_id)
if api_key:
_current_api_key.set(api_key)
logger.debug(f"API key found via session {session_id[:16]}...")
return api_key
else:
logger.debug(f"No API key found for session {session_id[:16]}...")
except RuntimeError:
# No HTTP request available (e.g., stdio transport)
logger.debug("No HTTP request available for API key extraction")
return None
# ============================================================================
# MCP SERVER INITIALIZATION
# ============================================================================
mcp = FastMCP(
name="FleetMind Dispatch Coordinator",
version="1.0.0"
# Note: request_timeout not supported in FastMCP 2.13.1
# Timeout handling is done in proxy.py instead
)
# ============================================================================
# FASTMCP AUTHENTICATION MIDDLEWARE
# Uses FastMCP's native middleware system to properly pass user context to tools
# Reference: https://gelembjuk.com/blog/post/authentication-remote-mcp-server-python/
# ============================================================================
try:
from fastmcp.server.middleware import Middleware, MiddlewareContext
from fastmcp.exceptions import ToolError
class ApiKeyAuthMiddleware(Middleware):
"""
FastMCP middleware for API key authentication.
Validates API key from request query params and injects user info into context.
"""
async def on_call_tool(self, context: MiddlewareContext, call_next):
"""Intercept tool calls to validate authentication"""
try:
from fastmcp.server.dependencies import get_http_request
request = get_http_request()
# Get API key from query params
api_key = request.query_params.get('api_key')
# Also check session store if no direct api_key
if not api_key:
session_id = request.query_params.get('session_id')
if session_id:
api_key = get_api_key_from_session(session_id)
if api_key:
# Validate API key and get user info
from database.api_keys import verify_api_key
user_info = verify_api_key(api_key)
if user_info:
# Store user info in context for tools to access
context.fastmcp_context.set_state("user_id", user_info['user_id'])
context.fastmcp_context.set_state("user_email", user_info['email'])
context.fastmcp_context.set_state("user_scopes", user_info.get('scopes', []))
context.fastmcp_context.set_state("user_name", user_info.get('name', ''))
logger.info(f"✅ Auth middleware: User {user_info['email']} authenticated")
return await call_next(context)
else:
logger.warning(f"❌ Auth middleware: Invalid API key {api_key[:10]}...")
# Check SKIP_AUTH for development
env = os.getenv("ENV") or os.getenv("ENVIRONMENT", "production")
skip_auth = os.getenv("SKIP_AUTH", "false").lower() == "true"
if skip_auth and env.lower() != "production":
# Development mode - use dev user
context.fastmcp_context.set_state("user_id", "dev-user")
context.fastmcp_context.set_state("user_email", "dev@fleetmind.local")
context.fastmcp_context.set_state("user_scopes", ["admin"])
context.fastmcp_context.set_state("user_name", "Development User")
logger.warning(f"⚠️ Auth middleware: SKIP_AUTH enabled, using dev user")
return await call_next(context)
# No valid authentication
raise ToolError("Authentication required. Please provide a valid API key.")
except ImportError:
# get_http_request not available (stdio transport)
logger.debug("Auth middleware: No HTTP request available")
return await call_next(context)
# Register the middleware with FastMCP
mcp.add_middleware(ApiKeyAuthMiddleware())
logger.info("[Auth] FastMCP ApiKeyAuthMiddleware registered")
except ImportError as e:
logger.warning(f"[Auth] Could not import FastMCP middleware: {e}")
logger.warning("[Auth] Falling back to per-tool authentication")
# Initialize shared services
logger.info("Initializing FleetMind MCP Server...")
geocoding_service = GeocodingService()
logger.info(f"Geocoding Service: {geocoding_service.get_status()}")
# Test database connection
try:
test_connection()
logger.info("Database: Connected to PostgreSQL")
except Exception as e:
logger.error(f"Database: Connection failed - {e}")
# ============================================================================
# AUTHENTICATION - API KEY SYSTEM
# ============================================================================
def get_authenticated_user(ctx=None):
"""
Get authenticated user from multiple sources:
1. FastMCP Context state (set by middleware) - PREFERRED
2. HTTP request API key extraction (fallback)
3. Environment variable (fallback for testing)
4. SKIP_AUTH bypass (development only)
Args:
ctx: Optional FastMCP Context object from tool function
Returns:
User info dict with user_id, email, scopes, name or None if not authenticated
"""
try:
# METHOD 1: Get user from FastMCP Context state (set by middleware)
# This is the preferred method when using FastMCP middleware
if ctx is not None:
try:
user_id = ctx.get_state("user_id")
if user_id:
return {
'user_id': user_id,
'email': ctx.get_state("user_email") or "",
'scopes': ctx.get_state("user_scopes") or [],
'name': ctx.get_state("user_name") or ""
}
except Exception:
pass # Context state not available, try other methods
# METHOD 2: Extract API key from current HTTP request
api_key = extract_api_key_from_request()
# METHOD 3: Fallback to environment variable for testing
if not api_key:
api_key = os.getenv("FLEETMIND_API_KEY")
if api_key:
logger.debug("Using API key from environment")
# Validate the API key
if api_key:
from database.api_keys import verify_api_key
user_info = verify_api_key(api_key)
if user_info:
logger.info(f"✅ Authenticated: {user_info['email']} (user_id: {user_info['user_id']})")
return user_info
else:
logger.warning(f"❌ Invalid API key: {api_key[:10]}...")
return None
# METHOD 4: Development bypass mode (local testing only)
# SECURITY: Only allow SKIP_AUTH in development environments
# Check both ENV and ENVIRONMENT variables for compatibility
env = os.getenv("ENV") or os.getenv("ENVIRONMENT", "production")
env = env.lower()
skip_auth = os.getenv("SKIP_AUTH", "false").lower() == "true"
if skip_auth:
if env == "production":
logger.error("⚠️ SKIP_AUTH is enabled but ENV=production - DENYING access for security")
return None
else:
logger.warning(f"⚠️ SKIP_AUTH enabled in {env} environment - using development user")
return {
'user_id': 'dev-user',
'email': 'dev@fleetmind.local',
'scopes': ['admin'],
'name': 'Development User'
}
return None
except Exception as e:
logger.error(f"Authentication error: {e}")
return None
# ============================================================================
# MCP RESOURCES
# ============================================================================
@mcp.resource("orders://all")
def get_orders_resource() -> str:
"""
Real-time orders dataset for AI context.
Returns authenticated user's orders from the last 30 days.
Returns:
JSON string containing orders array with key fields:
- order_id, customer_name, delivery_address
- status, priority, created_at, assigned_driver_id
"""
try:
# Authenticate user
user = get_authenticated_user()
if not user:
logger.warning("Resource orders://all - Authentication required")
return json.dumps({
"error": "Authentication required",
"message": "Please provide a valid API key to access orders"
})
# Query only this user's orders
query = """
SELECT order_id, customer_name, delivery_address,
status, priority, created_at, assigned_driver_id
FROM orders
WHERE user_id = %s
AND created_at > NOW() - INTERVAL '30 days'
ORDER BY created_at DESC
LIMIT 1000
"""
orders = execute_query(query, (user['user_id'],))
logger.info(f"Resource orders://all - User {user['user_id']} retrieved {len(orders) if orders else 0} orders")
return json.dumps(orders, default=str, indent=2)
except Exception as e:
logger.error(f"Resource orders://all failed: {e}")
return json.dumps({"error": str(e)})
@mcp.resource("drivers://all")
def get_drivers_resource() -> str:
"""
Real-time drivers dataset for AI context.
Returns authenticated user's drivers with current locations and status.
Returns:
JSON string containing drivers array with key fields:
- driver_id, name, status, vehicle_type, vehicle_plate
- current_lat, current_lng, last_location_update
"""
try:
# Authenticate user
user = get_authenticated_user()
if not user:
logger.warning("Resource drivers://all - Authentication required")
return json.dumps({
"error": "Authentication required",
"message": "Please provide a valid API key to access drivers"
})
# Query only this user's drivers
query = """
SELECT driver_id, name, status, vehicle_type, vehicle_plate,
current_lat, current_lng, last_location_update
FROM drivers
WHERE user_id = %s
ORDER BY name ASC
"""
drivers = execute_query(query, (user['user_id'],))
logger.info(f"Resource drivers://all - User {user['user_id']} retrieved {len(drivers) if drivers else 0} drivers")
return json.dumps(drivers, default=str, indent=2)
except Exception as e:
logger.error(f"Resource drivers://all failed: {e}")
return json.dumps({"error": str(e)})
# ============================================================================
# MCP PROMPTS (Workflows)
# ============================================================================
# TODO: Add prompts once FastMCP prompt API is confirmed
# Prompts will provide guided workflows for:
# - create_order_workflow: Interactive order creation with validation
# - assign_driver_workflow: Smart driver assignment with route optimization
# - order_status_check: Quick order status queries
# ============================================================================
# MCP TOOLS - ORDER CREATION & VALIDATION
# ============================================================================
@mcp.tool()
def geocode_address(address: str) -> dict:
"""
Convert a delivery address to GPS coordinates and validate the address format.
Use this before creating an order to ensure the address is valid.
Args:
address: The full delivery address to geocode (e.g., '123 Main St, San Francisco, CA')
Returns:
dict: {
success: bool,
latitude: float,
longitude: float,
formatted_address: str,
confidence: str (high/medium/low),
message: str
}
"""
from chat.tools import handle_geocode_address
logger.info(f"Tool: geocode_address('{address}')")
return handle_geocode_address({"address": address})
@mcp.tool()
def calculate_route(
origin: str,
destination: str,
mode: Literal["driving", "walking", "bicycling", "transit"] = "driving",
vehicle_type: Literal["car", "van", "truck", "motorcycle", "bicycle"] = "car",
alternatives: bool = False,
include_steps: bool = False,
avoid_tolls: bool = False,
avoid_highways: bool = False,
avoid_ferries: bool = False,
emission_type: Literal["GASOLINE", "ELECTRIC", "HYBRID", "DIESEL"] = "GASOLINE",
request_fuel_efficient: bool = False
) -> dict:
"""
Calculate the shortest route between two locations with vehicle-specific optimization.
Uses Google Routes API for accurate real-time traffic, toll info, and fuel consumption.
Args:
origin: Starting location - either full address or coordinates as 'lat,lng'
destination: Destination location - either full address or coordinates as 'lat,lng'
mode: Travel mode for route calculation (default: driving)
vehicle_type: Type of vehicle for route optimization (default: car)
- motorcycle: Uses TWO_WHEELER mode for motorcycle-specific routing
- bicycle: Uses bike lanes and paths
- car/van/truck: Uses DRIVE mode (no truck-specific routing available)
alternatives: Return multiple route options if available (default: false)
include_steps: Include turn-by-turn navigation steps in response (default: false)
avoid_tolls: Avoid toll roads (for cars and motorcycles) (default: false)
avoid_highways: Avoid highways (for cars and motorcycles) (default: false)
avoid_ferries: Avoid ferry routes (for cars and motorcycles) (default: false)
emission_type: Vehicle emission type for eco-routing (cars/vans/trucks only) (default: GASOLINE)
request_fuel_efficient: Request eco-friendly route alternative (cars/vans/trucks only) (default: false)
Returns:
dict: {
success: bool,
origin: str,
destination: str,
distance: {meters: int, text: str},
duration: {seconds: int, text: str} (without traffic),
duration_in_traffic: {seconds: int, text: str} (with traffic),
traffic_delay: {seconds: int, text: str},
mode: str,
vehicle_type: str,
route_summary: str,
route_labels: list,
confidence: str,
toll_info: {has_tolls: bool, details: str} (if applicable),
fuel_consumption: {liters: float, text: str} (if DRIVE mode),
traffic_data_available: bool,
warning: str (if TWO_WHEELER or BICYCLE mode),
steps: list (if include_steps=True)
}
"""
from chat.tools import handle_calculate_route
logger.info(f"Tool: calculate_route('{origin}' -> '{destination}', vehicle={vehicle_type}, mode={mode})")
return handle_calculate_route({
"origin": origin,
"destination": destination,
"mode": mode,
"vehicle_type": vehicle_type,
"alternatives": alternatives,
"include_steps": include_steps,
"avoid_tolls": avoid_tolls,
"avoid_highways": avoid_highways,
"avoid_ferries": avoid_ferries,
"emission_type": emission_type,
"request_fuel_efficient": request_fuel_efficient
})
@mcp.tool()
def calculate_intelligent_route(
origin: str,
destination: str,
vehicle_type: Literal["car", "van", "truck", "motorcycle"] = "car",
consider_weather: bool = True,
consider_traffic: bool = True
) -> dict:
"""
Calculate the optimal route considering real-time traffic, weather conditions, and vehicle type.
This is an intelligent routing tool that factors in:
- Real-time traffic delays
- Weather conditions (rain, visibility, wind)
- Vehicle-specific capabilities (motorcycle vs car vs truck)
- Safety warnings and recommendations
Use this when you need smart routing that accounts for current conditions.
Args:
origin: Starting location - either full address or coordinates as 'lat,lng'
destination: Destination location - either full address or coordinates as 'lat,lng'
vehicle_type: Type of vehicle for route optimization (default: car)
consider_weather: Factor in weather conditions (default: true)
consider_traffic: Factor in real-time traffic (default: true)
Returns:
dict: {
success: bool,
route: {
origin: str,
destination: str,
distance: {meters: int, text: str},
vehicle_type: str,
route_summary: str
},
timing: {
base_duration: {seconds: int, text: str},
with_traffic: {seconds: int, text: str},
adjusted_duration: {seconds: int, text: str},
traffic_delay_percent: int,
weather_delay_percent: int,
total_delay_percent: int
},
conditions: {
traffic_status: str (clear|light|moderate|heavy|severe),
weather_considered: bool
},
weather: {
conditions: str,
temperature_c: float,
precipitation_mm: float,
visibility_m: int,
impact_severity: str (none|minor|moderate|severe)
},
recommendations: list[str],
warnings: list[str],
alternatives: list (if available)
}
Examples:
- "Find the best route from SF to Oakland for a motorcycle considering weather"
- "What's the fastest route from downtown to airport with current traffic?"
- "Calculate delivery route for a truck from warehouse to customer address"
"""
from chat.route_optimizer import calculate_intelligent_route as calc_route
logger.info(f"Tool: calculate_intelligent_route('{origin}' -> '{destination}', vehicle={vehicle_type})")
return calc_route(origin, destination, vehicle_type, consider_weather, consider_traffic)
@mcp.tool()
def create_order(
customer_name: str,
delivery_address: str,
delivery_lat: float,
delivery_lng: float,
expected_delivery_time: str,
customer_phone: str | None = None,
customer_email: str | None = None,
priority: Literal["standard", "express", "urgent"] = "standard",
weight_kg: float = 5.0,
special_instructions: str | None = None,
sla_grace_period_minutes: int = 15,
time_window_end: str | None = None
) -> dict:
"""
Create a new delivery order in the database with MANDATORY delivery deadline.
IMPORTANT: expected_delivery_time is REQUIRED. This is the promised delivery time to the customer.
Only call this after geocoding the address successfully.
Args:
customer_name: Full name of the customer
delivery_address: Complete delivery address
delivery_lat: Latitude from geocoding
delivery_lng: Longitude from geocoding
expected_delivery_time: REQUIRED - Promised delivery deadline in ISO 8601 format
Must be future timestamp. Examples:
- '2025-11-15T18:00:00' (6 PM today)
- '2025-11-16T12:00:00' (noon tomorrow)
customer_phone: Customer phone number (optional)
customer_email: Customer email address (optional)
priority: Delivery priority level (default: standard)
weight_kg: Package weight in kilograms (default: 5.0)
special_instructions: Special delivery instructions (optional)
sla_grace_period_minutes: Grace period after deadline (default: 15 mins)
Deliveries within grace period marked as 'late' but acceptable
time_window_end: Legacy field, defaults to expected_delivery_time if not provided
Returns:
dict: {
success: bool,
order_id: str,
status: str,
customer: str,
address: str,
expected_delivery: str (new),
sla_grace_period_minutes: int (new),
priority: str,
message: str
}
"""
# STEP 1: Authenticate user
user = get_authenticated_user()
if not user:
return {
"success": False,
"error": "Authentication required. Please login first.",
"auth_required": True
}
# STEP 2: Check permissions
required_scope = get_required_scope('create_order')
if not check_permission(user.get('scopes', []), required_scope):
return {
"success": False,
"error": f"Permission denied. Required scope: {required_scope}"
}
# STEP 3: Execute tool with user_id
from chat.tools import handle_create_order
logger.info(f"Tool: create_order by user {user.get('email')} (customer='{customer_name}')")
return handle_create_order(
tool_input={
"customer_name": customer_name,
"delivery_address": delivery_address,
"delivery_lat": delivery_lat,
"delivery_lng": delivery_lng,
"expected_delivery_time": expected_delivery_time,
"customer_phone": customer_phone,
"customer_email": customer_email,
"priority": priority,
"weight_kg": weight_kg,
"special_instructions": special_instructions,
"sla_grace_period_minutes": sla_grace_period_minutes,
"time_window_end": time_window_end
},
user_id=user['user_id'] # Pass user_id for data isolation
)
# ============================================================================
# MCP TOOLS - ORDER QUERYING
# ============================================================================
@mcp.tool()
def count_orders(
status: Literal["pending", "assigned", "in_transit", "delivered", "failed", "cancelled"] | None = None,
priority: Literal["standard", "express", "urgent"] | None = None,
payment_status: Literal["pending", "paid", "cod"] | None = None,
assigned_driver_id: str | None = None,
is_fragile: bool | None = None,
requires_signature: bool | None = None,
requires_cold_storage: bool | None = None
) -> dict:
"""
Count total orders in the database with optional filters.
Use this when user asks 'how many orders', 'fetch orders', or wants to know order statistics.
Args:
status: Filter by order status (optional)
priority: Filter by priority level (optional)
payment_status: Filter by payment status (optional)
assigned_driver_id: Filter by assigned driver ID (optional)
is_fragile: Filter fragile packages only (optional)
requires_signature: Filter orders requiring signature (optional)
requires_cold_storage: Filter orders requiring cold storage (optional)
Returns:
dict: {
success: bool,
total: int,
status_breakdown: dict,
priority_breakdown: dict,
message: str
}
"""
from chat.tools import handle_count_orders
logger.info(f"Tool: count_orders(status={status}, priority={priority})")
# STEP 1: Authenticate user
user = get_authenticated_user()
if not user:
return {"success": False, "error": "Authentication required"}
# STEP 2: Check permissions
required_scope = get_required_scope('count_orders')
if not check_permission(user.get('scopes', []), required_scope):
return {"success": False, "error": "Permission denied"}
# STEP 3: Execute with user_id
tool_input = {}
if status is not None:
tool_input["status"] = status
if priority is not None:
tool_input["priority"] = priority
if payment_status is not None:
tool_input["payment_status"] = payment_status
if assigned_driver_id is not None:
tool_input["assigned_driver_id"] = assigned_driver_id
if is_fragile is not None:
tool_input["is_fragile"] = is_fragile
if requires_signature is not None:
tool_input["requires_signature"] = requires_signature
if requires_cold_storage is not None:
tool_input["requires_cold_storage"] = requires_cold_storage
return handle_count_orders(tool_input, user_id=user['user_id'])
@mcp.tool()
def fetch_orders(
limit: int = 10,
offset: int = 0,
status: Literal["pending", "assigned", "in_transit", "delivered", "failed", "cancelled"] | None = None,
priority: Literal["standard", "express", "urgent"] | None = None,
payment_status: Literal["pending", "paid", "cod"] | None = None,
assigned_driver_id: str | None = None,
is_fragile: bool | None = None,
requires_signature: bool | None = None,
requires_cold_storage: bool | None = None,
sort_by: Literal["created_at", "priority", "time_window_start"] = "created_at",
sort_order: Literal["ASC", "DESC"] = "DESC"
) -> dict:
"""
Fetch orders from the database with optional filters, pagination, and sorting.
Use after counting to show specific number of orders.
Args:
limit: Number of orders to fetch (default: 10, max: 100)
offset: Number of orders to skip for pagination (default: 0)
status: Filter by order status (optional)
priority: Filter by priority level (optional)
payment_status: Filter by payment status (optional)
assigned_driver_id: Filter by assigned driver ID (optional)
is_fragile: Filter fragile packages only (optional)
requires_signature: Filter orders requiring signature (optional)
requires_cold_storage: Filter orders requiring cold storage (optional)
sort_by: Field to sort by (default: created_at)
sort_order: Sort order (default: DESC for newest first)
Returns:
dict: {
success: bool,
orders: list[dict],
count: int,
message: str
}
"""
from chat.tools import handle_fetch_orders
logger.info(f"Tool: fetch_orders(limit={limit}, offset={offset}, status={status})")
# STEP 1: Authenticate user
user = get_authenticated_user()
if not user:
return {"success": False, "error": "Authentication required"}
# STEP 2: Check permissions
required_scope = get_required_scope('fetch_orders')
if not check_permission(user.get('scopes', []), required_scope):
return {"success": False, "error": "Permission denied"}
# STEP 3: Execute with user_id
tool_input = {
"limit": limit,
"offset": offset,
"sort_by": sort_by,
"sort_order": sort_order
}
if status is not None:
tool_input["status"] = status
if priority is not None:
tool_input["priority"] = priority
if payment_status is not None:
tool_input["payment_status"] = payment_status
if assigned_driver_id is not None:
tool_input["assigned_driver_id"] = assigned_driver_id
if is_fragile is not None:
tool_input["is_fragile"] = is_fragile
if requires_signature is not None:
tool_input["requires_signature"] = requires_signature
if requires_cold_storage is not None:
tool_input["requires_cold_storage"] = requires_cold_storage
return handle_fetch_orders(tool_input, user_id=user['user_id'])
@mcp.tool()
def get_order_details(order_id: str) -> dict:
"""
Get complete details of a specific order by order ID.
Use when user asks 'tell me about order X' or wants detailed information about a specific order.
Args:
order_id: The order ID to fetch details for (e.g., 'ORD-20251114163800')
Returns:
dict: {
success: bool,
order: dict (with all 26 fields),
message: str
}
"""
from chat.tools import handle_get_order_details
logger.info(f"Tool: get_order_details(order_id='{order_id}')")
# STEP 1: Authenticate user
user = get_authenticated_user()
if not user:
return {"success": False, "error": "Authentication required"}
# STEP 2: Check permissions
required_scope = get_required_scope('get_order_details')
if not check_permission(user.get('scopes', []), required_scope):
return {"success": False, "error": "Permission denied"}
# STEP 3: Execute with user_id
return handle_get_order_details({"order_id": order_id}, user_id=user['user_id'])
@mcp.tool()
def search_orders(search_term: str) -> dict:
"""
Search for orders by customer name, email, phone, or order ID pattern.
Use when user provides partial information to find orders.
Args:
search_term: Search term to match against customer_name, customer_email, customer_phone, or order_id
Returns:
dict: {
success: bool,
orders: list[dict],
count: int,
message: str
}
"""
from chat.tools import handle_search_orders
logger.info(f"Tool: search_orders(search_term='{search_term}')")
# STEP 1: Authenticate user
user = get_authenticated_user()
if not user:
return {"success": False, "error": "Authentication required"}
# STEP 2: Check permissions
required_scope = get_required_scope('search_orders')
if not check_permission(user.get('scopes', []), required_scope):
return {"success": False, "error": "Permission denied"}
# STEP 3: Execute with user_id
return handle_search_orders({"search_term": search_term}, user_id=user['user_id'])
@mcp.tool()
def get_incomplete_orders(limit: int = 20) -> dict:
"""
Get all orders that are not yet completed (excludes delivered and cancelled orders).
Shortcut for finding orders in progress (pending, assigned, in_transit).
Args:
limit: Number of orders to fetch (default: 20)
Returns:
dict: {
success: bool,
orders: list[dict],
count: int,
message: str
}
"""
from chat.tools import handle_get_incomplete_orders
logger.info(f"Tool: get_incomplete_orders(limit={limit})")
# STEP 1: Authenticate user
user = get_authenticated_user()
if not user:
return {"success": False, "error": "Authentication required"}
# STEP 2: Check permissions
required_scope = get_required_scope('get_incomplete_orders')
if not check_permission(user.get('scopes', []), required_scope):
return {"success": False, "error": "Permission denied"}
# STEP 3: Execute with user_id
return handle_get_incomplete_orders({"limit": limit}, user_id=user['user_id'])
# ============================================================================
# MCP TOOLS - ORDER MANAGEMENT
# ============================================================================
@mcp.tool()
def update_order(
order_id: str,
customer_name: str | None = None,
customer_phone: str | None = None,
customer_email: str | None = None,
delivery_address: str | None = None,
delivery_lat: float | None = None,
delivery_lng: float | None = None,
status: Literal["pending", "assigned", "in_transit", "delivered", "failed", "cancelled"] | None = None,
priority: Literal["standard", "express", "urgent"] | None = None,
special_instructions: str | None = None,
time_window_end: str | None = None,
payment_status: Literal["pending", "paid", "cod"] | None = None,
weight_kg: float | None = None,
order_value: float | None = None
) -> dict:
"""
Update an existing order's details. You can update any combination of fields.
Only provide the fields you want to change. Auto-geocodes if delivery_address updated without coordinates.
Args:
order_id: Order ID to update (e.g., 'ORD-20250114123456')
customer_name: Updated customer name (optional)
customer_phone: Updated customer phone number (optional)
customer_email: Updated customer email address (optional)
delivery_address: Updated delivery address (optional)
delivery_lat: Updated delivery latitude (required if updating address) (optional)
delivery_lng: Updated delivery longitude (required if updating address) (optional)
status: Updated order status (optional)
priority: Updated priority level (optional)
special_instructions: Updated special delivery instructions (optional)
time_window_end: Updated delivery deadline (ISO format datetime) (optional)
payment_status: Updated payment status (optional)
weight_kg: Updated package weight in kilograms (optional)
order_value: Updated order value in currency (optional)
Returns:
dict: {
success: bool,
order_id: str,
updated_fields: list[str],
message: str
}
"""
from chat.tools import handle_update_order
logger.info(f"Tool: update_order(order_id='{order_id}')")
# STEP 1: Authenticate user
user = get_authenticated_user()
if not user:
return {"success": False, "error": "Authentication required"}
# STEP 2: Check permissions
required_scope = get_required_scope('update_order')
if not check_permission(user.get('scopes', []), required_scope):
return {"success": False, "error": "Permission denied"}
# STEP 3: Execute with user_id
tool_input = {"order_id": order_id}
if customer_name is not None:
tool_input["customer_name"] = customer_name
if customer_phone is not None:
tool_input["customer_phone"] = customer_phone
if customer_email is not None:
tool_input["customer_email"] = customer_email
if delivery_address is not None:
tool_input["delivery_address"] = delivery_address
if delivery_lat is not None:
tool_input["delivery_lat"] = delivery_lat
if delivery_lng is not None:
tool_input["delivery_lng"] = delivery_lng
if status is not None:
tool_input["status"] = status
if priority is not None:
tool_input["priority"] = priority
if special_instructions is not None:
tool_input["special_instructions"] = special_instructions
if time_window_end is not None:
tool_input["time_window_end"] = time_window_end
if payment_status is not None:
tool_input["payment_status"] = payment_status
if weight_kg is not None:
tool_input["weight_kg"] = weight_kg
if order_value is not None:
tool_input["order_value"] = order_value
return handle_update_order(tool_input, user_id=user['user_id'])
@mcp.tool()
def delete_order(order_id: str, confirm: bool) -> dict:
"""
Permanently delete an order from the database. This action cannot be undone. Use with caution.
Args:
order_id: Order ID to delete (e.g., 'ORD-20250114123456')
confirm: Must be set to true to confirm deletion
Returns:
dict: {
success: bool,
order_id: str,
message: str
}
"""
from chat.tools import handle_delete_order
logger.info(f"Tool: delete_order(order_id='{order_id}', confirm={confirm})")
# STEP 1: Authenticate user
user = get_authenticated_user()
if not user:
return {"success": False, "error": "Authentication required"}
# STEP 2: Check permissions
required_scope = get_required_scope('delete_order')
if not check_permission(user.get('scopes', []), required_scope):
return {"success": False, "error": "Permission denied"}
# STEP 3: Execute with user_id
return handle_delete_order({"order_id": order_id, "confirm": confirm}, user_id=user['user_id'])
# ============================================================================
# MCP TOOLS - DRIVER CREATION
# ============================================================================
@mcp.tool()
def create_driver(
name: str,
vehicle_type: str,
current_address: str,
current_lat: float,
current_lng: float,
phone: str | None = None,
email: str | None = None,
vehicle_plate: str | None = None,
capacity_kg: float = 1000.0,
capacity_m3: float = 12.0,
skills: list[str] | None = None,
status: Literal["active", "busy", "offline", "unavailable"] = "active"
) -> dict:
"""
Create a new delivery driver in the database. Use this to onboard new drivers to the fleet.
Args:
name: Full name of the driver (REQUIRED)
vehicle_type: Type of vehicle: van, truck, car, motorcycle (REQUIRED)
current_address: Driver's current location address, e.g. '123 Main St, New York, NY' (REQUIRED)
current_lat: Driver's current latitude location (REQUIRED, -90 to 90)
current_lng: Driver's current longitude location (REQUIRED, -180 to 180)
phone: Driver phone number (optional)
email: Driver email address (optional)
vehicle_plate: Vehicle license plate number (optional)
capacity_kg: Vehicle cargo capacity in kilograms (default: 1000.0)
capacity_m3: Vehicle cargo volume in cubic meters (default: 12.0)
skills: List of driver skills/certifications: refrigerated, medical_certified, fragile_handler, overnight, express_delivery (optional)
status: Driver status (default: active)
Returns:
dict: {
success: bool,
driver_id: str,
name: str,
status: str,
vehicle_type: str,
vehicle_plate: str,
capacity_kg: float,
skills: list[str],
location: {latitude, longitude, address},
message: str
}
"""
from chat.tools import handle_create_driver
logger.info(f"Tool: create_driver(name='{name}', vehicle_type='{vehicle_type}', address='{current_address}', location=({current_lat}, {current_lng}))")
# STEP 1: Authenticate user
user = get_authenticated_user()
if not user:
return {"success": False, "error": "Authentication required"}
# STEP 2: Check permissions
required_scope = get_required_scope('create_driver')
if not check_permission(user.get('scopes', []), required_scope):
return {"success": False, "error": "Permission denied"}
# STEP 3: Execute with user_id
return handle_create_driver({
"name": name,
"phone": phone,
"email": email,
"vehicle_type": vehicle_type,
"vehicle_plate": vehicle_plate,
"capacity_kg": capacity_kg,
"capacity_m3": capacity_m3,
"skills": skills or [],
"status": status,
"current_address": current_address,
"current_lat": current_lat,
"current_lng": current_lng
}, user_id=user['user_id'])
# ============================================================================
# MCP TOOLS - DRIVER QUERYING
# ============================================================================
@mcp.tool()
def count_drivers(
status: Literal["active", "busy", "offline", "unavailable"] | None = None,
vehicle_type: str | None = None
) -> dict:
"""
Count total drivers in the database with optional filters.
Use this when user asks 'how many drivers', 'show drivers', or wants driver statistics.
Args:
status: Filter by driver status (optional)
vehicle_type: Filter by vehicle type: van, truck, car, motorcycle, etc. (optional)
Returns:
dict: {
success: bool,
total: int,
status_breakdown: dict,
vehicle_breakdown: dict,
message: str
}
"""
from chat.tools import handle_count_drivers
logger.info(f"Tool: count_drivers(status={status}, vehicle_type={vehicle_type})")
# STEP 1: Authenticate user
user = get_authenticated_user()
if not user:
return {"success": False, "error": "Authentication required"}
# STEP 2: Check permissions
required_scope = get_required_scope('count_drivers')
if not check_permission(user.get('scopes', []), required_scope):
return {"success": False, "error": "Permission denied"}
# STEP 3: Execute with user_id
tool_input = {}
if status is not None:
tool_input["status"] = status
if vehicle_type is not None:
tool_input["vehicle_type"] = vehicle_type
return handle_count_drivers(tool_input, user_id=user['user_id'])
@mcp.tool()
def fetch_drivers(
limit: int = 10,
offset: int = 0,
status: Literal["active", "busy", "offline", "unavailable"] | None = None,
vehicle_type: str | None = None,
sort_by: Literal["name", "status", "created_at", "last_location_update"] = "name",
sort_order: Literal["ASC", "DESC"] = "ASC"
) -> dict:
"""
Fetch drivers from the database with optional filters, pagination, and sorting.
Use after counting to show specific number of drivers.
Args:
limit: Number of drivers to fetch (default: 10, max: 100)
offset: Number of drivers to skip for pagination (default: 0)
status: Filter by driver status (optional)
vehicle_type: Filter by vehicle type: van, truck, car, motorcycle, etc. (optional)
sort_by: Field to sort by (default: name)
sort_order: Sort order (default: ASC for alphabetical)
Returns:
dict: {
success: bool,
drivers: list[dict],
count: int,
message: str
}
"""
from chat.tools import handle_fetch_drivers
logger.info(f"Tool: fetch_drivers(limit={limit}, offset={offset}, status={status})")
# STEP 1: Authenticate user
user = get_authenticated_user()
if not user:
return {"success": False, "error": "Authentication required"}
# STEP 2: Check permissions
required_scope = get_required_scope('fetch_drivers')
if not check_permission(user.get('scopes', []), required_scope):
return {"success": False, "error": "Permission denied"}
# STEP 3: Execute with user_id
tool_input = {
"limit": limit,
"offset": offset,
"sort_by": sort_by,
"sort_order": sort_order
}
if status is not None:
tool_input["status"] = status
if vehicle_type is not None:
tool_input["vehicle_type"] = vehicle_type
return handle_fetch_drivers(tool_input, user_id=user['user_id'])
@mcp.tool()
def get_driver_details(driver_id: str) -> dict:
"""
Get complete details of a specific driver by driver ID, including current location
(latitude, longitude, and human-readable address via reverse geocoding), contact info,
vehicle details, status, and skills. Use when user asks about a driver's location,
coordinates, position, or any other driver information.
Args:
driver_id: The driver ID to fetch details for (e.g., 'DRV-20251114163800')
Returns:
dict: {
success: bool,
driver: dict (with all fields including reverse-geocoded location address),
message: str
}
"""
from chat.tools import handle_get_driver_details
logger.info(f"Tool: get_driver_details(driver_id='{driver_id}')")
# STEP 1: Authenticate user
user = get_authenticated_user()
if not user:
return {"success": False, "error": "Authentication required"}
# STEP 2: Check permissions
required_scope = get_required_scope('get_driver_details')
if not check_permission(user.get('scopes', []), required_scope):
return {"success": False, "error": "Permission denied"}
# STEP 3: Execute with user_id
return handle_get_driver_details({"driver_id": driver_id}, user_id=user['user_id'])
@mcp.tool()
def search_drivers(search_term: str) -> dict:
"""
Search for drivers by name, email, phone, vehicle plate, or driver ID pattern.
Use when user provides partial information to find drivers.
Args:
search_term: Search term to match against name, email, phone, vehicle_plate, or driver_id
Returns:
dict: {
success: bool,
drivers: list[dict],
count: int,
message: str
}
"""
from chat.tools import handle_search_drivers
logger.info(f"Tool: search_drivers(search_term='{search_term}')")
# STEP 1: Authenticate user
user = get_authenticated_user()
if not user:
return {"success": False, "error": "Authentication required"}
# STEP 2: Check permissions
required_scope = get_required_scope('search_drivers')
if not check_permission(user.get('scopes', []), required_scope):
return {"success": False, "error": "Permission denied"}
# STEP 3: Execute with user_id
return handle_search_drivers({"search_term": search_term}, user_id=user['user_id'])
@mcp.tool()
def get_available_drivers(limit: int = 20) -> dict:
"""
Get all drivers that are available for assignment (active or offline status, excludes busy and unavailable).
Shortcut for finding drivers ready for dispatch.
Args:
limit: Number of drivers to fetch (default: 20)
Returns:
dict: {
success: bool,
drivers: list[dict],
count: int,
message: str
}
"""
from chat.tools import handle_get_available_drivers
logger.info(f"Tool: get_available_drivers(limit={limit})")
# STEP 1: Authenticate user
user = get_authenticated_user()
if not user:
return {"success": False, "error": "Authentication required"}
# STEP 2: Check permissions
required_scope = get_required_scope('get_available_drivers')
if not check_permission(user.get('scopes', []), required_scope):
return {"success": False, "error": "Permission denied"}
# STEP 3: Execute with user_id
return handle_get_available_drivers({"limit": limit}, user_id=user['user_id'])
# ============================================================================
# MCP TOOLS - DRIVER MANAGEMENT
# ============================================================================
@mcp.tool()
def update_driver(
driver_id: str,
name: str | None = None,
phone: str | None = None,
email: str | None = None,
status: Literal["active", "busy", "offline", "unavailable"] | None = None,
vehicle_type: str | None = None,
vehicle_plate: str | None = None,
capacity_kg: float | None = None,
capacity_m3: float | None = None,
skills: list[str] | None = None,
current_address: str | None = None,
current_lat: float | None = None,
current_lng: float | None = None
) -> dict:
"""
Update an existing driver's details. You can update any combination of fields.
Only provide the fields you want to change. Auto-updates last_location_update if coordinates changed.
Args:
driver_id: Driver ID to update (e.g., 'DRV-20250114123456')
name: Updated driver name (optional)
phone: Updated phone number (optional)
email: Updated email address (optional)
status: Updated driver status (optional)
vehicle_type: Updated vehicle type (optional)
vehicle_plate: Updated vehicle license plate (optional)
capacity_kg: Updated cargo capacity in kilograms (optional)
capacity_m3: Updated cargo capacity in cubic meters (optional)
skills: Updated list of driver skills/certifications (optional)
current_address: Updated current location address (optional)
current_lat: Updated current latitude (optional)
current_lng: Updated current longitude (optional)
Returns:
dict: {
success: bool,
driver_id: str,
updated_fields: list[str],
message: str
}
"""
from chat.tools import handle_update_driver
logger.info(f"Tool: update_driver(driver_id='{driver_id}')")
# STEP 1: Authenticate user
user = get_authenticated_user()
if not user:
return {"success": False, "error": "Authentication required"}
# STEP 2: Check permissions
required_scope = get_required_scope('update_driver')
if not check_permission(user.get('scopes', []), required_scope):
return {"success": False, "error": "Permission denied"}
# STEP 3: Execute with user_id
tool_input = {"driver_id": driver_id}
if name is not None:
tool_input["name"] = name
if phone is not None:
tool_input["phone"] = phone
if email is not None:
tool_input["email"] = email
if status is not None:
tool_input["status"] = status
if vehicle_type is not None:
tool_input["vehicle_type"] = vehicle_type
if vehicle_plate is not None:
tool_input["vehicle_plate"] = vehicle_plate
if capacity_kg is not None:
tool_input["capacity_kg"] = capacity_kg
if capacity_m3 is not None:
tool_input["capacity_m3"] = capacity_m3
if skills is not None:
tool_input["skills"] = skills
if current_address is not None:
tool_input["current_address"] = current_address
if current_lat is not None:
tool_input["current_lat"] = current_lat
if current_lng is not None:
tool_input["current_lng"] = current_lng
return handle_update_driver(tool_input, user_id=user['user_id'])
@mcp.tool()
def delete_driver(driver_id: str, confirm: bool) -> dict:
"""
Permanently delete a driver from the database. This action cannot be undone. Use with caution.
Args:
driver_id: Driver ID to delete (e.g., 'DRV-20250114123456')
confirm: Must be set to true to confirm deletion
Returns:
dict: {
success: bool,
driver_id: str,
message: str
}
"""
from chat.tools import handle_delete_driver
logger.info(f"Tool: delete_driver(driver_id='{driver_id}', confirm={confirm})")
# STEP 1: Authenticate user
user = get_authenticated_user()
if not user:
return {"success": False, "error": "Authentication required"}
# STEP 2: Check permissions
required_scope = get_required_scope('delete_driver')
if not check_permission(user.get('scopes', []), required_scope):
return {"success": False, "error": "Permission denied"}
# STEP 3: Execute with user_id
return handle_delete_driver({"driver_id": driver_id, "confirm": confirm}, user_id=user['user_id'])
@mcp.tool()
def delete_all_orders(confirm: bool, status: str = None) -> dict:
"""
Bulk delete all orders (or orders with specific status). DANGEROUS - Use with extreme caution!
Safety checks:
- Requires confirm=true
- Blocks deletion if any active assignments exist
- Optional status filter to delete only specific statuses
Args:
confirm: Must be set to true to confirm bulk deletion
status: Optional status filter (pending/assigned/in_transit/delivered/failed/cancelled)
Returns:
dict: {
success: bool,
deleted_count: int,
message: str
}
"""
from chat.tools import handle_delete_all_orders
logger.info(f"Tool: delete_all_orders(confirm={confirm}, status='{status}')")
# STEP 1: Authenticate user
user = get_authenticated_user()
if not user:
return {"success": False, "error": "Authentication required"}
# STEP 2: Check permissions
required_scope = get_required_scope('delete_all_orders')
if not check_permission(user.get('scopes', []), required_scope):
return {"success": False, "error": "Permission denied"}
# STEP 3: Execute with user_id
return handle_delete_all_orders({"confirm": confirm, "status": status}, user_id=user['user_id'])
@mcp.tool()
def delete_all_drivers(confirm: bool, status: str = None) -> dict:
"""
Bulk delete all drivers (or drivers with specific status). DANGEROUS - Use with extreme caution!
Safety checks:
- Requires confirm=true
- Blocks deletion if ANY assignments exist (due to RESTRICT constraint)
- Optional status filter to delete only specific statuses
Args:
confirm: Must be set to true to confirm bulk deletion
status: Optional status filter (active/busy/offline/unavailable)
Returns:
dict: {
success: bool,
deleted_count: int,
message: str
}
"""
from chat.tools import handle_delete_all_drivers
logger.info(f"Tool: delete_all_drivers(confirm={confirm}, status='{status}')")
# STEP 1: Authenticate user
user = get_authenticated_user()
if not user:
return {"success": False, "error": "Authentication required"}
# STEP 2: Check permissions
required_scope = get_required_scope('delete_all_drivers')
if not check_permission(user.get('scopes', []), required_scope):
return {"success": False, "error": "Permission denied"}
# STEP 3: Execute with user_id
return handle_delete_all_drivers({"confirm": confirm, "status": status}, user_id=user['user_id'])
# ============================================================================
# ASSIGNMENT TOOLS
# ============================================================================
@mcp.tool()
def create_assignment(order_id: str, driver_id: str) -> dict:
"""
Assign an order to a driver. Creates an assignment record with route data from driver location to delivery location.
Requirements:
- Order must be in 'pending' status
- Driver must be in 'active' or 'available' status
- Order cannot already have an active assignment
After assignment:
- Order status changes to 'assigned'
- Driver status changes to 'busy'
- Route data (distance, duration, path) is calculated and saved
- Assignment record is created with all route details
Args:
order_id: Order ID to assign (e.g., 'ORD-20250114123456')
driver_id: Driver ID to assign (e.g., 'DRV-20250114123456')
Returns:
dict: {
success: bool,
assignment_id: str,
order_id: str,
driver_id: str,
route: {
distance: {meters: int, text: str},
duration: {seconds: int, text: str},
route_summary: str,
driver_start: {lat: float, lng: float},
delivery_location: {lat: float, lng: float, address: str}
}
}
"""
from chat.tools import handle_create_assignment
logger.info(f"Tool: create_assignment(order_id='{order_id}', driver_id='{driver_id}')")
# STEP 1: Authenticate user
user = get_authenticated_user()
if not user:
return {"success": False, "error": "Authentication required"}
# STEP 2: Check permissions
required_scope = get_required_scope('create_assignment')
if not check_permission(user.get('scopes', []), required_scope):
return {"success": False, "error": "Permission denied"}
# STEP 3: Execute with user_id
return handle_create_assignment({"order_id": order_id, "driver_id": driver_id}, user_id=user['user_id'])
@mcp.tool()
def auto_assign_order(order_id: str) -> dict:
"""
Automatically assign order to the nearest available driver (distance + validation based).
Selection Criteria (Auto Algorithm):
1. Driver must be 'active' with valid location
2. Driver vehicle capacity must meet package weight/volume requirements
3. Driver must have required skills (fragile handling, cold storage, etc.)
4. Selects nearest driver by real-time route distance
This is a fixed-rule algorithm that prioritizes proximity while ensuring
the driver has the necessary capacity and skills for the delivery.
After assignment:
- Order status changes to 'assigned'
- Driver status changes to 'busy'
- Route data (distance, duration, path) is calculated and saved
- Assignment record is created with all route details
Args:
order_id: Order ID to auto-assign (e.g., 'ORD-20250114123456')
Returns:
dict: {
success: bool,
assignment_id: str,
method: 'auto_assignment',
order_id: str,
driver_id: str,
driver_name: str,
driver_phone: str,
driver_vehicle_type: str,
selection_reason: str,
distance_km: float,
distance_meters: int,
estimated_duration_minutes: float,
candidates_evaluated: int,
suitable_candidates: int,
route_summary: str,
estimated_arrival: str
}
"""
from chat.tools import handle_auto_assign_order
logger.info(f"Tool: auto_assign_order(order_id='{order_id}')")
# STEP 1: Authenticate user
user = get_authenticated_user()
if not user:
return {"success": False, "error": "Authentication required"}
# STEP 2: Check permissions
required_scope = get_required_scope('auto_assign_order')
if not check_permission(user.get('scopes', []), required_scope):
return {"success": False, "error": "Permission denied"}
# STEP 3: Execute with user_id
return handle_auto_assign_order({"order_id": order_id}, user_id=user['user_id'])
@mcp.tool()
def intelligent_assign_order(order_id: str) -> dict:
"""
Intelligently assign order using Google Gemini 2.0 Flash AI to analyze all parameters and select the best driver.
Uses Gemini 2.0 Flash (latest model) to evaluate:
- Order characteristics (priority, weight, fragility, time constraints, value)
- Driver capabilities (location, capacity, skills, vehicle type)
- Real-time routing data (distance, traffic delays, tolls)
- Weather conditions and impact on delivery
- Complex tradeoffs and optimal matching
The AI considers multiple factors holistically:
- Distance efficiency vs skill requirements
- Capacity utilization vs delivery urgency
- Traffic conditions vs time constraints
- Weather safety vs speed requirements
- Cost efficiency (tolls, fuel) vs customer satisfaction
Returns assignment with detailed AI reasoning explaining why the
selected driver is the best match for this specific delivery.
Requirements:
- GOOGLE_API_KEY environment variable must be set
- Order must be in 'pending' status
- At least one active driver with valid location
After assignment:
- Order status changes to 'assigned'
- Driver status changes to 'busy'
- Route data (distance, duration, path) is calculated and saved
- Assignment record is created with all route details
- AI reasoning is returned for transparency
Args:
order_id: Order ID to intelligently assign (e.g., 'ORD-20250114123456')
Returns:
dict: {
success: bool,
assignment_id: str,
method: 'intelligent_assignment',
ai_provider: 'Google Gemini 2.0 Flash',
order_id: str,
driver_id: str,
driver_name: str,
driver_phone: str,
driver_vehicle_type: str,
distance_km: float,
estimated_duration_minutes: float,
ai_reasoning: {
primary_factors: [str],
trade_offs_considered: [str],
risk_assessment: str,
decision_summary: str
},
confidence_score: float,
alternatives_considered: [{driver_id: str, reason_not_selected: str}],
candidates_evaluated: int,
route_summary: str,
estimated_arrival: str
}
"""
from chat.tools import handle_intelligent_assign_order
logger.info(f"Tool: intelligent_assign_order(order_id='{order_id}')")
# STEP 1: Authenticate user
user = get_authenticated_user()
if not user:
return {"success": False, "error": "Authentication required"}
# STEP 2: Check permissions
required_scope = get_required_scope('intelligent_assign_order')
if not check_permission(user.get('scopes', []), required_scope):
return {"success": False, "error": "Permission denied"}
# STEP 3: Execute with user_id
return handle_intelligent_assign_order({"order_id": order_id}, user_id=user['user_id'])
@mcp.tool()
def get_assignment_details(
assignment_id: str = None,
order_id: str = None,
driver_id: str = None
) -> dict:
"""
Get assignment details by assignment ID, order ID, or driver ID.
Provide at least one parameter to search.
Args:
assignment_id: Assignment ID (e.g., 'ASN-20250114123456')
order_id: Order ID to find assignments for (e.g., 'ORD-20250114123456')
driver_id: Driver ID to find assignments for (e.g., 'DRV-20250114123456')
Returns:
dict: {
success: bool,
assignments: [
{
assignment_id: str,
order_id: str,
driver_id: str,
customer_name: str,
driver_name: str,
status: str,
route_distance_meters: int,
route_duration_seconds: int,
route_summary: str,
driver_start_location: {lat: float, lng: float},
delivery_location: {lat: float, lng: float, address: str},
estimated_arrival: str,
assigned_at: str,
updated_at: str
}
]
}
"""
from chat.tools import handle_get_assignment_details
logger.info(f"Tool: get_assignment_details(assignment_id='{assignment_id}', order_id='{order_id}', driver_id='{driver_id}')")
# STEP 1: Authenticate user
user = get_authenticated_user()
if not user:
return {"success": False, "error": "Authentication required"}
# STEP 2: Check permissions
required_scope = get_required_scope('get_assignment_details')
if not check_permission(user.get('scopes', []), required_scope):
return {"success": False, "error": "Permission denied"}
# STEP 3: Execute with user_id
return handle_get_assignment_details({
"assignment_id": assignment_id,
"order_id": order_id,
"driver_id": driver_id
}, user_id=user['user_id'])
@mcp.tool()
def update_assignment(
assignment_id: str,
status: str = None,
actual_arrival: str = None,
actual_distance_meters: int = None,
notes: str = None
) -> dict:
"""
Update assignment status or details.
Valid status transitions:
- active → in_progress (driver starts delivery)
- in_progress → completed (delivery successful)
- in_progress → failed (delivery failed)
- active/in_progress → cancelled (assignment cancelled)
Cascading updates:
- completed: order status → 'delivered', driver checks for other assignments
- failed: order status → 'failed', driver checks for other assignments
- cancelled: order status → 'cancelled', order.assigned_driver_id → NULL, driver → 'active' if no other assignments
Args:
assignment_id: Assignment ID to update (e.g., 'ASN-20250114123456')
status: New status (active, in_progress, completed, failed, cancelled)
actual_arrival: Actual arrival timestamp (ISO format)
actual_distance_meters: Actual distance traveled in meters
notes: Additional notes about the assignment
Returns:
dict: {
success: bool,
assignment_id: str,
updated_fields: list,
cascading_actions: list,
message: str
}
"""
from chat.tools import handle_update_assignment
logger.info(f"Tool: update_assignment(assignment_id='{assignment_id}', status='{status}')")
# STEP 1: Authenticate user
user = get_authenticated_user()
if not user:
return {"success": False, "error": "Authentication required"}
# STEP 2: Check permissions
required_scope = get_required_scope('update_assignment')
if not check_permission(user.get('scopes', []), required_scope):
return {"success": False, "error": "Permission denied"}
# STEP 3: Execute with user_id
return handle_update_assignment({
"assignment_id": assignment_id,
"status": status,
"actual_arrival": actual_arrival,
"actual_distance_meters": actual_distance_meters,
"notes": notes
}, user_id=user['user_id'])
@mcp.tool()
def unassign_order(assignment_id: str, confirm: bool = False) -> dict:
"""
Unassign an order from a driver by deleting the assignment.
Requirements:
- Assignment cannot be in 'in_progress' status (must cancel first using update_assignment)
- Requires confirm=true to proceed
Effects:
- Assignment is deleted
- Order status changes back to 'pending'
- order.assigned_driver_id is set to NULL
- Driver status changes to 'active' (if no other assignments)
Args:
assignment_id: Assignment ID to unassign (e.g., 'ASN-20250114123456')
confirm: Must be set to true to confirm unassignment
Returns:
dict: {
success: bool,
assignment_id: str,
order_id: str,
driver_id: str,
message: str
}
"""
from chat.tools import handle_unassign_order
logger.info(f"Tool: unassign_order(assignment_id='{assignment_id}', confirm={confirm})")
# STEP 1: Authenticate user
user = get_authenticated_user()
if not user:
return {"success": False, "error": "Authentication required"}
# STEP 2: Check permissions
required_scope = get_required_scope('unassign_order')
if not check_permission(user.get('scopes', []), required_scope):
return {"success": False, "error": "Permission denied"}
# STEP 3: Execute with user_id
return handle_unassign_order({"assignment_id": assignment_id, "confirm": confirm}, user_id=user['user_id'])
@mcp.tool()
def complete_delivery(
assignment_id: str,
confirm: bool,
actual_distance_meters: int = None,
notes: str = None
) -> dict:
"""
Mark a delivery as successfully completed and automatically update driver location to delivery address.
This is the primary tool for completing deliveries. It handles all necessary updates:
- Marks assignment as 'completed' with timestamp
- Updates order status to 'delivered'
- **Automatically moves driver location to the delivery address**
- Updates driver status to 'active' (if no other assignments)
- Records actual distance and notes (optional)
Requirements:
- Assignment must be in 'active' or 'in_progress' status
- Delivery location coordinates must exist
- Requires confirm=true
For failed deliveries: Use fail_delivery tool instead.
Args:
assignment_id: Assignment ID to complete (e.g., 'ASN-20250114123456')
confirm: Must be set to true to confirm completion
actual_distance_meters: Optional actual distance traveled in meters
notes: Optional completion notes
Returns:
dict: {
success: bool,
assignment_id: str,
order_id: str,
driver_id: str,
customer_name: str,
driver_name: str,
completed_at: str (ISO timestamp),
delivery_location: {lat, lng, address},
driver_updated: {new_location, location_updated_at},
cascading_actions: list[str],
message: str
}
"""
from chat.tools import handle_complete_delivery
logger.info(f"Tool: complete_delivery(assignment_id='{assignment_id}', confirm={confirm})")
# STEP 1: Authenticate user
user = get_authenticated_user()
if not user:
return {"success": False, "error": "Authentication required"}
# STEP 2: Check permissions
required_scope = get_required_scope('complete_delivery')
if not check_permission(user.get('scopes', []), required_scope):
return {"success": False, "error": "Permission denied"}
# STEP 3: Execute with user_id
return handle_complete_delivery({
"assignment_id": assignment_id,
"confirm": confirm,
"actual_distance_meters": actual_distance_meters,
"notes": notes
}, user_id=user['user_id'])
@mcp.tool()
def fail_delivery(
assignment_id: str,
current_address: str,
current_lat: float,
current_lng: float,
failure_reason: str,
confirm: bool,
notes: str = None
) -> dict:
"""
Mark a delivery as failed with mandatory driver location and failure reason.
IMPORTANT: Driver MUST provide their current location (address + GPS coordinates) and a valid failure reason.
This ensures accurate location tracking and proper failure documentation.
Handles all necessary updates:
- Marks assignment as 'failed' with timestamp
- Updates order status to 'failed'
- **Updates driver location to the reported current position**
- Updates driver status to 'active' (if no other assignments)
- Records structured failure reason and optional notes
Valid failure reasons:
- customer_not_available: Customer not present or not reachable
- wrong_address: Incorrect or invalid delivery address
- refused_delivery: Customer refused to accept delivery
- damaged_goods: Package damaged during transit
- payment_issue: Payment problems (for COD orders)
- vehicle_breakdown: Driver's vehicle broke down
- access_restricted: Cannot access delivery location
- weather_conditions: Severe weather preventing delivery
- other: Other reasons (provide details in notes)
Requirements:
- Assignment must be in 'active' or 'in_progress' status
- Driver must provide current address and GPS coordinates
- Must provide a valid failure_reason from the list above
- Requires confirm=true
Args:
assignment_id: Assignment ID to mark as failed (e.g., 'ASN-20250114123456')
current_address: Driver's current location address (e.g., '123 Main St, New York, NY')
current_lat: Driver's current latitude (-90 to 90)
current_lng: Driver's current longitude (-180 to 180)
failure_reason: Reason for failure (must be from valid list)
confirm: Must be set to true to confirm failure
notes: Optional additional details about the failure
Returns:
dict: {
success: bool,
assignment_id: str,
order_id: str,
driver_id: str,
customer_name: str,
driver_name: str,
failed_at: str (ISO timestamp),
failure_reason: str,
failure_reason_display: str (human-readable),
delivery_address: str,
driver_location: {lat, lng, address, updated_at},
cascading_actions: list[str],
message: str
}
"""
from chat.tools import handle_fail_delivery
logger.info(f"Tool: fail_delivery(assignment_id='{assignment_id}', reason='{failure_reason}')")
# STEP 1: Authenticate user
user = get_authenticated_user()
if not user:
return {"success": False, "error": "Authentication required"}
# STEP 2: Check permissions
required_scope = get_required_scope('fail_delivery')
if not check_permission(user.get('scopes', []), required_scope):
return {"success": False, "error": "Permission denied"}
# STEP 3: Execute with user_id
return handle_fail_delivery({
"assignment_id": assignment_id,
"current_address": current_address,
"current_lat": current_lat,
"current_lng": current_lng,
"failure_reason": failure_reason,
"confirm": confirm,
"notes": notes
}, user_id=user['user_id'])
# ============================================================================
# MAIN ENTRY POINT
# ============================================================================
if __name__ == "__main__":
logger.info("=" * 60)
logger.info("FleetMind MCP Server v1.0.0")
logger.info("=" * 60)
logger.info(f"Geocoding: {geocoding_service.get_status()}")
logger.info("Tools: 27 tools registered (19 core + 6 assignment + 2 bulk delete)")
logger.info("Resources: 2 resources available")
logger.info("Prompts: 3 workflow templates")
logger.info("Authentication: Multi-tenant API key via URL query params")
logger.info("Usage: Connect with ?api_key=YOUR_KEY in the SSE URL")
logger.info("Starting MCP server...")
mcp.run()