mashrur950's picture
fix addresssing issue
5d901a6
"""
Tool definitions and execution handlers for FleetMind chat
Simulates MCP tools using Claude's tool calling feature
"""
import os
import sys
from pathlib import Path
from datetime import datetime, timedelta
import logging
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeoutError
# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent.parent))
from database.connection import execute_write, execute_query, get_db_connection
from chat.geocoding import GeocodingService, GEOCODING_TIMEOUT
from psycopg2.extras import RealDictCursor
logger = logging.getLogger(__name__)
# Thread pool for running blocking operations (geocoding, external APIs)
_blocking_executor = ThreadPoolExecutor(max_workers=5, thread_name_prefix="blocking_ops")
# Initialize geocoding service
geocoding_service = GeocodingService()
def safe_geocode(address: str) -> dict:
"""
Thread-safe geocoding with timeout protection.
Runs geocoding in a thread pool to prevent blocking the main event loop.
Args:
address: Address to geocode
Returns:
Geocoding result dict
"""
try:
future = _blocking_executor.submit(geocoding_service.geocode, address)
result = future.result(timeout=GEOCODING_TIMEOUT + 2)
return result
except FuturesTimeoutError:
logger.error(f"Geocoding timed out for: {address}, using mock fallback")
return geocoding_service._geocode_mock(address)
except Exception as e:
logger.error(f"Geocoding failed for {address}: {e}, using mock fallback")
return geocoding_service._geocode_mock(address)
def safe_reverse_geocode(lat: float, lng: float) -> dict:
"""
Thread-safe reverse geocoding with timeout protection.
Runs reverse geocoding in a thread pool to prevent blocking.
Args:
lat: Latitude
lng: Longitude
Returns:
Reverse geocoding result dict
"""
try:
future = _blocking_executor.submit(geocoding_service.reverse_geocode, lat, lng)
result = future.result(timeout=GEOCODING_TIMEOUT + 2)
return result
except FuturesTimeoutError:
logger.error(f"Reverse geocoding timed out for ({lat}, {lng}), using mock fallback")
return geocoding_service._reverse_geocode_mock(lat, lng)
except Exception as e:
logger.error(f"Reverse geocoding failed for ({lat}, {lng}): {e}, using mock fallback")
return geocoding_service._reverse_geocode_mock(lat, lng)
# Tool schemas for Claude
TOOLS_SCHEMA = [
{
"name": "geocode_address",
"description": "Convert a delivery address to GPS coordinates and validate the address format. Use this before creating an order to ensure the address is valid.",
"input_schema": {
"type": "object",
"properties": {
"address": {
"type": "string",
"description": "The full delivery address to geocode (e.g., '123 Main St, San Francisco, CA')"
}
},
"required": ["address"]
}
},
{
"name": "create_order",
"description": "Create a new delivery order in the database. Only call this after geocoding the address successfully.",
"input_schema": {
"type": "object",
"properties": {
"customer_name": {
"type": "string",
"description": "Full name of the customer"
},
"customer_phone": {
"type": "string",
"description": "Customer phone number (optional)"
},
"customer_email": {
"type": "string",
"description": "Customer email address (optional)"
},
"delivery_address": {
"type": "string",
"description": "Full delivery address"
},
"delivery_lat": {
"type": "number",
"description": "Latitude from geocoding"
},
"delivery_lng": {
"type": "number",
"description": "Longitude from geocoding"
},
"time_window_end": {
"type": "string",
"description": "Delivery deadline in ISO format (e.g., '2025-11-13T17:00:00'). If not specified by user, default to 6 hours from now."
},
"priority": {
"type": "string",
"enum": ["standard", "express", "urgent"],
"description": "Delivery priority. Default to 'standard' unless user specifies urgent/express."
},
"special_instructions": {
"type": "string",
"description": "Any special delivery instructions (optional)"
},
"weight_kg": {
"type": "number",
"description": "Package weight in kilograms (optional, default to 5.0)"
}
},
"required": ["customer_name", "delivery_address", "delivery_lat", "delivery_lng"]
}
},
{
"name": "create_driver",
"description": "Create a new delivery driver in the database. Use this to onboard new drivers to the fleet. Requires driver location (address + coordinates).",
"input_schema": {
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "Full name of the driver"
},
"phone": {
"type": "string",
"description": "Driver phone number (optional)"
},
"email": {
"type": "string",
"description": "Driver email address (optional)"
},
"vehicle_type": {
"type": "string",
"description": "Type of vehicle: van, truck, car, motorcycle"
},
"vehicle_plate": {
"type": "string",
"description": "Vehicle license plate number (optional)"
},
"current_address": {
"type": "string",
"description": "Driver's current location address (e.g., '123 Main St, New York, NY')"
},
"current_lat": {
"type": "number",
"description": "Driver's current latitude coordinate"
},
"current_lng": {
"type": "number",
"description": "Driver's current longitude coordinate"
},
"capacity_kg": {
"type": "number",
"description": "Vehicle cargo capacity in kilograms (default: 1000.0)"
},
"capacity_m3": {
"type": "number",
"description": "Vehicle cargo volume in cubic meters (default: 12.0)"
},
"skills": {
"type": "array",
"description": "List of driver skills/certifications: refrigerated, medical_certified, fragile_handler, overnight, express_delivery",
"items": {
"type": "string"
}
},
"status": {
"type": "string",
"enum": ["active", "busy", "offline", "unavailable"],
"description": "Driver status (default: active)"
}
},
"required": ["name", "vehicle_type", "current_address", "current_lat", "current_lng"]
}
},
{
"name": "count_orders",
"description": "Count total orders in the database with optional filters. Use this when user asks 'how many orders', 'fetch orders', or wants to know order statistics.",
"input_schema": {
"type": "object",
"properties": {
"status": {
"type": "string",
"enum": ["pending", "assigned", "in_transit", "delivered", "failed", "cancelled"],
"description": "Filter by order status (optional)"
},
"priority": {
"type": "string",
"enum": ["standard", "express", "urgent"],
"description": "Filter by priority level (optional)"
},
"payment_status": {
"type": "string",
"enum": ["pending", "paid", "cod"],
"description": "Filter by payment status (optional)"
},
"assigned_driver_id": {
"type": "string",
"description": "Filter by assigned driver ID (optional)"
},
"is_fragile": {
"type": "boolean",
"description": "Filter fragile packages only (optional)"
},
"requires_signature": {
"type": "boolean",
"description": "Filter orders requiring signature (optional)"
},
"requires_cold_storage": {
"type": "boolean",
"description": "Filter orders requiring cold storage (optional)"
}
},
"required": []
}
},
{
"name": "fetch_orders",
"description": "Fetch orders from the database with optional filters, pagination, and sorting. Use after counting to show specific number of orders.",
"input_schema": {
"type": "object",
"properties": {
"limit": {
"type": "integer",
"description": "Number of orders to fetch (default: 10, max: 100)"
},
"offset": {
"type": "integer",
"description": "Number of orders to skip for pagination (default: 0)"
},
"status": {
"type": "string",
"enum": ["pending", "assigned", "in_transit", "delivered", "failed", "cancelled"],
"description": "Filter by order status (optional)"
},
"priority": {
"type": "string",
"enum": ["standard", "express", "urgent"],
"description": "Filter by priority level (optional)"
},
"payment_status": {
"type": "string",
"enum": ["pending", "paid", "cod"],
"description": "Filter by payment status (optional)"
},
"assigned_driver_id": {
"type": "string",
"description": "Filter by assigned driver ID (optional)"
},
"is_fragile": {
"type": "boolean",
"description": "Filter fragile packages only (optional)"
},
"requires_signature": {
"type": "boolean",
"description": "Filter orders requiring signature (optional)"
},
"requires_cold_storage": {
"type": "boolean",
"description": "Filter orders requiring cold storage (optional)"
},
"sort_by": {
"type": "string",
"enum": ["created_at", "priority", "time_window_start"],
"description": "Field to sort by (default: created_at)"
},
"sort_order": {
"type": "string",
"enum": ["ASC", "DESC"],
"description": "Sort order (default: DESC for newest first)"
}
},
"required": []
}
},
{
"name": "get_order_details",
"description": "Get complete details of a specific order by order ID. Use when user asks 'tell me about order X' or wants detailed information about a specific order.",
"input_schema": {
"type": "object",
"properties": {
"order_id": {
"type": "string",
"description": "The order ID to fetch details for (e.g., 'ORD-20251114163800')"
}
},
"required": ["order_id"]
}
},
{
"name": "search_orders",
"description": "Search for orders by customer name, email, phone, or order ID pattern. Use when user provides partial information to find orders.",
"input_schema": {
"type": "object",
"properties": {
"search_term": {
"type": "string",
"description": "Search term to match against customer_name, customer_email, customer_phone, or order_id"
}
},
"required": ["search_term"]
}
},
{
"name": "get_incomplete_orders",
"description": "Get all orders that are not yet completed (excludes delivered and cancelled orders). Shortcut for finding orders in progress (pending, assigned, in_transit).",
"input_schema": {
"type": "object",
"properties": {
"limit": {
"type": "integer",
"description": "Number of orders to fetch (default: 20)"
}
},
"required": []
}
},
{
"name": "count_drivers",
"description": "Count total drivers in the database with optional filters. Use this when user asks 'how many drivers', 'show drivers', or wants driver statistics.",
"input_schema": {
"type": "object",
"properties": {
"status": {
"type": "string",
"enum": ["active", "busy", "offline", "unavailable"],
"description": "Filter by driver status (optional)"
},
"vehicle_type": {
"type": "string",
"description": "Filter by vehicle type: van, truck, car, motorcycle, etc. (optional)"
}
},
"required": []
}
},
{
"name": "fetch_drivers",
"description": "Fetch drivers from the database with optional filters, pagination, and sorting. Use after counting to show specific number of drivers.",
"input_schema": {
"type": "object",
"properties": {
"limit": {
"type": "integer",
"description": "Number of drivers to fetch (default: 10, max: 100)"
},
"offset": {
"type": "integer",
"description": "Number of drivers to skip for pagination (default: 0)"
},
"status": {
"type": "string",
"enum": ["active", "busy", "offline", "unavailable"],
"description": "Filter by driver status (optional)"
},
"vehicle_type": {
"type": "string",
"description": "Filter by vehicle type: van, truck, car, motorcycle, etc. (optional)"
},
"sort_by": {
"type": "string",
"enum": ["name", "status", "created_at", "last_location_update"],
"description": "Field to sort by (default: name)"
},
"sort_order": {
"type": "string",
"enum": ["ASC", "DESC"],
"description": "Sort order (default: ASC for alphabetical)"
}
},
"required": []
}
},
{
"name": "get_driver_details",
"description": "Get complete details of a specific driver by driver ID, including current location (latitude, longitude, and human-readable address), contact info, vehicle details, status, and skills. Use when user asks about a driver's location, coordinates, position, or any other driver information.",
"input_schema": {
"type": "object",
"properties": {
"driver_id": {
"type": "string",
"description": "The driver ID to fetch details for (e.g., 'DRV-20251114163800')"
}
},
"required": ["driver_id"]
}
},
{
"name": "search_drivers",
"description": "Search for drivers by name, email, phone, vehicle plate, or driver ID pattern. Use when user provides partial information to find drivers.",
"input_schema": {
"type": "object",
"properties": {
"search_term": {
"type": "string",
"description": "Search term to match against name, email, phone, vehicle_plate, or driver_id"
}
},
"required": ["search_term"]
}
},
{
"name": "get_available_drivers",
"description": "Get all drivers that are available for assignment (active or offline status, excludes busy and unavailable). Shortcut for finding drivers ready for dispatch.",
"input_schema": {
"type": "object",
"properties": {
"limit": {
"type": "integer",
"description": "Number of drivers to fetch (default: 20)"
}
},
"required": []
}
},
{
"name": "update_order",
"description": "Update an existing order's details. You can update any combination of fields. Only provide the fields you want to change.",
"input_schema": {
"type": "object",
"properties": {
"order_id": {
"type": "string",
"description": "Order ID to update (e.g., 'ORD-20250114123456')"
},
"customer_name": {
"type": "string",
"description": "Updated customer name"
},
"customer_phone": {
"type": "string",
"description": "Updated customer phone number"
},
"customer_email": {
"type": "string",
"description": "Updated customer email address"
},
"delivery_address": {
"type": "string",
"description": "Updated delivery address"
},
"delivery_lat": {
"type": "number",
"description": "Updated delivery latitude (required if updating address)"
},
"delivery_lng": {
"type": "number",
"description": "Updated delivery longitude (required if updating address)"
},
"status": {
"type": "string",
"description": "Updated order status",
"enum": ["pending", "assigned", "in_transit", "delivered", "failed", "cancelled"]
},
"priority": {
"type": "string",
"description": "Updated priority level",
"enum": ["standard", "express", "urgent"]
},
"special_instructions": {
"type": "string",
"description": "Updated special delivery instructions"
},
"time_window_end": {
"type": "string",
"description": "Updated delivery deadline (ISO format datetime)"
},
"payment_status": {
"type": "string",
"description": "Updated payment status",
"enum": ["pending", "paid", "cod"]
},
"weight_kg": {
"type": "number",
"description": "Updated package weight in kilograms"
},
"order_value": {
"type": "number",
"description": "Updated order value in currency"
}
},
"required": ["order_id"]
}
},
{
"name": "delete_order",
"description": "Permanently delete an order from the database. This action cannot be undone. Use with caution.",
"input_schema": {
"type": "object",
"properties": {
"order_id": {
"type": "string",
"description": "Order ID to delete (e.g., 'ORD-20250114123456')"
},
"confirm": {
"type": "boolean",
"description": "Must be set to true to confirm deletion"
}
},
"required": ["order_id", "confirm"]
}
},
{
"name": "update_driver",
"description": "Update an existing driver's details. You can update any combination of fields. Only provide the fields you want to change.",
"input_schema": {
"type": "object",
"properties": {
"driver_id": {
"type": "string",
"description": "Driver ID to update (e.g., 'DRV-20250114123456')"
},
"name": {
"type": "string",
"description": "Updated driver name"
},
"phone": {
"type": "string",
"description": "Updated phone number"
},
"email": {
"type": "string",
"description": "Updated email address"
},
"status": {
"type": "string",
"description": "Updated driver status",
"enum": ["active", "busy", "offline", "unavailable"]
},
"vehicle_type": {
"type": "string",
"description": "Updated vehicle type"
},
"vehicle_plate": {
"type": "string",
"description": "Updated vehicle license plate"
},
"capacity_kg": {
"type": "number",
"description": "Updated cargo capacity in kilograms"
},
"capacity_m3": {
"type": "number",
"description": "Updated cargo capacity in cubic meters"
},
"skills": {
"type": "array",
"items": {"type": "string"},
"description": "Updated list of driver skills/certifications"
},
"current_address": {
"type": "string",
"description": "Updated current location address (e.g., '123 Main St, New York, NY')"
},
"current_lat": {
"type": "number",
"description": "Updated current latitude"
},
"current_lng": {
"type": "number",
"description": "Updated current longitude"
}
},
"required": ["driver_id"]
}
},
{
"name": "delete_driver",
"description": "Permanently delete a driver from the database. This action cannot be undone. Use with caution.",
"input_schema": {
"type": "object",
"properties": {
"driver_id": {
"type": "string",
"description": "Driver ID to delete (e.g., 'DRV-20250114123456')"
},
"confirm": {
"type": "boolean",
"description": "Must be set to true to confirm deletion"
}
},
"required": ["driver_id", "confirm"]
}
}
]
def execute_tool(tool_name: str, tool_input: dict) -> dict:
"""
Route tool execution to appropriate handler
Args:
tool_name: Name of the tool to execute
tool_input: Tool input parameters
Returns:
Dict with tool execution results
"""
try:
if tool_name == "geocode_address":
return handle_geocode_address(tool_input)
elif tool_name == "create_order":
return handle_create_order(tool_input)
elif tool_name == "create_driver":
return handle_create_driver(tool_input)
elif tool_name == "count_orders":
return handle_count_orders(tool_input)
elif tool_name == "fetch_orders":
return handle_fetch_orders(tool_input)
elif tool_name == "get_order_details":
return handle_get_order_details(tool_input)
elif tool_name == "search_orders":
return handle_search_orders(tool_input)
elif tool_name == "get_incomplete_orders":
return handle_get_incomplete_orders(tool_input)
elif tool_name == "count_drivers":
return handle_count_drivers(tool_input)
elif tool_name == "fetch_drivers":
return handle_fetch_drivers(tool_input)
elif tool_name == "get_driver_details":
return handle_get_driver_details(tool_input)
elif tool_name == "search_drivers":
return handle_search_drivers(tool_input)
elif tool_name == "get_available_drivers":
return handle_get_available_drivers(tool_input)
elif tool_name == "update_order":
return handle_update_order(tool_input)
elif tool_name == "delete_order":
return handle_delete_order(tool_input)
elif tool_name == "update_driver":
return handle_update_driver(tool_input)
elif tool_name == "delete_driver":
return handle_delete_driver(tool_input)
else:
return {
"success": False,
"error": f"Unknown tool: {tool_name}"
}
except Exception as e:
logger.error(f"Tool execution error ({tool_name}): {e}")
return {
"success": False,
"error": str(e)
}
def handle_geocode_address(tool_input: dict) -> dict:
"""
Execute geocoding tool
Args:
tool_input: Dict with 'address' key
Returns:
Geocoding result
"""
address = tool_input.get("address", "")
if not address:
return {
"success": False,
"error": "Address is required"
}
logger.info(f"Geocoding address: {address}")
# Use safe_geocode with timeout protection
result = safe_geocode(address)
return {
"success": True,
"latitude": result["lat"],
"longitude": result["lng"],
"formatted_address": result["formatted_address"],
"confidence": result["confidence"],
"message": f"Address geocoded successfully ({result['confidence']})"
}
def handle_calculate_route(tool_input: dict) -> dict:
"""
Execute route calculation tool
Args:
tool_input: Dict with origin, destination, mode, vehicle_type, alternatives, include_steps
Returns:
Route calculation result with distance, duration, and optional directions
"""
import math
from datetime import datetime
origin = tool_input.get("origin", "")
destination = tool_input.get("destination", "")
mode = tool_input.get("mode", "driving")
vehicle_type = tool_input.get("vehicle_type", "car")
alternatives = tool_input.get("alternatives", False)
include_steps = tool_input.get("include_steps", False)
if not origin or not destination:
return {
"success": False,
"error": "Both origin and destination are required"
}
# Map vehicle type to travel mode
VEHICLE_TYPE_TO_MODE = {
"motorcycle": "TWO_WHEELER", # Use proper TWO_WHEELER mode for motorcycle-specific routing
"bicycle": "bicycling",
"car": "driving",
"van": "driving",
"truck": "driving" # Note: No truck-specific routing available in API
}
# Override mode if vehicle_type is provided
if vehicle_type in VEHICLE_TYPE_TO_MODE:
mode = VEHICLE_TYPE_TO_MODE[vehicle_type]
logger.info(f"Vehicle type '{vehicle_type}' mapped to mode '{mode}'")
logger.info(f"Calculating route: {origin}{destination} (mode: {mode}, vehicle: {vehicle_type})")
# Triple fallback: Routes API → Directions API → Mock
if geocoding_service.use_mock:
logger.info("Using mock route calculation (no API key)")
result = _calculate_route_mock(origin, destination, mode)
else:
try:
# Try Routes API first (recommended, more accurate)
logger.info("Attempting Routes API (recommended)")
result = _calculate_route_routes_api(origin, destination, mode, alternatives, include_steps, vehicle_type, tool_input)
except Exception as e:
logger.warning(f"Routes API failed: {e}")
try:
# Fall back to Directions API (legacy)
logger.info("Falling back to Directions API (legacy)")
result = _calculate_route_google(origin, destination, mode, alternatives, include_steps)
except Exception as e2:
# Fall back to mock calculation
logger.error(f"Directions API also failed: {e2}, falling back to mock")
result = _calculate_route_mock(origin, destination, mode)
# Add vehicle type to result for use in intelligent routing
result["vehicle_type"] = vehicle_type
return result
def _calculate_route_google(origin: str, destination: str, mode: str, alternatives: bool, include_steps: bool) -> dict:
"""Calculate route using Google Maps Directions API"""
try:
# Map our mode to Google Maps mode
mode_mapping = {
"driving": "driving",
"walking": "walking",
"bicycling": "bicycling",
"transit": "transit"
}
gmaps_mode = mode_mapping.get(mode, "driving")
# Call Google Maps Directions API
result = geocoding_service.gmaps_client.directions(
origin=origin,
destination=destination,
mode=gmaps_mode,
alternatives=alternatives,
departure_time="now" # Get real-time traffic data
)
if not result:
logger.warning(f"Google Maps Directions API found no routes for: {origin}{destination}")
return _calculate_route_mock(origin, destination, mode)
# Get first (best) route
route = result[0]
leg = route['legs'][0] # First leg (direct route)
# Extract route information
distance_meters = leg['distance']['value']
distance_text = leg['distance']['text']
duration_seconds = leg['duration']['value']
duration_text = leg['duration']['text']
# Get traffic-aware duration if available
duration_in_traffic = leg.get('duration_in_traffic')
if duration_in_traffic:
traffic_duration_seconds = duration_in_traffic['value']
traffic_duration_text = duration_in_traffic['text']
else:
traffic_duration_seconds = duration_seconds
traffic_duration_text = duration_text
# Get route summary
route_summary = route.get('summary', 'Via main roads')
# Prepare response
response = {
"success": True,
"origin": leg['start_address'],
"destination": leg['end_address'],
"distance": {
"meters": distance_meters,
"text": distance_text
},
"duration": {
"seconds": duration_seconds,
"text": duration_text
},
"duration_in_traffic": {
"seconds": traffic_duration_seconds,
"text": traffic_duration_text
},
"mode": mode,
"route_summary": route_summary,
"confidence": "high (Google Maps API)"
}
# Add turn-by-turn steps if requested
if include_steps and 'steps' in leg:
steps = []
for step in leg['steps']:
steps.append({
"instruction": step.get('html_instructions', '').replace('<b>', '').replace('</b>', ''),
"distance": step['distance']['text'],
"duration": step['duration']['text']
})
response["steps"] = steps
response["total_steps"] = len(steps)
# Add alternative routes if requested
if alternatives and len(result) > 1:
alt_routes = []
for alt_route in result[1:]: # Skip first route (already returned)
alt_leg = alt_route['legs'][0]
alt_routes.append({
"route_summary": alt_route.get('summary', 'Alternative route'),
"distance": alt_leg['distance']['text'],
"duration": alt_leg['duration']['text']
})
response["alternatives"] = alt_routes
response["alternatives_count"] = len(alt_routes)
logger.info(f"Route calculated: {distance_text}, {traffic_duration_text}")
return response
except Exception as e:
logger.error(f"Google Maps Directions API error: {e}")
raise
def _location_to_latlng(location: str) -> dict:
"""
Convert location (address or coordinates) to lat/lng dict for Routes API
Args:
location: Either an address string or "lat,lng" coordinates
Returns:
Dict with {"latitude": float, "longitude": float}
"""
# Check if already in "lat,lng" format
if ',' in location:
parts = location.split(',')
if len(parts) == 2:
try:
lat = float(parts[0].strip())
lng = float(parts[1].strip())
return {"latitude": lat, "longitude": lng}
except ValueError:
pass # Not valid coordinates, treat as address
# Geocode the address using safe version with timeout
geocoded = safe_geocode(location)
return {
"latitude": geocoded["lat"],
"longitude": geocoded["lng"]
}
def _calculate_route_routes_api(origin: str, destination: str, mode: str, alternatives: bool, include_steps: bool, vehicle_type: str = "car", tool_input: dict = None) -> dict:
"""
Calculate route using Google Routes API (new, recommended)
This uses the modern Routes API which provides better accuracy,
real-time traffic data, vehicle-specific routing, and additional features.
Args:
origin: Starting location (address or "lat,lng")
destination: Ending location (address or "lat,lng")
mode: Travel mode (driving, walking, bicycling, transit, TWO_WHEELER)
alternatives: Whether to return alternative routes
include_steps: Whether to include turn-by-turn directions
vehicle_type: Vehicle type (motorcycle, bicycle, car, van, truck)
tool_input: Original tool input dict for route modifiers
Returns:
Route calculation result dict with vehicle-specific data
"""
if tool_input is None:
tool_input = {}
import requests
import re
try:
# Convert locations to lat/lng
origin_latlng = _location_to_latlng(origin)
dest_latlng = _location_to_latlng(destination)
# Map travel modes to Routes API format
mode_mapping = {
"driving": "DRIVE",
"walking": "WALK",
"bicycling": "BICYCLE",
"transit": "TRANSIT",
"TWO_WHEELER": "TWO_WHEELER" # Motorcycle-specific routing
}
routes_mode = mode_mapping.get(mode, "DRIVE")
# Prepare API request
url = "https://routes.googleapis.com/directions/v2:computeRoutes"
# Build enhanced field mask for vehicle-specific data
field_mask_parts = [
"routes.duration",
"routes.staticDuration", # Duration without traffic
"routes.distanceMeters",
"routes.polyline.encodedPolyline",
"routes.legs",
"routes.description",
"routes.localizedValues",
"routes.routeLabels", # Get route type labels (FUEL_EFFICIENT, etc.)
"routes.travelAdvisory.speedReadingIntervals", # Traffic segments
"routes.travelAdvisory.tollInfo" # Toll information
]
# Add fuel consumption for DRIVE mode
if routes_mode == "DRIVE":
field_mask_parts.append("routes.travelAdvisory.fuelConsumptionMicroliters")
headers = {
"Content-Type": "application/json",
"X-Goog-Api-Key": geocoding_service.google_maps_key,
"X-Goog-FieldMask": ",".join(field_mask_parts)
}
# Build request body
body = {
"origin": {
"location": {
"latLng": origin_latlng
}
},
"destination": {
"location": {
"latLng": dest_latlng
}
},
"travelMode": routes_mode,
"computeAlternativeRoutes": alternatives,
"languageCode": "en-US",
"units": "METRIC"
}
# Add routing preference only for DRIVE and TWO_WHEELER (not for WALK/BICYCLE)
if routes_mode in ["DRIVE", "TWO_WHEELER"]:
body["routingPreference"] = "TRAFFIC_AWARE"
# Add route modifiers based on vehicle type
route_modifiers = {}
# Vehicle emission type for DRIVE mode (cars, vans, trucks)
if routes_mode == "DRIVE":
emission_type = tool_input.get("emission_type", "GASOLINE").upper()
if emission_type in ["GASOLINE", "ELECTRIC", "HYBRID", "DIESEL"]:
route_modifiers["vehicleInfo"] = {
"emissionType": emission_type
}
# Avoid options (applicable to DRIVE and TWO_WHEELER)
if routes_mode in ["DRIVE", "TWO_WHEELER"]:
if tool_input.get("avoid_tolls", False):
route_modifiers["avoidTolls"] = True
if tool_input.get("avoid_highways", False):
route_modifiers["avoidHighways"] = True
if tool_input.get("avoid_ferries", False):
route_modifiers["avoidFerries"] = True
if route_modifiers:
body["routeModifiers"] = route_modifiers
# Add extra computations for enhanced data
extra_computations = []
# Traffic data for DRIVE and TWO_WHEELER
if routes_mode in ["DRIVE", "TWO_WHEELER"]:
extra_computations.append("TRAFFIC_ON_POLYLINE")
# Toll information (unless avoiding tolls)
if not tool_input.get("avoid_tolls", False):
extra_computations.append("TOLLS")
# Fuel consumption for DRIVE mode only
if routes_mode == "DRIVE":
extra_computations.append("FUEL_CONSUMPTION")
if extra_computations:
body["extraComputations"] = extra_computations
# Request fuel-efficient alternative for DRIVE mode
if routes_mode == "DRIVE" and tool_input.get("request_fuel_efficient", False):
body["requestedReferenceRoutes"] = ["FUEL_EFFICIENT"]
# Make API request
logger.info(f"Calling Routes API: {origin}{destination} (mode: {routes_mode})")
response = requests.post(url, headers=headers, json=body, timeout=10)
if response.status_code != 200:
logger.error(f"Routes API error: {response.status_code} - {response.text}")
raise Exception(f"Routes API returned {response.status_code}: {response.text[:200]}")
data = response.json()
if not data.get("routes"):
logger.warning(f"Routes API found no routes for: {origin}{destination}")
return _calculate_route_google(origin, destination, mode, alternatives, include_steps)
# Get first (best) route
route = data["routes"][0]
# Extract distance
distance_meters = route.get("distanceMeters", 0)
if distance_meters >= 1000:
distance_text = f"{distance_meters/1000:.1f} km"
else:
distance_text = f"{distance_meters} m"
# Helper function to format duration
def format_duration(seconds):
hours = seconds // 3600
minutes = (seconds % 3600) // 60
if hours > 0:
return f"{hours} hour{'s' if hours > 1 else ''} {minutes} min{'s' if minutes != 1 else ''}"
else:
return f"{minutes} min{'s' if minutes != 1 else ''}"
# Extract duration WITH traffic (format: "123s" or "123.456s")
duration_str = route.get("duration", "0s")
duration_with_traffic_seconds = int(float(re.sub(r'[^\d.]', '', duration_str)))
# Extract static duration (WITHOUT traffic)
static_duration_str = route.get("staticDuration", duration_str)
static_duration_seconds = int(float(re.sub(r'[^\d.]', '', static_duration_str)))
# Calculate traffic delay
traffic_delay_seconds = duration_with_traffic_seconds - static_duration_seconds
# Get route description/summary and labels
route_summary = route.get("description", "Route via Routes API")
route_labels = route.get("routeLabels", [])
# Extract travel advisory information
travel_advisory = route.get("travelAdvisory", {})
# Toll information
toll_info = travel_advisory.get("tollInfo")
has_tolls = toll_info is not None
# Fuel consumption (DRIVE mode only)
fuel_consumption_ml = travel_advisory.get("fuelConsumptionMicroliters")
fuel_consumption_liters = None
if fuel_consumption_ml:
fuel_consumption_liters = float(fuel_consumption_ml) / 1_000_000
# Traffic segments
speed_intervals = travel_advisory.get("speedReadingIntervals", [])
has_traffic_data = len(speed_intervals) > 0
# Get origin and destination addresses (geocode if needed) - use safe version with timeout
origin_geocoded = safe_geocode(origin)
dest_geocoded = safe_geocode(destination)
# Build enhanced response with vehicle-specific data
response_data = {
"success": True,
"origin": origin_geocoded["formatted_address"],
"destination": dest_geocoded["formatted_address"],
"distance": {
"meters": distance_meters,
"text": distance_text
},
"duration": {
"seconds": static_duration_seconds,
"text": format_duration(static_duration_seconds)
},
"duration_in_traffic": {
"seconds": duration_with_traffic_seconds,
"text": format_duration(duration_with_traffic_seconds)
},
"traffic_delay": {
"seconds": traffic_delay_seconds,
"text": format_duration(traffic_delay_seconds) if traffic_delay_seconds > 0 else "No delay"
},
"mode": mode,
"vehicle_type": vehicle_type,
"route_summary": route_summary,
"route_labels": route_labels,
"confidence": "high (Routes API with real-time traffic)"
}
# Add toll information if available
if has_tolls:
response_data["toll_info"] = {
"has_tolls": True,
"details": "Toll roads on route"
}
else:
response_data["toll_info"] = {"has_tolls": False}
# Add fuel consumption if available (DRIVE mode)
if fuel_consumption_liters is not None:
response_data["fuel_consumption"] = {
"liters": round(fuel_consumption_liters, 2),
"text": f"{fuel_consumption_liters:.2f} L"
}
# Add traffic data availability indicator
if has_traffic_data:
response_data["traffic_data_available"] = True
response_data["traffic_segments_count"] = len(speed_intervals)
# Add beta warnings for specific modes
if routes_mode == "TWO_WHEELER":
response_data["warning"] = (
"Motorcycle routing uses TWO_WHEELER mode (beta). "
"May occasionally miss clear paths. Billed at higher rate."
)
elif routes_mode == "BICYCLE":
response_data["warning"] = (
"Bicycle routing is in beta and may occasionally miss clear bike paths."
)
# Add turn-by-turn steps if requested
if include_steps and route.get("legs"):
steps = []
for leg in route["legs"]:
if leg.get("steps"):
for step in leg["steps"]:
# Routes API has different step format, adapt as needed
steps.append({
"instruction": step.get("navigationInstruction", {}).get("instructions", "Continue"),
"distance": step.get("distanceMeters", 0),
"duration": step.get("staticDuration", "0s")
})
if steps:
response_data["steps"] = steps
response_data["steps_count"] = len(steps)
# Add alternative routes if requested and available
if alternatives and len(data["routes"]) > 1:
alt_routes = []
for alt_route in data["routes"][1:]:
alt_distance = alt_route.get("distanceMeters", 0)
alt_duration_str = alt_route.get("duration", "0s")
alt_duration_sec = int(float(re.sub(r'[^\d.]', '', alt_duration_str)))
alt_hours = alt_duration_sec // 3600
alt_minutes = (alt_duration_sec % 3600) // 60
if alt_hours > 0:
alt_duration_text = f"{alt_hours} hour{'s' if alt_hours > 1 else ''} {alt_minutes} min"
else:
alt_duration_text = f"{alt_minutes} min"
alt_routes.append({
"route_summary": alt_route.get("description", "Alternative route"),
"distance": f"{alt_distance/1000:.1f} km" if alt_distance >= 1000 else f"{alt_distance} m",
"duration": alt_duration_text
})
response_data["alternatives"] = alt_routes
response_data["alternatives_count"] = len(alt_routes)
logger.info(f"Routes API: {distance_text}, {format_duration(duration_with_traffic_seconds)}")
return response_data
except Exception as e:
logger.error(f"Routes API error: {e}")
raise
# City-specific traffic profiles for realistic routing
CITY_PROFILES = {
"dhaka": {
"name": "Dhaka, Bangladesh",
"peak_speed_kmh": 8, # 8 km/h during peak hours (7-10 AM, 5-9 PM)
"offpeak_speed_kmh": 18, # 18 km/h during off-peak hours
"night_speed_kmh": 25, # 25 km/h at night (10 PM - 6 AM)
"signals_per_km": 4, # 4 traffic signals per km in urban areas
"signal_delay_sec": 50, # 50 seconds average per signal
"intersection_delay_per_km": 30, # 30 seconds per km for intersections
"congestion_multiplier": 2.5, # Heavy congestion factor
"keywords": ["dhaka", "bangladesh"]
},
"default": {
"name": "Default Urban Area",
"peak_speed_kmh": 20, # 20 km/h during peak hours
"offpeak_speed_kmh": 30, # 30 km/h during off-peak hours
"night_speed_kmh": 40, # 40 km/h at night
"signals_per_km": 2, # 2 traffic signals per km
"signal_delay_sec": 45, # 45 seconds average per signal
"intersection_delay_per_km": 20, # 20 seconds per km
"congestion_multiplier": 1.5, # Moderate congestion
"keywords": []
}
}
def _calculate_route_mock(origin: str, destination: str, mode: str) -> dict:
"""Mock route calculation with realistic urban traffic modeling"""
import math
from datetime import datetime
# Try to geocode both locations to get coordinates (using safe version)
try:
origin_geocoded = safe_geocode(origin)
dest_geocoded = safe_geocode(destination)
origin_lat = origin_geocoded["lat"]
origin_lng = origin_geocoded["lng"]
dest_lat = dest_geocoded["lat"]
dest_lng = dest_geocoded["lng"]
# Detect city from destination address
dest_address_lower = dest_geocoded["formatted_address"].lower()
city_profile = CITY_PROFILES["default"]
for city_key, profile in CITY_PROFILES.items():
if city_key != "default":
for keyword in profile["keywords"]:
if keyword in dest_address_lower:
city_profile = profile
logger.info(f"Detected city: {profile['name']}")
break
if city_profile != CITY_PROFILES["default"]:
break
# Detect time of day
current_hour = datetime.now().hour
if 7 <= current_hour < 10 or 17 <= current_hour < 21:
time_period = "peak"
speed_kmh = city_profile["peak_speed_kmh"]
elif 22 <= current_hour or current_hour < 6:
time_period = "night"
speed_kmh = city_profile["night_speed_kmh"]
else:
time_period = "offpeak"
speed_kmh = city_profile["offpeak_speed_kmh"]
logger.info(f"Time period: {time_period}, base speed: {speed_kmh} km/h")
# Calculate straight-line distance using Haversine formula
R = 6371000 # Earth radius in meters
phi1 = math.radians(origin_lat)
phi2 = math.radians(dest_lat)
delta_phi = math.radians(dest_lat - origin_lat)
delta_lambda = math.radians(dest_lng - origin_lng)
a = math.sin(delta_phi/2)**2 + math.cos(phi1) * math.cos(phi2) * math.sin(delta_lambda/2)**2
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
distance_meters = R * c
# Estimate driving distance based on mode
if mode == "driving":
distance_meters *= 1.3 # 30% longer for road network
speed_mps = speed_kmh / 3.6 # Convert km/h to m/s
elif mode == "walking":
distance_meters *= 1.2
speed_mps = 1.4 # ~5 km/h walking speed
elif mode == "bicycling":
distance_meters *= 1.25
speed_mps = 4.5 # ~16 km/h cycling speed
elif mode == "transit":
distance_meters *= 1.4
speed_mps = 8.9 # ~32 km/h transit speed
else:
speed_mps = speed_kmh / 3.6
# Calculate base duration from speed
base_duration_seconds = int(distance_meters / speed_mps)
# Add realistic urban delays for driving mode
traffic_duration_seconds = base_duration_seconds
if mode == "driving":
distance_km = distance_meters / 1000.0
# Add traffic signal delays
num_signals = int(distance_km * city_profile["signals_per_km"])
signal_delay = num_signals * city_profile["signal_delay_sec"]
# Add intersection delays
intersection_delay = int(distance_km * city_profile["intersection_delay_per_km"])
# Apply congestion multiplier for peak hours
if time_period == "peak":
congestion_delay = int(base_duration_seconds * (city_profile["congestion_multiplier"] - 1.0))
else:
congestion_delay = 0
# Calculate total traffic-aware duration
traffic_duration_seconds = base_duration_seconds + signal_delay + intersection_delay + congestion_delay
# Apply minimum travel time (2 minutes)
MIN_TRAVEL_TIME = 120
if traffic_duration_seconds < MIN_TRAVEL_TIME:
traffic_duration_seconds = MIN_TRAVEL_TIME
logger.info(f"Urban delays - Signals: {signal_delay}s, Intersections: {intersection_delay}s, Congestion: {congestion_delay}s")
# Format distance
if distance_meters >= 1000:
distance_text = f"{distance_meters/1000:.1f} km"
else:
distance_text = f"{int(distance_meters)} m"
# Format base duration
hours = base_duration_seconds // 3600
minutes = (base_duration_seconds % 3600) // 60
if hours > 0:
base_duration_text = f"{hours} hour{'s' if hours > 1 else ''} {minutes} min{'s' if minutes != 1 else ''}"
else:
base_duration_text = f"{minutes} min{'s' if minutes != 1 else ''}"
# Format traffic-aware duration
hours = traffic_duration_seconds // 3600
minutes = (traffic_duration_seconds % 3600) // 60
if hours > 0:
traffic_duration_text = f"{hours} hour{'s' if hours > 1 else ''} {minutes} min{'s' if minutes != 1 else ''}"
else:
traffic_duration_text = f"{minutes} min{'s' if minutes != 1 else ''}"
logger.info(f"Mock route calculated: {distance_text}, {traffic_duration_text} (base: {base_duration_text}, city: {city_profile['name']})")
return {
"success": True,
"origin": origin_geocoded["formatted_address"],
"destination": dest_geocoded["formatted_address"],
"distance": {
"meters": int(distance_meters),
"text": distance_text
},
"duration": {
"seconds": base_duration_seconds,
"text": base_duration_text
},
"duration_in_traffic": {
"seconds": traffic_duration_seconds,
"text": traffic_duration_text
},
"mode": mode,
"route_summary": f"Direct route via {city_profile['name']} ({time_period} traffic)",
"confidence": "low (mock calculation with urban traffic modeling)"
}
except Exception as e:
logger.error(f"Mock route calculation failed: {e}")
return {
"success": False,
"error": f"Could not calculate route: {str(e)}"
}
def handle_create_order(tool_input: dict, user_id: str = None) -> dict:
"""
Execute order creation tool
Args:
tool_input: Dict with order fields (expected_delivery_time now REQUIRED)
user_id: ID of authenticated user creating the order
Returns:
Order creation result
"""
# Authentication check - allow dev mode (only in non-production environments)
if not user_id:
# Development mode - use default user
# SECURITY: Only allow SKIP_AUTH in development environments
env = os.getenv("ENV", "production").lower()
skip_auth = os.getenv("SKIP_AUTH", "false").lower() == "true"
if skip_auth and env != "production":
user_id = "dev-user"
else:
return {
"success": False,
"error": "Authentication required. Please login first.",
"auth_required": True
}
# Extract fields with defaults
customer_name = tool_input.get("customer_name")
customer_phone = tool_input.get("customer_phone")
customer_email = tool_input.get("customer_email")
delivery_address = tool_input.get("delivery_address")
delivery_lat = tool_input.get("delivery_lat")
delivery_lng = tool_input.get("delivery_lng")
expected_delivery_time_str = tool_input.get("expected_delivery_time")
priority = tool_input.get("priority", "standard")
special_instructions = tool_input.get("special_instructions")
weight_kg = tool_input.get("weight_kg", 5.0)
volume_m3 = tool_input.get("volume_m3", 1.0)
is_fragile = tool_input.get("is_fragile", False)
requires_cold_storage = tool_input.get("requires_cold_storage", False)
requires_signature = tool_input.get("requires_signature", False)
sla_grace_period_minutes = tool_input.get("sla_grace_period_minutes", 15)
# Validate required fields (expected_delivery_time is now MANDATORY)
if not all([customer_name, delivery_address, delivery_lat, delivery_lng, expected_delivery_time_str]):
return {
"success": False,
"error": "Missing required fields: customer_name, delivery_address, delivery_lat, delivery_lng, expected_delivery_time"
}
# Generate order ID with microseconds to prevent collisions
now = datetime.now()
order_id = f"ORD-{now.strftime('%Y%m%d%H%M%S%f')[:18]}" # YYYYMMDDHHMMSSμμμμμμ (18 chars)
# Parse and validate expected_delivery_time
try:
expected_delivery_time = datetime.fromisoformat(expected_delivery_time_str.replace('Z', '+00:00'))
# Validate it's in the future
if expected_delivery_time <= now:
return {
"success": False,
"error": f"expected_delivery_time must be in the future. Provided: {expected_delivery_time_str}, Current time: {now.isoformat()}"
}
except (ValueError, AttributeError) as e:
return {
"success": False,
"error": f"Invalid expected_delivery_time format. Must be ISO 8601 format (e.g., '2025-11-15T18:00:00'). Error: {str(e)}"
}
# Handle time window (kept for backward compatibility)
time_window_end_str = tool_input.get("time_window_end")
if time_window_end_str:
try:
time_window_end = datetime.fromisoformat(time_window_end_str.replace('Z', '+00:00'))
except:
time_window_end = expected_delivery_time # Use expected time as fallback
else:
time_window_end = expected_delivery_time # Default to expected delivery time
time_window_start = now + timedelta(hours=2)
# Insert into database
query = """
INSERT INTO orders (
order_id, customer_name, customer_phone, customer_email,
delivery_address, delivery_lat, delivery_lng,
time_window_start, time_window_end, expected_delivery_time,
priority, weight_kg, volume_m3, is_fragile, requires_cold_storage, requires_signature,
status, special_instructions, sla_grace_period_minutes, user_id
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
params = (
order_id,
customer_name,
customer_phone,
customer_email,
delivery_address,
delivery_lat,
delivery_lng,
time_window_start,
time_window_end,
expected_delivery_time,
priority,
weight_kg,
volume_m3,
is_fragile,
requires_cold_storage,
requires_signature,
"pending",
special_instructions,
sla_grace_period_minutes,
user_id # Add user_id to track ownership
)
try:
execute_write(query, params)
logger.info(f"Order created: {order_id}, expected delivery: {expected_delivery_time.strftime('%Y-%m-%d %H:%M')}")
return {
"success": True,
"order_id": order_id,
"status": "pending",
"customer": customer_name,
"address": delivery_address,
"expected_delivery": expected_delivery_time.strftime("%Y-%m-%d %H:%M"),
"sla_grace_period_minutes": sla_grace_period_minutes,
"priority": priority,
"message": f"Order {order_id} created successfully! Expected delivery: {expected_delivery_time.strftime('%Y-%m-%d %H:%M')}"
}
except Exception as e:
logger.error(f"Database error creating order: {e}")
return {
"success": False,
"error": f"Failed to create order: {str(e)}"
}
def handle_create_driver(tool_input: dict, user_id: str = None) -> dict:
"""
Execute driver creation tool
Args:
tool_input: Dict with driver fields
user_id: ID of authenticated user creating the driver
Returns:
Driver creation result
"""
# Authentication check - allow dev mode (only in non-production environments)
if not user_id:
# SECURITY: Only allow SKIP_AUTH in development environments
env = os.getenv("ENV", "production").lower()
skip_auth = os.getenv("SKIP_AUTH", "false").lower() == "true"
if skip_auth and env != "production":
user_id = "dev-user"
else:
return {
"success": False,
"error": "Authentication required. Please login first.",
"auth_required": True
}
# Extract fields with defaults
name = tool_input.get("name")
phone = tool_input.get("phone")
email = tool_input.get("email")
vehicle_type = tool_input.get("vehicle_type") # No default - REQUIRED
vehicle_plate = tool_input.get("vehicle_plate")
capacity_kg = tool_input.get("capacity_kg", 1000.0)
capacity_m3 = tool_input.get("capacity_m3", 12.0)
current_lat = tool_input.get("current_lat") # No default - REQUIRED
current_lng = tool_input.get("current_lng") # No default - REQUIRED
current_address = tool_input.get("current_address") # No default - REQUIRED
# Convert skills to regular list (handles protobuf RepeatedComposite)
skills_raw = tool_input.get("skills", [])
skills = list(skills_raw) if skills_raw else []
# Define valid skills
VALID_SKILLS = [
"refrigerated",
"medical_certified",
"fragile_handler",
"overnight",
"express_delivery"
]
# Validate skills
if skills:
invalid_skills = [s for s in skills if s not in VALID_SKILLS]
if invalid_skills:
return {
"success": False,
"error": f"Invalid skills: {invalid_skills}. Valid skills are: {VALID_SKILLS}"
}
status = tool_input.get("status", "active")
# Validate ALL required fields (name, vehicle_type, current_address, current_lat, current_lng)
if not all([name, vehicle_type, current_address, current_lat is not None, current_lng is not None]):
return {
"success": False,
"error": "Missing required fields: name, vehicle_type, current_address, current_lat, current_lng. All fields are mandatory."
}
# Validate coordinates are valid numbers
try:
current_lat = float(current_lat)
current_lng = float(current_lng)
except (ValueError, TypeError):
return {
"success": False,
"error": "current_lat and current_lng must be valid numbers"
}
# Validate coordinates are within valid ranges
if not (-90 <= current_lat <= 90):
return {
"success": False,
"error": f"Invalid latitude {current_lat}. Must be between -90 and 90"
}
if not (-180 <= current_lng <= 180):
return {
"success": False,
"error": f"Invalid longitude {current_lng}. Must be between -180 and 180"
}
# Generate driver ID with microseconds to prevent collisions
now = datetime.now()
driver_id = f"DRV-{now.strftime('%Y%m%d%H%M%S%f')[:18]}" # YYYYMMDDHHMMSSμμμμμμ (18 chars)
# Insert into database
query = """
INSERT INTO drivers (
driver_id, name, phone, email,
current_lat, current_lng, current_address, last_location_update,
status, vehicle_type, vehicle_plate,
capacity_kg, capacity_m3, skills, user_id
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
# Convert skills list to JSON
import json
skills_json = json.dumps(skills) if skills else json.dumps([])
params = (
driver_id,
name,
phone,
email,
current_lat,
current_lng,
current_address,
now,
status,
vehicle_type,
vehicle_plate,
capacity_kg,
capacity_m3,
skills_json,
user_id # Add user_id to track ownership
)
try:
execute_write(query, params)
logger.info(f"Driver created: {driver_id}")
return {
"success": True,
"driver_id": driver_id,
"name": name,
"phone": phone,
"status": status,
"vehicle_type": vehicle_type,
"vehicle_plate": vehicle_plate,
"capacity_kg": capacity_kg,
"skills": skills,
"location": {
"latitude": current_lat,
"longitude": current_lng,
"address": current_address
},
"message": f"Driver {driver_id} ({name}) created successfully!"
}
except Exception as e:
logger.error(f"Database error creating driver: {e}")
return {
"success": False,
"error": f"Failed to create driver: {str(e)}"
}
def handle_update_order(tool_input: dict, user_id: str = None) -> dict:
"""
Execute order update tool with assignment cascading logic
Args:
tool_input: Dict with order_id and fields to update
user_id: Authenticated user ID
Returns:
Update result
"""
# Authentication check - allow dev mode (only in non-production environments)
if not user_id:
# SECURITY: Only allow SKIP_AUTH in development environments
env = os.getenv("ENV", "production").lower()
skip_auth = os.getenv("SKIP_AUTH", "false").lower() == "true"
if skip_auth and env != "production":
user_id = "dev-user"
else:
return {
"success": False,
"error": "Authentication required. Please login first.",
"auth_required": True
}
import json
order_id = tool_input.get("order_id")
# Validate required field
if not order_id:
return {
"success": False,
"error": "Missing required field: order_id"
}
# Check if order exists and belongs to user
check_query = "SELECT order_id, status, assigned_driver_id FROM orders WHERE order_id = %s AND user_id = %s"
existing = execute_query(check_query, (order_id, user_id))
if not existing:
return {
"success": False,
"error": f"Order {order_id} not found"
}
current_status = existing[0].get("status")
current_assigned_driver = existing[0].get("assigned_driver_id")
# Auto-geocode if delivery address is updated without coordinates
# Use safe_geocode with timeout protection
if "delivery_address" in tool_input and ("delivery_lat" not in tool_input or "delivery_lng" not in tool_input):
try:
geocode_result = safe_geocode(tool_input["delivery_address"])
tool_input["delivery_lat"] = geocode_result["lat"]
tool_input["delivery_lng"] = geocode_result["lng"]
logger.info(f"Auto-geocoded delivery address: {geocode_result['formatted_address']}")
except Exception as e:
logger.warning(f"Failed to geocode address, skipping coordinate update: {e}")
# Handle status changes with assignment cascading logic
new_status = tool_input.get("status")
cascading_actions = []
if new_status and new_status != current_status:
# Check if order has active assignment
assignment_check = execute_query("""
SELECT assignment_id, status, driver_id
FROM assignments
WHERE order_id = %s AND status IN ('active', 'in_progress')
LIMIT 1
""", (order_id,))
has_active_assignment = len(assignment_check) > 0
# Validate status transitions based on assignment state
if new_status == "pending" and current_status == "assigned":
if has_active_assignment:
# Changing assigned order back to pending - must cancel assignment
assignment_id = assignment_check[0]["assignment_id"]
driver_id = assignment_check[0]["driver_id"]
# Cancel the assignment
execute_write("""
UPDATE assignments SET status = 'cancelled', updated_at = %s
WHERE assignment_id = %s
""", (datetime.now(), assignment_id))
# Clear assigned_driver_id from order
execute_write("""
UPDATE orders SET assigned_driver_id = NULL
WHERE order_id = %s
""", (order_id,))
# Check if driver has other active assignments
other_assignments = execute_query("""
SELECT COUNT(*) as count FROM assignments
WHERE driver_id = %s AND status IN ('active', 'in_progress')
AND assignment_id != %s
""", (driver_id, assignment_id))
if other_assignments[0]["count"] == 0:
# Set driver back to active if no other assignments
execute_write("""
UPDATE drivers SET status = 'active', updated_at = %s
WHERE driver_id = %s
""", (datetime.now(), driver_id))
cascading_actions.append(f"Driver {driver_id} set to active (no other assignments)")
cascading_actions.append(f"Assignment {assignment_id} cancelled and removed")
elif new_status == "cancelled":
if has_active_assignment:
# Cancel active assignment when order is cancelled
assignment_id = assignment_check[0]["assignment_id"]
driver_id = assignment_check[0]["driver_id"]
execute_write("""
UPDATE assignments SET status = 'cancelled', updated_at = %s
WHERE assignment_id = %s
""", (datetime.now(), assignment_id))
# Clear assigned_driver_id
execute_write("""
UPDATE orders SET assigned_driver_id = NULL
WHERE order_id = %s
""", (order_id,))
# Check if driver has other active assignments
other_assignments = execute_query("""
SELECT COUNT(*) as count FROM assignments
WHERE driver_id = %s AND status IN ('active', 'in_progress')
AND assignment_id != %s
""", (driver_id, assignment_id))
if other_assignments[0]["count"] == 0:
execute_write("""
UPDATE drivers SET status = 'active', updated_at = %s
WHERE driver_id = %s
""", (datetime.now(), driver_id))
cascading_actions.append(f"Driver {driver_id} set to active")
cascading_actions.append(f"Assignment {assignment_id} cancelled")
elif new_status in ["delivered", "failed"] and has_active_assignment:
# Note: This should normally be handled by update_assignment tool
# but we allow it here for flexibility
assignment_id = assignment_check[0]["assignment_id"]
final_status = "completed" if new_status == "delivered" else "failed"
execute_write("""
UPDATE assignments SET status = %s, updated_at = %s
WHERE assignment_id = %s
""", (final_status, datetime.now(), assignment_id))
cascading_actions.append(f"Assignment {assignment_id} marked as {final_status}")
# Build UPDATE query dynamically based on provided fields
update_fields = []
params = []
# Map of field names to their database columns
updateable_fields = {
"customer_name": "customer_name",
"customer_phone": "customer_phone",
"customer_email": "customer_email",
"delivery_address": "delivery_address",
"delivery_lat": "delivery_lat",
"delivery_lng": "delivery_lng",
"status": "status",
"priority": "priority",
"special_instructions": "special_instructions",
"time_window_end": "time_window_end",
"payment_status": "payment_status",
"weight_kg": "weight_kg",
"order_value": "order_value"
}
for field, column in updateable_fields.items():
if field in tool_input:
update_fields.append(f"{column} = %s")
params.append(tool_input[field])
if not update_fields:
return {
"success": False,
"error": "No fields provided to update"
}
# Always update the updated_at timestamp
update_fields.append("updated_at = %s")
params.append(datetime.now())
# Add order_id and user_id for WHERE clause
params.append(order_id)
params.append(user_id)
# Execute update
query = f"""
UPDATE orders
SET {', '.join(update_fields)}
WHERE order_id = %s AND user_id = %s
"""
try:
execute_write(query, tuple(params))
logger.info(f"Order updated: {order_id}")
result = {
"success": True,
"order_id": order_id,
"updated_fields": list(updateable_fields.keys() & tool_input.keys()),
"message": f"Order {order_id} updated successfully!"
}
if cascading_actions:
result["cascading_actions"] = cascading_actions
return result
except Exception as e:
logger.error(f"Database error updating order: {e}")
return {
"success": False,
"error": f"Failed to update order: {str(e)}"
}
def handle_delete_all_orders(tool_input: dict, user_id: str = None) -> dict:
"""
Delete all orders (bulk delete) for the authenticated user
Args:
tool_input: Dict with confirm flag and optional status filter
user_id: Authenticated user ID
Returns:
Deletion result with count
"""
# Authentication check - allow dev mode (only in non-production environments)
if not user_id:
# SECURITY: Only allow SKIP_AUTH in development environments
env = os.getenv("ENV", "production").lower()
skip_auth = os.getenv("SKIP_AUTH", "false").lower() == "true"
if skip_auth and env != "production":
user_id = "dev-user"
else:
return {
"success": False,
"error": "Authentication required. Please login first.",
"auth_required": True
}
confirm = tool_input.get("confirm", False)
status_filter = tool_input.get("status") # Optional: delete only specific status
if not confirm:
return {
"success": False,
"error": "Bulk deletion requires confirm=true for safety"
}
try:
# Check for active assignments first (only for this user's orders)
active_assignments = execute_query("""
SELECT COUNT(*) as count FROM assignments
WHERE user_id = %s AND status IN ('active', 'in_progress')
""", (user_id,))
active_count = active_assignments[0]['count']
if active_count > 0:
return {
"success": False,
"error": f"Cannot delete orders: {active_count} active assignment(s) exist. Cancel or complete them first."
}
# Build delete query based on status filter (always filter by user_id)
if status_filter:
count_query = "SELECT COUNT(*) as count FROM orders WHERE user_id = %s AND status = %s"
delete_query = "DELETE FROM orders WHERE user_id = %s AND status = %s"
params = (user_id, status_filter)
else:
count_query = "SELECT COUNT(*) as count FROM orders WHERE user_id = %s"
delete_query = "DELETE FROM orders WHERE user_id = %s"
params = (user_id,)
# Get count before deletion
count_result = execute_query(count_query, params)
total_count = count_result[0]['count']
if total_count == 0:
return {
"success": True,
"deleted_count": 0,
"message": "No orders to delete"
}
# Execute bulk delete
execute_write(delete_query, params)
logger.info(f"Bulk deleted {total_count} orders")
return {
"success": True,
"deleted_count": total_count,
"message": f"Successfully deleted {total_count} order(s)"
}
except Exception as e:
logger.error(f"Database error bulk deleting orders: {e}")
return {
"success": False,
"error": f"Failed to bulk delete orders: {str(e)}"
}
def handle_delete_order(tool_input: dict, user_id: str = None) -> dict:
"""
Execute order deletion tool with assignment safety checks
Args:
tool_input: Dict with order_id and confirm flag
user_id: Authenticated user ID
Returns:
Deletion result
"""
# Authentication check - allow dev mode (only in non-production environments)
if not user_id:
# SECURITY: Only allow SKIP_AUTH in development environments
env = os.getenv("ENV", "production").lower()
skip_auth = os.getenv("SKIP_AUTH", "false").lower() == "true"
if skip_auth and env != "production":
user_id = "dev-user"
else:
return {
"success": False,
"error": "Authentication required. Please login first.",
"auth_required": True
}
order_id = tool_input.get("order_id")
confirm = tool_input.get("confirm", False)
# Validate required fields
if not order_id:
return {
"success": False,
"error": "Missing required field: order_id"
}
if not confirm:
return {
"success": False,
"error": "Deletion not confirmed. Set confirm=true to proceed."
}
# Check if order exists and belongs to user
check_query = "SELECT order_id, status FROM orders WHERE order_id = %s AND user_id = %s"
existing = execute_query(check_query, (order_id, user_id))
if not existing:
return {
"success": False,
"error": f"Order {order_id} not found"
}
order_status = existing[0].get("status")
# Check for active assignments
assignment_check = execute_query("""
SELECT assignment_id, status, driver_id
FROM assignments
WHERE order_id = %s AND status IN ('active', 'in_progress')
""", (order_id,))
if assignment_check:
# Warn about active assignments that will be cascade deleted
assignment_count = len(assignment_check)
assignment_ids = [a["assignment_id"] for a in assignment_check]
return {
"success": False,
"error": f"Cannot delete order {order_id}: it has {assignment_count} active assignment(s): {', '.join(assignment_ids)}. Please cancel or complete the assignment(s) first using update_assignment or unassign_order.",
"active_assignments": assignment_ids
}
# Check for any completed assignments (these will be cascade deleted)
completed_assignments = execute_query("""
SELECT COUNT(*) as count FROM assignments
WHERE order_id = %s AND status IN ('completed', 'failed', 'cancelled')
""", (order_id,))
cascading_info = []
if completed_assignments[0]["count"] > 0:
cascading_info.append(f"{completed_assignments[0]['count']} completed/failed/cancelled assignment(s) will be cascade deleted")
# Delete the order (will cascade to assignments via FK)
query = "DELETE FROM orders WHERE order_id = %s AND user_id = %s"
try:
execute_write(query, (order_id, user_id))
logger.info(f"Order deleted: {order_id}")
result = {
"success": True,
"order_id": order_id,
"message": f"Order {order_id} has been permanently deleted."
}
if cascading_info:
result["cascading_info"] = cascading_info
return result
except Exception as e:
logger.error(f"Database error deleting order: {e}")
return {
"success": False,
"error": f"Failed to delete order: {str(e)}"
}
def handle_update_driver(tool_input: dict, user_id: str = None) -> dict:
"""
Execute driver update tool with assignment validation
Args:
tool_input: Dict with driver_id and fields to update
user_id: Authenticated user ID
Returns:
Update result
"""
# Authentication check - allow dev mode (only in non-production environments)
if not user_id:
# SECURITY: Only allow SKIP_AUTH in development environments
env = os.getenv("ENV", "production").lower()
skip_auth = os.getenv("SKIP_AUTH", "false").lower() == "true"
if skip_auth and env != "production":
user_id = "dev-user"
else:
return {
"success": False,
"error": "Authentication required. Please login first.",
"auth_required": True
}
import json
driver_id = tool_input.get("driver_id")
# Validate required field
if not driver_id:
return {
"success": False,
"error": "Missing required field: driver_id"
}
# Check if driver exists and belongs to user
check_query = "SELECT driver_id, status FROM drivers WHERE driver_id = %s AND user_id = %s"
existing = execute_query(check_query, (driver_id, user_id))
if not existing:
return {
"success": False,
"error": f"Driver {driver_id} not found"
}
current_status = existing[0].get("status")
# Validate status changes against active assignments
new_status = tool_input.get("status")
if new_status and new_status != current_status:
# Check for active assignments
assignment_check = execute_query("""
SELECT assignment_id, status, order_id
FROM assignments
WHERE driver_id = %s AND status IN ('active', 'in_progress')
""", (driver_id,))
has_active_assignments = len(assignment_check) > 0
# Prevent setting driver to offline/inactive when they have active assignments
if new_status in ["offline", "inactive"] and has_active_assignments:
assignment_count = len(assignment_check)
assignment_ids = [a["assignment_id"] for a in assignment_check]
return {
"success": False,
"error": f"Cannot set driver {driver_id} to '{new_status}': driver has {assignment_count} active assignment(s): {', '.join(assignment_ids)}. Please complete or cancel assignments first.",
"active_assignments": assignment_ids
}
# Note: Setting driver to 'active' when they have assignments is allowed
# The system manages 'busy' status automatically via assignment creation
# But we allow manual override to 'active' for edge cases
# Build UPDATE query dynamically based on provided fields
update_fields = []
params = []
# Map of field names to their database columns
updateable_fields = {
"name": "name",
"phone": "phone",
"email": "email",
"status": "status",
"vehicle_type": "vehicle_type",
"vehicle_plate": "vehicle_plate",
"capacity_kg": "capacity_kg",
"capacity_m3": "capacity_m3",
"current_address": "current_address",
"current_lat": "current_lat",
"current_lng": "current_lng"
}
for field, column in updateable_fields.items():
if field in tool_input:
update_fields.append(f"{column} = %s")
params.append(tool_input[field])
# Handle skills array specially (convert to JSON)
if "skills" in tool_input:
skills = list(tool_input.get("skills", []))
update_fields.append("skills = %s")
params.append(json.dumps(skills))
if not update_fields:
return {
"success": False,
"error": "No fields provided to update"
}
# Always update the updated_at timestamp
update_fields.append("updated_at = %s")
params.append(datetime.now())
# Update location timestamp if lat/lng changed
if "current_lat" in tool_input or "current_lng" in tool_input:
update_fields.append("last_location_update = %s")
params.append(datetime.now())
# Add driver_id and user_id for WHERE clause
params.append(driver_id)
params.append(user_id)
# Execute update
query = f"""
UPDATE drivers
SET {', '.join(update_fields)}
WHERE driver_id = %s AND user_id = %s
"""
try:
execute_write(query, tuple(params))
logger.info(f"Driver updated: {driver_id}")
updated_list = list(updateable_fields.keys() & tool_input.keys())
if "skills" in tool_input:
updated_list.append("skills")
return {
"success": True,
"driver_id": driver_id,
"updated_fields": updated_list,
"message": f"Driver {driver_id} updated successfully!"
}
except Exception as e:
logger.error(f"Database error updating driver: {e}")
return {
"success": False,
"error": f"Failed to update driver: {str(e)}"
}
def handle_delete_all_drivers(tool_input: dict, user_id: str = None) -> dict:
"""
Delete all drivers (bulk delete) for the authenticated user
Args:
tool_input: Dict with confirm flag and optional status filter
user_id: Authenticated user ID
Returns:
Deletion result with count
"""
# Authentication check - allow dev mode (only in non-production environments)
if not user_id:
# SECURITY: Only allow SKIP_AUTH in development environments
env = os.getenv("ENV", "production").lower()
skip_auth = os.getenv("SKIP_AUTH", "false").lower() == "true"
if skip_auth and env != "production":
user_id = "dev-user"
else:
return {
"success": False,
"error": "Authentication required. Please login first.",
"auth_required": True
}
confirm = tool_input.get("confirm", False)
status_filter = tool_input.get("status") # Optional: delete only specific status
if not confirm:
return {
"success": False,
"error": "Bulk deletion requires confirm=true for safety"
}
try:
# Check for ANY assignments for this user (RESTRICT constraint will block if any exist)
assignments = execute_query("""
SELECT COUNT(*) as count FROM assignments WHERE user_id = %s
""", (user_id,))
assignment_count = assignments[0]['count']
if assignment_count > 0:
return {
"success": False,
"error": f"Cannot delete drivers: {assignment_count} assignment(s) exist in database. Database RESTRICT constraint prevents driver deletion when assignments exist."
}
# Build delete query based on status filter (always filter by user_id)
if status_filter:
count_query = "SELECT COUNT(*) as count FROM drivers WHERE user_id = %s AND status = %s"
delete_query = "DELETE FROM drivers WHERE user_id = %s AND status = %s"
params = (user_id, status_filter)
else:
count_query = "SELECT COUNT(*) as count FROM drivers WHERE user_id = %s"
delete_query = "DELETE FROM drivers WHERE user_id = %s"
params = (user_id,)
# Get count before deletion
count_result = execute_query(count_query, params)
total_count = count_result[0]['count']
if total_count == 0:
return {
"success": True,
"deleted_count": 0,
"message": "No drivers to delete"
}
# Execute bulk delete
execute_write(delete_query, params)
logger.info(f"Bulk deleted {total_count} drivers")
return {
"success": True,
"deleted_count": total_count,
"message": f"Successfully deleted {total_count} driver(s)"
}
except Exception as e:
logger.error(f"Database error bulk deleting drivers: {e}")
# Provide more context if it's a FK constraint error
error_message = str(e)
if "foreign key" in error_message.lower() or "violates" in error_message.lower():
error_message = f"Cannot delete drivers due to database constraint (assignments exist). Error: {error_message}"
return {
"success": False,
"error": f"Failed to bulk delete drivers: {error_message}"
}
def handle_delete_driver(tool_input: dict, user_id: str = None) -> dict:
"""
Execute driver deletion tool with assignment safety checks
Args:
tool_input: Dict with driver_id and confirm flag
user_id: Authenticated user ID
Returns:
Deletion result
"""
# Authentication check - allow dev mode (only in non-production environments)
if not user_id:
# SECURITY: Only allow SKIP_AUTH in development environments
env = os.getenv("ENV", "production").lower()
skip_auth = os.getenv("SKIP_AUTH", "false").lower() == "true"
if skip_auth and env != "production":
user_id = "dev-user"
else:
return {
"success": False,
"error": "Authentication required. Please login first.",
"auth_required": True
}
driver_id = tool_input.get("driver_id")
confirm = tool_input.get("confirm", False)
# Validate required fields
if not driver_id:
return {
"success": False,
"error": "Missing required field: driver_id"
}
if not confirm:
return {
"success": False,
"error": "Deletion not confirmed. Set confirm=true to proceed."
}
# Check if driver exists and belongs to user
check_query = "SELECT driver_id, name FROM drivers WHERE driver_id = %s AND user_id = %s"
existing = execute_query(check_query, (driver_id, user_id))
if not existing:
return {
"success": False,
"error": f"Driver {driver_id} not found"
}
driver_name = existing[0]["name"]
# Check for ANY assignments (active or completed)
# FK constraint with ON DELETE RESTRICT will prevent deletion if ANY assignments exist
assignment_check = execute_query("""
SELECT assignment_id, status, order_id
FROM assignments
WHERE driver_id = %s
""", (driver_id,))
if assignment_check:
# Count active vs completed assignments
active_assignments = [a for a in assignment_check if a["status"] in ("active", "in_progress")]
completed_assignments = [a for a in assignment_check if a["status"] in ("completed", "failed", "cancelled")]
total_count = len(assignment_check)
active_count = len(active_assignments)
completed_count = len(completed_assignments)
error_msg = f"Cannot delete driver {driver_id} ({driver_name}): driver has {total_count} assignment(s)"
if active_count > 0:
active_ids = [a["assignment_id"] for a in active_assignments]
error_msg += f" ({active_count} active: {', '.join(active_ids)})"
if completed_count > 0:
error_msg += f" ({completed_count} completed/failed/cancelled)"
error_msg += ". The database has RESTRICT constraint preventing driver deletion when assignments exist. Please cancel/complete active assignments and consider archiving the driver instead of deleting."
return {
"success": False,
"error": error_msg,
"total_assignments": total_count,
"active_assignments": [a["assignment_id"] for a in active_assignments],
"completed_assignments": [a["assignment_id"] for a in completed_assignments]
}
# Check for orders that reference this driver in assigned_driver_id
# FK constraint with ON DELETE SET NULL will set these to NULL
assigned_orders = execute_query("""
SELECT order_id FROM orders WHERE assigned_driver_id = %s
""", (driver_id,))
cascading_info = []
if assigned_orders:
order_count = len(assigned_orders)
cascading_info.append(f"{order_count} order(s) will have assigned_driver_id set to NULL")
# Delete the driver
query = "DELETE FROM drivers WHERE driver_id = %s AND user_id = %s"
try:
execute_write(query, (driver_id, user_id))
logger.info(f"Driver deleted: {driver_id}")
result = {
"success": True,
"driver_id": driver_id,
"message": f"Driver {driver_id} ({driver_name}) has been permanently deleted."
}
if cascading_info:
result["cascading_info"] = cascading_info
return result
except Exception as e:
logger.error(f"Database error deleting driver: {e}")
# Provide more context if it's a FK constraint error
error_message = str(e)
if "foreign key" in error_message.lower() or "violates" in error_message.lower():
error_message = f"Cannot delete driver due to database constraint (likely has related assignments). Error: {error_message}"
return {
"success": False,
"error": f"Failed to delete driver: {error_message}"
}
def handle_count_orders(tool_input: dict, user_id: str = None) -> dict:
"""
Execute count orders tool
Args:
tool_input: Dict with optional filter fields
user_id: ID of authenticated user
Returns:
Order count result with breakdown (only user's orders)
"""
# Authentication check - allow dev mode (only in non-production environments)
if not user_id:
# SECURITY: Only allow SKIP_AUTH in development environments
env = os.getenv("ENV", "production").lower()
skip_auth = os.getenv("SKIP_AUTH", "false").lower() == "true"
if skip_auth and env != "production":
user_id = "dev-user"
else:
return {
"success": False,
"error": "Authentication required. Please login first.",
"auth_required": True
}
# Build WHERE clause based on filters
# IMPORTANT: Always filter by user_id FIRST
where_clauses = ["user_id = %s"]
params = [user_id]
if "status" in tool_input:
where_clauses.append("status = %s")
params.append(tool_input["status"])
if "priority" in tool_input:
where_clauses.append("priority = %s")
params.append(tool_input["priority"])
if "payment_status" in tool_input:
where_clauses.append("payment_status = %s")
params.append(tool_input["payment_status"])
if "assigned_driver_id" in tool_input:
where_clauses.append("assigned_driver_id = %s")
params.append(tool_input["assigned_driver_id"])
if "is_fragile" in tool_input:
where_clauses.append("is_fragile = %s")
params.append(tool_input["is_fragile"])
if "requires_signature" in tool_input:
where_clauses.append("requires_signature = %s")
params.append(tool_input["requires_signature"])
if "requires_cold_storage" in tool_input:
where_clauses.append("requires_cold_storage = %s")
params.append(tool_input["requires_cold_storage"])
where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else ""
# Total count query
count_query = f"SELECT COUNT(*) as total FROM orders{where_sql}"
# Breakdown by status query
breakdown_query = f"""
SELECT status, COUNT(*) as count
FROM orders{where_sql}
GROUP BY status
ORDER BY count DESC
"""
# Breakdown by priority query
priority_query = f"""
SELECT priority, COUNT(*) as count
FROM orders{where_sql}
GROUP BY priority
ORDER BY CASE priority
WHEN 'urgent' THEN 1
WHEN 'express' THEN 2
WHEN 'standard' THEN 3
END
"""
try:
# Execute queries
total_result = execute_query(count_query, tuple(params) if params else None)
total = total_result[0]['total'] if total_result else 0
status_result = execute_query(breakdown_query, tuple(params) if params else None)
priority_result = execute_query(priority_query, tuple(params) if params else None)
# Format breakdown
status_breakdown = {row['status']: row['count'] for row in status_result} if status_result else {}
priority_breakdown = {row['priority']: row['count'] for row in priority_result} if priority_result else {}
logger.info(f"Counted orders: {total} total")
return {
"success": True,
"total": total,
"status_breakdown": status_breakdown,
"priority_breakdown": priority_breakdown,
"message": f"Found {total} order(s)"
}
except Exception as e:
logger.error(f"Database error counting orders: {e}")
return {
"success": False,
"error": f"Failed to count orders: {str(e)}"
}
def handle_fetch_orders(tool_input: dict, user_id: str = None) -> dict:
"""
Execute fetch orders tool
Args:
tool_input: Dict with filter, pagination, and sorting options
user_id: ID of authenticated user (filters to only their orders)
Returns:
List of orders matching criteria (only user's orders)
"""
# Authentication check - allow dev mode (only in non-production environments)
if not user_id:
# Development mode - use default user
# SECURITY: Only allow SKIP_AUTH in development environments
env = os.getenv("ENV", "production").lower()
skip_auth = os.getenv("SKIP_AUTH", "false").lower() == "true"
if skip_auth and env != "production":
user_id = "dev-user"
else:
return {
"success": False,
"error": "Authentication required. Please login first.",
"auth_required": True
}
# Extract pagination and sorting
limit = min(tool_input.get("limit", 10), 100) # Cap at 100
offset = tool_input.get("offset", 0)
sort_by = tool_input.get("sort_by", "created_at")
sort_order = tool_input.get("sort_order", "DESC")
# Build WHERE clause based on filters
# IMPORTANT: Always filter by user_id FIRST for security
where_clauses = ["user_id = %s"]
params = [user_id]
if "status" in tool_input:
where_clauses.append("status = %s")
params.append(tool_input["status"])
if "priority" in tool_input:
where_clauses.append("priority = %s")
params.append(tool_input["priority"])
if "payment_status" in tool_input:
where_clauses.append("payment_status = %s")
params.append(tool_input["payment_status"])
if "assigned_driver_id" in tool_input:
where_clauses.append("assigned_driver_id = %s")
params.append(tool_input["assigned_driver_id"])
if "is_fragile" in tool_input:
where_clauses.append("is_fragile = %s")
params.append(tool_input["is_fragile"])
if "requires_signature" in tool_input:
where_clauses.append("requires_signature = %s")
params.append(tool_input["requires_signature"])
if "requires_cold_storage" in tool_input:
where_clauses.append("requires_cold_storage = %s")
params.append(tool_input["requires_cold_storage"])
where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else ""
# Build query
query = f"""
SELECT
order_id, customer_name, customer_phone, customer_email,
delivery_address, delivery_lat, delivery_lng,
time_window_start, time_window_end,
priority, weight_kg, volume_m3, special_instructions,
status, assigned_driver_id,
created_at, updated_at, delivered_at,
order_value, payment_status,
requires_signature, is_fragile, requires_cold_storage
FROM orders
{where_sql}
ORDER BY {sort_by} {sort_order}
LIMIT %s OFFSET %s
"""
params.extend([limit, offset])
try:
results = execute_query(query, tuple(params))
if not results:
return {
"success": True,
"orders": [],
"count": 0,
"message": "No orders found matching criteria"
}
# Format orders for readability
orders = []
for row in results:
order = {
"order_id": row['order_id'],
"customer": {
"name": row['customer_name'],
"phone": row['customer_phone'],
"email": row['customer_email']
},
"delivery": {
"address": row['delivery_address'],
"latitude": float(row['delivery_lat']) if row['delivery_lat'] else None,
"longitude": float(row['delivery_lng']) if row['delivery_lng'] else None
},
"time_window": {
"start": str(row['time_window_start']) if row['time_window_start'] else None,
"end": str(row['time_window_end']) if row['time_window_end'] else None
},
"details": {
"priority": row['priority'],
"status": row['status'],
"weight_kg": float(row['weight_kg']) if row['weight_kg'] else None,
"volume_m3": float(row['volume_m3']) if row['volume_m3'] else None,
"special_instructions": row['special_instructions']
},
"flags": {
"requires_signature": row['requires_signature'],
"is_fragile": row['is_fragile'],
"requires_cold_storage": row['requires_cold_storage']
},
"payment": {
"order_value": float(row['order_value']) if row['order_value'] else None,
"payment_status": row['payment_status']
},
"assigned_driver_id": row['assigned_driver_id'],
"timestamps": {
"created_at": str(row['created_at']),
"updated_at": str(row['updated_at']) if row['updated_at'] else None,
"delivered_at": str(row['delivered_at']) if row['delivered_at'] else None
}
}
orders.append(order)
logger.info(f"Fetched {len(orders)} orders")
return {
"success": True,
"orders": orders,
"count": len(orders),
"message": f"Retrieved {len(orders)} order(s)"
}
except Exception as e:
logger.error(f"Database error fetching orders: {e}")
return {
"success": False,
"error": f"Failed to fetch orders: {str(e)}"
}
def handle_get_order_details(tool_input: dict, user_id: str = None) -> dict:
"""
Execute get order details tool
Args:
tool_input: Dict with order_id
user_id: ID of authenticated user
Returns:
Complete order details (only if owned by user)
"""
# Authentication check - allow dev mode (only in non-production environments)
if not user_id:
# SECURITY: Only allow SKIP_AUTH in development environments
env = os.getenv("ENV", "production").lower()
skip_auth = os.getenv("SKIP_AUTH", "false").lower() == "true"
if skip_auth and env != "production":
user_id = "dev-user"
else:
return {
"success": False,
"error": "Authentication required. Please login first.",
"auth_required": True
}
order_id = tool_input.get("order_id")
if not order_id:
return {
"success": False,
"error": "order_id is required"
}
query = """
SELECT
order_id, customer_name, customer_phone, customer_email,
pickup_address, pickup_lat, pickup_lng,
delivery_address, delivery_lat, delivery_lng,
time_window_start, time_window_end, expected_delivery_time,
priority, weight_kg, volume_m3, special_instructions,
status, assigned_driver_id, delivery_status,
created_at, updated_at, delivered_at, sla_grace_period_minutes,
order_value, payment_status,
requires_signature, is_fragile, requires_cold_storage
FROM orders
WHERE order_id = %s AND user_id = %s
"""
try:
results = execute_query(query, (order_id, user_id))
if not results:
return {
"success": False,
"error": f"Order {order_id} not found"
}
row = results[0]
order = {
"order_id": row['order_id'],
"customer": {
"name": row['customer_name'],
"phone": row['customer_phone'],
"email": row['customer_email']
},
"pickup": {
"address": row['pickup_address'],
"latitude": float(row['pickup_lat']) if row['pickup_lat'] else None,
"longitude": float(row['pickup_lng']) if row['pickup_lng'] else None
} if row['pickup_address'] else None,
"delivery": {
"address": row['delivery_address'],
"latitude": float(row['delivery_lat']) if row['delivery_lat'] else None,
"longitude": float(row['delivery_lng']) if row['delivery_lng'] else None
},
"time_window": {
"start": str(row['time_window_start']) if row['time_window_start'] else None,
"end": str(row['time_window_end']) if row['time_window_end'] else None
},
"details": {
"priority": row['priority'],
"status": row['status'],
"weight_kg": float(row['weight_kg']) if row['weight_kg'] else None,
"volume_m3": float(row['volume_m3']) if row['volume_m3'] else None,
"special_instructions": row['special_instructions']
},
"delivery_status": row['delivery_status'],
"timing": {
"expected_delivery_time": str(row['expected_delivery_time']) if row['expected_delivery_time'] else None,
"delivered_at": str(row['delivered_at']) if row['delivered_at'] else None,
"sla_grace_period_minutes": row['sla_grace_period_minutes']
},
"flags": {
"requires_signature": row['requires_signature'],
"is_fragile": row['is_fragile'],
"requires_cold_storage": row['requires_cold_storage']
},
"payment": {
"order_value": float(row['order_value']) if row['order_value'] else None,
"payment_status": row['payment_status']
},
"assigned_driver_id": row['assigned_driver_id'],
"timestamps": {
"created_at": str(row['created_at']),
"updated_at": str(row['updated_at']) if row['updated_at'] else None
}
}
logger.info(f"Retrieved details for order: {order_id}")
return {
"success": True,
"order": order,
"message": f"Order {order_id} details retrieved"
}
except Exception as e:
logger.error(f"Database error getting order details: {e}")
return {
"success": False,
"error": f"Failed to get order details: {str(e)}"
}
def handle_search_orders(tool_input: dict, user_id: str = None) -> dict:
"""
Execute search orders tool
Args:
tool_input: Dict with search_term
user_id: Authenticated user ID
Returns:
List of matching orders
"""
# Authentication check - allow dev mode (only in non-production environments)
if not user_id:
# SECURITY: Only allow SKIP_AUTH in development environments
env = os.getenv("ENV", "production").lower()
skip_auth = os.getenv("SKIP_AUTH", "false").lower() == "true"
if skip_auth and env != "production":
user_id = "dev-user"
else:
return {
"success": False,
"error": "Authentication required. Please login first.",
"auth_required": True
}
search_term = tool_input.get("search_term", "").strip()
if not search_term:
return {
"success": False,
"error": "search_term is required"
}
query = """
SELECT
order_id, customer_name, customer_phone, customer_email,
delivery_address, priority, status, created_at
FROM orders
WHERE
user_id = %s AND (
order_id ILIKE %s OR
customer_name ILIKE %s OR
customer_email ILIKE %s OR
customer_phone ILIKE %s
)
ORDER BY created_at DESC
LIMIT 50
"""
search_pattern = f"%{search_term}%"
params = (user_id, search_pattern, search_pattern, search_pattern, search_pattern)
try:
results = execute_query(query, params)
if not results:
return {
"success": True,
"orders": [],
"count": 0,
"message": f"No orders found matching '{search_term}'"
}
orders = []
for row in results:
orders.append({
"order_id": row['order_id'],
"customer_name": row['customer_name'],
"customer_phone": row['customer_phone'],
"customer_email": row['customer_email'],
"delivery_address": row['delivery_address'],
"priority": row['priority'],
"status": row['status'],
"created_at": str(row['created_at'])
})
logger.info(f"Search '{search_term}' found {len(orders)} orders")
return {
"success": True,
"orders": orders,
"count": len(orders),
"message": f"Found {len(orders)} order(s) matching '{search_term}'"
}
except Exception as e:
logger.error(f"Database error searching orders: {e}")
return {
"success": False,
"error": f"Failed to search orders: {str(e)}"
}
def handle_get_incomplete_orders(tool_input: dict, user_id: str = None) -> dict:
"""
Execute get incomplete orders tool
Args:
tool_input: Dict with optional limit
user_id: Authenticated user ID
Returns:
List of incomplete orders (pending, assigned, in_transit)
"""
# Authentication check - allow dev mode (only in non-production environments)
if not user_id:
# SECURITY: Only allow SKIP_AUTH in development environments
env = os.getenv("ENV", "production").lower()
skip_auth = os.getenv("SKIP_AUTH", "false").lower() == "true"
if skip_auth and env != "production":
user_id = "dev-user"
else:
return {
"success": False,
"error": "Authentication required. Please login first.",
"auth_required": True
}
limit = min(tool_input.get("limit", 20), 100)
query = """
SELECT
order_id, customer_name, delivery_address,
priority, status, time_window_end, created_at,
assigned_driver_id
FROM orders
WHERE user_id = %s AND status IN ('pending', 'assigned', 'in_transit')
ORDER BY
CASE priority
WHEN 'urgent' THEN 1
WHEN 'express' THEN 2
WHEN 'standard' THEN 3
END,
time_window_end ASC
LIMIT %s
"""
try:
results = execute_query(query, (user_id, limit))
if not results:
return {
"success": True,
"orders": [],
"count": 0,
"message": "No incomplete orders found"
}
orders = []
for row in results:
orders.append({
"order_id": row['order_id'],
"customer_name": row['customer_name'],
"delivery_address": row['delivery_address'],
"priority": row['priority'],
"status": row['status'],
"time_window_end": str(row['time_window_end']) if row['time_window_end'] else None,
"created_at": str(row['created_at']),
"assigned_driver_id": row['assigned_driver_id']
})
logger.info(f"Retrieved {len(orders)} incomplete orders")
return {
"success": True,
"orders": orders,
"count": len(orders),
"message": f"Found {len(orders)} incomplete order(s)"
}
except Exception as e:
logger.error(f"Database error getting incomplete orders: {e}")
return {
"success": False,
"error": f"Failed to get incomplete orders: {str(e)}"
}
def handle_count_drivers(tool_input: dict, user_id: str = None) -> dict:
"""
Execute count drivers tool
Args:
tool_input: Dict with optional filter fields
user_id: Authenticated user ID
Returns:
Driver count result with breakdown
"""
# Authentication check - allow dev mode (only in non-production environments)
if not user_id:
# SECURITY: Only allow SKIP_AUTH in development environments
env = os.getenv("ENV", "production").lower()
skip_auth = os.getenv("SKIP_AUTH", "false").lower() == "true"
if skip_auth and env != "production":
user_id = "dev-user"
else:
return {
"success": False,
"error": "Authentication required. Please login first.",
"auth_required": True
}
# Build WHERE clause based on filters
# IMPORTANT: Always filter by user_id FIRST
where_clauses = ["user_id = %s"]
params = [user_id]
if "status" in tool_input:
where_clauses.append("status = %s")
params.append(tool_input["status"])
if "vehicle_type" in tool_input:
where_clauses.append("vehicle_type = %s")
params.append(tool_input["vehicle_type"])
where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else ""
# Total count query
count_query = f"SELECT COUNT(*) as total FROM drivers{where_sql}"
# Breakdown by status query
status_query = f"""
SELECT status, COUNT(*) as count
FROM drivers{where_sql}
GROUP BY status
ORDER BY count DESC
"""
# Breakdown by vehicle type query
vehicle_query = f"""
SELECT vehicle_type, COUNT(*) as count
FROM drivers{where_sql}
GROUP BY vehicle_type
ORDER BY count DESC
"""
try:
# Execute queries
total_result = execute_query(count_query, tuple(params) if params else None)
total = total_result[0]['total'] if total_result else 0
status_result = execute_query(status_query, tuple(params) if params else None)
vehicle_result = execute_query(vehicle_query, tuple(params) if params else None)
# Format breakdown
status_breakdown = {row['status']: row['count'] for row in status_result} if status_result else {}
vehicle_breakdown = {row['vehicle_type']: row['count'] for row in vehicle_result if row['vehicle_type']} if vehicle_result else {}
logger.info(f"Counted drivers: {total} total")
return {
"success": True,
"total": total,
"status_breakdown": status_breakdown,
"vehicle_breakdown": vehicle_breakdown,
"message": f"Found {total} driver(s)"
}
except Exception as e:
logger.error(f"Database error counting drivers: {e}")
return {
"success": False,
"error": f"Failed to count drivers: {str(e)}"
}
def handle_fetch_drivers(tool_input: dict, user_id: str = None) -> dict:
"""
Execute fetch drivers tool
Args:
tool_input: Dict with filter, pagination, and sorting options
user_id: ID of authenticated user (filters to only their drivers)
Returns:
List of drivers matching criteria (only user's drivers)
"""
# Authentication check - allow dev mode (only in non-production environments)
if not user_id:
# SECURITY: Only allow SKIP_AUTH in development environments
env = os.getenv("ENV", "production").lower()
skip_auth = os.getenv("SKIP_AUTH", "false").lower() == "true"
if skip_auth and env != "production":
user_id = "dev-user"
else:
return {
"success": False,
"error": "Authentication required. Please login first.",
"auth_required": True
}
# Extract pagination and sorting
limit = min(tool_input.get("limit", 10), 100) # Cap at 100
offset = tool_input.get("offset", 0)
sort_by = tool_input.get("sort_by", "name")
sort_order = tool_input.get("sort_order", "ASC")
# Build WHERE clause based on filters
# IMPORTANT: Always filter by user_id FIRST for security
where_clauses = ["user_id = %s"]
params = [user_id]
if "status" in tool_input:
where_clauses.append("status = %s")
params.append(tool_input["status"])
if "vehicle_type" in tool_input:
where_clauses.append("vehicle_type = %s")
params.append(tool_input["vehicle_type"])
where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else ""
# Build query
query = f"""
SELECT
driver_id, name, phone, email,
current_lat, current_lng, current_address, last_location_update,
status, vehicle_type, vehicle_plate,
capacity_kg, capacity_m3, skills,
created_at, updated_at
FROM drivers
{where_sql}
ORDER BY {sort_by} {sort_order}
LIMIT %s OFFSET %s
"""
params.extend([limit, offset])
try:
results = execute_query(query, tuple(params))
if not results:
return {
"success": True,
"drivers": [],
"count": 0,
"message": "No drivers found matching criteria"
}
# Format drivers for readability
drivers = []
for row in results:
# Parse skills JSON if present
skills = []
if row['skills']:
try:
import json
skills = json.loads(row['skills']) if isinstance(row['skills'], str) else row['skills']
except:
skills = []
driver = {
"driver_id": row['driver_id'],
"name": row['name'],
"contact": {
"phone": row['phone'],
"email": row['email']
},
"location": {
"latitude": float(row['current_lat']) if row['current_lat'] else None,
"longitude": float(row['current_lng']) if row['current_lng'] else None,
"address": row['current_address'],
"last_update": str(row['last_location_update']) if row['last_location_update'] else None
},
"status": row['status'],
"vehicle": {
"type": row['vehicle_type'],
"plate": row['vehicle_plate'],
"capacity_kg": float(row['capacity_kg']) if row['capacity_kg'] else None,
"capacity_m3": float(row['capacity_m3']) if row['capacity_m3'] else None
},
"skills": skills,
"timestamps": {
"created_at": str(row['created_at']),
"updated_at": str(row['updated_at']) if row['updated_at'] else None
}
}
drivers.append(driver)
logger.info(f"Fetched {len(drivers)} drivers")
return {
"success": True,
"drivers": drivers,
"count": len(drivers),
"message": f"Retrieved {len(drivers)} driver(s)"
}
except Exception as e:
logger.error(f"Database error fetching drivers: {e}")
return {
"success": False,
"error": f"Failed to fetch drivers: {str(e)}"
}
def handle_get_driver_details(tool_input: dict, user_id: str = None) -> dict:
"""
Execute get driver details tool
Args:
tool_input: Dict with driver_id
user_id: Authenticated user ID
Returns:
Complete driver details
"""
# Authentication check - allow dev mode (only in non-production environments)
if not user_id:
# SECURITY: Only allow SKIP_AUTH in development environments
env = os.getenv("ENV", "production").lower()
skip_auth = os.getenv("SKIP_AUTH", "false").lower() == "true"
if skip_auth and env != "production":
user_id = "dev-user"
else:
return {
"success": False,
"error": "Authentication required. Please login first.",
"auth_required": True
}
driver_id = tool_input.get("driver_id")
if not driver_id:
return {
"success": False,
"error": "driver_id is required"
}
query = """
SELECT
driver_id, name, phone, email,
current_lat, current_lng, current_address, last_location_update,
status, vehicle_type, vehicle_plate,
capacity_kg, capacity_m3, skills,
created_at, updated_at
FROM drivers
WHERE driver_id = %s AND user_id = %s
"""
try:
results = execute_query(query, (driver_id, user_id))
if not results:
return {
"success": False,
"error": f"Driver {driver_id} not found"
}
row = results[0]
# Parse skills JSON if present
skills = []
if row['skills']:
try:
import json
skills = json.loads(row['skills']) if isinstance(row['skills'], str) else row['skills']
except:
skills = []
# Use stored address from database, or reverse geocode if not available
location_address = row['current_address']
if not location_address and row['current_lat'] and row['current_lng']:
try:
reverse_result = safe_reverse_geocode(
float(row['current_lat']),
float(row['current_lng'])
)
location_address = reverse_result.get('address', None)
logger.info(f"Reverse geocoded driver location: {location_address}")
except Exception as e:
logger.warning(f"Failed to reverse geocode driver location: {e}")
location_address = None
driver = {
"driver_id": row['driver_id'],
"name": row['name'],
"contact": {
"phone": row['phone'],
"email": row['email']
},
"location": {
"latitude": float(row['current_lat']) if row['current_lat'] else None,
"longitude": float(row['current_lng']) if row['current_lng'] else None,
"address": location_address,
"last_update": str(row['last_location_update']) if row['last_location_update'] else None
},
"status": row['status'],
"vehicle": {
"type": row['vehicle_type'],
"plate": row['vehicle_plate'],
"capacity_kg": float(row['capacity_kg']) if row['capacity_kg'] else None,
"capacity_m3": float(row['capacity_m3']) if row['capacity_m3'] else None
},
"skills": skills,
"timestamps": {
"created_at": str(row['created_at']),
"updated_at": str(row['updated_at']) if row['updated_at'] else None
}
}
logger.info(f"Retrieved details for driver: {driver_id}")
return {
"success": True,
"driver": driver,
"message": f"Driver {driver_id} details retrieved"
}
except Exception as e:
logger.error(f"Database error getting driver details: {e}")
return {
"success": False,
"error": f"Failed to get driver details: {str(e)}"
}
def handle_search_drivers(tool_input: dict, user_id: str = None) -> dict:
"""
Execute search drivers tool
Args:
tool_input: Dict with search_term
user_id: Authenticated user ID
Returns:
List of matching drivers
"""
# Authentication check - allow dev mode (only in non-production environments)
if not user_id:
# SECURITY: Only allow SKIP_AUTH in development environments
env = os.getenv("ENV", "production").lower()
skip_auth = os.getenv("SKIP_AUTH", "false").lower() == "true"
if skip_auth and env != "production":
user_id = "dev-user"
else:
return {
"success": False,
"error": "Authentication required. Please login first.",
"auth_required": True
}
search_term = tool_input.get("search_term", "").strip()
if not search_term:
return {
"success": False,
"error": "search_term is required"
}
query = """
SELECT
driver_id, name, phone, email,
current_lat, current_lng, current_address,
vehicle_type, vehicle_plate, status, skills, created_at
FROM drivers
WHERE
user_id = %s AND (
driver_id ILIKE %s OR
name ILIKE %s OR
email ILIKE %s OR
phone ILIKE %s OR
vehicle_plate ILIKE %s
)
ORDER BY name ASC
LIMIT 50
"""
search_pattern = f"%{search_term}%"
params = (user_id, search_pattern, search_pattern, search_pattern, search_pattern, search_pattern)
try:
results = execute_query(query, params)
if not results:
return {
"success": True,
"drivers": [],
"count": 0,
"message": f"No drivers found matching '{search_term}'"
}
drivers = []
for row in results:
# Parse skills JSON if present
skills = []
if row['skills']:
try:
import json
skills = json.loads(row['skills']) if isinstance(row['skills'], str) else row['skills']
except:
skills = []
drivers.append({
"driver_id": row['driver_id'],
"name": row['name'],
"phone": row['phone'],
"email": row['email'],
"location": {
"latitude": float(row['current_lat']) if row['current_lat'] else None,
"longitude": float(row['current_lng']) if row['current_lng'] else None,
"address": row['current_address']
},
"vehicle_type": row['vehicle_type'],
"vehicle_plate": row['vehicle_plate'],
"status": row['status'],
"skills": skills,
"created_at": str(row['created_at'])
})
logger.info(f"Search '{search_term}' found {len(drivers)} drivers")
return {
"success": True,
"drivers": drivers,
"count": len(drivers),
"message": f"Found {len(drivers)} driver(s) matching '{search_term}'"
}
except Exception as e:
logger.error(f"Database error searching drivers: {e}")
return {
"success": False,
"error": f"Failed to search drivers: {str(e)}"
}
def handle_get_available_drivers(tool_input: dict, user_id: str = None) -> dict:
"""
Execute get available drivers tool
Args:
tool_input: Dict with optional limit
user_id: Authenticated user ID
Returns:
List of available drivers (active or offline)
"""
# Authentication check - allow dev mode (only in non-production environments)
if not user_id:
# SECURITY: Only allow SKIP_AUTH in development environments
env = os.getenv("ENV", "production").lower()
skip_auth = os.getenv("SKIP_AUTH", "false").lower() == "true"
if skip_auth and env != "production":
user_id = "dev-user"
else:
return {
"success": False,
"error": "Authentication required. Please login first.",
"auth_required": True
}
limit = min(tool_input.get("limit", 20), 100)
query = """
SELECT
driver_id, name, phone, vehicle_type, vehicle_plate,
current_lat, current_lng, current_address, last_location_update,
status, capacity_kg, capacity_m3, skills
FROM drivers
WHERE user_id = %s AND status IN ('active', 'offline')
ORDER BY
CASE status
WHEN 'active' THEN 1
WHEN 'offline' THEN 2
END,
name ASC
LIMIT %s
"""
try:
results = execute_query(query, (user_id, limit))
if not results:
return {
"success": True,
"drivers": [],
"count": 0,
"message": "No available drivers found"
}
drivers = []
for row in results:
# Parse skills JSON if present
skills = []
if row['skills']:
try:
import json
skills = json.loads(row['skills']) if isinstance(row['skills'], str) else row['skills']
except:
skills = []
drivers.append({
"driver_id": row['driver_id'],
"name": row['name'],
"phone": row['phone'],
"location": {
"latitude": float(row['current_lat']) if row['current_lat'] else None,
"longitude": float(row['current_lng']) if row['current_lng'] else None,
"address": row['current_address'],
"last_update": str(row['last_location_update']) if row['last_location_update'] else None
},
"status": row['status'],
"vehicle": {
"type": row['vehicle_type'],
"plate": row['vehicle_plate'],
"capacity_kg": float(row['capacity_kg']) if row['capacity_kg'] else None,
"capacity_m3": float(row['capacity_m3']) if row['capacity_m3'] else None
},
"skills": skills
})
logger.info(f"Retrieved {len(drivers)} available drivers")
return {
"success": True,
"drivers": drivers,
"count": len(drivers),
"message": f"Found {len(drivers)} available driver(s)"
}
except Exception as e:
logger.error(f"Database error getting available drivers: {e}")
return {
"success": False,
"error": f"Failed to get available drivers: {str(e)}"
}
# ============================================================================
# ASSIGNMENT MANAGEMENT TOOLS
# ============================================================================
def handle_create_assignment(tool_input: dict, user_id: str = None) -> dict:
"""
Create assignment (assign order to driver)
Validates order and driver status, calculates route, creates assignment record,
and updates order/driver statuses.
Args:
tool_input: Dict with order_id and driver_id
user_id: Authenticated user ID
Returns:
Assignment creation result with route data
"""
# Authentication check - allow dev mode (only in non-production environments)
if not user_id:
# SECURITY: Only allow SKIP_AUTH in development environments
env = os.getenv("ENV", "production").lower()
skip_auth = os.getenv("SKIP_AUTH", "false").lower() == "true"
if skip_auth and env != "production":
user_id = "dev-user"
else:
return {
"success": False,
"error": "Authentication required. Please login first.",
"auth_required": True
}
from datetime import datetime, timedelta
order_id = (tool_input.get("order_id") or "").strip()
driver_id = (tool_input.get("driver_id") or "").strip()
if not order_id or not driver_id:
return {
"success": False,
"error": "Both order_id and driver_id are required"
}
logger.info(f"Creating assignment: order={order_id}, driver={driver_id}")
try:
conn = get_db_connection()
cursor = conn.cursor()
# Step 1: Validate order exists, belongs to user, and status is "pending"
cursor.execute("""
SELECT status, delivery_lat, delivery_lng, delivery_address, assigned_driver_id
FROM orders
WHERE order_id = %s AND user_id = %s
""", (order_id, user_id))
order_row = cursor.fetchone()
if not order_row:
cursor.close()
conn.close()
return {
"success": False,
"error": f"Order not found: {order_id}"
}
order_status = order_row['status']
delivery_lat = order_row['delivery_lat']
delivery_lng = order_row['delivery_lng']
delivery_address = order_row['delivery_address']
current_driver = order_row['assigned_driver_id']
if order_status != "pending":
cursor.close()
conn.close()
# Provide helpful error message based on current status
if order_status == "assigned" and current_driver:
# Get current driver name for better error message
cursor2 = get_db_connection().cursor()
cursor2.execute("SELECT name FROM drivers WHERE driver_id = %s", (current_driver,))
driver_row = cursor2.fetchone()
driver_name = driver_row['name'] if driver_row else current_driver
cursor2.close()
return {
"success": False,
"error": f"Order {order_id} is already assigned to driver {driver_name}. Use 'unassign_order' first to reassign to a different driver."
}
else:
return {
"success": False,
"error": f"Order must be in 'pending' status to be assigned. Current status: '{order_status}'"
}
if not delivery_lat or not delivery_lng:
cursor.close()
conn.close()
return {
"success": False,
"error": "Order does not have delivery location coordinates"
}
# Step 2: Validate driver exists, belongs to user, and status is "active"
cursor.execute("""
SELECT status, current_lat, current_lng, vehicle_type, name
FROM drivers
WHERE driver_id = %s AND user_id = %s
""", (driver_id, user_id))
driver_row = cursor.fetchone()
if not driver_row:
cursor.close()
conn.close()
return {
"success": False,
"error": f"Driver not found: {driver_id}"
}
driver_status = driver_row['status']
driver_lat = driver_row['current_lat']
driver_lng = driver_row['current_lng']
vehicle_type = driver_row['vehicle_type']
driver_name = driver_row['name']
if driver_status not in ["active", "available"]:
cursor.close()
conn.close()
return {
"success": False,
"error": f"Driver must be 'active' or 'available'. Current status: {driver_status}"
}
if not driver_lat or not driver_lng:
cursor.close()
conn.close()
return {
"success": False,
"error": "Driver does not have current location"
}
# Step 3: Check if order already has active assignment
cursor.execute("""
SELECT assignment_id, driver_id
FROM assignments
WHERE order_id = %s AND status IN ('active', 'in_progress')
""", (order_id,))
existing_assignment = cursor.fetchone()
if existing_assignment:
cursor.close()
conn.close()
existing_asn_id = existing_assignment['assignment_id']
existing_driver_id = existing_assignment['driver_id']
# Get driver name for better error message
cursor2 = get_db_connection().cursor()
cursor2.execute("SELECT name FROM drivers WHERE driver_id = %s", (existing_driver_id,))
driver_row = cursor2.fetchone()
existing_driver_name = driver_row['name'] if driver_row else existing_driver_id
cursor2.close()
return {
"success": False,
"error": f"Order {order_id} is already assigned to driver {existing_driver_name} (Assignment: {existing_asn_id}). Use 'unassign_order' first to reassign."
}
# Step 4: Calculate route from driver location to delivery location
logger.info(f"Calculating route: ({driver_lat},{driver_lng}) -> ({delivery_lat},{delivery_lng})")
route_result = handle_calculate_route({
"origin": f"{driver_lat},{driver_lng}",
"destination": f"{delivery_lat},{delivery_lng}",
"vehicle_type": vehicle_type or "car",
"alternatives": False,
"include_steps": True # Get turn-by-turn directions
})
if not route_result.get("success"):
cursor.close()
conn.close()
return {
"success": False,
"error": f"Route calculation failed: {route_result.get('error', 'Unknown error')}"
}
# Step 5: Generate assignment ID
timestamp = datetime.now().strftime("%Y%m%d%H%M%S%f")
assignment_id = f"ASN-{timestamp}"
# Step 6: Calculate estimated arrival
duration_seconds = route_result.get("duration_in_traffic", {}).get("seconds", 0)
estimated_arrival = datetime.now() + timedelta(seconds=duration_seconds)
# Step 7: Create assignment record
import json
# Extract route directions (turn-by-turn steps)
route_directions = route_result.get("steps", [])
route_directions_json = json.dumps(route_directions) if route_directions else None
cursor.execute("""
INSERT INTO assignments (
assignment_id, order_id, driver_id, user_id,
route_distance_meters, route_duration_seconds, route_duration_in_traffic_seconds,
route_summary, route_confidence, route_directions,
driver_start_location_lat, driver_start_location_lng,
delivery_location_lat, delivery_location_lng, delivery_address,
estimated_arrival, vehicle_type, traffic_delay_seconds,
status
) VALUES (
%s, %s, %s, %s,
%s, %s, %s,
%s, %s, %s,
%s, %s,
%s, %s, %s,
%s, %s, %s,
%s
)
""", (
assignment_id, order_id, driver_id, user_id,
route_result.get("distance", {}).get("meters", 0),
route_result.get("duration", {}).get("seconds", 0),
route_result.get("duration_in_traffic", {}).get("seconds", 0),
route_result.get("route_summary", ""),
route_result.get("confidence", ""),
route_directions_json,
driver_lat, driver_lng,
delivery_lat, delivery_lng, delivery_address,
estimated_arrival, vehicle_type,
route_result.get("traffic_delay", {}).get("seconds", 0),
"active"
))
# Step 8: Update order status and assigned driver
cursor.execute("""
UPDATE orders
SET status = 'assigned', assigned_driver_id = %s
WHERE order_id = %s
""", (driver_id, order_id))
# Step 9: Update driver status to busy
cursor.execute("""
UPDATE drivers
SET status = 'busy'
WHERE driver_id = %s
""", (driver_id,))
conn.commit()
cursor.close()
conn.close()
logger.info(f"Assignment created successfully: {assignment_id}")
return {
"success": True,
"assignment_id": assignment_id,
"order_id": order_id,
"driver_id": driver_id,
"driver_name": driver_name,
"route": {
"distance": route_result.get("distance", {}).get("text", ""),
"duration": route_result.get("duration", {}).get("text", ""),
"duration_in_traffic": route_result.get("duration_in_traffic", {}).get("text", ""),
"traffic_delay": route_result.get("traffic_delay", {}).get("text", ""),
"summary": route_result.get("route_summary", ""),
"directions": route_directions # Turn-by-turn navigation steps
},
"estimated_arrival": estimated_arrival.isoformat(),
"status": "active",
"message": f"Order {order_id} assigned to driver {driver_name} ({driver_id})"
}
except Exception as e:
logger.error(f"Failed to create assignment: {e}")
return {
"success": False,
"error": f"Failed to create assignment: {str(e)}"
}
def handle_auto_assign_order(tool_input: dict, user_id: str = None) -> dict:
"""
Automatically assign order to nearest available driver (distance + validation based).
Selection criteria:
1. Driver must be 'active' with valid location
2. Driver vehicle capacity must meet package weight/volume requirements
3. Driver must have required skills (fragile handling, cold storage, etc.)
4. Selects nearest driver by real-time route distance
Args:
tool_input: Dict with order_id
user_id: Authenticated user ID
Returns:
Assignment details with selected driver info and distance
"""
# Authentication check - allow dev mode (only in non-production environments)
if not user_id:
# SECURITY: Only allow SKIP_AUTH in development environments
env = os.getenv("ENV", "production").lower()
skip_auth = os.getenv("SKIP_AUTH", "false").lower() == "true"
if skip_auth and env != "production":
user_id = "dev-user"
else:
return {
"success": False,
"error": "Authentication required. Please login first.",
"auth_required": True
}
order_id = (tool_input.get("order_id") or "").strip()
if not order_id:
return {
"success": False,
"error": "Missing required field: order_id"
}
try:
conn = get_db_connection()
cursor = conn.cursor(cursor_factory=RealDictCursor)
# Step 1: Get order details with ALL requirements (filtered by user_id)
cursor.execute("""
SELECT
order_id, customer_name, delivery_address,
delivery_lat, delivery_lng, status,
weight_kg, volume_m3, is_fragile,
requires_cold_storage, requires_signature,
priority, assigned_driver_id
FROM orders
WHERE order_id = %s AND user_id = %s
""", (order_id, user_id))
order = cursor.fetchone()
if not order:
cursor.close()
conn.close()
return {
"success": False,
"error": f"Order not found: {order_id}"
}
if order['status'] != 'pending':
cursor.close()
conn.close()
return {
"success": False,
"error": f"Order must be 'pending' to auto-assign. Current status: {order['status']}"
}
if not order['delivery_lat'] or not order['delivery_lng']:
cursor.close()
conn.close()
return {
"success": False,
"error": "Order missing delivery coordinates. Cannot calculate routes."
}
# Extract order requirements
required_weight_kg = order['weight_kg'] or 0
required_volume_m3 = order['volume_m3'] or 0
needs_fragile_handling = order['is_fragile'] or False
needs_cold_storage = order['requires_cold_storage'] or False
# Step 2: Get all active drivers with valid locations (filtered by user_id)
cursor.execute("""
SELECT
driver_id, name, phone, current_lat, current_lng,
vehicle_type, capacity_kg, capacity_m3, skills
FROM drivers
WHERE user_id = %s
AND status = 'active'
AND current_lat IS NOT NULL
AND current_lng IS NOT NULL
""", (user_id,))
active_drivers = cursor.fetchall()
if not active_drivers:
cursor.close()
conn.close()
return {
"success": False,
"error": "No active drivers available with valid location"
}
# Step 3: Filter and score each driver
suitable_drivers = []
for driver in active_drivers:
# Validate capacity (weight and volume)
driver_capacity_kg = driver['capacity_kg'] or 0
driver_capacity_m3 = driver['capacity_m3'] or 0
if driver_capacity_kg < required_weight_kg:
logger.info(f"Driver {driver['driver_id']} ({driver['name']}) - Insufficient weight capacity: {driver_capacity_kg}kg < {required_weight_kg}kg")
continue
if driver_capacity_m3 < required_volume_m3:
logger.info(f"Driver {driver['driver_id']} ({driver['name']}) - Insufficient volume capacity: {driver_capacity_m3}m³ < {required_volume_m3}m³")
continue
# Validate skills
driver_skills = driver['skills'] or []
if needs_fragile_handling and "fragile_handler" not in driver_skills:
logger.info(f"Driver {driver['driver_id']} ({driver['name']}) - Missing fragile_handler skill")
continue
if needs_cold_storage and "refrigerated" not in driver_skills:
logger.info(f"Driver {driver['driver_id']} ({driver['name']}) - Missing refrigerated skill")
continue
# Step 4: Calculate real-time route distance
route_result = handle_calculate_route({
"origin": f"{driver['current_lat']},{driver['current_lng']}",
"destination": f"{order['delivery_lat']},{order['delivery_lng']}",
"vehicle_type": driver['vehicle_type'],
"include_steps": False # We don't need turn-by-turn for scoring
})
if not route_result.get("success"):
logger.warning(f"Driver {driver['driver_id']} ({driver['name']}) - Route calculation failed: {route_result.get('error')}")
continue
# Extract distance
distance_meters = route_result.get('distance_meters', 999999)
duration_seconds = route_result.get('duration_in_traffic_seconds', 0)
suitable_drivers.append({
"driver": driver,
"distance_meters": distance_meters,
"distance_km": distance_meters / 1000,
"duration_seconds": duration_seconds,
"duration_minutes": duration_seconds / 60,
"route_data": route_result
})
if not suitable_drivers:
cursor.close()
conn.close()
return {
"success": False,
"error": "No suitable drivers found. All active drivers failed capacity or skill requirements."
}
# Step 5: Sort by distance (nearest first)
suitable_drivers.sort(key=lambda x: x['distance_meters'])
# Step 6: Select nearest driver
best_match = suitable_drivers[0]
selected_driver = best_match['driver']
logger.info(f"Auto-assign: Selected driver {selected_driver['driver_id']} ({selected_driver['name']}) - {best_match['distance_km']:.2f}km away")
cursor.close()
conn.close()
# Step 7: Create assignment using existing function
assignment_result = handle_create_assignment({
"order_id": order_id,
"driver_id": selected_driver['driver_id']
}, user_id=user_id)
if not assignment_result.get("success"):
return assignment_result
# Step 8: Return enhanced response with selection info
return {
"success": True,
"assignment_id": assignment_result['assignment_id'],
"method": "auto_assignment",
"order_id": order_id,
"driver_id": selected_driver['driver_id'],
"driver_name": selected_driver['name'],
"driver_phone": selected_driver['phone'],
"driver_vehicle_type": selected_driver['vehicle_type'],
"selection_reason": "Nearest available driver meeting all requirements",
"distance_km": round(best_match['distance_km'], 2),
"distance_meters": best_match['distance_meters'],
"estimated_duration_minutes": round(best_match['duration_minutes'], 1),
"candidates_evaluated": len(active_drivers),
"suitable_candidates": len(suitable_drivers),
"route_summary": assignment_result.get('route_summary'),
"estimated_arrival": assignment_result.get('estimated_arrival'),
"assignment_details": assignment_result
}
except Exception as e:
logger.error(f"Failed to auto-assign order: {e}")
return {
"success": False,
"error": f"Failed to auto-assign order: {str(e)}"
}
def handle_intelligent_assign_order(tool_input: dict, user_id: str = None) -> dict:
"""
Intelligently assign order using Gemini AI to analyze all parameters.
Uses Google's Gemini AI to evaluate:
- Order characteristics (priority, weight, fragility, time constraints)
- All available drivers (location, capacity, skills, vehicle type)
- Real-time routing data (distance, traffic, weather)
- Complex tradeoffs and optimal matching
Returns assignment with AI reasoning explaining the selection.
Args:
tool_input: Dict with order_id
user_id: Authenticated user ID
Returns:
Assignment details with AI reasoning and selected driver info
"""
# Authentication check - allow dev mode (only in non-production environments)
if not user_id:
# SECURITY: Only allow SKIP_AUTH in development environments
env = os.getenv("ENV", "production").lower()
skip_auth = os.getenv("SKIP_AUTH", "false").lower() == "true"
if skip_auth and env != "production":
user_id = "dev-user"
else:
return {
"success": False,
"error": "Authentication required. Please login first.",
"auth_required": True
}
import os
import json
import google.generativeai as genai
from datetime import datetime
order_id = (tool_input.get("order_id") or "").strip()
if not order_id:
return {
"success": False,
"error": "Missing required field: order_id"
}
# Check for Gemini API key
gemini_api_key = os.getenv("GOOGLE_API_KEY")
if not gemini_api_key:
return {
"success": False,
"error": "GOOGLE_API_KEY environment variable not set. Required for intelligent assignment."
}
try:
conn = get_db_connection()
cursor = conn.cursor(cursor_factory=RealDictCursor)
# Step 1: Get complete order details (filtered by user_id)
cursor.execute("""
SELECT
order_id, customer_name, customer_phone, customer_email,
delivery_address, delivery_lat, delivery_lng,
pickup_address, pickup_lat, pickup_lng,
time_window_start, time_window_end, expected_delivery_time,
priority, weight_kg, volume_m3, order_value,
is_fragile, requires_cold_storage, requires_signature,
payment_status, special_instructions, status,
created_at, sla_grace_period_minutes
FROM orders
WHERE order_id = %s AND user_id = %s
""", (order_id, user_id))
order = cursor.fetchone()
if not order:
cursor.close()
conn.close()
return {
"success": False,
"error": f"Order not found: {order_id}"
}
if order['status'] != 'pending':
cursor.close()
conn.close()
return {
"success": False,
"error": f"Order must be 'pending' to assign. Current status: {order['status']}"
}
if not order['delivery_lat'] or not order['delivery_lng']:
cursor.close()
conn.close()
return {
"success": False,
"error": "Order missing delivery coordinates. Cannot calculate routes."
}
# Step 2: Get all active drivers with complete details (filtered by user_id)
cursor.execute("""
SELECT
driver_id, name, phone, email,
current_lat, current_lng, last_location_update,
vehicle_type, vehicle_plate, capacity_kg, capacity_m3,
skills, status, created_at, updated_at
FROM drivers
WHERE user_id = %s
AND status = 'active'
AND current_lat IS NOT NULL
AND current_lng IS NOT NULL
""", (user_id,))
active_drivers = cursor.fetchall()
if not active_drivers:
cursor.close()
conn.close()
return {
"success": False,
"error": "No active drivers available with valid location"
}
# Step 3: Calculate routing data for each driver
drivers_with_routes = []
for driver in active_drivers:
# Calculate route with traffic
route_result = handle_calculate_route({
"origin": f"{driver['current_lat']},{driver['current_lng']}",
"destination": f"{order['delivery_lat']},{order['delivery_lng']}",
"vehicle_type": driver['vehicle_type'],
"include_steps": False
})
# Get weather-aware routing if available
try:
intelligent_route = handle_calculate_intelligent_route({
"origin": f"{driver['current_lat']},{driver['current_lng']}",
"destination": f"{order['delivery_lat']},{order['delivery_lng']}",
"vehicle_type": driver['vehicle_type']
})
weather_data = intelligent_route.get('weather', {})
except:
weather_data = {}
if route_result.get("success"):
drivers_with_routes.append({
"driver_id": driver['driver_id'],
"name": driver['name'],
"phone": driver['phone'],
"vehicle_type": driver['vehicle_type'],
"vehicle_plate": driver['vehicle_plate'],
"capacity_kg": float(driver['capacity_kg']) if driver['capacity_kg'] else 0,
"capacity_m3": float(driver['capacity_m3']) if driver['capacity_m3'] else 0,
"skills": driver['skills'] or [],
"current_location": {
"lat": float(driver['current_lat']),
"lng": float(driver['current_lng'])
},
"route_to_delivery": {
"distance_km": round(route_result.get('distance_meters', 0) / 1000, 2),
"distance_meters": route_result.get('distance_meters', 0),
"duration_minutes": round(route_result.get('duration_in_traffic_seconds', 0) / 60, 1),
"traffic_delay_seconds": route_result.get('traffic_delay_seconds', 0),
"route_summary": route_result.get('route_summary', ''),
"has_tolls": route_result.get('has_tolls', False)
},
"weather_conditions": weather_data
})
if not drivers_with_routes:
cursor.close()
conn.close()
return {
"success": False,
"error": "Unable to calculate routes for any active drivers"
}
cursor.close()
conn.close()
# Step 4: Build comprehensive context for Gemini
order_context = {
"order_id": order['order_id'],
"customer": {
"name": order['customer_name'],
"phone": order['customer_phone']
},
"delivery": {
"address": order['delivery_address'],
"coordinates": {"lat": float(order['delivery_lat']), "lng": float(order['delivery_lng'])}
},
"time_constraints": {
"expected_delivery_time": str(order['expected_delivery_time']) if order['expected_delivery_time'] else None,
"time_window_start": str(order['time_window_start']) if order['time_window_start'] else None,
"time_window_end": str(order['time_window_end']) if order['time_window_end'] else None,
"sla_grace_period_minutes": order['sla_grace_period_minutes'],
"created_at": str(order['created_at'])
},
"package": {
"weight_kg": float(order['weight_kg']) if order['weight_kg'] else 0,
"volume_m3": float(order['volume_m3']) if order['volume_m3'] else 0,
"value": float(order['order_value']) if order['order_value'] else 0,
"is_fragile": order['is_fragile'] or False,
"requires_cold_storage": order['requires_cold_storage'] or False,
"requires_signature": order['requires_signature'] or False
},
"priority": order['priority'],
"payment_status": order['payment_status'],
"special_instructions": order['special_instructions']
}
# Step 5: Call Gemini AI for intelligent decision
genai.configure(api_key=gemini_api_key)
model = genai.GenerativeModel('gemini-2.0-flash-exp')
prompt = f"""You are an intelligent fleet management AI. Analyze the following delivery order and available drivers to select the BEST driver for this assignment.
**ORDER DETAILS:**
{json.dumps(order_context, indent=2)}
**AVAILABLE DRIVERS ({len(drivers_with_routes)}):**
{json.dumps(drivers_with_routes, indent=2)}
**CURRENT TIME:** {datetime.now().isoformat()}
**YOUR TASK:**
Analyze ALL parameters comprehensively:
1. **Distance & Route Efficiency**: Consider route distance, traffic delays, tolls
2. **Vehicle Matching**: Match vehicle type and capacity to package requirements
3. **Skills Requirements**: Ensure driver has necessary skills (fragile handling, cold storage)
4. **Time Constraints**: Evaluate ability to meet expected delivery time
5. **Priority Level**: Factor in order priority (urgent > express > standard)
6. **Weather Conditions**: Consider weather impact on delivery safety and speed
7. **Special Requirements**: Account for signature requirements, special instructions
8. **Cost Efficiency**: Consider fuel costs, toll roads, driver utilization
**RESPONSE FORMAT (JSON only, no markdown):**
{{
"selected_driver_id": "DRV-XXXXXXXXX",
"confidence_score": 0.95,
"reasoning": {{
"primary_factors": ["Nearest driver (5.2km)", "Has fragile_handler skill", "Sufficient capacity"],
"trade_offs_considered": ["Driver A was 1km closer but lacked required skills", "Driver B had larger capacity but 15min further"],
"risk_assessment": "Low risk - clear weather, light traffic, experienced driver",
"decision_summary": "Selected Driver X because they offer the best balance of proximity (5.2km), required skills (fragile_handler), and adequate capacity (10kg) for this urgent fragile delivery."
}},
"alternatives": [
{{"driver_id": "DRV-YYY", "reason_not_selected": "Missing fragile_handler skill"}},
{{"driver_id": "DRV-ZZZ", "reason_not_selected": "15 minutes further away"}}
]
}}
**IMPORTANT:** Return ONLY valid JSON. Do not include markdown formatting, code blocks, or explanatory text outside the JSON."""
# Call Gemini with timeout protection (30 seconds max)
try:
future = _blocking_executor.submit(model.generate_content, prompt)
response = future.result(timeout=30)
except FuturesTimeoutError:
logger.error("Gemini AI call timed out after 30 seconds")
return {
"success": False,
"error": "AI assignment timed out. Please try auto_assign_order instead."
}
response_text = response.text.strip()
# Clean response (remove markdown code blocks if present)
if response_text.startswith("```json"):
response_text = response_text[7:]
if response_text.startswith("```"):
response_text = response_text[3:]
if response_text.endswith("```"):
response_text = response_text[:-3]
response_text = response_text.strip()
# Parse Gemini response
try:
ai_decision = json.loads(response_text)
except json.JSONDecodeError as e:
logger.error(f"Failed to parse Gemini response: {e}")
logger.error(f"Response text: {response_text}")
return {
"success": False,
"error": f"Failed to parse AI response. Invalid JSON returned by Gemini: {str(e)}"
}
selected_driver_id = ai_decision.get("selected_driver_id")
if not selected_driver_id:
return {
"success": False,
"error": "AI did not select a driver"
}
# Validate selected driver is still available
selected_driver = next((d for d in drivers_with_routes if d["driver_id"] == selected_driver_id), None)
if not selected_driver:
return {
"success": False,
"error": f"AI selected driver {selected_driver_id} but driver not found in available list"
}
# Step 6: Create assignment using existing function
logger.info(f"Intelligent-assign: AI selected driver {selected_driver_id} ({selected_driver['name']})")
assignment_result = handle_create_assignment({
"order_id": order_id,
"driver_id": selected_driver_id
}, user_id=user_id)
if not assignment_result.get("success"):
return assignment_result
# Step 7: Return enhanced response with AI reasoning
return {
"success": True,
"assignment_id": assignment_result['assignment_id'],
"method": "intelligent_assignment",
"ai_provider": "Google Gemini 2.0 Flash",
"ai_model": "gemini-2.0-flash-exp",
"order_id": order_id,
"driver_id": selected_driver_id,
"driver_name": selected_driver['name'],
"driver_phone": selected_driver['phone'],
"driver_vehicle_type": selected_driver['vehicle_type'],
"distance_km": selected_driver['route_to_delivery']['distance_km'],
"estimated_duration_minutes": selected_driver['route_to_delivery']['duration_minutes'],
"ai_reasoning": ai_decision.get('reasoning', {}),
"confidence_score": ai_decision.get('confidence_score', 0),
"alternatives_considered": ai_decision.get('alternatives', []),
"candidates_evaluated": len(drivers_with_routes),
"route_summary": assignment_result.get('route_summary'),
"estimated_arrival": assignment_result.get('estimated_arrival'),
"assignment_details": assignment_result
}
except Exception as e:
logger.error(f"Failed to intelligently assign order: {e}")
return {
"success": False,
"error": f"Failed to intelligently assign order: {str(e)}"
}
def handle_get_assignment_details(tool_input: dict, user_id: str = None) -> dict:
"""
Get assignment details
Can query by assignment_id, order_id, or driver_id.
Returns assignment with route data and related order/driver info.
Args:
tool_input: Dict with assignment_id, order_id, or driver_id
user_id: Authenticated user ID
Returns:
Assignment details or list of assignments
"""
# Authentication check - allow dev mode (only in non-production environments)
if not user_id:
# SECURITY: Only allow SKIP_AUTH in development environments
env = os.getenv("ENV", "production").lower()
skip_auth = os.getenv("SKIP_AUTH", "false").lower() == "true"
if skip_auth and env != "production":
user_id = "dev-user"
else:
return {
"success": False,
"error": "Authentication required. Please login first.",
"auth_required": True
}
assignment_id = (tool_input.get("assignment_id") or "").strip()
order_id = (tool_input.get("order_id") or "").strip()
driver_id = (tool_input.get("driver_id") or "").strip()
if not assignment_id and not order_id and not driver_id:
return {
"success": False,
"error": "Provide at least one of: assignment_id, order_id, or driver_id"
}
try:
conn = get_db_connection()
cursor = conn.cursor()
# Build query based on provided parameters (filtered by user_id)
query = """
SELECT
a.assignment_id, a.order_id, a.driver_id, a.status,
a.assigned_at, a.updated_at, a.estimated_arrival, a.actual_arrival,
a.route_distance_meters, a.route_duration_seconds, a.route_duration_in_traffic_seconds,
a.route_summary, a.route_confidence, a.traffic_delay_seconds, a.route_directions,
a.driver_start_location_lat, a.driver_start_location_lng,
a.delivery_location_lat, a.delivery_location_lng, a.delivery_address,
a.vehicle_type, a.sequence_number, a.notes, a.failure_reason,
o.customer_name, o.status as order_status,
d.name as driver_name, d.status as driver_status, d.phone as driver_phone
FROM assignments a
LEFT JOIN orders o ON a.order_id = o.order_id
LEFT JOIN drivers d ON a.driver_id = d.driver_id
WHERE a.user_id = %s
"""
params = [user_id]
if assignment_id:
query += " AND a.assignment_id = %s"
params.append(assignment_id)
if order_id:
query += " AND a.order_id = %s"
params.append(order_id)
if driver_id:
query += " AND a.driver_id = %s"
params.append(driver_id)
query += " ORDER BY a.assigned_at DESC"
cursor.execute(query, params)
rows = cursor.fetchall()
cursor.close()
conn.close()
if not rows:
return {
"success": False,
"error": "No assignments found matching criteria"
}
# Format results
assignments = []
for row in rows:
assignment = {
"assignment_id": row['assignment_id'],
"order_id": row['order_id'],
"driver_id": row['driver_id'],
"status": row['status'],
"assigned_at": row['assigned_at'].isoformat() if row['assigned_at'] else None,
"updated_at": row['updated_at'].isoformat() if row['updated_at'] else None,
"estimated_arrival": row['estimated_arrival'].isoformat() if row['estimated_arrival'] else None,
"actual_arrival": row['actual_arrival'].isoformat() if row['actual_arrival'] else None,
"route": {
"distance_meters": row['route_distance_meters'],
"distance_km": round(row['route_distance_meters'] / 1000, 2) if row['route_distance_meters'] else 0,
"duration_seconds": row['route_duration_seconds'],
"duration_minutes": round(row['route_duration_seconds'] / 60, 1) if row['route_duration_seconds'] else 0,
"duration_in_traffic_seconds": row['route_duration_in_traffic_seconds'],
"duration_in_traffic_minutes": round(row['route_duration_in_traffic_seconds'] / 60, 1) if row['route_duration_in_traffic_seconds'] else 0,
"summary": row['route_summary'],
"confidence": row['route_confidence'],
"traffic_delay_seconds": row['traffic_delay_seconds'],
"traffic_delay_minutes": round(row['traffic_delay_seconds'] / 60, 1) if row['traffic_delay_seconds'] else 0,
"directions": row['route_directions'] # Turn-by-turn navigation steps
},
"driver_start_location": {
"lat": float(row['driver_start_location_lat']) if row['driver_start_location_lat'] else None,
"lng": float(row['driver_start_location_lng']) if row['driver_start_location_lng'] else None
},
"delivery_location": {
"lat": float(row['delivery_location_lat']) if row['delivery_location_lat'] else None,
"lng": float(row['delivery_location_lng']) if row['delivery_location_lng'] else None,
"address": row['delivery_address']
},
"vehicle_type": row['vehicle_type'],
"sequence_number": row['sequence_number'],
"notes": row['notes'],
"failure_reason": row['failure_reason'],
"order": {
"customer_name": row['customer_name'],
"status": row['order_status']
},
"driver": {
"name": row['driver_name'],
"status": row['driver_status'],
"phone": row['driver_phone']
}
}
assignments.append(assignment)
if assignment_id and len(assignments) == 1:
# Single assignment query
return {
"success": True,
"assignment": assignments[0]
}
else:
# Multiple assignments
return {
"success": True,
"count": len(assignments),
"assignments": assignments
}
except Exception as e:
logger.error(f"Failed to get assignment details: {e}")
return {
"success": False,
"error": f"Failed to get assignment details: {str(e)}"
}
def handle_update_assignment(tool_input: dict, user_id: str = None) -> dict:
"""
Update assignment status
Allows updating assignment status and actual metrics.
Manages cascading updates to order and driver statuses.
Args:
tool_input: Dict with assignment_id, status (optional), actual_arrival (optional), notes (optional)
user_id: Authenticated user ID
Returns:
Update result
"""
# Authentication check - allow dev mode (only in non-production environments)
if not user_id:
# SECURITY: Only allow SKIP_AUTH in development environments
env = os.getenv("ENV", "production").lower()
skip_auth = os.getenv("SKIP_AUTH", "false").lower() == "true"
if skip_auth and env != "production":
user_id = "dev-user"
else:
return {
"success": False,
"error": "Authentication required. Please login first.",
"auth_required": True
}
from datetime import datetime
assignment_id = (tool_input.get("assignment_id") or "").strip()
new_status = (tool_input.get("status") or "").strip().lower()
actual_arrival = tool_input.get("actual_arrival")
notes = (tool_input.get("notes") or "").strip()
if not assignment_id:
return {
"success": False,
"error": "assignment_id is required"
}
if not new_status and not actual_arrival and not notes:
return {
"success": False,
"error": "Provide at least one field to update: status, actual_arrival, or notes"
}
# Validate status if provided
valid_statuses = ["active", "in_progress", "completed", "failed", "cancelled"]
if new_status and new_status not in valid_statuses:
return {
"success": False,
"error": f"Invalid status. Must be one of: {', '.join(valid_statuses)}"
}
logger.info(f"Updating assignment: {assignment_id}, status={new_status}")
try:
conn = get_db_connection()
cursor = conn.cursor()
# Get current assignment details (filtered by user_id)
cursor.execute("""
SELECT status, order_id, driver_id
FROM assignments
WHERE assignment_id = %s AND user_id = %s
""", (assignment_id, user_id))
assignment_row = cursor.fetchone()
if not assignment_row:
cursor.close()
conn.close()
return {
"success": False,
"error": f"Assignment not found: {assignment_id}"
}
current_status = assignment_row['status']
order_id = assignment_row['order_id']
driver_id = assignment_row['driver_id']
# Validate status transitions
if new_status:
# Cannot go backwards
if current_status == "completed" and new_status in ["active", "in_progress"]:
cursor.close()
conn.close()
return {
"success": False,
"error": "Cannot change status from 'completed' back to 'active' or 'in_progress'"
}
if current_status == "failed" and new_status != "failed":
cursor.close()
conn.close()
return {
"success": False,
"error": "Cannot change status from 'failed'"
}
if current_status == "cancelled" and new_status != "cancelled":
cursor.close()
conn.close()
return {
"success": False,
"error": "Cannot change status from 'cancelled'"
}
# Build update query
updates = []
params = []
if new_status:
updates.append("status = %s")
params.append(new_status)
if actual_arrival:
updates.append("actual_arrival = %s")
params.append(actual_arrival)
if notes:
updates.append("notes = %s")
params.append(notes)
params.append(assignment_id)
# Update assignment
cursor.execute(f"""
UPDATE assignments
SET {', '.join(updates)}
WHERE assignment_id = %s
""", params)
# Handle cascading updates based on new status
if new_status:
if new_status in ["completed", "failed", "cancelled"]:
# Update order status
if new_status == "completed":
cursor.execute("""
UPDATE orders
SET status = 'delivered'
WHERE order_id = %s
""", (order_id,))
elif new_status == "failed":
cursor.execute("""
UPDATE orders
SET status = 'failed'
WHERE order_id = %s
""", (order_id,))
elif new_status == "cancelled":
cursor.execute("""
UPDATE orders
SET status = 'cancelled', assigned_driver_id = NULL
WHERE order_id = %s
""", (order_id,))
# Check if driver has other active assignments
cursor.execute("""
SELECT COUNT(*) as count
FROM assignments
WHERE driver_id = %s AND status IN ('active', 'in_progress') AND assignment_id != %s
""", (driver_id, assignment_id))
other_assignments_count = cursor.fetchone()['count']
# If no other active assignments, set driver back to active
if other_assignments_count == 0:
cursor.execute("""
UPDATE drivers
SET status = 'active'
WHERE driver_id = %s
""", (driver_id,))
conn.commit()
cursor.close()
conn.close()
logger.info(f"Assignment updated successfully: {assignment_id}")
return {
"success": True,
"assignment_id": assignment_id,
"updated_fields": {
"status": new_status if new_status else current_status,
"actual_arrival": actual_arrival if actual_arrival else "not updated",
"notes": notes if notes else "not updated"
},
"message": f"Assignment {assignment_id} updated successfully"
}
except Exception as e:
logger.error(f"Failed to update assignment: {e}")
return {
"success": False,
"error": f"Failed to update assignment: {str(e)}"
}
def handle_unassign_order(tool_input: dict, user_id: str = None) -> dict:
"""
Unassign order (delete assignment)
Removes assignment and reverts order/driver to original states.
Args:
tool_input: Dict with order_id or assignment_id, and confirm flag
user_id: Authenticated user ID
Returns:
Unassignment result
"""
# Authentication check - allow dev mode (only in non-production environments)
if not user_id:
# SECURITY: Only allow SKIP_AUTH in development environments
env = os.getenv("ENV", "production").lower()
skip_auth = os.getenv("SKIP_AUTH", "false").lower() == "true"
if skip_auth and env != "production":
user_id = "dev-user"
else:
return {
"success": False,
"error": "Authentication required. Please login first.",
"auth_required": True
}
order_id = (tool_input.get("order_id") or "").strip()
assignment_id = (tool_input.get("assignment_id") or "").strip()
confirm = tool_input.get("confirm", False)
if not order_id and not assignment_id:
return {
"success": False,
"error": "Provide either order_id or assignment_id"
}
if not confirm:
return {
"success": False,
"error": "Unassignment requires confirm=true for safety"
}
logger.info(f"Unassigning: order_id={order_id}, assignment_id={assignment_id}")
try:
conn = get_db_connection()
cursor = conn.cursor()
# Find assignment (filtered by user_id)
if assignment_id:
cursor.execute("""
SELECT order_id, driver_id, status
FROM assignments
WHERE assignment_id = %s AND user_id = %s
""", (assignment_id, user_id))
else:
cursor.execute("""
SELECT assignment_id, driver_id, status
FROM assignments
WHERE order_id = %s AND user_id = %s AND status IN ('active', 'in_progress')
ORDER BY assigned_at DESC
LIMIT 1
""", (order_id, user_id))
assignment_row = cursor.fetchone()
if not assignment_row:
cursor.close()
conn.close()
return {
"success": False,
"error": "No active assignment found"
}
if assignment_id:
found_order_id = assignment_row['order_id']
driver_id = assignment_row['driver_id']
status = assignment_row['status']
else:
assignment_id = assignment_row['assignment_id']
driver_id = assignment_row['driver_id']
status = assignment_row['status']
found_order_id = order_id
# Validate status (cannot unassign if in_progress without force)
if status == "in_progress":
cursor.close()
conn.close()
return {
"success": False,
"error": "Cannot unassign order with 'in_progress' status. Complete or fail the delivery first."
}
# Delete assignment
cursor.execute("""
DELETE FROM assignments
WHERE assignment_id = %s
""", (assignment_id,))
# Revert order status to pending and clear assigned driver
cursor.execute("""
UPDATE orders
SET status = 'pending', assigned_driver_id = NULL
WHERE order_id = %s
""", (found_order_id,))
# Check if driver has other active assignments
cursor.execute("""
SELECT COUNT(*)
FROM assignments
WHERE driver_id = %s AND status IN ('active', 'in_progress')
""", (driver_id,))
other_assignments_count = cursor.fetchone()[0]
# If no other active assignments, set driver back to active
if other_assignments_count == 0:
cursor.execute("""
UPDATE drivers
SET status = 'active'
WHERE driver_id = %s
""", (driver_id,))
conn.commit()
cursor.close()
conn.close()
logger.info(f"Assignment removed successfully: {assignment_id}")
return {
"success": True,
"assignment_id": assignment_id,
"order_id": found_order_id,
"driver_id": driver_id,
"message": f"Order {found_order_id} unassigned from driver {driver_id}. Order status reverted to 'pending'."
}
except Exception as e:
logger.error(f"Failed to unassign order: {e}")
return {
"success": False,
"error": f"Failed to unassign order: {str(e)}"
}
def handle_complete_delivery(tool_input: dict, user_id: str = None) -> dict:
"""
Complete a delivery and automatically update driver location
Marks delivery as completed, updates order/driver statuses, and moves
driver location to the delivery address.
Args:
tool_input: Dict with assignment_id, confirm flag, and optional fields
user_id: Authenticated user ID
Returns:
Completion result
"""
# Authentication check - allow dev mode (only in non-production environments)
if not user_id:
# SECURITY: Only allow SKIP_AUTH in development environments
env = os.getenv("ENV", "production").lower()
skip_auth = os.getenv("SKIP_AUTH", "false").lower() == "true"
if skip_auth and env != "production":
user_id = "dev-user"
else:
return {
"success": False,
"error": "Authentication required. Please login first.",
"auth_required": True
}
from datetime import datetime
assignment_id = (tool_input.get("assignment_id") or "").strip()
confirm = tool_input.get("confirm", False)
actual_distance_meters = tool_input.get("actual_distance_meters")
notes = (tool_input.get("notes") or "").strip()
if not assignment_id:
return {
"success": False,
"error": "assignment_id is required"
}
if not confirm:
return {
"success": False,
"error": "Delivery completion requires confirm=true for safety"
}
logger.info(f"Completing delivery: assignment_id={assignment_id}")
try:
conn = get_db_connection()
cursor = conn.cursor()
# Get assignment and order details including timing fields (filtered by user_id)
cursor.execute("""
SELECT
a.status, a.order_id, a.driver_id,
a.delivery_location_lat, a.delivery_location_lng, a.delivery_address,
o.customer_name, o.expected_delivery_time, o.sla_grace_period_minutes,
d.name as driver_name
FROM assignments a
JOIN orders o ON a.order_id = o.order_id
JOIN drivers d ON a.driver_id = d.driver_id
WHERE a.assignment_id = %s AND a.user_id = %s
""", (assignment_id, user_id))
assignment_row = cursor.fetchone()
if not assignment_row:
cursor.close()
conn.close()
return {
"success": False,
"error": f"Assignment not found: {assignment_id}"
}
status = assignment_row['status']
order_id = assignment_row['order_id']
driver_id = assignment_row['driver_id']
delivery_lat = assignment_row['delivery_location_lat']
delivery_lng = assignment_row['delivery_location_lng']
delivery_address = assignment_row['delivery_address']
customer_name = assignment_row['customer_name']
driver_name = assignment_row['driver_name']
expected_delivery_time = assignment_row['expected_delivery_time']
sla_grace_period_minutes = assignment_row['sla_grace_period_minutes'] or 15
# Validate status
if status not in ["active", "in_progress"]:
cursor.close()
conn.close()
return {
"success": False,
"error": f"Cannot complete delivery: assignment status is '{status}'. Must be 'active' or 'in_progress'."
}
# Validate delivery location exists
if not delivery_lat or not delivery_lng:
cursor.close()
conn.close()
return {
"success": False,
"error": "Cannot complete delivery: delivery location coordinates are missing"
}
# Current timestamp for completion
completion_time = datetime.now()
# Step 1: Update assignment to completed
update_fields = ["status = %s", "actual_arrival = %s", "updated_at = %s"]
params = ["completed", completion_time, completion_time]
if actual_distance_meters:
update_fields.append("actual_distance_meters = %s")
params.append(actual_distance_meters)
if notes:
update_fields.append("notes = %s")
params.append(notes)
params.append(assignment_id)
cursor.execute(f"""
UPDATE assignments
SET {', '.join(update_fields)}
WHERE assignment_id = %s
""", tuple(params))
# Step 2: Update driver location to delivery address (including address text)
cursor.execute("""
UPDATE drivers
SET current_lat = %s,
current_lng = %s,
current_address = %s,
last_location_update = %s,
updated_at = %s
WHERE driver_id = %s
""", (delivery_lat, delivery_lng, delivery_address, completion_time, completion_time, driver_id))
logger.info(f"Driver {driver_id} location updated to delivery address: {delivery_address} ({delivery_lat}, {delivery_lng})")
# Step 3: Calculate delivery performance status
delivery_status = "on_time" # Default
timing_info = {
"expected_delivery_time": expected_delivery_time.isoformat() if expected_delivery_time else None,
"actual_delivery_time": completion_time.isoformat(),
"sla_grace_period_minutes": sla_grace_period_minutes
}
if expected_delivery_time:
# Calculate grace period deadline
from datetime import timedelta
grace_deadline = expected_delivery_time + timedelta(minutes=sla_grace_period_minutes)
if completion_time <= expected_delivery_time:
delivery_status = "on_time"
timing_info["status"] = "On-time delivery"
timing_info["delay_minutes"] = 0
elif completion_time <= grace_deadline:
delivery_status = "late"
delay_minutes = int((completion_time - expected_delivery_time).total_seconds() / 60)
timing_info["status"] = f"Late (within grace period)"
timing_info["delay_minutes"] = delay_minutes
else:
delivery_status = "very_late"
delay_minutes = int((completion_time - expected_delivery_time).total_seconds() / 60)
timing_info["status"] = f"Very late (SLA violation)"
timing_info["delay_minutes"] = delay_minutes
# Step 4: Update order status to delivered with timing info
cursor.execute("""
UPDATE orders
SET status = 'delivered',
delivered_at = %s,
delivery_status = %s,
updated_at = %s
WHERE order_id = %s
""", (completion_time, delivery_status, completion_time, order_id))
logger.info(f"Order {order_id} marked as delivered with status '{delivery_status}'")
# Step 4: Check if driver has other active assignments
cursor.execute("""
SELECT COUNT(*) as count FROM assignments
WHERE driver_id = %s AND status IN ('active', 'in_progress')
AND assignment_id != %s
""", (driver_id, assignment_id))
other_assignments_count = cursor.fetchone()['count']
# Step 5: If no other active assignments, set driver to active
cascading_actions = []
if other_assignments_count == 0:
cursor.execute("""
UPDATE drivers
SET status = 'active', updated_at = %s
WHERE driver_id = %s
""", (completion_time, driver_id))
cascading_actions.append(f"Driver {driver_name} set to 'active' (no other assignments)")
else:
cascading_actions.append(f"Driver {driver_name} remains 'busy' ({other_assignments_count} other active assignment(s))")
conn.commit()
cursor.close()
conn.close()
logger.info(f"Delivery completed successfully: {assignment_id}")
return {
"success": True,
"assignment_id": assignment_id,
"order_id": order_id,
"driver_id": driver_id,
"customer_name": customer_name,
"driver_name": driver_name,
"completed_at": completion_time.isoformat(),
"delivery_status": delivery_status,
"timing": timing_info,
"delivery_location": {
"lat": float(delivery_lat),
"lng": float(delivery_lng),
"address": delivery_address
},
"driver_updated": {
"new_location": f"{delivery_lat}, {delivery_lng}",
"location_updated_at": completion_time.isoformat()
},
"cascading_actions": cascading_actions,
"message": f"Delivery completed! Order {order_id} delivered by {driver_name}. Status: {timing_info.get('status', delivery_status)}. Driver location updated to delivery address."
}
except Exception as e:
logger.error(f"Failed to complete delivery: {e}")
return {
"success": False,
"error": f"Failed to complete delivery: {str(e)}"
}
def handle_fail_delivery(tool_input: dict, user_id: str = None) -> dict:
"""
Mark delivery as failed with mandatory location and reason
Driver must provide current GPS location and failure reason.
Updates driver location to reported coordinates and sets statuses accordingly.
Args:
tool_input: Dict with assignment_id, current_lat, current_lng, failure_reason,
confirm flag, and optional notes
user_id: Authenticated user ID
Returns:
Failure recording result
"""
# Authentication check - allow dev mode (only in non-production environments)
if not user_id:
# SECURITY: Only allow SKIP_AUTH in development environments
env = os.getenv("ENV", "production").lower()
skip_auth = os.getenv("SKIP_AUTH", "false").lower() == "true"
if skip_auth and env != "production":
user_id = "dev-user"
else:
return {
"success": False,
"error": "Authentication required. Please login first.",
"auth_required": True
}
from datetime import datetime
assignment_id = (tool_input.get("assignment_id") or "").strip()
current_address = (tool_input.get("current_address") or "").strip()
current_lat = tool_input.get("current_lat")
current_lng = tool_input.get("current_lng")
failure_reason = (tool_input.get("failure_reason") or "").strip()
confirm = tool_input.get("confirm", False)
notes = (tool_input.get("notes") or "").strip()
# Validation
if not assignment_id:
return {
"success": False,
"error": "assignment_id is required"
}
if not confirm:
return {
"success": False,
"error": "Delivery failure requires confirm=true for safety"
}
if not current_address or current_lat is None or current_lng is None:
return {
"success": False,
"error": "Driver must provide current location (current_address, current_lat, and current_lng required)"
}
if not failure_reason:
return {
"success": False,
"error": "Failure reason is required. Valid reasons: customer_not_available, wrong_address, refused_delivery, damaged_goods, payment_issue, vehicle_breakdown, access_restricted, weather_conditions, other"
}
# Validate failure_reason is one of the allowed values
valid_reasons = [
"customer_not_available",
"wrong_address",
"refused_delivery",
"damaged_goods",
"payment_issue",
"vehicle_breakdown",
"access_restricted",
"weather_conditions",
"other"
]
if failure_reason not in valid_reasons:
return {
"success": False,
"error": f"Invalid failure_reason '{failure_reason}'. Must be one of: {', '.join(valid_reasons)}"
}
# Validate coordinates are valid
try:
current_lat = float(current_lat)
current_lng = float(current_lng)
if not (-90 <= current_lat <= 90) or not (-180 <= current_lng <= 180):
return {
"success": False,
"error": "Invalid GPS coordinates. Latitude must be -90 to 90, longitude must be -180 to 180"
}
except (ValueError, TypeError):
return {
"success": False,
"error": "current_lat and current_lng must be valid numbers"
}
logger.info(f"Failing delivery: assignment_id={assignment_id}, reason={failure_reason}")
try:
conn = get_db_connection()
cursor = conn.cursor()
# Get assignment and order details including timing fields (filtered by user_id)
cursor.execute("""
SELECT
a.status, a.order_id, a.driver_id,
a.delivery_address,
o.customer_name, o.expected_delivery_time, o.sla_grace_period_minutes,
d.name as driver_name
FROM assignments a
JOIN orders o ON a.order_id = o.order_id
JOIN drivers d ON a.driver_id = d.driver_id
WHERE a.assignment_id = %s AND a.user_id = %s
""", (assignment_id, user_id))
assignment_row = cursor.fetchone()
if not assignment_row:
cursor.close()
conn.close()
return {
"success": False,
"error": f"Assignment not found: {assignment_id}"
}
status = assignment_row['status']
order_id = assignment_row['order_id']
driver_id = assignment_row['driver_id']
delivery_address = assignment_row['delivery_address']
customer_name = assignment_row['customer_name']
driver_name = assignment_row['driver_name']
expected_delivery_time = assignment_row['expected_delivery_time']
sla_grace_period_minutes = assignment_row['sla_grace_period_minutes'] or 15
# Validate status
if status not in ["active", "in_progress"]:
cursor.close()
conn.close()
return {
"success": False,
"error": f"Cannot fail delivery: assignment status is '{status}'. Must be 'active' or 'in_progress'."
}
# Current timestamp for failure
failure_time = datetime.now()
# Step 1: Update assignment to failed
update_fields = [
"status = %s",
"failure_reason = %s",
"actual_arrival = %s",
"updated_at = %s"
]
params = ["failed", failure_reason, failure_time, failure_time]
if notes:
update_fields.append("notes = %s")
params.append(notes)
params.append(assignment_id)
cursor.execute(f"""
UPDATE assignments
SET {', '.join(update_fields)}
WHERE assignment_id = %s
""", tuple(params))
# Step 2: Update driver location to reported current location (address provided by user)
cursor.execute("""
UPDATE drivers
SET current_lat = %s,
current_lng = %s,
current_address = %s,
last_location_update = %s,
updated_at = %s
WHERE driver_id = %s
""", (current_lat, current_lng, current_address, failure_time, failure_time, driver_id))
logger.info(f"Driver {driver_id} location updated to reported position: {current_address} ({current_lat}, {current_lng})")
# Step 3: Calculate delivery performance status for failure
delivery_status = "failed_on_time" # Default - failed but before deadline
timing_info = {
"expected_delivery_time": expected_delivery_time.isoformat() if expected_delivery_time else None,
"failure_time": failure_time.isoformat(),
"sla_grace_period_minutes": sla_grace_period_minutes
}
if expected_delivery_time:
if failure_time <= expected_delivery_time:
delivery_status = "failed_on_time"
timing_info["status"] = "Failed before deadline (attempted delivery on time)"
else:
delivery_status = "failed_late"
delay_minutes = int((failure_time - expected_delivery_time).total_seconds() / 60)
timing_info["status"] = f"Failed after deadline (late attempt)"
timing_info["delay_minutes"] = delay_minutes
# Step 4: Update order status to failed with timing info
cursor.execute("""
UPDATE orders
SET status = 'failed',
delivered_at = %s,
delivery_status = %s,
updated_at = %s
WHERE order_id = %s
""", (failure_time, delivery_status, failure_time, order_id))
logger.info(f"Order {order_id} marked as failed with status '{delivery_status}'")
# Step 4: Check if driver has other active assignments
cursor.execute("""
SELECT COUNT(*) as count FROM assignments
WHERE driver_id = %s AND status IN ('active', 'in_progress')
AND assignment_id != %s
""", (driver_id, assignment_id))
other_assignments_count = cursor.fetchone()['count']
# Step 5: If no other active assignments, set driver to active
cascading_actions = []
if other_assignments_count == 0:
cursor.execute("""
UPDATE drivers
SET status = 'active', updated_at = %s
WHERE driver_id = %s
""", (failure_time, driver_id))
cascading_actions.append(f"Driver {driver_name} set to 'active' (no other assignments)")
else:
cascading_actions.append(f"Driver {driver_name} remains 'busy' ({other_assignments_count} other active assignment(s))")
conn.commit()
cursor.close()
conn.close()
logger.info(f"Delivery marked as failed: {assignment_id}")
# Format failure reason for display
reason_display = failure_reason.replace("_", " ").title()
return {
"success": True,
"assignment_id": assignment_id,
"order_id": order_id,
"driver_id": driver_id,
"customer_name": customer_name,
"driver_name": driver_name,
"failed_at": failure_time.isoformat(),
"failure_reason": failure_reason,
"failure_reason_display": reason_display,
"delivery_status": delivery_status,
"timing": timing_info,
"delivery_address": delivery_address,
"driver_location": {
"lat": current_lat,
"lng": current_lng,
"address": current_address,
"updated_at": failure_time.isoformat()
},
"cascading_actions": cascading_actions,
"message": f"Delivery failed for order {order_id}. Reason: {reason_display}. Timing: {timing_info.get('status', delivery_status)}. Driver {driver_name} location updated to {current_address or f'({current_lat}, {current_lng})'}."
}
except Exception as e:
logger.error(f"Failed to record delivery failure: {e}")
return {
"success": False,
"error": f"Failed to record delivery failure: {str(e)}"
}
def get_tools_list() -> list:
"""Get list of available tools"""
return [tool["name"] for tool in TOOLS_SCHEMA]
def get_tool_description(tool_name: str) -> str:
"""Get description for a specific tool"""
for tool in TOOLS_SCHEMA:
if tool["name"] == tool_name:
return tool["description"]
return ""