""" Tool definitions and execution handlers for FleetMind chat Simulates MCP tools using Claude's tool calling feature """ import os import sys from pathlib import Path from datetime import datetime, timedelta import logging from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeoutError # Add parent directory to path sys.path.insert(0, str(Path(__file__).parent.parent)) from database.connection import execute_write, execute_query, get_db_connection from chat.geocoding import GeocodingService, GEOCODING_TIMEOUT from psycopg2.extras import RealDictCursor logger = logging.getLogger(__name__) # Thread pool for running blocking operations (geocoding, external APIs) _blocking_executor = ThreadPoolExecutor(max_workers=5, thread_name_prefix="blocking_ops") # Initialize geocoding service geocoding_service = GeocodingService() def safe_geocode(address: str) -> dict: """ Thread-safe geocoding with timeout protection. Runs geocoding in a thread pool to prevent blocking the main event loop. Args: address: Address to geocode Returns: Geocoding result dict """ try: future = _blocking_executor.submit(geocoding_service.geocode, address) result = future.result(timeout=GEOCODING_TIMEOUT + 2) return result except FuturesTimeoutError: logger.error(f"Geocoding timed out for: {address}, using mock fallback") return geocoding_service._geocode_mock(address) except Exception as e: logger.error(f"Geocoding failed for {address}: {e}, using mock fallback") return geocoding_service._geocode_mock(address) def safe_reverse_geocode(lat: float, lng: float) -> dict: """ Thread-safe reverse geocoding with timeout protection. Runs reverse geocoding in a thread pool to prevent blocking. Args: lat: Latitude lng: Longitude Returns: Reverse geocoding result dict """ try: future = _blocking_executor.submit(geocoding_service.reverse_geocode, lat, lng) result = future.result(timeout=GEOCODING_TIMEOUT + 2) return result except FuturesTimeoutError: logger.error(f"Reverse geocoding timed out for ({lat}, {lng}), using mock fallback") return geocoding_service._reverse_geocode_mock(lat, lng) except Exception as e: logger.error(f"Reverse geocoding failed for ({lat}, {lng}): {e}, using mock fallback") return geocoding_service._reverse_geocode_mock(lat, lng) # Tool schemas for Claude TOOLS_SCHEMA = [ { "name": "geocode_address", "description": "Convert a delivery address to GPS coordinates and validate the address format. Use this before creating an order to ensure the address is valid.", "input_schema": { "type": "object", "properties": { "address": { "type": "string", "description": "The full delivery address to geocode (e.g., '123 Main St, San Francisco, CA')" } }, "required": ["address"] } }, { "name": "create_order", "description": "Create a new delivery order in the database. Only call this after geocoding the address successfully.", "input_schema": { "type": "object", "properties": { "customer_name": { "type": "string", "description": "Full name of the customer" }, "customer_phone": { "type": "string", "description": "Customer phone number (optional)" }, "customer_email": { "type": "string", "description": "Customer email address (optional)" }, "delivery_address": { "type": "string", "description": "Full delivery address" }, "delivery_lat": { "type": "number", "description": "Latitude from geocoding" }, "delivery_lng": { "type": "number", "description": "Longitude from geocoding" }, "time_window_end": { "type": "string", "description": "Delivery deadline in ISO format (e.g., '2025-11-13T17:00:00'). If not specified by user, default to 6 hours from now." }, "priority": { "type": "string", "enum": ["standard", "express", "urgent"], "description": "Delivery priority. Default to 'standard' unless user specifies urgent/express." }, "special_instructions": { "type": "string", "description": "Any special delivery instructions (optional)" }, "weight_kg": { "type": "number", "description": "Package weight in kilograms (optional, default to 5.0)" } }, "required": ["customer_name", "delivery_address", "delivery_lat", "delivery_lng"] } }, { "name": "create_driver", "description": "Create a new delivery driver in the database. Use this to onboard new drivers to the fleet. Requires driver location (address + coordinates).", "input_schema": { "type": "object", "properties": { "name": { "type": "string", "description": "Full name of the driver" }, "phone": { "type": "string", "description": "Driver phone number (optional)" }, "email": { "type": "string", "description": "Driver email address (optional)" }, "vehicle_type": { "type": "string", "description": "Type of vehicle: van, truck, car, motorcycle" }, "vehicle_plate": { "type": "string", "description": "Vehicle license plate number (optional)" }, "current_address": { "type": "string", "description": "Driver's current location address (e.g., '123 Main St, New York, NY')" }, "current_lat": { "type": "number", "description": "Driver's current latitude coordinate" }, "current_lng": { "type": "number", "description": "Driver's current longitude coordinate" }, "capacity_kg": { "type": "number", "description": "Vehicle cargo capacity in kilograms (default: 1000.0)" }, "capacity_m3": { "type": "number", "description": "Vehicle cargo volume in cubic meters (default: 12.0)" }, "skills": { "type": "array", "description": "List of driver skills/certifications: refrigerated, medical_certified, fragile_handler, overnight, express_delivery", "items": { "type": "string" } }, "status": { "type": "string", "enum": ["active", "busy", "offline", "unavailable"], "description": "Driver status (default: active)" } }, "required": ["name", "vehicle_type", "current_address", "current_lat", "current_lng"] } }, { "name": "count_orders", "description": "Count total orders in the database with optional filters. Use this when user asks 'how many orders', 'fetch orders', or wants to know order statistics.", "input_schema": { "type": "object", "properties": { "status": { "type": "string", "enum": ["pending", "assigned", "in_transit", "delivered", "failed", "cancelled"], "description": "Filter by order status (optional)" }, "priority": { "type": "string", "enum": ["standard", "express", "urgent"], "description": "Filter by priority level (optional)" }, "payment_status": { "type": "string", "enum": ["pending", "paid", "cod"], "description": "Filter by payment status (optional)" }, "assigned_driver_id": { "type": "string", "description": "Filter by assigned driver ID (optional)" }, "is_fragile": { "type": "boolean", "description": "Filter fragile packages only (optional)" }, "requires_signature": { "type": "boolean", "description": "Filter orders requiring signature (optional)" }, "requires_cold_storage": { "type": "boolean", "description": "Filter orders requiring cold storage (optional)" } }, "required": [] } }, { "name": "fetch_orders", "description": "Fetch orders from the database with optional filters, pagination, and sorting. Use after counting to show specific number of orders.", "input_schema": { "type": "object", "properties": { "limit": { "type": "integer", "description": "Number of orders to fetch (default: 10, max: 100)" }, "offset": { "type": "integer", "description": "Number of orders to skip for pagination (default: 0)" }, "status": { "type": "string", "enum": ["pending", "assigned", "in_transit", "delivered", "failed", "cancelled"], "description": "Filter by order status (optional)" }, "priority": { "type": "string", "enum": ["standard", "express", "urgent"], "description": "Filter by priority level (optional)" }, "payment_status": { "type": "string", "enum": ["pending", "paid", "cod"], "description": "Filter by payment status (optional)" }, "assigned_driver_id": { "type": "string", "description": "Filter by assigned driver ID (optional)" }, "is_fragile": { "type": "boolean", "description": "Filter fragile packages only (optional)" }, "requires_signature": { "type": "boolean", "description": "Filter orders requiring signature (optional)" }, "requires_cold_storage": { "type": "boolean", "description": "Filter orders requiring cold storage (optional)" }, "sort_by": { "type": "string", "enum": ["created_at", "priority", "time_window_start"], "description": "Field to sort by (default: created_at)" }, "sort_order": { "type": "string", "enum": ["ASC", "DESC"], "description": "Sort order (default: DESC for newest first)" } }, "required": [] } }, { "name": "get_order_details", "description": "Get complete details of a specific order by order ID. Use when user asks 'tell me about order X' or wants detailed information about a specific order.", "input_schema": { "type": "object", "properties": { "order_id": { "type": "string", "description": "The order ID to fetch details for (e.g., 'ORD-20251114163800')" } }, "required": ["order_id"] } }, { "name": "search_orders", "description": "Search for orders by customer name, email, phone, or order ID pattern. Use when user provides partial information to find orders.", "input_schema": { "type": "object", "properties": { "search_term": { "type": "string", "description": "Search term to match against customer_name, customer_email, customer_phone, or order_id" } }, "required": ["search_term"] } }, { "name": "get_incomplete_orders", "description": "Get all orders that are not yet completed (excludes delivered and cancelled orders). Shortcut for finding orders in progress (pending, assigned, in_transit).", "input_schema": { "type": "object", "properties": { "limit": { "type": "integer", "description": "Number of orders to fetch (default: 20)" } }, "required": [] } }, { "name": "count_drivers", "description": "Count total drivers in the database with optional filters. Use this when user asks 'how many drivers', 'show drivers', or wants driver statistics.", "input_schema": { "type": "object", "properties": { "status": { "type": "string", "enum": ["active", "busy", "offline", "unavailable"], "description": "Filter by driver status (optional)" }, "vehicle_type": { "type": "string", "description": "Filter by vehicle type: van, truck, car, motorcycle, etc. (optional)" } }, "required": [] } }, { "name": "fetch_drivers", "description": "Fetch drivers from the database with optional filters, pagination, and sorting. Use after counting to show specific number of drivers.", "input_schema": { "type": "object", "properties": { "limit": { "type": "integer", "description": "Number of drivers to fetch (default: 10, max: 100)" }, "offset": { "type": "integer", "description": "Number of drivers to skip for pagination (default: 0)" }, "status": { "type": "string", "enum": ["active", "busy", "offline", "unavailable"], "description": "Filter by driver status (optional)" }, "vehicle_type": { "type": "string", "description": "Filter by vehicle type: van, truck, car, motorcycle, etc. (optional)" }, "sort_by": { "type": "string", "enum": ["name", "status", "created_at", "last_location_update"], "description": "Field to sort by (default: name)" }, "sort_order": { "type": "string", "enum": ["ASC", "DESC"], "description": "Sort order (default: ASC for alphabetical)" } }, "required": [] } }, { "name": "get_driver_details", "description": "Get complete details of a specific driver by driver ID, including current location (latitude, longitude, and human-readable address), contact info, vehicle details, status, and skills. Use when user asks about a driver's location, coordinates, position, or any other driver information.", "input_schema": { "type": "object", "properties": { "driver_id": { "type": "string", "description": "The driver ID to fetch details for (e.g., 'DRV-20251114163800')" } }, "required": ["driver_id"] } }, { "name": "search_drivers", "description": "Search for drivers by name, email, phone, vehicle plate, or driver ID pattern. Use when user provides partial information to find drivers.", "input_schema": { "type": "object", "properties": { "search_term": { "type": "string", "description": "Search term to match against name, email, phone, vehicle_plate, or driver_id" } }, "required": ["search_term"] } }, { "name": "get_available_drivers", "description": "Get all drivers that are available for assignment (active or offline status, excludes busy and unavailable). Shortcut for finding drivers ready for dispatch.", "input_schema": { "type": "object", "properties": { "limit": { "type": "integer", "description": "Number of drivers to fetch (default: 20)" } }, "required": [] } }, { "name": "update_order", "description": "Update an existing order's details. You can update any combination of fields. Only provide the fields you want to change.", "input_schema": { "type": "object", "properties": { "order_id": { "type": "string", "description": "Order ID to update (e.g., 'ORD-20250114123456')" }, "customer_name": { "type": "string", "description": "Updated customer name" }, "customer_phone": { "type": "string", "description": "Updated customer phone number" }, "customer_email": { "type": "string", "description": "Updated customer email address" }, "delivery_address": { "type": "string", "description": "Updated delivery address" }, "delivery_lat": { "type": "number", "description": "Updated delivery latitude (required if updating address)" }, "delivery_lng": { "type": "number", "description": "Updated delivery longitude (required if updating address)" }, "status": { "type": "string", "description": "Updated order status", "enum": ["pending", "assigned", "in_transit", "delivered", "failed", "cancelled"] }, "priority": { "type": "string", "description": "Updated priority level", "enum": ["standard", "express", "urgent"] }, "special_instructions": { "type": "string", "description": "Updated special delivery instructions" }, "time_window_end": { "type": "string", "description": "Updated delivery deadline (ISO format datetime)" }, "payment_status": { "type": "string", "description": "Updated payment status", "enum": ["pending", "paid", "cod"] }, "weight_kg": { "type": "number", "description": "Updated package weight in kilograms" }, "order_value": { "type": "number", "description": "Updated order value in currency" } }, "required": ["order_id"] } }, { "name": "delete_order", "description": "Permanently delete an order from the database. This action cannot be undone. Use with caution.", "input_schema": { "type": "object", "properties": { "order_id": { "type": "string", "description": "Order ID to delete (e.g., 'ORD-20250114123456')" }, "confirm": { "type": "boolean", "description": "Must be set to true to confirm deletion" } }, "required": ["order_id", "confirm"] } }, { "name": "update_driver", "description": "Update an existing driver's details. You can update any combination of fields. Only provide the fields you want to change.", "input_schema": { "type": "object", "properties": { "driver_id": { "type": "string", "description": "Driver ID to update (e.g., 'DRV-20250114123456')" }, "name": { "type": "string", "description": "Updated driver name" }, "phone": { "type": "string", "description": "Updated phone number" }, "email": { "type": "string", "description": "Updated email address" }, "status": { "type": "string", "description": "Updated driver status", "enum": ["active", "busy", "offline", "unavailable"] }, "vehicle_type": { "type": "string", "description": "Updated vehicle type" }, "vehicle_plate": { "type": "string", "description": "Updated vehicle license plate" }, "capacity_kg": { "type": "number", "description": "Updated cargo capacity in kilograms" }, "capacity_m3": { "type": "number", "description": "Updated cargo capacity in cubic meters" }, "skills": { "type": "array", "items": {"type": "string"}, "description": "Updated list of driver skills/certifications" }, "current_address": { "type": "string", "description": "Updated current location address (e.g., '123 Main St, New York, NY')" }, "current_lat": { "type": "number", "description": "Updated current latitude" }, "current_lng": { "type": "number", "description": "Updated current longitude" } }, "required": ["driver_id"] } }, { "name": "delete_driver", "description": "Permanently delete a driver from the database. This action cannot be undone. Use with caution.", "input_schema": { "type": "object", "properties": { "driver_id": { "type": "string", "description": "Driver ID to delete (e.g., 'DRV-20250114123456')" }, "confirm": { "type": "boolean", "description": "Must be set to true to confirm deletion" } }, "required": ["driver_id", "confirm"] } } ] def execute_tool(tool_name: str, tool_input: dict) -> dict: """ Route tool execution to appropriate handler Args: tool_name: Name of the tool to execute tool_input: Tool input parameters Returns: Dict with tool execution results """ try: if tool_name == "geocode_address": return handle_geocode_address(tool_input) elif tool_name == "create_order": return handle_create_order(tool_input) elif tool_name == "create_driver": return handle_create_driver(tool_input) elif tool_name == "count_orders": return handle_count_orders(tool_input) elif tool_name == "fetch_orders": return handle_fetch_orders(tool_input) elif tool_name == "get_order_details": return handle_get_order_details(tool_input) elif tool_name == "search_orders": return handle_search_orders(tool_input) elif tool_name == "get_incomplete_orders": return handle_get_incomplete_orders(tool_input) elif tool_name == "count_drivers": return handle_count_drivers(tool_input) elif tool_name == "fetch_drivers": return handle_fetch_drivers(tool_input) elif tool_name == "get_driver_details": return handle_get_driver_details(tool_input) elif tool_name == "search_drivers": return handle_search_drivers(tool_input) elif tool_name == "get_available_drivers": return handle_get_available_drivers(tool_input) elif tool_name == "update_order": return handle_update_order(tool_input) elif tool_name == "delete_order": return handle_delete_order(tool_input) elif tool_name == "update_driver": return handle_update_driver(tool_input) elif tool_name == "delete_driver": return handle_delete_driver(tool_input) else: return { "success": False, "error": f"Unknown tool: {tool_name}" } except Exception as e: logger.error(f"Tool execution error ({tool_name}): {e}") return { "success": False, "error": str(e) } def handle_geocode_address(tool_input: dict) -> dict: """ Execute geocoding tool Args: tool_input: Dict with 'address' key Returns: Geocoding result """ address = tool_input.get("address", "") if not address: return { "success": False, "error": "Address is required" } logger.info(f"Geocoding address: {address}") # Use safe_geocode with timeout protection result = safe_geocode(address) return { "success": True, "latitude": result["lat"], "longitude": result["lng"], "formatted_address": result["formatted_address"], "confidence": result["confidence"], "message": f"Address geocoded successfully ({result['confidence']})" } def handle_calculate_route(tool_input: dict) -> dict: """ Execute route calculation tool Args: tool_input: Dict with origin, destination, mode, vehicle_type, alternatives, include_steps Returns: Route calculation result with distance, duration, and optional directions """ import math from datetime import datetime origin = tool_input.get("origin", "") destination = tool_input.get("destination", "") mode = tool_input.get("mode", "driving") vehicle_type = tool_input.get("vehicle_type", "car") alternatives = tool_input.get("alternatives", False) include_steps = tool_input.get("include_steps", False) if not origin or not destination: return { "success": False, "error": "Both origin and destination are required" } # Map vehicle type to travel mode VEHICLE_TYPE_TO_MODE = { "motorcycle": "TWO_WHEELER", # Use proper TWO_WHEELER mode for motorcycle-specific routing "bicycle": "bicycling", "car": "driving", "van": "driving", "truck": "driving" # Note: No truck-specific routing available in API } # Override mode if vehicle_type is provided if vehicle_type in VEHICLE_TYPE_TO_MODE: mode = VEHICLE_TYPE_TO_MODE[vehicle_type] logger.info(f"Vehicle type '{vehicle_type}' mapped to mode '{mode}'") logger.info(f"Calculating route: {origin} → {destination} (mode: {mode}, vehicle: {vehicle_type})") # Triple fallback: Routes API → Directions API → Mock if geocoding_service.use_mock: logger.info("Using mock route calculation (no API key)") result = _calculate_route_mock(origin, destination, mode) else: try: # Try Routes API first (recommended, more accurate) logger.info("Attempting Routes API (recommended)") result = _calculate_route_routes_api(origin, destination, mode, alternatives, include_steps, vehicle_type, tool_input) except Exception as e: logger.warning(f"Routes API failed: {e}") try: # Fall back to Directions API (legacy) logger.info("Falling back to Directions API (legacy)") result = _calculate_route_google(origin, destination, mode, alternatives, include_steps) except Exception as e2: # Fall back to mock calculation logger.error(f"Directions API also failed: {e2}, falling back to mock") result = _calculate_route_mock(origin, destination, mode) # Add vehicle type to result for use in intelligent routing result["vehicle_type"] = vehicle_type return result def _calculate_route_google(origin: str, destination: str, mode: str, alternatives: bool, include_steps: bool) -> dict: """Calculate route using Google Maps Directions API""" try: # Map our mode to Google Maps mode mode_mapping = { "driving": "driving", "walking": "walking", "bicycling": "bicycling", "transit": "transit" } gmaps_mode = mode_mapping.get(mode, "driving") # Call Google Maps Directions API result = geocoding_service.gmaps_client.directions( origin=origin, destination=destination, mode=gmaps_mode, alternatives=alternatives, departure_time="now" # Get real-time traffic data ) if not result: logger.warning(f"Google Maps Directions API found no routes for: {origin} → {destination}") return _calculate_route_mock(origin, destination, mode) # Get first (best) route route = result[0] leg = route['legs'][0] # First leg (direct route) # Extract route information distance_meters = leg['distance']['value'] distance_text = leg['distance']['text'] duration_seconds = leg['duration']['value'] duration_text = leg['duration']['text'] # Get traffic-aware duration if available duration_in_traffic = leg.get('duration_in_traffic') if duration_in_traffic: traffic_duration_seconds = duration_in_traffic['value'] traffic_duration_text = duration_in_traffic['text'] else: traffic_duration_seconds = duration_seconds traffic_duration_text = duration_text # Get route summary route_summary = route.get('summary', 'Via main roads') # Prepare response response = { "success": True, "origin": leg['start_address'], "destination": leg['end_address'], "distance": { "meters": distance_meters, "text": distance_text }, "duration": { "seconds": duration_seconds, "text": duration_text }, "duration_in_traffic": { "seconds": traffic_duration_seconds, "text": traffic_duration_text }, "mode": mode, "route_summary": route_summary, "confidence": "high (Google Maps API)" } # Add turn-by-turn steps if requested if include_steps and 'steps' in leg: steps = [] for step in leg['steps']: steps.append({ "instruction": step.get('html_instructions', '').replace('', '').replace('', ''), "distance": step['distance']['text'], "duration": step['duration']['text'] }) response["steps"] = steps response["total_steps"] = len(steps) # Add alternative routes if requested if alternatives and len(result) > 1: alt_routes = [] for alt_route in result[1:]: # Skip first route (already returned) alt_leg = alt_route['legs'][0] alt_routes.append({ "route_summary": alt_route.get('summary', 'Alternative route'), "distance": alt_leg['distance']['text'], "duration": alt_leg['duration']['text'] }) response["alternatives"] = alt_routes response["alternatives_count"] = len(alt_routes) logger.info(f"Route calculated: {distance_text}, {traffic_duration_text}") return response except Exception as e: logger.error(f"Google Maps Directions API error: {e}") raise def _location_to_latlng(location: str) -> dict: """ Convert location (address or coordinates) to lat/lng dict for Routes API Args: location: Either an address string or "lat,lng" coordinates Returns: Dict with {"latitude": float, "longitude": float} """ # Check if already in "lat,lng" format if ',' in location: parts = location.split(',') if len(parts) == 2: try: lat = float(parts[0].strip()) lng = float(parts[1].strip()) return {"latitude": lat, "longitude": lng} except ValueError: pass # Not valid coordinates, treat as address # Geocode the address using safe version with timeout geocoded = safe_geocode(location) return { "latitude": geocoded["lat"], "longitude": geocoded["lng"] } def _calculate_route_routes_api(origin: str, destination: str, mode: str, alternatives: bool, include_steps: bool, vehicle_type: str = "car", tool_input: dict = None) -> dict: """ Calculate route using Google Routes API (new, recommended) This uses the modern Routes API which provides better accuracy, real-time traffic data, vehicle-specific routing, and additional features. Args: origin: Starting location (address or "lat,lng") destination: Ending location (address or "lat,lng") mode: Travel mode (driving, walking, bicycling, transit, TWO_WHEELER) alternatives: Whether to return alternative routes include_steps: Whether to include turn-by-turn directions vehicle_type: Vehicle type (motorcycle, bicycle, car, van, truck) tool_input: Original tool input dict for route modifiers Returns: Route calculation result dict with vehicle-specific data """ if tool_input is None: tool_input = {} import requests import re try: # Convert locations to lat/lng origin_latlng = _location_to_latlng(origin) dest_latlng = _location_to_latlng(destination) # Map travel modes to Routes API format mode_mapping = { "driving": "DRIVE", "walking": "WALK", "bicycling": "BICYCLE", "transit": "TRANSIT", "TWO_WHEELER": "TWO_WHEELER" # Motorcycle-specific routing } routes_mode = mode_mapping.get(mode, "DRIVE") # Prepare API request url = "https://routes.googleapis.com/directions/v2:computeRoutes" # Build enhanced field mask for vehicle-specific data field_mask_parts = [ "routes.duration", "routes.staticDuration", # Duration without traffic "routes.distanceMeters", "routes.polyline.encodedPolyline", "routes.legs", "routes.description", "routes.localizedValues", "routes.routeLabels", # Get route type labels (FUEL_EFFICIENT, etc.) "routes.travelAdvisory.speedReadingIntervals", # Traffic segments "routes.travelAdvisory.tollInfo" # Toll information ] # Add fuel consumption for DRIVE mode if routes_mode == "DRIVE": field_mask_parts.append("routes.travelAdvisory.fuelConsumptionMicroliters") headers = { "Content-Type": "application/json", "X-Goog-Api-Key": geocoding_service.google_maps_key, "X-Goog-FieldMask": ",".join(field_mask_parts) } # Build request body body = { "origin": { "location": { "latLng": origin_latlng } }, "destination": { "location": { "latLng": dest_latlng } }, "travelMode": routes_mode, "computeAlternativeRoutes": alternatives, "languageCode": "en-US", "units": "METRIC" } # Add routing preference only for DRIVE and TWO_WHEELER (not for WALK/BICYCLE) if routes_mode in ["DRIVE", "TWO_WHEELER"]: body["routingPreference"] = "TRAFFIC_AWARE" # Add route modifiers based on vehicle type route_modifiers = {} # Vehicle emission type for DRIVE mode (cars, vans, trucks) if routes_mode == "DRIVE": emission_type = tool_input.get("emission_type", "GASOLINE").upper() if emission_type in ["GASOLINE", "ELECTRIC", "HYBRID", "DIESEL"]: route_modifiers["vehicleInfo"] = { "emissionType": emission_type } # Avoid options (applicable to DRIVE and TWO_WHEELER) if routes_mode in ["DRIVE", "TWO_WHEELER"]: if tool_input.get("avoid_tolls", False): route_modifiers["avoidTolls"] = True if tool_input.get("avoid_highways", False): route_modifiers["avoidHighways"] = True if tool_input.get("avoid_ferries", False): route_modifiers["avoidFerries"] = True if route_modifiers: body["routeModifiers"] = route_modifiers # Add extra computations for enhanced data extra_computations = [] # Traffic data for DRIVE and TWO_WHEELER if routes_mode in ["DRIVE", "TWO_WHEELER"]: extra_computations.append("TRAFFIC_ON_POLYLINE") # Toll information (unless avoiding tolls) if not tool_input.get("avoid_tolls", False): extra_computations.append("TOLLS") # Fuel consumption for DRIVE mode only if routes_mode == "DRIVE": extra_computations.append("FUEL_CONSUMPTION") if extra_computations: body["extraComputations"] = extra_computations # Request fuel-efficient alternative for DRIVE mode if routes_mode == "DRIVE" and tool_input.get("request_fuel_efficient", False): body["requestedReferenceRoutes"] = ["FUEL_EFFICIENT"] # Make API request logger.info(f"Calling Routes API: {origin} → {destination} (mode: {routes_mode})") response = requests.post(url, headers=headers, json=body, timeout=10) if response.status_code != 200: logger.error(f"Routes API error: {response.status_code} - {response.text}") raise Exception(f"Routes API returned {response.status_code}: {response.text[:200]}") data = response.json() if not data.get("routes"): logger.warning(f"Routes API found no routes for: {origin} → {destination}") return _calculate_route_google(origin, destination, mode, alternatives, include_steps) # Get first (best) route route = data["routes"][0] # Extract distance distance_meters = route.get("distanceMeters", 0) if distance_meters >= 1000: distance_text = f"{distance_meters/1000:.1f} km" else: distance_text = f"{distance_meters} m" # Helper function to format duration def format_duration(seconds): hours = seconds // 3600 minutes = (seconds % 3600) // 60 if hours > 0: return f"{hours} hour{'s' if hours > 1 else ''} {minutes} min{'s' if minutes != 1 else ''}" else: return f"{minutes} min{'s' if minutes != 1 else ''}" # Extract duration WITH traffic (format: "123s" or "123.456s") duration_str = route.get("duration", "0s") duration_with_traffic_seconds = int(float(re.sub(r'[^\d.]', '', duration_str))) # Extract static duration (WITHOUT traffic) static_duration_str = route.get("staticDuration", duration_str) static_duration_seconds = int(float(re.sub(r'[^\d.]', '', static_duration_str))) # Calculate traffic delay traffic_delay_seconds = duration_with_traffic_seconds - static_duration_seconds # Get route description/summary and labels route_summary = route.get("description", "Route via Routes API") route_labels = route.get("routeLabels", []) # Extract travel advisory information travel_advisory = route.get("travelAdvisory", {}) # Toll information toll_info = travel_advisory.get("tollInfo") has_tolls = toll_info is not None # Fuel consumption (DRIVE mode only) fuel_consumption_ml = travel_advisory.get("fuelConsumptionMicroliters") fuel_consumption_liters = None if fuel_consumption_ml: fuel_consumption_liters = float(fuel_consumption_ml) / 1_000_000 # Traffic segments speed_intervals = travel_advisory.get("speedReadingIntervals", []) has_traffic_data = len(speed_intervals) > 0 # Get origin and destination addresses (geocode if needed) - use safe version with timeout origin_geocoded = safe_geocode(origin) dest_geocoded = safe_geocode(destination) # Build enhanced response with vehicle-specific data response_data = { "success": True, "origin": origin_geocoded["formatted_address"], "destination": dest_geocoded["formatted_address"], "distance": { "meters": distance_meters, "text": distance_text }, "duration": { "seconds": static_duration_seconds, "text": format_duration(static_duration_seconds) }, "duration_in_traffic": { "seconds": duration_with_traffic_seconds, "text": format_duration(duration_with_traffic_seconds) }, "traffic_delay": { "seconds": traffic_delay_seconds, "text": format_duration(traffic_delay_seconds) if traffic_delay_seconds > 0 else "No delay" }, "mode": mode, "vehicle_type": vehicle_type, "route_summary": route_summary, "route_labels": route_labels, "confidence": "high (Routes API with real-time traffic)" } # Add toll information if available if has_tolls: response_data["toll_info"] = { "has_tolls": True, "details": "Toll roads on route" } else: response_data["toll_info"] = {"has_tolls": False} # Add fuel consumption if available (DRIVE mode) if fuel_consumption_liters is not None: response_data["fuel_consumption"] = { "liters": round(fuel_consumption_liters, 2), "text": f"{fuel_consumption_liters:.2f} L" } # Add traffic data availability indicator if has_traffic_data: response_data["traffic_data_available"] = True response_data["traffic_segments_count"] = len(speed_intervals) # Add beta warnings for specific modes if routes_mode == "TWO_WHEELER": response_data["warning"] = ( "Motorcycle routing uses TWO_WHEELER mode (beta). " "May occasionally miss clear paths. Billed at higher rate." ) elif routes_mode == "BICYCLE": response_data["warning"] = ( "Bicycle routing is in beta and may occasionally miss clear bike paths." ) # Add turn-by-turn steps if requested if include_steps and route.get("legs"): steps = [] for leg in route["legs"]: if leg.get("steps"): for step in leg["steps"]: # Routes API has different step format, adapt as needed steps.append({ "instruction": step.get("navigationInstruction", {}).get("instructions", "Continue"), "distance": step.get("distanceMeters", 0), "duration": step.get("staticDuration", "0s") }) if steps: response_data["steps"] = steps response_data["steps_count"] = len(steps) # Add alternative routes if requested and available if alternatives and len(data["routes"]) > 1: alt_routes = [] for alt_route in data["routes"][1:]: alt_distance = alt_route.get("distanceMeters", 0) alt_duration_str = alt_route.get("duration", "0s") alt_duration_sec = int(float(re.sub(r'[^\d.]', '', alt_duration_str))) alt_hours = alt_duration_sec // 3600 alt_minutes = (alt_duration_sec % 3600) // 60 if alt_hours > 0: alt_duration_text = f"{alt_hours} hour{'s' if alt_hours > 1 else ''} {alt_minutes} min" else: alt_duration_text = f"{alt_minutes} min" alt_routes.append({ "route_summary": alt_route.get("description", "Alternative route"), "distance": f"{alt_distance/1000:.1f} km" if alt_distance >= 1000 else f"{alt_distance} m", "duration": alt_duration_text }) response_data["alternatives"] = alt_routes response_data["alternatives_count"] = len(alt_routes) logger.info(f"Routes API: {distance_text}, {format_duration(duration_with_traffic_seconds)}") return response_data except Exception as e: logger.error(f"Routes API error: {e}") raise # City-specific traffic profiles for realistic routing CITY_PROFILES = { "dhaka": { "name": "Dhaka, Bangladesh", "peak_speed_kmh": 8, # 8 km/h during peak hours (7-10 AM, 5-9 PM) "offpeak_speed_kmh": 18, # 18 km/h during off-peak hours "night_speed_kmh": 25, # 25 km/h at night (10 PM - 6 AM) "signals_per_km": 4, # 4 traffic signals per km in urban areas "signal_delay_sec": 50, # 50 seconds average per signal "intersection_delay_per_km": 30, # 30 seconds per km for intersections "congestion_multiplier": 2.5, # Heavy congestion factor "keywords": ["dhaka", "bangladesh"] }, "default": { "name": "Default Urban Area", "peak_speed_kmh": 20, # 20 km/h during peak hours "offpeak_speed_kmh": 30, # 30 km/h during off-peak hours "night_speed_kmh": 40, # 40 km/h at night "signals_per_km": 2, # 2 traffic signals per km "signal_delay_sec": 45, # 45 seconds average per signal "intersection_delay_per_km": 20, # 20 seconds per km "congestion_multiplier": 1.5, # Moderate congestion "keywords": [] } } def _calculate_route_mock(origin: str, destination: str, mode: str) -> dict: """Mock route calculation with realistic urban traffic modeling""" import math from datetime import datetime # Try to geocode both locations to get coordinates (using safe version) try: origin_geocoded = safe_geocode(origin) dest_geocoded = safe_geocode(destination) origin_lat = origin_geocoded["lat"] origin_lng = origin_geocoded["lng"] dest_lat = dest_geocoded["lat"] dest_lng = dest_geocoded["lng"] # Detect city from destination address dest_address_lower = dest_geocoded["formatted_address"].lower() city_profile = CITY_PROFILES["default"] for city_key, profile in CITY_PROFILES.items(): if city_key != "default": for keyword in profile["keywords"]: if keyword in dest_address_lower: city_profile = profile logger.info(f"Detected city: {profile['name']}") break if city_profile != CITY_PROFILES["default"]: break # Detect time of day current_hour = datetime.now().hour if 7 <= current_hour < 10 or 17 <= current_hour < 21: time_period = "peak" speed_kmh = city_profile["peak_speed_kmh"] elif 22 <= current_hour or current_hour < 6: time_period = "night" speed_kmh = city_profile["night_speed_kmh"] else: time_period = "offpeak" speed_kmh = city_profile["offpeak_speed_kmh"] logger.info(f"Time period: {time_period}, base speed: {speed_kmh} km/h") # Calculate straight-line distance using Haversine formula R = 6371000 # Earth radius in meters phi1 = math.radians(origin_lat) phi2 = math.radians(dest_lat) delta_phi = math.radians(dest_lat - origin_lat) delta_lambda = math.radians(dest_lng - origin_lng) a = math.sin(delta_phi/2)**2 + math.cos(phi1) * math.cos(phi2) * math.sin(delta_lambda/2)**2 c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a)) distance_meters = R * c # Estimate driving distance based on mode if mode == "driving": distance_meters *= 1.3 # 30% longer for road network speed_mps = speed_kmh / 3.6 # Convert km/h to m/s elif mode == "walking": distance_meters *= 1.2 speed_mps = 1.4 # ~5 km/h walking speed elif mode == "bicycling": distance_meters *= 1.25 speed_mps = 4.5 # ~16 km/h cycling speed elif mode == "transit": distance_meters *= 1.4 speed_mps = 8.9 # ~32 km/h transit speed else: speed_mps = speed_kmh / 3.6 # Calculate base duration from speed base_duration_seconds = int(distance_meters / speed_mps) # Add realistic urban delays for driving mode traffic_duration_seconds = base_duration_seconds if mode == "driving": distance_km = distance_meters / 1000.0 # Add traffic signal delays num_signals = int(distance_km * city_profile["signals_per_km"]) signal_delay = num_signals * city_profile["signal_delay_sec"] # Add intersection delays intersection_delay = int(distance_km * city_profile["intersection_delay_per_km"]) # Apply congestion multiplier for peak hours if time_period == "peak": congestion_delay = int(base_duration_seconds * (city_profile["congestion_multiplier"] - 1.0)) else: congestion_delay = 0 # Calculate total traffic-aware duration traffic_duration_seconds = base_duration_seconds + signal_delay + intersection_delay + congestion_delay # Apply minimum travel time (2 minutes) MIN_TRAVEL_TIME = 120 if traffic_duration_seconds < MIN_TRAVEL_TIME: traffic_duration_seconds = MIN_TRAVEL_TIME logger.info(f"Urban delays - Signals: {signal_delay}s, Intersections: {intersection_delay}s, Congestion: {congestion_delay}s") # Format distance if distance_meters >= 1000: distance_text = f"{distance_meters/1000:.1f} km" else: distance_text = f"{int(distance_meters)} m" # Format base duration hours = base_duration_seconds // 3600 minutes = (base_duration_seconds % 3600) // 60 if hours > 0: base_duration_text = f"{hours} hour{'s' if hours > 1 else ''} {minutes} min{'s' if minutes != 1 else ''}" else: base_duration_text = f"{minutes} min{'s' if minutes != 1 else ''}" # Format traffic-aware duration hours = traffic_duration_seconds // 3600 minutes = (traffic_duration_seconds % 3600) // 60 if hours > 0: traffic_duration_text = f"{hours} hour{'s' if hours > 1 else ''} {minutes} min{'s' if minutes != 1 else ''}" else: traffic_duration_text = f"{minutes} min{'s' if minutes != 1 else ''}" logger.info(f"Mock route calculated: {distance_text}, {traffic_duration_text} (base: {base_duration_text}, city: {city_profile['name']})") return { "success": True, "origin": origin_geocoded["formatted_address"], "destination": dest_geocoded["formatted_address"], "distance": { "meters": int(distance_meters), "text": distance_text }, "duration": { "seconds": base_duration_seconds, "text": base_duration_text }, "duration_in_traffic": { "seconds": traffic_duration_seconds, "text": traffic_duration_text }, "mode": mode, "route_summary": f"Direct route via {city_profile['name']} ({time_period} traffic)", "confidence": "low (mock calculation with urban traffic modeling)" } except Exception as e: logger.error(f"Mock route calculation failed: {e}") return { "success": False, "error": f"Could not calculate route: {str(e)}" } def handle_create_order(tool_input: dict, user_id: str = None) -> dict: """ Execute order creation tool Args: tool_input: Dict with order fields (expected_delivery_time now REQUIRED) user_id: ID of authenticated user creating the order Returns: Order creation result """ # Authentication check - allow dev mode (only in non-production environments) if not user_id: # Development mode - use default user # SECURITY: Only allow SKIP_AUTH in development environments env = os.getenv("ENV", "production").lower() skip_auth = os.getenv("SKIP_AUTH", "false").lower() == "true" if skip_auth and env != "production": user_id = "dev-user" else: return { "success": False, "error": "Authentication required. Please login first.", "auth_required": True } # Extract fields with defaults customer_name = tool_input.get("customer_name") customer_phone = tool_input.get("customer_phone") customer_email = tool_input.get("customer_email") delivery_address = tool_input.get("delivery_address") delivery_lat = tool_input.get("delivery_lat") delivery_lng = tool_input.get("delivery_lng") expected_delivery_time_str = tool_input.get("expected_delivery_time") priority = tool_input.get("priority", "standard") special_instructions = tool_input.get("special_instructions") weight_kg = tool_input.get("weight_kg", 5.0) volume_m3 = tool_input.get("volume_m3", 1.0) is_fragile = tool_input.get("is_fragile", False) requires_cold_storage = tool_input.get("requires_cold_storage", False) requires_signature = tool_input.get("requires_signature", False) sla_grace_period_minutes = tool_input.get("sla_grace_period_minutes", 15) # Validate required fields (expected_delivery_time is now MANDATORY) if not all([customer_name, delivery_address, delivery_lat, delivery_lng, expected_delivery_time_str]): return { "success": False, "error": "Missing required fields: customer_name, delivery_address, delivery_lat, delivery_lng, expected_delivery_time" } # Generate order ID with microseconds to prevent collisions now = datetime.now() order_id = f"ORD-{now.strftime('%Y%m%d%H%M%S%f')[:18]}" # YYYYMMDDHHMMSSμμμμμμ (18 chars) # Parse and validate expected_delivery_time try: expected_delivery_time = datetime.fromisoformat(expected_delivery_time_str.replace('Z', '+00:00')) # Validate it's in the future if expected_delivery_time <= now: return { "success": False, "error": f"expected_delivery_time must be in the future. Provided: {expected_delivery_time_str}, Current time: {now.isoformat()}" } except (ValueError, AttributeError) as e: return { "success": False, "error": f"Invalid expected_delivery_time format. Must be ISO 8601 format (e.g., '2025-11-15T18:00:00'). Error: {str(e)}" } # Handle time window (kept for backward compatibility) time_window_end_str = tool_input.get("time_window_end") if time_window_end_str: try: time_window_end = datetime.fromisoformat(time_window_end_str.replace('Z', '+00:00')) except: time_window_end = expected_delivery_time # Use expected time as fallback else: time_window_end = expected_delivery_time # Default to expected delivery time time_window_start = now + timedelta(hours=2) # Insert into database query = """ INSERT INTO orders ( order_id, customer_name, customer_phone, customer_email, delivery_address, delivery_lat, delivery_lng, time_window_start, time_window_end, expected_delivery_time, priority, weight_kg, volume_m3, is_fragile, requires_cold_storage, requires_signature, status, special_instructions, sla_grace_period_minutes, user_id ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ params = ( order_id, customer_name, customer_phone, customer_email, delivery_address, delivery_lat, delivery_lng, time_window_start, time_window_end, expected_delivery_time, priority, weight_kg, volume_m3, is_fragile, requires_cold_storage, requires_signature, "pending", special_instructions, sla_grace_period_minutes, user_id # Add user_id to track ownership ) try: execute_write(query, params) logger.info(f"Order created: {order_id}, expected delivery: {expected_delivery_time.strftime('%Y-%m-%d %H:%M')}") return { "success": True, "order_id": order_id, "status": "pending", "customer": customer_name, "address": delivery_address, "expected_delivery": expected_delivery_time.strftime("%Y-%m-%d %H:%M"), "sla_grace_period_minutes": sla_grace_period_minutes, "priority": priority, "message": f"Order {order_id} created successfully! Expected delivery: {expected_delivery_time.strftime('%Y-%m-%d %H:%M')}" } except Exception as e: logger.error(f"Database error creating order: {e}") return { "success": False, "error": f"Failed to create order: {str(e)}" } def handle_create_driver(tool_input: dict, user_id: str = None) -> dict: """ Execute driver creation tool Args: tool_input: Dict with driver fields user_id: ID of authenticated user creating the driver Returns: Driver creation result """ # Authentication check - allow dev mode (only in non-production environments) if not user_id: # SECURITY: Only allow SKIP_AUTH in development environments env = os.getenv("ENV", "production").lower() skip_auth = os.getenv("SKIP_AUTH", "false").lower() == "true" if skip_auth and env != "production": user_id = "dev-user" else: return { "success": False, "error": "Authentication required. Please login first.", "auth_required": True } # Extract fields with defaults name = tool_input.get("name") phone = tool_input.get("phone") email = tool_input.get("email") vehicle_type = tool_input.get("vehicle_type") # No default - REQUIRED vehicle_plate = tool_input.get("vehicle_plate") capacity_kg = tool_input.get("capacity_kg", 1000.0) capacity_m3 = tool_input.get("capacity_m3", 12.0) current_lat = tool_input.get("current_lat") # No default - REQUIRED current_lng = tool_input.get("current_lng") # No default - REQUIRED current_address = tool_input.get("current_address") # No default - REQUIRED # Convert skills to regular list (handles protobuf RepeatedComposite) skills_raw = tool_input.get("skills", []) skills = list(skills_raw) if skills_raw else [] # Define valid skills VALID_SKILLS = [ "refrigerated", "medical_certified", "fragile_handler", "overnight", "express_delivery" ] # Validate skills if skills: invalid_skills = [s for s in skills if s not in VALID_SKILLS] if invalid_skills: return { "success": False, "error": f"Invalid skills: {invalid_skills}. Valid skills are: {VALID_SKILLS}" } status = tool_input.get("status", "active") # Validate ALL required fields (name, vehicle_type, current_address, current_lat, current_lng) if not all([name, vehicle_type, current_address, current_lat is not None, current_lng is not None]): return { "success": False, "error": "Missing required fields: name, vehicle_type, current_address, current_lat, current_lng. All fields are mandatory." } # Validate coordinates are valid numbers try: current_lat = float(current_lat) current_lng = float(current_lng) except (ValueError, TypeError): return { "success": False, "error": "current_lat and current_lng must be valid numbers" } # Validate coordinates are within valid ranges if not (-90 <= current_lat <= 90): return { "success": False, "error": f"Invalid latitude {current_lat}. Must be between -90 and 90" } if not (-180 <= current_lng <= 180): return { "success": False, "error": f"Invalid longitude {current_lng}. Must be between -180 and 180" } # Generate driver ID with microseconds to prevent collisions now = datetime.now() driver_id = f"DRV-{now.strftime('%Y%m%d%H%M%S%f')[:18]}" # YYYYMMDDHHMMSSμμμμμμ (18 chars) # Insert into database query = """ INSERT INTO drivers ( driver_id, name, phone, email, current_lat, current_lng, current_address, last_location_update, status, vehicle_type, vehicle_plate, capacity_kg, capacity_m3, skills, user_id ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ # Convert skills list to JSON import json skills_json = json.dumps(skills) if skills else json.dumps([]) params = ( driver_id, name, phone, email, current_lat, current_lng, current_address, now, status, vehicle_type, vehicle_plate, capacity_kg, capacity_m3, skills_json, user_id # Add user_id to track ownership ) try: execute_write(query, params) logger.info(f"Driver created: {driver_id}") return { "success": True, "driver_id": driver_id, "name": name, "phone": phone, "status": status, "vehicle_type": vehicle_type, "vehicle_plate": vehicle_plate, "capacity_kg": capacity_kg, "skills": skills, "location": { "latitude": current_lat, "longitude": current_lng, "address": current_address }, "message": f"Driver {driver_id} ({name}) created successfully!" } except Exception as e: logger.error(f"Database error creating driver: {e}") return { "success": False, "error": f"Failed to create driver: {str(e)}" } def handle_update_order(tool_input: dict, user_id: str = None) -> dict: """ Execute order update tool with assignment cascading logic Args: tool_input: Dict with order_id and fields to update user_id: Authenticated user ID Returns: Update result """ # Authentication check - allow dev mode (only in non-production environments) if not user_id: # SECURITY: Only allow SKIP_AUTH in development environments env = os.getenv("ENV", "production").lower() skip_auth = os.getenv("SKIP_AUTH", "false").lower() == "true" if skip_auth and env != "production": user_id = "dev-user" else: return { "success": False, "error": "Authentication required. Please login first.", "auth_required": True } import json order_id = tool_input.get("order_id") # Validate required field if not order_id: return { "success": False, "error": "Missing required field: order_id" } # Check if order exists and belongs to user check_query = "SELECT order_id, status, assigned_driver_id FROM orders WHERE order_id = %s AND user_id = %s" existing = execute_query(check_query, (order_id, user_id)) if not existing: return { "success": False, "error": f"Order {order_id} not found" } current_status = existing[0].get("status") current_assigned_driver = existing[0].get("assigned_driver_id") # Auto-geocode if delivery address is updated without coordinates # Use safe_geocode with timeout protection if "delivery_address" in tool_input and ("delivery_lat" not in tool_input or "delivery_lng" not in tool_input): try: geocode_result = safe_geocode(tool_input["delivery_address"]) tool_input["delivery_lat"] = geocode_result["lat"] tool_input["delivery_lng"] = geocode_result["lng"] logger.info(f"Auto-geocoded delivery address: {geocode_result['formatted_address']}") except Exception as e: logger.warning(f"Failed to geocode address, skipping coordinate update: {e}") # Handle status changes with assignment cascading logic new_status = tool_input.get("status") cascading_actions = [] if new_status and new_status != current_status: # Check if order has active assignment assignment_check = execute_query(""" SELECT assignment_id, status, driver_id FROM assignments WHERE order_id = %s AND status IN ('active', 'in_progress') LIMIT 1 """, (order_id,)) has_active_assignment = len(assignment_check) > 0 # Validate status transitions based on assignment state if new_status == "pending" and current_status == "assigned": if has_active_assignment: # Changing assigned order back to pending - must cancel assignment assignment_id = assignment_check[0]["assignment_id"] driver_id = assignment_check[0]["driver_id"] # Cancel the assignment execute_write(""" UPDATE assignments SET status = 'cancelled', updated_at = %s WHERE assignment_id = %s """, (datetime.now(), assignment_id)) # Clear assigned_driver_id from order execute_write(""" UPDATE orders SET assigned_driver_id = NULL WHERE order_id = %s """, (order_id,)) # Check if driver has other active assignments other_assignments = execute_query(""" SELECT COUNT(*) as count FROM assignments WHERE driver_id = %s AND status IN ('active', 'in_progress') AND assignment_id != %s """, (driver_id, assignment_id)) if other_assignments[0]["count"] == 0: # Set driver back to active if no other assignments execute_write(""" UPDATE drivers SET status = 'active', updated_at = %s WHERE driver_id = %s """, (datetime.now(), driver_id)) cascading_actions.append(f"Driver {driver_id} set to active (no other assignments)") cascading_actions.append(f"Assignment {assignment_id} cancelled and removed") elif new_status == "cancelled": if has_active_assignment: # Cancel active assignment when order is cancelled assignment_id = assignment_check[0]["assignment_id"] driver_id = assignment_check[0]["driver_id"] execute_write(""" UPDATE assignments SET status = 'cancelled', updated_at = %s WHERE assignment_id = %s """, (datetime.now(), assignment_id)) # Clear assigned_driver_id execute_write(""" UPDATE orders SET assigned_driver_id = NULL WHERE order_id = %s """, (order_id,)) # Check if driver has other active assignments other_assignments = execute_query(""" SELECT COUNT(*) as count FROM assignments WHERE driver_id = %s AND status IN ('active', 'in_progress') AND assignment_id != %s """, (driver_id, assignment_id)) if other_assignments[0]["count"] == 0: execute_write(""" UPDATE drivers SET status = 'active', updated_at = %s WHERE driver_id = %s """, (datetime.now(), driver_id)) cascading_actions.append(f"Driver {driver_id} set to active") cascading_actions.append(f"Assignment {assignment_id} cancelled") elif new_status in ["delivered", "failed"] and has_active_assignment: # Note: This should normally be handled by update_assignment tool # but we allow it here for flexibility assignment_id = assignment_check[0]["assignment_id"] final_status = "completed" if new_status == "delivered" else "failed" execute_write(""" UPDATE assignments SET status = %s, updated_at = %s WHERE assignment_id = %s """, (final_status, datetime.now(), assignment_id)) cascading_actions.append(f"Assignment {assignment_id} marked as {final_status}") # Build UPDATE query dynamically based on provided fields update_fields = [] params = [] # Map of field names to their database columns updateable_fields = { "customer_name": "customer_name", "customer_phone": "customer_phone", "customer_email": "customer_email", "delivery_address": "delivery_address", "delivery_lat": "delivery_lat", "delivery_lng": "delivery_lng", "status": "status", "priority": "priority", "special_instructions": "special_instructions", "time_window_end": "time_window_end", "payment_status": "payment_status", "weight_kg": "weight_kg", "order_value": "order_value" } for field, column in updateable_fields.items(): if field in tool_input: update_fields.append(f"{column} = %s") params.append(tool_input[field]) if not update_fields: return { "success": False, "error": "No fields provided to update" } # Always update the updated_at timestamp update_fields.append("updated_at = %s") params.append(datetime.now()) # Add order_id and user_id for WHERE clause params.append(order_id) params.append(user_id) # Execute update query = f""" UPDATE orders SET {', '.join(update_fields)} WHERE order_id = %s AND user_id = %s """ try: execute_write(query, tuple(params)) logger.info(f"Order updated: {order_id}") result = { "success": True, "order_id": order_id, "updated_fields": list(updateable_fields.keys() & tool_input.keys()), "message": f"Order {order_id} updated successfully!" } if cascading_actions: result["cascading_actions"] = cascading_actions return result except Exception as e: logger.error(f"Database error updating order: {e}") return { "success": False, "error": f"Failed to update order: {str(e)}" } def handle_delete_all_orders(tool_input: dict, user_id: str = None) -> dict: """ Delete all orders (bulk delete) for the authenticated user Args: tool_input: Dict with confirm flag and optional status filter user_id: Authenticated user ID Returns: Deletion result with count """ # Authentication check - allow dev mode (only in non-production environments) if not user_id: # SECURITY: Only allow SKIP_AUTH in development environments env = os.getenv("ENV", "production").lower() skip_auth = os.getenv("SKIP_AUTH", "false").lower() == "true" if skip_auth and env != "production": user_id = "dev-user" else: return { "success": False, "error": "Authentication required. Please login first.", "auth_required": True } confirm = tool_input.get("confirm", False) status_filter = tool_input.get("status") # Optional: delete only specific status if not confirm: return { "success": False, "error": "Bulk deletion requires confirm=true for safety" } try: # Check for active assignments first (only for this user's orders) active_assignments = execute_query(""" SELECT COUNT(*) as count FROM assignments WHERE user_id = %s AND status IN ('active', 'in_progress') """, (user_id,)) active_count = active_assignments[0]['count'] if active_count > 0: return { "success": False, "error": f"Cannot delete orders: {active_count} active assignment(s) exist. Cancel or complete them first." } # Build delete query based on status filter (always filter by user_id) if status_filter: count_query = "SELECT COUNT(*) as count FROM orders WHERE user_id = %s AND status = %s" delete_query = "DELETE FROM orders WHERE user_id = %s AND status = %s" params = (user_id, status_filter) else: count_query = "SELECT COUNT(*) as count FROM orders WHERE user_id = %s" delete_query = "DELETE FROM orders WHERE user_id = %s" params = (user_id,) # Get count before deletion count_result = execute_query(count_query, params) total_count = count_result[0]['count'] if total_count == 0: return { "success": True, "deleted_count": 0, "message": "No orders to delete" } # Execute bulk delete execute_write(delete_query, params) logger.info(f"Bulk deleted {total_count} orders") return { "success": True, "deleted_count": total_count, "message": f"Successfully deleted {total_count} order(s)" } except Exception as e: logger.error(f"Database error bulk deleting orders: {e}") return { "success": False, "error": f"Failed to bulk delete orders: {str(e)}" } def handle_delete_order(tool_input: dict, user_id: str = None) -> dict: """ Execute order deletion tool with assignment safety checks Args: tool_input: Dict with order_id and confirm flag user_id: Authenticated user ID Returns: Deletion result """ # Authentication check - allow dev mode (only in non-production environments) if not user_id: # SECURITY: Only allow SKIP_AUTH in development environments env = os.getenv("ENV", "production").lower() skip_auth = os.getenv("SKIP_AUTH", "false").lower() == "true" if skip_auth and env != "production": user_id = "dev-user" else: return { "success": False, "error": "Authentication required. Please login first.", "auth_required": True } order_id = tool_input.get("order_id") confirm = tool_input.get("confirm", False) # Validate required fields if not order_id: return { "success": False, "error": "Missing required field: order_id" } if not confirm: return { "success": False, "error": "Deletion not confirmed. Set confirm=true to proceed." } # Check if order exists and belongs to user check_query = "SELECT order_id, status FROM orders WHERE order_id = %s AND user_id = %s" existing = execute_query(check_query, (order_id, user_id)) if not existing: return { "success": False, "error": f"Order {order_id} not found" } order_status = existing[0].get("status") # Check for active assignments assignment_check = execute_query(""" SELECT assignment_id, status, driver_id FROM assignments WHERE order_id = %s AND status IN ('active', 'in_progress') """, (order_id,)) if assignment_check: # Warn about active assignments that will be cascade deleted assignment_count = len(assignment_check) assignment_ids = [a["assignment_id"] for a in assignment_check] return { "success": False, "error": f"Cannot delete order {order_id}: it has {assignment_count} active assignment(s): {', '.join(assignment_ids)}. Please cancel or complete the assignment(s) first using update_assignment or unassign_order.", "active_assignments": assignment_ids } # Check for any completed assignments (these will be cascade deleted) completed_assignments = execute_query(""" SELECT COUNT(*) as count FROM assignments WHERE order_id = %s AND status IN ('completed', 'failed', 'cancelled') """, (order_id,)) cascading_info = [] if completed_assignments[0]["count"] > 0: cascading_info.append(f"{completed_assignments[0]['count']} completed/failed/cancelled assignment(s) will be cascade deleted") # Delete the order (will cascade to assignments via FK) query = "DELETE FROM orders WHERE order_id = %s AND user_id = %s" try: execute_write(query, (order_id, user_id)) logger.info(f"Order deleted: {order_id}") result = { "success": True, "order_id": order_id, "message": f"Order {order_id} has been permanently deleted." } if cascading_info: result["cascading_info"] = cascading_info return result except Exception as e: logger.error(f"Database error deleting order: {e}") return { "success": False, "error": f"Failed to delete order: {str(e)}" } def handle_update_driver(tool_input: dict, user_id: str = None) -> dict: """ Execute driver update tool with assignment validation Args: tool_input: Dict with driver_id and fields to update user_id: Authenticated user ID Returns: Update result """ # Authentication check - allow dev mode (only in non-production environments) if not user_id: # SECURITY: Only allow SKIP_AUTH in development environments env = os.getenv("ENV", "production").lower() skip_auth = os.getenv("SKIP_AUTH", "false").lower() == "true" if skip_auth and env != "production": user_id = "dev-user" else: return { "success": False, "error": "Authentication required. Please login first.", "auth_required": True } import json driver_id = tool_input.get("driver_id") # Validate required field if not driver_id: return { "success": False, "error": "Missing required field: driver_id" } # Check if driver exists and belongs to user check_query = "SELECT driver_id, status FROM drivers WHERE driver_id = %s AND user_id = %s" existing = execute_query(check_query, (driver_id, user_id)) if not existing: return { "success": False, "error": f"Driver {driver_id} not found" } current_status = existing[0].get("status") # Validate status changes against active assignments new_status = tool_input.get("status") if new_status and new_status != current_status: # Check for active assignments assignment_check = execute_query(""" SELECT assignment_id, status, order_id FROM assignments WHERE driver_id = %s AND status IN ('active', 'in_progress') """, (driver_id,)) has_active_assignments = len(assignment_check) > 0 # Prevent setting driver to offline/inactive when they have active assignments if new_status in ["offline", "inactive"] and has_active_assignments: assignment_count = len(assignment_check) assignment_ids = [a["assignment_id"] for a in assignment_check] return { "success": False, "error": f"Cannot set driver {driver_id} to '{new_status}': driver has {assignment_count} active assignment(s): {', '.join(assignment_ids)}. Please complete or cancel assignments first.", "active_assignments": assignment_ids } # Note: Setting driver to 'active' when they have assignments is allowed # The system manages 'busy' status automatically via assignment creation # But we allow manual override to 'active' for edge cases # Build UPDATE query dynamically based on provided fields update_fields = [] params = [] # Map of field names to their database columns updateable_fields = { "name": "name", "phone": "phone", "email": "email", "status": "status", "vehicle_type": "vehicle_type", "vehicle_plate": "vehicle_plate", "capacity_kg": "capacity_kg", "capacity_m3": "capacity_m3", "current_address": "current_address", "current_lat": "current_lat", "current_lng": "current_lng" } for field, column in updateable_fields.items(): if field in tool_input: update_fields.append(f"{column} = %s") params.append(tool_input[field]) # Handle skills array specially (convert to JSON) if "skills" in tool_input: skills = list(tool_input.get("skills", [])) update_fields.append("skills = %s") params.append(json.dumps(skills)) if not update_fields: return { "success": False, "error": "No fields provided to update" } # Always update the updated_at timestamp update_fields.append("updated_at = %s") params.append(datetime.now()) # Update location timestamp if lat/lng changed if "current_lat" in tool_input or "current_lng" in tool_input: update_fields.append("last_location_update = %s") params.append(datetime.now()) # Add driver_id and user_id for WHERE clause params.append(driver_id) params.append(user_id) # Execute update query = f""" UPDATE drivers SET {', '.join(update_fields)} WHERE driver_id = %s AND user_id = %s """ try: execute_write(query, tuple(params)) logger.info(f"Driver updated: {driver_id}") updated_list = list(updateable_fields.keys() & tool_input.keys()) if "skills" in tool_input: updated_list.append("skills") return { "success": True, "driver_id": driver_id, "updated_fields": updated_list, "message": f"Driver {driver_id} updated successfully!" } except Exception as e: logger.error(f"Database error updating driver: {e}") return { "success": False, "error": f"Failed to update driver: {str(e)}" } def handle_delete_all_drivers(tool_input: dict, user_id: str = None) -> dict: """ Delete all drivers (bulk delete) for the authenticated user Args: tool_input: Dict with confirm flag and optional status filter user_id: Authenticated user ID Returns: Deletion result with count """ # Authentication check - allow dev mode (only in non-production environments) if not user_id: # SECURITY: Only allow SKIP_AUTH in development environments env = os.getenv("ENV", "production").lower() skip_auth = os.getenv("SKIP_AUTH", "false").lower() == "true" if skip_auth and env != "production": user_id = "dev-user" else: return { "success": False, "error": "Authentication required. Please login first.", "auth_required": True } confirm = tool_input.get("confirm", False) status_filter = tool_input.get("status") # Optional: delete only specific status if not confirm: return { "success": False, "error": "Bulk deletion requires confirm=true for safety" } try: # Check for ANY assignments for this user (RESTRICT constraint will block if any exist) assignments = execute_query(""" SELECT COUNT(*) as count FROM assignments WHERE user_id = %s """, (user_id,)) assignment_count = assignments[0]['count'] if assignment_count > 0: return { "success": False, "error": f"Cannot delete drivers: {assignment_count} assignment(s) exist in database. Database RESTRICT constraint prevents driver deletion when assignments exist." } # Build delete query based on status filter (always filter by user_id) if status_filter: count_query = "SELECT COUNT(*) as count FROM drivers WHERE user_id = %s AND status = %s" delete_query = "DELETE FROM drivers WHERE user_id = %s AND status = %s" params = (user_id, status_filter) else: count_query = "SELECT COUNT(*) as count FROM drivers WHERE user_id = %s" delete_query = "DELETE FROM drivers WHERE user_id = %s" params = (user_id,) # Get count before deletion count_result = execute_query(count_query, params) total_count = count_result[0]['count'] if total_count == 0: return { "success": True, "deleted_count": 0, "message": "No drivers to delete" } # Execute bulk delete execute_write(delete_query, params) logger.info(f"Bulk deleted {total_count} drivers") return { "success": True, "deleted_count": total_count, "message": f"Successfully deleted {total_count} driver(s)" } except Exception as e: logger.error(f"Database error bulk deleting drivers: {e}") # Provide more context if it's a FK constraint error error_message = str(e) if "foreign key" in error_message.lower() or "violates" in error_message.lower(): error_message = f"Cannot delete drivers due to database constraint (assignments exist). Error: {error_message}" return { "success": False, "error": f"Failed to bulk delete drivers: {error_message}" } def handle_delete_driver(tool_input: dict, user_id: str = None) -> dict: """ Execute driver deletion tool with assignment safety checks Args: tool_input: Dict with driver_id and confirm flag user_id: Authenticated user ID Returns: Deletion result """ # Authentication check - allow dev mode (only in non-production environments) if not user_id: # SECURITY: Only allow SKIP_AUTH in development environments env = os.getenv("ENV", "production").lower() skip_auth = os.getenv("SKIP_AUTH", "false").lower() == "true" if skip_auth and env != "production": user_id = "dev-user" else: return { "success": False, "error": "Authentication required. Please login first.", "auth_required": True } driver_id = tool_input.get("driver_id") confirm = tool_input.get("confirm", False) # Validate required fields if not driver_id: return { "success": False, "error": "Missing required field: driver_id" } if not confirm: return { "success": False, "error": "Deletion not confirmed. Set confirm=true to proceed." } # Check if driver exists and belongs to user check_query = "SELECT driver_id, name FROM drivers WHERE driver_id = %s AND user_id = %s" existing = execute_query(check_query, (driver_id, user_id)) if not existing: return { "success": False, "error": f"Driver {driver_id} not found" } driver_name = existing[0]["name"] # Check for ANY assignments (active or completed) # FK constraint with ON DELETE RESTRICT will prevent deletion if ANY assignments exist assignment_check = execute_query(""" SELECT assignment_id, status, order_id FROM assignments WHERE driver_id = %s """, (driver_id,)) if assignment_check: # Count active vs completed assignments active_assignments = [a for a in assignment_check if a["status"] in ("active", "in_progress")] completed_assignments = [a for a in assignment_check if a["status"] in ("completed", "failed", "cancelled")] total_count = len(assignment_check) active_count = len(active_assignments) completed_count = len(completed_assignments) error_msg = f"Cannot delete driver {driver_id} ({driver_name}): driver has {total_count} assignment(s)" if active_count > 0: active_ids = [a["assignment_id"] for a in active_assignments] error_msg += f" ({active_count} active: {', '.join(active_ids)})" if completed_count > 0: error_msg += f" ({completed_count} completed/failed/cancelled)" error_msg += ". The database has RESTRICT constraint preventing driver deletion when assignments exist. Please cancel/complete active assignments and consider archiving the driver instead of deleting." return { "success": False, "error": error_msg, "total_assignments": total_count, "active_assignments": [a["assignment_id"] for a in active_assignments], "completed_assignments": [a["assignment_id"] for a in completed_assignments] } # Check for orders that reference this driver in assigned_driver_id # FK constraint with ON DELETE SET NULL will set these to NULL assigned_orders = execute_query(""" SELECT order_id FROM orders WHERE assigned_driver_id = %s """, (driver_id,)) cascading_info = [] if assigned_orders: order_count = len(assigned_orders) cascading_info.append(f"{order_count} order(s) will have assigned_driver_id set to NULL") # Delete the driver query = "DELETE FROM drivers WHERE driver_id = %s AND user_id = %s" try: execute_write(query, (driver_id, user_id)) logger.info(f"Driver deleted: {driver_id}") result = { "success": True, "driver_id": driver_id, "message": f"Driver {driver_id} ({driver_name}) has been permanently deleted." } if cascading_info: result["cascading_info"] = cascading_info return result except Exception as e: logger.error(f"Database error deleting driver: {e}") # Provide more context if it's a FK constraint error error_message = str(e) if "foreign key" in error_message.lower() or "violates" in error_message.lower(): error_message = f"Cannot delete driver due to database constraint (likely has related assignments). Error: {error_message}" return { "success": False, "error": f"Failed to delete driver: {error_message}" } def handle_count_orders(tool_input: dict, user_id: str = None) -> dict: """ Execute count orders tool Args: tool_input: Dict with optional filter fields user_id: ID of authenticated user Returns: Order count result with breakdown (only user's orders) """ # Authentication check - allow dev mode (only in non-production environments) if not user_id: # SECURITY: Only allow SKIP_AUTH in development environments env = os.getenv("ENV", "production").lower() skip_auth = os.getenv("SKIP_AUTH", "false").lower() == "true" if skip_auth and env != "production": user_id = "dev-user" else: return { "success": False, "error": "Authentication required. Please login first.", "auth_required": True } # Build WHERE clause based on filters # IMPORTANT: Always filter by user_id FIRST where_clauses = ["user_id = %s"] params = [user_id] if "status" in tool_input: where_clauses.append("status = %s") params.append(tool_input["status"]) if "priority" in tool_input: where_clauses.append("priority = %s") params.append(tool_input["priority"]) if "payment_status" in tool_input: where_clauses.append("payment_status = %s") params.append(tool_input["payment_status"]) if "assigned_driver_id" in tool_input: where_clauses.append("assigned_driver_id = %s") params.append(tool_input["assigned_driver_id"]) if "is_fragile" in tool_input: where_clauses.append("is_fragile = %s") params.append(tool_input["is_fragile"]) if "requires_signature" in tool_input: where_clauses.append("requires_signature = %s") params.append(tool_input["requires_signature"]) if "requires_cold_storage" in tool_input: where_clauses.append("requires_cold_storage = %s") params.append(tool_input["requires_cold_storage"]) where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else "" # Total count query count_query = f"SELECT COUNT(*) as total FROM orders{where_sql}" # Breakdown by status query breakdown_query = f""" SELECT status, COUNT(*) as count FROM orders{where_sql} GROUP BY status ORDER BY count DESC """ # Breakdown by priority query priority_query = f""" SELECT priority, COUNT(*) as count FROM orders{where_sql} GROUP BY priority ORDER BY CASE priority WHEN 'urgent' THEN 1 WHEN 'express' THEN 2 WHEN 'standard' THEN 3 END """ try: # Execute queries total_result = execute_query(count_query, tuple(params) if params else None) total = total_result[0]['total'] if total_result else 0 status_result = execute_query(breakdown_query, tuple(params) if params else None) priority_result = execute_query(priority_query, tuple(params) if params else None) # Format breakdown status_breakdown = {row['status']: row['count'] for row in status_result} if status_result else {} priority_breakdown = {row['priority']: row['count'] for row in priority_result} if priority_result else {} logger.info(f"Counted orders: {total} total") return { "success": True, "total": total, "status_breakdown": status_breakdown, "priority_breakdown": priority_breakdown, "message": f"Found {total} order(s)" } except Exception as e: logger.error(f"Database error counting orders: {e}") return { "success": False, "error": f"Failed to count orders: {str(e)}" } def handle_fetch_orders(tool_input: dict, user_id: str = None) -> dict: """ Execute fetch orders tool Args: tool_input: Dict with filter, pagination, and sorting options user_id: ID of authenticated user (filters to only their orders) Returns: List of orders matching criteria (only user's orders) """ # Authentication check - allow dev mode (only in non-production environments) if not user_id: # Development mode - use default user # SECURITY: Only allow SKIP_AUTH in development environments env = os.getenv("ENV", "production").lower() skip_auth = os.getenv("SKIP_AUTH", "false").lower() == "true" if skip_auth and env != "production": user_id = "dev-user" else: return { "success": False, "error": "Authentication required. Please login first.", "auth_required": True } # Extract pagination and sorting limit = min(tool_input.get("limit", 10), 100) # Cap at 100 offset = tool_input.get("offset", 0) sort_by = tool_input.get("sort_by", "created_at") sort_order = tool_input.get("sort_order", "DESC") # Build WHERE clause based on filters # IMPORTANT: Always filter by user_id FIRST for security where_clauses = ["user_id = %s"] params = [user_id] if "status" in tool_input: where_clauses.append("status = %s") params.append(tool_input["status"]) if "priority" in tool_input: where_clauses.append("priority = %s") params.append(tool_input["priority"]) if "payment_status" in tool_input: where_clauses.append("payment_status = %s") params.append(tool_input["payment_status"]) if "assigned_driver_id" in tool_input: where_clauses.append("assigned_driver_id = %s") params.append(tool_input["assigned_driver_id"]) if "is_fragile" in tool_input: where_clauses.append("is_fragile = %s") params.append(tool_input["is_fragile"]) if "requires_signature" in tool_input: where_clauses.append("requires_signature = %s") params.append(tool_input["requires_signature"]) if "requires_cold_storage" in tool_input: where_clauses.append("requires_cold_storage = %s") params.append(tool_input["requires_cold_storage"]) where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else "" # Build query query = f""" SELECT order_id, customer_name, customer_phone, customer_email, delivery_address, delivery_lat, delivery_lng, time_window_start, time_window_end, priority, weight_kg, volume_m3, special_instructions, status, assigned_driver_id, created_at, updated_at, delivered_at, order_value, payment_status, requires_signature, is_fragile, requires_cold_storage FROM orders {where_sql} ORDER BY {sort_by} {sort_order} LIMIT %s OFFSET %s """ params.extend([limit, offset]) try: results = execute_query(query, tuple(params)) if not results: return { "success": True, "orders": [], "count": 0, "message": "No orders found matching criteria" } # Format orders for readability orders = [] for row in results: order = { "order_id": row['order_id'], "customer": { "name": row['customer_name'], "phone": row['customer_phone'], "email": row['customer_email'] }, "delivery": { "address": row['delivery_address'], "latitude": float(row['delivery_lat']) if row['delivery_lat'] else None, "longitude": float(row['delivery_lng']) if row['delivery_lng'] else None }, "time_window": { "start": str(row['time_window_start']) if row['time_window_start'] else None, "end": str(row['time_window_end']) if row['time_window_end'] else None }, "details": { "priority": row['priority'], "status": row['status'], "weight_kg": float(row['weight_kg']) if row['weight_kg'] else None, "volume_m3": float(row['volume_m3']) if row['volume_m3'] else None, "special_instructions": row['special_instructions'] }, "flags": { "requires_signature": row['requires_signature'], "is_fragile": row['is_fragile'], "requires_cold_storage": row['requires_cold_storage'] }, "payment": { "order_value": float(row['order_value']) if row['order_value'] else None, "payment_status": row['payment_status'] }, "assigned_driver_id": row['assigned_driver_id'], "timestamps": { "created_at": str(row['created_at']), "updated_at": str(row['updated_at']) if row['updated_at'] else None, "delivered_at": str(row['delivered_at']) if row['delivered_at'] else None } } orders.append(order) logger.info(f"Fetched {len(orders)} orders") return { "success": True, "orders": orders, "count": len(orders), "message": f"Retrieved {len(orders)} order(s)" } except Exception as e: logger.error(f"Database error fetching orders: {e}") return { "success": False, "error": f"Failed to fetch orders: {str(e)}" } def handle_get_order_details(tool_input: dict, user_id: str = None) -> dict: """ Execute get order details tool Args: tool_input: Dict with order_id user_id: ID of authenticated user Returns: Complete order details (only if owned by user) """ # Authentication check - allow dev mode (only in non-production environments) if not user_id: # SECURITY: Only allow SKIP_AUTH in development environments env = os.getenv("ENV", "production").lower() skip_auth = os.getenv("SKIP_AUTH", "false").lower() == "true" if skip_auth and env != "production": user_id = "dev-user" else: return { "success": False, "error": "Authentication required. Please login first.", "auth_required": True } order_id = tool_input.get("order_id") if not order_id: return { "success": False, "error": "order_id is required" } query = """ SELECT order_id, customer_name, customer_phone, customer_email, pickup_address, pickup_lat, pickup_lng, delivery_address, delivery_lat, delivery_lng, time_window_start, time_window_end, expected_delivery_time, priority, weight_kg, volume_m3, special_instructions, status, assigned_driver_id, delivery_status, created_at, updated_at, delivered_at, sla_grace_period_minutes, order_value, payment_status, requires_signature, is_fragile, requires_cold_storage FROM orders WHERE order_id = %s AND user_id = %s """ try: results = execute_query(query, (order_id, user_id)) if not results: return { "success": False, "error": f"Order {order_id} not found" } row = results[0] order = { "order_id": row['order_id'], "customer": { "name": row['customer_name'], "phone": row['customer_phone'], "email": row['customer_email'] }, "pickup": { "address": row['pickup_address'], "latitude": float(row['pickup_lat']) if row['pickup_lat'] else None, "longitude": float(row['pickup_lng']) if row['pickup_lng'] else None } if row['pickup_address'] else None, "delivery": { "address": row['delivery_address'], "latitude": float(row['delivery_lat']) if row['delivery_lat'] else None, "longitude": float(row['delivery_lng']) if row['delivery_lng'] else None }, "time_window": { "start": str(row['time_window_start']) if row['time_window_start'] else None, "end": str(row['time_window_end']) if row['time_window_end'] else None }, "details": { "priority": row['priority'], "status": row['status'], "weight_kg": float(row['weight_kg']) if row['weight_kg'] else None, "volume_m3": float(row['volume_m3']) if row['volume_m3'] else None, "special_instructions": row['special_instructions'] }, "delivery_status": row['delivery_status'], "timing": { "expected_delivery_time": str(row['expected_delivery_time']) if row['expected_delivery_time'] else None, "delivered_at": str(row['delivered_at']) if row['delivered_at'] else None, "sla_grace_period_minutes": row['sla_grace_period_minutes'] }, "flags": { "requires_signature": row['requires_signature'], "is_fragile": row['is_fragile'], "requires_cold_storage": row['requires_cold_storage'] }, "payment": { "order_value": float(row['order_value']) if row['order_value'] else None, "payment_status": row['payment_status'] }, "assigned_driver_id": row['assigned_driver_id'], "timestamps": { "created_at": str(row['created_at']), "updated_at": str(row['updated_at']) if row['updated_at'] else None } } logger.info(f"Retrieved details for order: {order_id}") return { "success": True, "order": order, "message": f"Order {order_id} details retrieved" } except Exception as e: logger.error(f"Database error getting order details: {e}") return { "success": False, "error": f"Failed to get order details: {str(e)}" } def handle_search_orders(tool_input: dict, user_id: str = None) -> dict: """ Execute search orders tool Args: tool_input: Dict with search_term user_id: Authenticated user ID Returns: List of matching orders """ # Authentication check - allow dev mode (only in non-production environments) if not user_id: # SECURITY: Only allow SKIP_AUTH in development environments env = os.getenv("ENV", "production").lower() skip_auth = os.getenv("SKIP_AUTH", "false").lower() == "true" if skip_auth and env != "production": user_id = "dev-user" else: return { "success": False, "error": "Authentication required. Please login first.", "auth_required": True } search_term = tool_input.get("search_term", "").strip() if not search_term: return { "success": False, "error": "search_term is required" } query = """ SELECT order_id, customer_name, customer_phone, customer_email, delivery_address, priority, status, created_at FROM orders WHERE user_id = %s AND ( order_id ILIKE %s OR customer_name ILIKE %s OR customer_email ILIKE %s OR customer_phone ILIKE %s ) ORDER BY created_at DESC LIMIT 50 """ search_pattern = f"%{search_term}%" params = (user_id, search_pattern, search_pattern, search_pattern, search_pattern) try: results = execute_query(query, params) if not results: return { "success": True, "orders": [], "count": 0, "message": f"No orders found matching '{search_term}'" } orders = [] for row in results: orders.append({ "order_id": row['order_id'], "customer_name": row['customer_name'], "customer_phone": row['customer_phone'], "customer_email": row['customer_email'], "delivery_address": row['delivery_address'], "priority": row['priority'], "status": row['status'], "created_at": str(row['created_at']) }) logger.info(f"Search '{search_term}' found {len(orders)} orders") return { "success": True, "orders": orders, "count": len(orders), "message": f"Found {len(orders)} order(s) matching '{search_term}'" } except Exception as e: logger.error(f"Database error searching orders: {e}") return { "success": False, "error": f"Failed to search orders: {str(e)}" } def handle_get_incomplete_orders(tool_input: dict, user_id: str = None) -> dict: """ Execute get incomplete orders tool Args: tool_input: Dict with optional limit user_id: Authenticated user ID Returns: List of incomplete orders (pending, assigned, in_transit) """ # Authentication check - allow dev mode (only in non-production environments) if not user_id: # SECURITY: Only allow SKIP_AUTH in development environments env = os.getenv("ENV", "production").lower() skip_auth = os.getenv("SKIP_AUTH", "false").lower() == "true" if skip_auth and env != "production": user_id = "dev-user" else: return { "success": False, "error": "Authentication required. Please login first.", "auth_required": True } limit = min(tool_input.get("limit", 20), 100) query = """ SELECT order_id, customer_name, delivery_address, priority, status, time_window_end, created_at, assigned_driver_id FROM orders WHERE user_id = %s AND status IN ('pending', 'assigned', 'in_transit') ORDER BY CASE priority WHEN 'urgent' THEN 1 WHEN 'express' THEN 2 WHEN 'standard' THEN 3 END, time_window_end ASC LIMIT %s """ try: results = execute_query(query, (user_id, limit)) if not results: return { "success": True, "orders": [], "count": 0, "message": "No incomplete orders found" } orders = [] for row in results: orders.append({ "order_id": row['order_id'], "customer_name": row['customer_name'], "delivery_address": row['delivery_address'], "priority": row['priority'], "status": row['status'], "time_window_end": str(row['time_window_end']) if row['time_window_end'] else None, "created_at": str(row['created_at']), "assigned_driver_id": row['assigned_driver_id'] }) logger.info(f"Retrieved {len(orders)} incomplete orders") return { "success": True, "orders": orders, "count": len(orders), "message": f"Found {len(orders)} incomplete order(s)" } except Exception as e: logger.error(f"Database error getting incomplete orders: {e}") return { "success": False, "error": f"Failed to get incomplete orders: {str(e)}" } def handle_count_drivers(tool_input: dict, user_id: str = None) -> dict: """ Execute count drivers tool Args: tool_input: Dict with optional filter fields user_id: Authenticated user ID Returns: Driver count result with breakdown """ # Authentication check - allow dev mode (only in non-production environments) if not user_id: # SECURITY: Only allow SKIP_AUTH in development environments env = os.getenv("ENV", "production").lower() skip_auth = os.getenv("SKIP_AUTH", "false").lower() == "true" if skip_auth and env != "production": user_id = "dev-user" else: return { "success": False, "error": "Authentication required. Please login first.", "auth_required": True } # Build WHERE clause based on filters # IMPORTANT: Always filter by user_id FIRST where_clauses = ["user_id = %s"] params = [user_id] if "status" in tool_input: where_clauses.append("status = %s") params.append(tool_input["status"]) if "vehicle_type" in tool_input: where_clauses.append("vehicle_type = %s") params.append(tool_input["vehicle_type"]) where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else "" # Total count query count_query = f"SELECT COUNT(*) as total FROM drivers{where_sql}" # Breakdown by status query status_query = f""" SELECT status, COUNT(*) as count FROM drivers{where_sql} GROUP BY status ORDER BY count DESC """ # Breakdown by vehicle type query vehicle_query = f""" SELECT vehicle_type, COUNT(*) as count FROM drivers{where_sql} GROUP BY vehicle_type ORDER BY count DESC """ try: # Execute queries total_result = execute_query(count_query, tuple(params) if params else None) total = total_result[0]['total'] if total_result else 0 status_result = execute_query(status_query, tuple(params) if params else None) vehicle_result = execute_query(vehicle_query, tuple(params) if params else None) # Format breakdown status_breakdown = {row['status']: row['count'] for row in status_result} if status_result else {} vehicle_breakdown = {row['vehicle_type']: row['count'] for row in vehicle_result if row['vehicle_type']} if vehicle_result else {} logger.info(f"Counted drivers: {total} total") return { "success": True, "total": total, "status_breakdown": status_breakdown, "vehicle_breakdown": vehicle_breakdown, "message": f"Found {total} driver(s)" } except Exception as e: logger.error(f"Database error counting drivers: {e}") return { "success": False, "error": f"Failed to count drivers: {str(e)}" } def handle_fetch_drivers(tool_input: dict, user_id: str = None) -> dict: """ Execute fetch drivers tool Args: tool_input: Dict with filter, pagination, and sorting options user_id: ID of authenticated user (filters to only their drivers) Returns: List of drivers matching criteria (only user's drivers) """ # Authentication check - allow dev mode (only in non-production environments) if not user_id: # SECURITY: Only allow SKIP_AUTH in development environments env = os.getenv("ENV", "production").lower() skip_auth = os.getenv("SKIP_AUTH", "false").lower() == "true" if skip_auth and env != "production": user_id = "dev-user" else: return { "success": False, "error": "Authentication required. Please login first.", "auth_required": True } # Extract pagination and sorting limit = min(tool_input.get("limit", 10), 100) # Cap at 100 offset = tool_input.get("offset", 0) sort_by = tool_input.get("sort_by", "name") sort_order = tool_input.get("sort_order", "ASC") # Build WHERE clause based on filters # IMPORTANT: Always filter by user_id FIRST for security where_clauses = ["user_id = %s"] params = [user_id] if "status" in tool_input: where_clauses.append("status = %s") params.append(tool_input["status"]) if "vehicle_type" in tool_input: where_clauses.append("vehicle_type = %s") params.append(tool_input["vehicle_type"]) where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else "" # Build query query = f""" SELECT driver_id, name, phone, email, current_lat, current_lng, current_address, last_location_update, status, vehicle_type, vehicle_plate, capacity_kg, capacity_m3, skills, created_at, updated_at FROM drivers {where_sql} ORDER BY {sort_by} {sort_order} LIMIT %s OFFSET %s """ params.extend([limit, offset]) try: results = execute_query(query, tuple(params)) if not results: return { "success": True, "drivers": [], "count": 0, "message": "No drivers found matching criteria" } # Format drivers for readability drivers = [] for row in results: # Parse skills JSON if present skills = [] if row['skills']: try: import json skills = json.loads(row['skills']) if isinstance(row['skills'], str) else row['skills'] except: skills = [] driver = { "driver_id": row['driver_id'], "name": row['name'], "contact": { "phone": row['phone'], "email": row['email'] }, "location": { "latitude": float(row['current_lat']) if row['current_lat'] else None, "longitude": float(row['current_lng']) if row['current_lng'] else None, "address": row['current_address'], "last_update": str(row['last_location_update']) if row['last_location_update'] else None }, "status": row['status'], "vehicle": { "type": row['vehicle_type'], "plate": row['vehicle_plate'], "capacity_kg": float(row['capacity_kg']) if row['capacity_kg'] else None, "capacity_m3": float(row['capacity_m3']) if row['capacity_m3'] else None }, "skills": skills, "timestamps": { "created_at": str(row['created_at']), "updated_at": str(row['updated_at']) if row['updated_at'] else None } } drivers.append(driver) logger.info(f"Fetched {len(drivers)} drivers") return { "success": True, "drivers": drivers, "count": len(drivers), "message": f"Retrieved {len(drivers)} driver(s)" } except Exception as e: logger.error(f"Database error fetching drivers: {e}") return { "success": False, "error": f"Failed to fetch drivers: {str(e)}" } def handle_get_driver_details(tool_input: dict, user_id: str = None) -> dict: """ Execute get driver details tool Args: tool_input: Dict with driver_id user_id: Authenticated user ID Returns: Complete driver details """ # Authentication check - allow dev mode (only in non-production environments) if not user_id: # SECURITY: Only allow SKIP_AUTH in development environments env = os.getenv("ENV", "production").lower() skip_auth = os.getenv("SKIP_AUTH", "false").lower() == "true" if skip_auth and env != "production": user_id = "dev-user" else: return { "success": False, "error": "Authentication required. Please login first.", "auth_required": True } driver_id = tool_input.get("driver_id") if not driver_id: return { "success": False, "error": "driver_id is required" } query = """ SELECT driver_id, name, phone, email, current_lat, current_lng, current_address, last_location_update, status, vehicle_type, vehicle_plate, capacity_kg, capacity_m3, skills, created_at, updated_at FROM drivers WHERE driver_id = %s AND user_id = %s """ try: results = execute_query(query, (driver_id, user_id)) if not results: return { "success": False, "error": f"Driver {driver_id} not found" } row = results[0] # Parse skills JSON if present skills = [] if row['skills']: try: import json skills = json.loads(row['skills']) if isinstance(row['skills'], str) else row['skills'] except: skills = [] # Use stored address from database, or reverse geocode if not available location_address = row['current_address'] if not location_address and row['current_lat'] and row['current_lng']: try: reverse_result = safe_reverse_geocode( float(row['current_lat']), float(row['current_lng']) ) location_address = reverse_result.get('address', None) logger.info(f"Reverse geocoded driver location: {location_address}") except Exception as e: logger.warning(f"Failed to reverse geocode driver location: {e}") location_address = None driver = { "driver_id": row['driver_id'], "name": row['name'], "contact": { "phone": row['phone'], "email": row['email'] }, "location": { "latitude": float(row['current_lat']) if row['current_lat'] else None, "longitude": float(row['current_lng']) if row['current_lng'] else None, "address": location_address, "last_update": str(row['last_location_update']) if row['last_location_update'] else None }, "status": row['status'], "vehicle": { "type": row['vehicle_type'], "plate": row['vehicle_plate'], "capacity_kg": float(row['capacity_kg']) if row['capacity_kg'] else None, "capacity_m3": float(row['capacity_m3']) if row['capacity_m3'] else None }, "skills": skills, "timestamps": { "created_at": str(row['created_at']), "updated_at": str(row['updated_at']) if row['updated_at'] else None } } logger.info(f"Retrieved details for driver: {driver_id}") return { "success": True, "driver": driver, "message": f"Driver {driver_id} details retrieved" } except Exception as e: logger.error(f"Database error getting driver details: {e}") return { "success": False, "error": f"Failed to get driver details: {str(e)}" } def handle_search_drivers(tool_input: dict, user_id: str = None) -> dict: """ Execute search drivers tool Args: tool_input: Dict with search_term user_id: Authenticated user ID Returns: List of matching drivers """ # Authentication check - allow dev mode (only in non-production environments) if not user_id: # SECURITY: Only allow SKIP_AUTH in development environments env = os.getenv("ENV", "production").lower() skip_auth = os.getenv("SKIP_AUTH", "false").lower() == "true" if skip_auth and env != "production": user_id = "dev-user" else: return { "success": False, "error": "Authentication required. Please login first.", "auth_required": True } search_term = tool_input.get("search_term", "").strip() if not search_term: return { "success": False, "error": "search_term is required" } query = """ SELECT driver_id, name, phone, email, current_lat, current_lng, current_address, vehicle_type, vehicle_plate, status, skills, created_at FROM drivers WHERE user_id = %s AND ( driver_id ILIKE %s OR name ILIKE %s OR email ILIKE %s OR phone ILIKE %s OR vehicle_plate ILIKE %s ) ORDER BY name ASC LIMIT 50 """ search_pattern = f"%{search_term}%" params = (user_id, search_pattern, search_pattern, search_pattern, search_pattern, search_pattern) try: results = execute_query(query, params) if not results: return { "success": True, "drivers": [], "count": 0, "message": f"No drivers found matching '{search_term}'" } drivers = [] for row in results: # Parse skills JSON if present skills = [] if row['skills']: try: import json skills = json.loads(row['skills']) if isinstance(row['skills'], str) else row['skills'] except: skills = [] drivers.append({ "driver_id": row['driver_id'], "name": row['name'], "phone": row['phone'], "email": row['email'], "location": { "latitude": float(row['current_lat']) if row['current_lat'] else None, "longitude": float(row['current_lng']) if row['current_lng'] else None, "address": row['current_address'] }, "vehicle_type": row['vehicle_type'], "vehicle_plate": row['vehicle_plate'], "status": row['status'], "skills": skills, "created_at": str(row['created_at']) }) logger.info(f"Search '{search_term}' found {len(drivers)} drivers") return { "success": True, "drivers": drivers, "count": len(drivers), "message": f"Found {len(drivers)} driver(s) matching '{search_term}'" } except Exception as e: logger.error(f"Database error searching drivers: {e}") return { "success": False, "error": f"Failed to search drivers: {str(e)}" } def handle_get_available_drivers(tool_input: dict, user_id: str = None) -> dict: """ Execute get available drivers tool Args: tool_input: Dict with optional limit user_id: Authenticated user ID Returns: List of available drivers (active or offline) """ # Authentication check - allow dev mode (only in non-production environments) if not user_id: # SECURITY: Only allow SKIP_AUTH in development environments env = os.getenv("ENV", "production").lower() skip_auth = os.getenv("SKIP_AUTH", "false").lower() == "true" if skip_auth and env != "production": user_id = "dev-user" else: return { "success": False, "error": "Authentication required. Please login first.", "auth_required": True } limit = min(tool_input.get("limit", 20), 100) query = """ SELECT driver_id, name, phone, vehicle_type, vehicle_plate, current_lat, current_lng, current_address, last_location_update, status, capacity_kg, capacity_m3, skills FROM drivers WHERE user_id = %s AND status IN ('active', 'offline') ORDER BY CASE status WHEN 'active' THEN 1 WHEN 'offline' THEN 2 END, name ASC LIMIT %s """ try: results = execute_query(query, (user_id, limit)) if not results: return { "success": True, "drivers": [], "count": 0, "message": "No available drivers found" } drivers = [] for row in results: # Parse skills JSON if present skills = [] if row['skills']: try: import json skills = json.loads(row['skills']) if isinstance(row['skills'], str) else row['skills'] except: skills = [] drivers.append({ "driver_id": row['driver_id'], "name": row['name'], "phone": row['phone'], "location": { "latitude": float(row['current_lat']) if row['current_lat'] else None, "longitude": float(row['current_lng']) if row['current_lng'] else None, "address": row['current_address'], "last_update": str(row['last_location_update']) if row['last_location_update'] else None }, "status": row['status'], "vehicle": { "type": row['vehicle_type'], "plate": row['vehicle_plate'], "capacity_kg": float(row['capacity_kg']) if row['capacity_kg'] else None, "capacity_m3": float(row['capacity_m3']) if row['capacity_m3'] else None }, "skills": skills }) logger.info(f"Retrieved {len(drivers)} available drivers") return { "success": True, "drivers": drivers, "count": len(drivers), "message": f"Found {len(drivers)} available driver(s)" } except Exception as e: logger.error(f"Database error getting available drivers: {e}") return { "success": False, "error": f"Failed to get available drivers: {str(e)}" } # ============================================================================ # ASSIGNMENT MANAGEMENT TOOLS # ============================================================================ def handle_create_assignment(tool_input: dict, user_id: str = None) -> dict: """ Create assignment (assign order to driver) Validates order and driver status, calculates route, creates assignment record, and updates order/driver statuses. Args: tool_input: Dict with order_id and driver_id user_id: Authenticated user ID Returns: Assignment creation result with route data """ # Authentication check - allow dev mode (only in non-production environments) if not user_id: # SECURITY: Only allow SKIP_AUTH in development environments env = os.getenv("ENV", "production").lower() skip_auth = os.getenv("SKIP_AUTH", "false").lower() == "true" if skip_auth and env != "production": user_id = "dev-user" else: return { "success": False, "error": "Authentication required. Please login first.", "auth_required": True } from datetime import datetime, timedelta order_id = (tool_input.get("order_id") or "").strip() driver_id = (tool_input.get("driver_id") or "").strip() if not order_id or not driver_id: return { "success": False, "error": "Both order_id and driver_id are required" } logger.info(f"Creating assignment: order={order_id}, driver={driver_id}") try: conn = get_db_connection() cursor = conn.cursor() # Step 1: Validate order exists, belongs to user, and status is "pending" cursor.execute(""" SELECT status, delivery_lat, delivery_lng, delivery_address, assigned_driver_id FROM orders WHERE order_id = %s AND user_id = %s """, (order_id, user_id)) order_row = cursor.fetchone() if not order_row: cursor.close() conn.close() return { "success": False, "error": f"Order not found: {order_id}" } order_status = order_row['status'] delivery_lat = order_row['delivery_lat'] delivery_lng = order_row['delivery_lng'] delivery_address = order_row['delivery_address'] current_driver = order_row['assigned_driver_id'] if order_status != "pending": cursor.close() conn.close() # Provide helpful error message based on current status if order_status == "assigned" and current_driver: # Get current driver name for better error message cursor2 = get_db_connection().cursor() cursor2.execute("SELECT name FROM drivers WHERE driver_id = %s", (current_driver,)) driver_row = cursor2.fetchone() driver_name = driver_row['name'] if driver_row else current_driver cursor2.close() return { "success": False, "error": f"Order {order_id} is already assigned to driver {driver_name}. Use 'unassign_order' first to reassign to a different driver." } else: return { "success": False, "error": f"Order must be in 'pending' status to be assigned. Current status: '{order_status}'" } if not delivery_lat or not delivery_lng: cursor.close() conn.close() return { "success": False, "error": "Order does not have delivery location coordinates" } # Step 2: Validate driver exists, belongs to user, and status is "active" cursor.execute(""" SELECT status, current_lat, current_lng, vehicle_type, name FROM drivers WHERE driver_id = %s AND user_id = %s """, (driver_id, user_id)) driver_row = cursor.fetchone() if not driver_row: cursor.close() conn.close() return { "success": False, "error": f"Driver not found: {driver_id}" } driver_status = driver_row['status'] driver_lat = driver_row['current_lat'] driver_lng = driver_row['current_lng'] vehicle_type = driver_row['vehicle_type'] driver_name = driver_row['name'] if driver_status not in ["active", "available"]: cursor.close() conn.close() return { "success": False, "error": f"Driver must be 'active' or 'available'. Current status: {driver_status}" } if not driver_lat or not driver_lng: cursor.close() conn.close() return { "success": False, "error": "Driver does not have current location" } # Step 3: Check if order already has active assignment cursor.execute(""" SELECT assignment_id, driver_id FROM assignments WHERE order_id = %s AND status IN ('active', 'in_progress') """, (order_id,)) existing_assignment = cursor.fetchone() if existing_assignment: cursor.close() conn.close() existing_asn_id = existing_assignment['assignment_id'] existing_driver_id = existing_assignment['driver_id'] # Get driver name for better error message cursor2 = get_db_connection().cursor() cursor2.execute("SELECT name FROM drivers WHERE driver_id = %s", (existing_driver_id,)) driver_row = cursor2.fetchone() existing_driver_name = driver_row['name'] if driver_row else existing_driver_id cursor2.close() return { "success": False, "error": f"Order {order_id} is already assigned to driver {existing_driver_name} (Assignment: {existing_asn_id}). Use 'unassign_order' first to reassign." } # Step 4: Calculate route from driver location to delivery location logger.info(f"Calculating route: ({driver_lat},{driver_lng}) -> ({delivery_lat},{delivery_lng})") route_result = handle_calculate_route({ "origin": f"{driver_lat},{driver_lng}", "destination": f"{delivery_lat},{delivery_lng}", "vehicle_type": vehicle_type or "car", "alternatives": False, "include_steps": True # Get turn-by-turn directions }) if not route_result.get("success"): cursor.close() conn.close() return { "success": False, "error": f"Route calculation failed: {route_result.get('error', 'Unknown error')}" } # Step 5: Generate assignment ID timestamp = datetime.now().strftime("%Y%m%d%H%M%S%f") assignment_id = f"ASN-{timestamp}" # Step 6: Calculate estimated arrival duration_seconds = route_result.get("duration_in_traffic", {}).get("seconds", 0) estimated_arrival = datetime.now() + timedelta(seconds=duration_seconds) # Step 7: Create assignment record import json # Extract route directions (turn-by-turn steps) route_directions = route_result.get("steps", []) route_directions_json = json.dumps(route_directions) if route_directions else None cursor.execute(""" INSERT INTO assignments ( assignment_id, order_id, driver_id, user_id, route_distance_meters, route_duration_seconds, route_duration_in_traffic_seconds, route_summary, route_confidence, route_directions, driver_start_location_lat, driver_start_location_lng, delivery_location_lat, delivery_location_lng, delivery_address, estimated_arrival, vehicle_type, traffic_delay_seconds, status ) VALUES ( %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s ) """, ( assignment_id, order_id, driver_id, user_id, route_result.get("distance", {}).get("meters", 0), route_result.get("duration", {}).get("seconds", 0), route_result.get("duration_in_traffic", {}).get("seconds", 0), route_result.get("route_summary", ""), route_result.get("confidence", ""), route_directions_json, driver_lat, driver_lng, delivery_lat, delivery_lng, delivery_address, estimated_arrival, vehicle_type, route_result.get("traffic_delay", {}).get("seconds", 0), "active" )) # Step 8: Update order status and assigned driver cursor.execute(""" UPDATE orders SET status = 'assigned', assigned_driver_id = %s WHERE order_id = %s """, (driver_id, order_id)) # Step 9: Update driver status to busy cursor.execute(""" UPDATE drivers SET status = 'busy' WHERE driver_id = %s """, (driver_id,)) conn.commit() cursor.close() conn.close() logger.info(f"Assignment created successfully: {assignment_id}") return { "success": True, "assignment_id": assignment_id, "order_id": order_id, "driver_id": driver_id, "driver_name": driver_name, "route": { "distance": route_result.get("distance", {}).get("text", ""), "duration": route_result.get("duration", {}).get("text", ""), "duration_in_traffic": route_result.get("duration_in_traffic", {}).get("text", ""), "traffic_delay": route_result.get("traffic_delay", {}).get("text", ""), "summary": route_result.get("route_summary", ""), "directions": route_directions # Turn-by-turn navigation steps }, "estimated_arrival": estimated_arrival.isoformat(), "status": "active", "message": f"Order {order_id} assigned to driver {driver_name} ({driver_id})" } except Exception as e: logger.error(f"Failed to create assignment: {e}") return { "success": False, "error": f"Failed to create assignment: {str(e)}" } def handle_auto_assign_order(tool_input: dict, user_id: str = None) -> dict: """ Automatically assign order to nearest available driver (distance + validation based). Selection criteria: 1. Driver must be 'active' with valid location 2. Driver vehicle capacity must meet package weight/volume requirements 3. Driver must have required skills (fragile handling, cold storage, etc.) 4. Selects nearest driver by real-time route distance Args: tool_input: Dict with order_id user_id: Authenticated user ID Returns: Assignment details with selected driver info and distance """ # Authentication check - allow dev mode (only in non-production environments) if not user_id: # SECURITY: Only allow SKIP_AUTH in development environments env = os.getenv("ENV", "production").lower() skip_auth = os.getenv("SKIP_AUTH", "false").lower() == "true" if skip_auth and env != "production": user_id = "dev-user" else: return { "success": False, "error": "Authentication required. Please login first.", "auth_required": True } order_id = (tool_input.get("order_id") or "").strip() if not order_id: return { "success": False, "error": "Missing required field: order_id" } try: conn = get_db_connection() cursor = conn.cursor(cursor_factory=RealDictCursor) # Step 1: Get order details with ALL requirements (filtered by user_id) cursor.execute(""" SELECT order_id, customer_name, delivery_address, delivery_lat, delivery_lng, status, weight_kg, volume_m3, is_fragile, requires_cold_storage, requires_signature, priority, assigned_driver_id FROM orders WHERE order_id = %s AND user_id = %s """, (order_id, user_id)) order = cursor.fetchone() if not order: cursor.close() conn.close() return { "success": False, "error": f"Order not found: {order_id}" } if order['status'] != 'pending': cursor.close() conn.close() return { "success": False, "error": f"Order must be 'pending' to auto-assign. Current status: {order['status']}" } if not order['delivery_lat'] or not order['delivery_lng']: cursor.close() conn.close() return { "success": False, "error": "Order missing delivery coordinates. Cannot calculate routes." } # Extract order requirements required_weight_kg = order['weight_kg'] or 0 required_volume_m3 = order['volume_m3'] or 0 needs_fragile_handling = order['is_fragile'] or False needs_cold_storage = order['requires_cold_storage'] or False # Step 2: Get all active drivers with valid locations (filtered by user_id) cursor.execute(""" SELECT driver_id, name, phone, current_lat, current_lng, vehicle_type, capacity_kg, capacity_m3, skills FROM drivers WHERE user_id = %s AND status = 'active' AND current_lat IS NOT NULL AND current_lng IS NOT NULL """, (user_id,)) active_drivers = cursor.fetchall() if not active_drivers: cursor.close() conn.close() return { "success": False, "error": "No active drivers available with valid location" } # Step 3: Filter and score each driver suitable_drivers = [] for driver in active_drivers: # Validate capacity (weight and volume) driver_capacity_kg = driver['capacity_kg'] or 0 driver_capacity_m3 = driver['capacity_m3'] or 0 if driver_capacity_kg < required_weight_kg: logger.info(f"Driver {driver['driver_id']} ({driver['name']}) - Insufficient weight capacity: {driver_capacity_kg}kg < {required_weight_kg}kg") continue if driver_capacity_m3 < required_volume_m3: logger.info(f"Driver {driver['driver_id']} ({driver['name']}) - Insufficient volume capacity: {driver_capacity_m3}m³ < {required_volume_m3}m³") continue # Validate skills driver_skills = driver['skills'] or [] if needs_fragile_handling and "fragile_handler" not in driver_skills: logger.info(f"Driver {driver['driver_id']} ({driver['name']}) - Missing fragile_handler skill") continue if needs_cold_storage and "refrigerated" not in driver_skills: logger.info(f"Driver {driver['driver_id']} ({driver['name']}) - Missing refrigerated skill") continue # Step 4: Calculate real-time route distance route_result = handle_calculate_route({ "origin": f"{driver['current_lat']},{driver['current_lng']}", "destination": f"{order['delivery_lat']},{order['delivery_lng']}", "vehicle_type": driver['vehicle_type'], "include_steps": False # We don't need turn-by-turn for scoring }) if not route_result.get("success"): logger.warning(f"Driver {driver['driver_id']} ({driver['name']}) - Route calculation failed: {route_result.get('error')}") continue # Extract distance distance_meters = route_result.get('distance_meters', 999999) duration_seconds = route_result.get('duration_in_traffic_seconds', 0) suitable_drivers.append({ "driver": driver, "distance_meters": distance_meters, "distance_km": distance_meters / 1000, "duration_seconds": duration_seconds, "duration_minutes": duration_seconds / 60, "route_data": route_result }) if not suitable_drivers: cursor.close() conn.close() return { "success": False, "error": "No suitable drivers found. All active drivers failed capacity or skill requirements." } # Step 5: Sort by distance (nearest first) suitable_drivers.sort(key=lambda x: x['distance_meters']) # Step 6: Select nearest driver best_match = suitable_drivers[0] selected_driver = best_match['driver'] logger.info(f"Auto-assign: Selected driver {selected_driver['driver_id']} ({selected_driver['name']}) - {best_match['distance_km']:.2f}km away") cursor.close() conn.close() # Step 7: Create assignment using existing function assignment_result = handle_create_assignment({ "order_id": order_id, "driver_id": selected_driver['driver_id'] }, user_id=user_id) if not assignment_result.get("success"): return assignment_result # Step 8: Return enhanced response with selection info return { "success": True, "assignment_id": assignment_result['assignment_id'], "method": "auto_assignment", "order_id": order_id, "driver_id": selected_driver['driver_id'], "driver_name": selected_driver['name'], "driver_phone": selected_driver['phone'], "driver_vehicle_type": selected_driver['vehicle_type'], "selection_reason": "Nearest available driver meeting all requirements", "distance_km": round(best_match['distance_km'], 2), "distance_meters": best_match['distance_meters'], "estimated_duration_minutes": round(best_match['duration_minutes'], 1), "candidates_evaluated": len(active_drivers), "suitable_candidates": len(suitable_drivers), "route_summary": assignment_result.get('route_summary'), "estimated_arrival": assignment_result.get('estimated_arrival'), "assignment_details": assignment_result } except Exception as e: logger.error(f"Failed to auto-assign order: {e}") return { "success": False, "error": f"Failed to auto-assign order: {str(e)}" } def handle_intelligent_assign_order(tool_input: dict, user_id: str = None) -> dict: """ Intelligently assign order using Gemini AI to analyze all parameters. Uses Google's Gemini AI to evaluate: - Order characteristics (priority, weight, fragility, time constraints) - All available drivers (location, capacity, skills, vehicle type) - Real-time routing data (distance, traffic, weather) - Complex tradeoffs and optimal matching Returns assignment with AI reasoning explaining the selection. Args: tool_input: Dict with order_id user_id: Authenticated user ID Returns: Assignment details with AI reasoning and selected driver info """ # Authentication check - allow dev mode (only in non-production environments) if not user_id: # SECURITY: Only allow SKIP_AUTH in development environments env = os.getenv("ENV", "production").lower() skip_auth = os.getenv("SKIP_AUTH", "false").lower() == "true" if skip_auth and env != "production": user_id = "dev-user" else: return { "success": False, "error": "Authentication required. Please login first.", "auth_required": True } import os import json import google.generativeai as genai from datetime import datetime order_id = (tool_input.get("order_id") or "").strip() if not order_id: return { "success": False, "error": "Missing required field: order_id" } # Check for Gemini API key gemini_api_key = os.getenv("GOOGLE_API_KEY") if not gemini_api_key: return { "success": False, "error": "GOOGLE_API_KEY environment variable not set. Required for intelligent assignment." } try: conn = get_db_connection() cursor = conn.cursor(cursor_factory=RealDictCursor) # Step 1: Get complete order details (filtered by user_id) cursor.execute(""" SELECT order_id, customer_name, customer_phone, customer_email, delivery_address, delivery_lat, delivery_lng, pickup_address, pickup_lat, pickup_lng, time_window_start, time_window_end, expected_delivery_time, priority, weight_kg, volume_m3, order_value, is_fragile, requires_cold_storage, requires_signature, payment_status, special_instructions, status, created_at, sla_grace_period_minutes FROM orders WHERE order_id = %s AND user_id = %s """, (order_id, user_id)) order = cursor.fetchone() if not order: cursor.close() conn.close() return { "success": False, "error": f"Order not found: {order_id}" } if order['status'] != 'pending': cursor.close() conn.close() return { "success": False, "error": f"Order must be 'pending' to assign. Current status: {order['status']}" } if not order['delivery_lat'] or not order['delivery_lng']: cursor.close() conn.close() return { "success": False, "error": "Order missing delivery coordinates. Cannot calculate routes." } # Step 2: Get all active drivers with complete details (filtered by user_id) cursor.execute(""" SELECT driver_id, name, phone, email, current_lat, current_lng, last_location_update, vehicle_type, vehicle_plate, capacity_kg, capacity_m3, skills, status, created_at, updated_at FROM drivers WHERE user_id = %s AND status = 'active' AND current_lat IS NOT NULL AND current_lng IS NOT NULL """, (user_id,)) active_drivers = cursor.fetchall() if not active_drivers: cursor.close() conn.close() return { "success": False, "error": "No active drivers available with valid location" } # Step 3: Calculate routing data for each driver drivers_with_routes = [] for driver in active_drivers: # Calculate route with traffic route_result = handle_calculate_route({ "origin": f"{driver['current_lat']},{driver['current_lng']}", "destination": f"{order['delivery_lat']},{order['delivery_lng']}", "vehicle_type": driver['vehicle_type'], "include_steps": False }) # Get weather-aware routing if available try: intelligent_route = handle_calculate_intelligent_route({ "origin": f"{driver['current_lat']},{driver['current_lng']}", "destination": f"{order['delivery_lat']},{order['delivery_lng']}", "vehicle_type": driver['vehicle_type'] }) weather_data = intelligent_route.get('weather', {}) except: weather_data = {} if route_result.get("success"): drivers_with_routes.append({ "driver_id": driver['driver_id'], "name": driver['name'], "phone": driver['phone'], "vehicle_type": driver['vehicle_type'], "vehicle_plate": driver['vehicle_plate'], "capacity_kg": float(driver['capacity_kg']) if driver['capacity_kg'] else 0, "capacity_m3": float(driver['capacity_m3']) if driver['capacity_m3'] else 0, "skills": driver['skills'] or [], "current_location": { "lat": float(driver['current_lat']), "lng": float(driver['current_lng']) }, "route_to_delivery": { "distance_km": round(route_result.get('distance_meters', 0) / 1000, 2), "distance_meters": route_result.get('distance_meters', 0), "duration_minutes": round(route_result.get('duration_in_traffic_seconds', 0) / 60, 1), "traffic_delay_seconds": route_result.get('traffic_delay_seconds', 0), "route_summary": route_result.get('route_summary', ''), "has_tolls": route_result.get('has_tolls', False) }, "weather_conditions": weather_data }) if not drivers_with_routes: cursor.close() conn.close() return { "success": False, "error": "Unable to calculate routes for any active drivers" } cursor.close() conn.close() # Step 4: Build comprehensive context for Gemini order_context = { "order_id": order['order_id'], "customer": { "name": order['customer_name'], "phone": order['customer_phone'] }, "delivery": { "address": order['delivery_address'], "coordinates": {"lat": float(order['delivery_lat']), "lng": float(order['delivery_lng'])} }, "time_constraints": { "expected_delivery_time": str(order['expected_delivery_time']) if order['expected_delivery_time'] else None, "time_window_start": str(order['time_window_start']) if order['time_window_start'] else None, "time_window_end": str(order['time_window_end']) if order['time_window_end'] else None, "sla_grace_period_minutes": order['sla_grace_period_minutes'], "created_at": str(order['created_at']) }, "package": { "weight_kg": float(order['weight_kg']) if order['weight_kg'] else 0, "volume_m3": float(order['volume_m3']) if order['volume_m3'] else 0, "value": float(order['order_value']) if order['order_value'] else 0, "is_fragile": order['is_fragile'] or False, "requires_cold_storage": order['requires_cold_storage'] or False, "requires_signature": order['requires_signature'] or False }, "priority": order['priority'], "payment_status": order['payment_status'], "special_instructions": order['special_instructions'] } # Step 5: Call Gemini AI for intelligent decision genai.configure(api_key=gemini_api_key) model = genai.GenerativeModel('gemini-2.0-flash-exp') prompt = f"""You are an intelligent fleet management AI. Analyze the following delivery order and available drivers to select the BEST driver for this assignment. **ORDER DETAILS:** {json.dumps(order_context, indent=2)} **AVAILABLE DRIVERS ({len(drivers_with_routes)}):** {json.dumps(drivers_with_routes, indent=2)} **CURRENT TIME:** {datetime.now().isoformat()} **YOUR TASK:** Analyze ALL parameters comprehensively: 1. **Distance & Route Efficiency**: Consider route distance, traffic delays, tolls 2. **Vehicle Matching**: Match vehicle type and capacity to package requirements 3. **Skills Requirements**: Ensure driver has necessary skills (fragile handling, cold storage) 4. **Time Constraints**: Evaluate ability to meet expected delivery time 5. **Priority Level**: Factor in order priority (urgent > express > standard) 6. **Weather Conditions**: Consider weather impact on delivery safety and speed 7. **Special Requirements**: Account for signature requirements, special instructions 8. **Cost Efficiency**: Consider fuel costs, toll roads, driver utilization **RESPONSE FORMAT (JSON only, no markdown):** {{ "selected_driver_id": "DRV-XXXXXXXXX", "confidence_score": 0.95, "reasoning": {{ "primary_factors": ["Nearest driver (5.2km)", "Has fragile_handler skill", "Sufficient capacity"], "trade_offs_considered": ["Driver A was 1km closer but lacked required skills", "Driver B had larger capacity but 15min further"], "risk_assessment": "Low risk - clear weather, light traffic, experienced driver", "decision_summary": "Selected Driver X because they offer the best balance of proximity (5.2km), required skills (fragile_handler), and adequate capacity (10kg) for this urgent fragile delivery." }}, "alternatives": [ {{"driver_id": "DRV-YYY", "reason_not_selected": "Missing fragile_handler skill"}}, {{"driver_id": "DRV-ZZZ", "reason_not_selected": "15 minutes further away"}} ] }} **IMPORTANT:** Return ONLY valid JSON. Do not include markdown formatting, code blocks, or explanatory text outside the JSON.""" # Call Gemini with timeout protection (30 seconds max) try: future = _blocking_executor.submit(model.generate_content, prompt) response = future.result(timeout=30) except FuturesTimeoutError: logger.error("Gemini AI call timed out after 30 seconds") return { "success": False, "error": "AI assignment timed out. Please try auto_assign_order instead." } response_text = response.text.strip() # Clean response (remove markdown code blocks if present) if response_text.startswith("```json"): response_text = response_text[7:] if response_text.startswith("```"): response_text = response_text[3:] if response_text.endswith("```"): response_text = response_text[:-3] response_text = response_text.strip() # Parse Gemini response try: ai_decision = json.loads(response_text) except json.JSONDecodeError as e: logger.error(f"Failed to parse Gemini response: {e}") logger.error(f"Response text: {response_text}") return { "success": False, "error": f"Failed to parse AI response. Invalid JSON returned by Gemini: {str(e)}" } selected_driver_id = ai_decision.get("selected_driver_id") if not selected_driver_id: return { "success": False, "error": "AI did not select a driver" } # Validate selected driver is still available selected_driver = next((d for d in drivers_with_routes if d["driver_id"] == selected_driver_id), None) if not selected_driver: return { "success": False, "error": f"AI selected driver {selected_driver_id} but driver not found in available list" } # Step 6: Create assignment using existing function logger.info(f"Intelligent-assign: AI selected driver {selected_driver_id} ({selected_driver['name']})") assignment_result = handle_create_assignment({ "order_id": order_id, "driver_id": selected_driver_id }, user_id=user_id) if not assignment_result.get("success"): return assignment_result # Step 7: Return enhanced response with AI reasoning return { "success": True, "assignment_id": assignment_result['assignment_id'], "method": "intelligent_assignment", "ai_provider": "Google Gemini 2.0 Flash", "ai_model": "gemini-2.0-flash-exp", "order_id": order_id, "driver_id": selected_driver_id, "driver_name": selected_driver['name'], "driver_phone": selected_driver['phone'], "driver_vehicle_type": selected_driver['vehicle_type'], "distance_km": selected_driver['route_to_delivery']['distance_km'], "estimated_duration_minutes": selected_driver['route_to_delivery']['duration_minutes'], "ai_reasoning": ai_decision.get('reasoning', {}), "confidence_score": ai_decision.get('confidence_score', 0), "alternatives_considered": ai_decision.get('alternatives', []), "candidates_evaluated": len(drivers_with_routes), "route_summary": assignment_result.get('route_summary'), "estimated_arrival": assignment_result.get('estimated_arrival'), "assignment_details": assignment_result } except Exception as e: logger.error(f"Failed to intelligently assign order: {e}") return { "success": False, "error": f"Failed to intelligently assign order: {str(e)}" } def handle_get_assignment_details(tool_input: dict, user_id: str = None) -> dict: """ Get assignment details Can query by assignment_id, order_id, or driver_id. Returns assignment with route data and related order/driver info. Args: tool_input: Dict with assignment_id, order_id, or driver_id user_id: Authenticated user ID Returns: Assignment details or list of assignments """ # Authentication check - allow dev mode (only in non-production environments) if not user_id: # SECURITY: Only allow SKIP_AUTH in development environments env = os.getenv("ENV", "production").lower() skip_auth = os.getenv("SKIP_AUTH", "false").lower() == "true" if skip_auth and env != "production": user_id = "dev-user" else: return { "success": False, "error": "Authentication required. Please login first.", "auth_required": True } assignment_id = (tool_input.get("assignment_id") or "").strip() order_id = (tool_input.get("order_id") or "").strip() driver_id = (tool_input.get("driver_id") or "").strip() if not assignment_id and not order_id and not driver_id: return { "success": False, "error": "Provide at least one of: assignment_id, order_id, or driver_id" } try: conn = get_db_connection() cursor = conn.cursor() # Build query based on provided parameters (filtered by user_id) query = """ SELECT a.assignment_id, a.order_id, a.driver_id, a.status, a.assigned_at, a.updated_at, a.estimated_arrival, a.actual_arrival, a.route_distance_meters, a.route_duration_seconds, a.route_duration_in_traffic_seconds, a.route_summary, a.route_confidence, a.traffic_delay_seconds, a.route_directions, a.driver_start_location_lat, a.driver_start_location_lng, a.delivery_location_lat, a.delivery_location_lng, a.delivery_address, a.vehicle_type, a.sequence_number, a.notes, a.failure_reason, o.customer_name, o.status as order_status, d.name as driver_name, d.status as driver_status, d.phone as driver_phone FROM assignments a LEFT JOIN orders o ON a.order_id = o.order_id LEFT JOIN drivers d ON a.driver_id = d.driver_id WHERE a.user_id = %s """ params = [user_id] if assignment_id: query += " AND a.assignment_id = %s" params.append(assignment_id) if order_id: query += " AND a.order_id = %s" params.append(order_id) if driver_id: query += " AND a.driver_id = %s" params.append(driver_id) query += " ORDER BY a.assigned_at DESC" cursor.execute(query, params) rows = cursor.fetchall() cursor.close() conn.close() if not rows: return { "success": False, "error": "No assignments found matching criteria" } # Format results assignments = [] for row in rows: assignment = { "assignment_id": row['assignment_id'], "order_id": row['order_id'], "driver_id": row['driver_id'], "status": row['status'], "assigned_at": row['assigned_at'].isoformat() if row['assigned_at'] else None, "updated_at": row['updated_at'].isoformat() if row['updated_at'] else None, "estimated_arrival": row['estimated_arrival'].isoformat() if row['estimated_arrival'] else None, "actual_arrival": row['actual_arrival'].isoformat() if row['actual_arrival'] else None, "route": { "distance_meters": row['route_distance_meters'], "distance_km": round(row['route_distance_meters'] / 1000, 2) if row['route_distance_meters'] else 0, "duration_seconds": row['route_duration_seconds'], "duration_minutes": round(row['route_duration_seconds'] / 60, 1) if row['route_duration_seconds'] else 0, "duration_in_traffic_seconds": row['route_duration_in_traffic_seconds'], "duration_in_traffic_minutes": round(row['route_duration_in_traffic_seconds'] / 60, 1) if row['route_duration_in_traffic_seconds'] else 0, "summary": row['route_summary'], "confidence": row['route_confidence'], "traffic_delay_seconds": row['traffic_delay_seconds'], "traffic_delay_minutes": round(row['traffic_delay_seconds'] / 60, 1) if row['traffic_delay_seconds'] else 0, "directions": row['route_directions'] # Turn-by-turn navigation steps }, "driver_start_location": { "lat": float(row['driver_start_location_lat']) if row['driver_start_location_lat'] else None, "lng": float(row['driver_start_location_lng']) if row['driver_start_location_lng'] else None }, "delivery_location": { "lat": float(row['delivery_location_lat']) if row['delivery_location_lat'] else None, "lng": float(row['delivery_location_lng']) if row['delivery_location_lng'] else None, "address": row['delivery_address'] }, "vehicle_type": row['vehicle_type'], "sequence_number": row['sequence_number'], "notes": row['notes'], "failure_reason": row['failure_reason'], "order": { "customer_name": row['customer_name'], "status": row['order_status'] }, "driver": { "name": row['driver_name'], "status": row['driver_status'], "phone": row['driver_phone'] } } assignments.append(assignment) if assignment_id and len(assignments) == 1: # Single assignment query return { "success": True, "assignment": assignments[0] } else: # Multiple assignments return { "success": True, "count": len(assignments), "assignments": assignments } except Exception as e: logger.error(f"Failed to get assignment details: {e}") return { "success": False, "error": f"Failed to get assignment details: {str(e)}" } def handle_update_assignment(tool_input: dict, user_id: str = None) -> dict: """ Update assignment status Allows updating assignment status and actual metrics. Manages cascading updates to order and driver statuses. Args: tool_input: Dict with assignment_id, status (optional), actual_arrival (optional), notes (optional) user_id: Authenticated user ID Returns: Update result """ # Authentication check - allow dev mode (only in non-production environments) if not user_id: # SECURITY: Only allow SKIP_AUTH in development environments env = os.getenv("ENV", "production").lower() skip_auth = os.getenv("SKIP_AUTH", "false").lower() == "true" if skip_auth and env != "production": user_id = "dev-user" else: return { "success": False, "error": "Authentication required. Please login first.", "auth_required": True } from datetime import datetime assignment_id = (tool_input.get("assignment_id") or "").strip() new_status = (tool_input.get("status") or "").strip().lower() actual_arrival = tool_input.get("actual_arrival") notes = (tool_input.get("notes") or "").strip() if not assignment_id: return { "success": False, "error": "assignment_id is required" } if not new_status and not actual_arrival and not notes: return { "success": False, "error": "Provide at least one field to update: status, actual_arrival, or notes" } # Validate status if provided valid_statuses = ["active", "in_progress", "completed", "failed", "cancelled"] if new_status and new_status not in valid_statuses: return { "success": False, "error": f"Invalid status. Must be one of: {', '.join(valid_statuses)}" } logger.info(f"Updating assignment: {assignment_id}, status={new_status}") try: conn = get_db_connection() cursor = conn.cursor() # Get current assignment details (filtered by user_id) cursor.execute(""" SELECT status, order_id, driver_id FROM assignments WHERE assignment_id = %s AND user_id = %s """, (assignment_id, user_id)) assignment_row = cursor.fetchone() if not assignment_row: cursor.close() conn.close() return { "success": False, "error": f"Assignment not found: {assignment_id}" } current_status = assignment_row['status'] order_id = assignment_row['order_id'] driver_id = assignment_row['driver_id'] # Validate status transitions if new_status: # Cannot go backwards if current_status == "completed" and new_status in ["active", "in_progress"]: cursor.close() conn.close() return { "success": False, "error": "Cannot change status from 'completed' back to 'active' or 'in_progress'" } if current_status == "failed" and new_status != "failed": cursor.close() conn.close() return { "success": False, "error": "Cannot change status from 'failed'" } if current_status == "cancelled" and new_status != "cancelled": cursor.close() conn.close() return { "success": False, "error": "Cannot change status from 'cancelled'" } # Build update query updates = [] params = [] if new_status: updates.append("status = %s") params.append(new_status) if actual_arrival: updates.append("actual_arrival = %s") params.append(actual_arrival) if notes: updates.append("notes = %s") params.append(notes) params.append(assignment_id) # Update assignment cursor.execute(f""" UPDATE assignments SET {', '.join(updates)} WHERE assignment_id = %s """, params) # Handle cascading updates based on new status if new_status: if new_status in ["completed", "failed", "cancelled"]: # Update order status if new_status == "completed": cursor.execute(""" UPDATE orders SET status = 'delivered' WHERE order_id = %s """, (order_id,)) elif new_status == "failed": cursor.execute(""" UPDATE orders SET status = 'failed' WHERE order_id = %s """, (order_id,)) elif new_status == "cancelled": cursor.execute(""" UPDATE orders SET status = 'cancelled', assigned_driver_id = NULL WHERE order_id = %s """, (order_id,)) # Check if driver has other active assignments cursor.execute(""" SELECT COUNT(*) as count FROM assignments WHERE driver_id = %s AND status IN ('active', 'in_progress') AND assignment_id != %s """, (driver_id, assignment_id)) other_assignments_count = cursor.fetchone()['count'] # If no other active assignments, set driver back to active if other_assignments_count == 0: cursor.execute(""" UPDATE drivers SET status = 'active' WHERE driver_id = %s """, (driver_id,)) conn.commit() cursor.close() conn.close() logger.info(f"Assignment updated successfully: {assignment_id}") return { "success": True, "assignment_id": assignment_id, "updated_fields": { "status": new_status if new_status else current_status, "actual_arrival": actual_arrival if actual_arrival else "not updated", "notes": notes if notes else "not updated" }, "message": f"Assignment {assignment_id} updated successfully" } except Exception as e: logger.error(f"Failed to update assignment: {e}") return { "success": False, "error": f"Failed to update assignment: {str(e)}" } def handle_unassign_order(tool_input: dict, user_id: str = None) -> dict: """ Unassign order (delete assignment) Removes assignment and reverts order/driver to original states. Args: tool_input: Dict with order_id or assignment_id, and confirm flag user_id: Authenticated user ID Returns: Unassignment result """ # Authentication check - allow dev mode (only in non-production environments) if not user_id: # SECURITY: Only allow SKIP_AUTH in development environments env = os.getenv("ENV", "production").lower() skip_auth = os.getenv("SKIP_AUTH", "false").lower() == "true" if skip_auth and env != "production": user_id = "dev-user" else: return { "success": False, "error": "Authentication required. Please login first.", "auth_required": True } order_id = (tool_input.get("order_id") or "").strip() assignment_id = (tool_input.get("assignment_id") or "").strip() confirm = tool_input.get("confirm", False) if not order_id and not assignment_id: return { "success": False, "error": "Provide either order_id or assignment_id" } if not confirm: return { "success": False, "error": "Unassignment requires confirm=true for safety" } logger.info(f"Unassigning: order_id={order_id}, assignment_id={assignment_id}") try: conn = get_db_connection() cursor = conn.cursor() # Find assignment (filtered by user_id) if assignment_id: cursor.execute(""" SELECT order_id, driver_id, status FROM assignments WHERE assignment_id = %s AND user_id = %s """, (assignment_id, user_id)) else: cursor.execute(""" SELECT assignment_id, driver_id, status FROM assignments WHERE order_id = %s AND user_id = %s AND status IN ('active', 'in_progress') ORDER BY assigned_at DESC LIMIT 1 """, (order_id, user_id)) assignment_row = cursor.fetchone() if not assignment_row: cursor.close() conn.close() return { "success": False, "error": "No active assignment found" } if assignment_id: found_order_id = assignment_row['order_id'] driver_id = assignment_row['driver_id'] status = assignment_row['status'] else: assignment_id = assignment_row['assignment_id'] driver_id = assignment_row['driver_id'] status = assignment_row['status'] found_order_id = order_id # Validate status (cannot unassign if in_progress without force) if status == "in_progress": cursor.close() conn.close() return { "success": False, "error": "Cannot unassign order with 'in_progress' status. Complete or fail the delivery first." } # Delete assignment cursor.execute(""" DELETE FROM assignments WHERE assignment_id = %s """, (assignment_id,)) # Revert order status to pending and clear assigned driver cursor.execute(""" UPDATE orders SET status = 'pending', assigned_driver_id = NULL WHERE order_id = %s """, (found_order_id,)) # Check if driver has other active assignments cursor.execute(""" SELECT COUNT(*) FROM assignments WHERE driver_id = %s AND status IN ('active', 'in_progress') """, (driver_id,)) other_assignments_count = cursor.fetchone()[0] # If no other active assignments, set driver back to active if other_assignments_count == 0: cursor.execute(""" UPDATE drivers SET status = 'active' WHERE driver_id = %s """, (driver_id,)) conn.commit() cursor.close() conn.close() logger.info(f"Assignment removed successfully: {assignment_id}") return { "success": True, "assignment_id": assignment_id, "order_id": found_order_id, "driver_id": driver_id, "message": f"Order {found_order_id} unassigned from driver {driver_id}. Order status reverted to 'pending'." } except Exception as e: logger.error(f"Failed to unassign order: {e}") return { "success": False, "error": f"Failed to unassign order: {str(e)}" } def handle_complete_delivery(tool_input: dict, user_id: str = None) -> dict: """ Complete a delivery and automatically update driver location Marks delivery as completed, updates order/driver statuses, and moves driver location to the delivery address. Args: tool_input: Dict with assignment_id, confirm flag, and optional fields user_id: Authenticated user ID Returns: Completion result """ # Authentication check - allow dev mode (only in non-production environments) if not user_id: # SECURITY: Only allow SKIP_AUTH in development environments env = os.getenv("ENV", "production").lower() skip_auth = os.getenv("SKIP_AUTH", "false").lower() == "true" if skip_auth and env != "production": user_id = "dev-user" else: return { "success": False, "error": "Authentication required. Please login first.", "auth_required": True } from datetime import datetime assignment_id = (tool_input.get("assignment_id") or "").strip() confirm = tool_input.get("confirm", False) actual_distance_meters = tool_input.get("actual_distance_meters") notes = (tool_input.get("notes") or "").strip() if not assignment_id: return { "success": False, "error": "assignment_id is required" } if not confirm: return { "success": False, "error": "Delivery completion requires confirm=true for safety" } logger.info(f"Completing delivery: assignment_id={assignment_id}") try: conn = get_db_connection() cursor = conn.cursor() # Get assignment and order details including timing fields (filtered by user_id) cursor.execute(""" SELECT a.status, a.order_id, a.driver_id, a.delivery_location_lat, a.delivery_location_lng, a.delivery_address, o.customer_name, o.expected_delivery_time, o.sla_grace_period_minutes, d.name as driver_name FROM assignments a JOIN orders o ON a.order_id = o.order_id JOIN drivers d ON a.driver_id = d.driver_id WHERE a.assignment_id = %s AND a.user_id = %s """, (assignment_id, user_id)) assignment_row = cursor.fetchone() if not assignment_row: cursor.close() conn.close() return { "success": False, "error": f"Assignment not found: {assignment_id}" } status = assignment_row['status'] order_id = assignment_row['order_id'] driver_id = assignment_row['driver_id'] delivery_lat = assignment_row['delivery_location_lat'] delivery_lng = assignment_row['delivery_location_lng'] delivery_address = assignment_row['delivery_address'] customer_name = assignment_row['customer_name'] driver_name = assignment_row['driver_name'] expected_delivery_time = assignment_row['expected_delivery_time'] sla_grace_period_minutes = assignment_row['sla_grace_period_minutes'] or 15 # Validate status if status not in ["active", "in_progress"]: cursor.close() conn.close() return { "success": False, "error": f"Cannot complete delivery: assignment status is '{status}'. Must be 'active' or 'in_progress'." } # Validate delivery location exists if not delivery_lat or not delivery_lng: cursor.close() conn.close() return { "success": False, "error": "Cannot complete delivery: delivery location coordinates are missing" } # Current timestamp for completion completion_time = datetime.now() # Step 1: Update assignment to completed update_fields = ["status = %s", "actual_arrival = %s", "updated_at = %s"] params = ["completed", completion_time, completion_time] if actual_distance_meters: update_fields.append("actual_distance_meters = %s") params.append(actual_distance_meters) if notes: update_fields.append("notes = %s") params.append(notes) params.append(assignment_id) cursor.execute(f""" UPDATE assignments SET {', '.join(update_fields)} WHERE assignment_id = %s """, tuple(params)) # Step 2: Update driver location to delivery address (including address text) cursor.execute(""" UPDATE drivers SET current_lat = %s, current_lng = %s, current_address = %s, last_location_update = %s, updated_at = %s WHERE driver_id = %s """, (delivery_lat, delivery_lng, delivery_address, completion_time, completion_time, driver_id)) logger.info(f"Driver {driver_id} location updated to delivery address: {delivery_address} ({delivery_lat}, {delivery_lng})") # Step 3: Calculate delivery performance status delivery_status = "on_time" # Default timing_info = { "expected_delivery_time": expected_delivery_time.isoformat() if expected_delivery_time else None, "actual_delivery_time": completion_time.isoformat(), "sla_grace_period_minutes": sla_grace_period_minutes } if expected_delivery_time: # Calculate grace period deadline from datetime import timedelta grace_deadline = expected_delivery_time + timedelta(minutes=sla_grace_period_minutes) if completion_time <= expected_delivery_time: delivery_status = "on_time" timing_info["status"] = "On-time delivery" timing_info["delay_minutes"] = 0 elif completion_time <= grace_deadline: delivery_status = "late" delay_minutes = int((completion_time - expected_delivery_time).total_seconds() / 60) timing_info["status"] = f"Late (within grace period)" timing_info["delay_minutes"] = delay_minutes else: delivery_status = "very_late" delay_minutes = int((completion_time - expected_delivery_time).total_seconds() / 60) timing_info["status"] = f"Very late (SLA violation)" timing_info["delay_minutes"] = delay_minutes # Step 4: Update order status to delivered with timing info cursor.execute(""" UPDATE orders SET status = 'delivered', delivered_at = %s, delivery_status = %s, updated_at = %s WHERE order_id = %s """, (completion_time, delivery_status, completion_time, order_id)) logger.info(f"Order {order_id} marked as delivered with status '{delivery_status}'") # Step 4: Check if driver has other active assignments cursor.execute(""" SELECT COUNT(*) as count FROM assignments WHERE driver_id = %s AND status IN ('active', 'in_progress') AND assignment_id != %s """, (driver_id, assignment_id)) other_assignments_count = cursor.fetchone()['count'] # Step 5: If no other active assignments, set driver to active cascading_actions = [] if other_assignments_count == 0: cursor.execute(""" UPDATE drivers SET status = 'active', updated_at = %s WHERE driver_id = %s """, (completion_time, driver_id)) cascading_actions.append(f"Driver {driver_name} set to 'active' (no other assignments)") else: cascading_actions.append(f"Driver {driver_name} remains 'busy' ({other_assignments_count} other active assignment(s))") conn.commit() cursor.close() conn.close() logger.info(f"Delivery completed successfully: {assignment_id}") return { "success": True, "assignment_id": assignment_id, "order_id": order_id, "driver_id": driver_id, "customer_name": customer_name, "driver_name": driver_name, "completed_at": completion_time.isoformat(), "delivery_status": delivery_status, "timing": timing_info, "delivery_location": { "lat": float(delivery_lat), "lng": float(delivery_lng), "address": delivery_address }, "driver_updated": { "new_location": f"{delivery_lat}, {delivery_lng}", "location_updated_at": completion_time.isoformat() }, "cascading_actions": cascading_actions, "message": f"Delivery completed! Order {order_id} delivered by {driver_name}. Status: {timing_info.get('status', delivery_status)}. Driver location updated to delivery address." } except Exception as e: logger.error(f"Failed to complete delivery: {e}") return { "success": False, "error": f"Failed to complete delivery: {str(e)}" } def handle_fail_delivery(tool_input: dict, user_id: str = None) -> dict: """ Mark delivery as failed with mandatory location and reason Driver must provide current GPS location and failure reason. Updates driver location to reported coordinates and sets statuses accordingly. Args: tool_input: Dict with assignment_id, current_lat, current_lng, failure_reason, confirm flag, and optional notes user_id: Authenticated user ID Returns: Failure recording result """ # Authentication check - allow dev mode (only in non-production environments) if not user_id: # SECURITY: Only allow SKIP_AUTH in development environments env = os.getenv("ENV", "production").lower() skip_auth = os.getenv("SKIP_AUTH", "false").lower() == "true" if skip_auth and env != "production": user_id = "dev-user" else: return { "success": False, "error": "Authentication required. Please login first.", "auth_required": True } from datetime import datetime assignment_id = (tool_input.get("assignment_id") or "").strip() current_address = (tool_input.get("current_address") or "").strip() current_lat = tool_input.get("current_lat") current_lng = tool_input.get("current_lng") failure_reason = (tool_input.get("failure_reason") or "").strip() confirm = tool_input.get("confirm", False) notes = (tool_input.get("notes") or "").strip() # Validation if not assignment_id: return { "success": False, "error": "assignment_id is required" } if not confirm: return { "success": False, "error": "Delivery failure requires confirm=true for safety" } if not current_address or current_lat is None or current_lng is None: return { "success": False, "error": "Driver must provide current location (current_address, current_lat, and current_lng required)" } if not failure_reason: return { "success": False, "error": "Failure reason is required. Valid reasons: customer_not_available, wrong_address, refused_delivery, damaged_goods, payment_issue, vehicle_breakdown, access_restricted, weather_conditions, other" } # Validate failure_reason is one of the allowed values valid_reasons = [ "customer_not_available", "wrong_address", "refused_delivery", "damaged_goods", "payment_issue", "vehicle_breakdown", "access_restricted", "weather_conditions", "other" ] if failure_reason not in valid_reasons: return { "success": False, "error": f"Invalid failure_reason '{failure_reason}'. Must be one of: {', '.join(valid_reasons)}" } # Validate coordinates are valid try: current_lat = float(current_lat) current_lng = float(current_lng) if not (-90 <= current_lat <= 90) or not (-180 <= current_lng <= 180): return { "success": False, "error": "Invalid GPS coordinates. Latitude must be -90 to 90, longitude must be -180 to 180" } except (ValueError, TypeError): return { "success": False, "error": "current_lat and current_lng must be valid numbers" } logger.info(f"Failing delivery: assignment_id={assignment_id}, reason={failure_reason}") try: conn = get_db_connection() cursor = conn.cursor() # Get assignment and order details including timing fields (filtered by user_id) cursor.execute(""" SELECT a.status, a.order_id, a.driver_id, a.delivery_address, o.customer_name, o.expected_delivery_time, o.sla_grace_period_minutes, d.name as driver_name FROM assignments a JOIN orders o ON a.order_id = o.order_id JOIN drivers d ON a.driver_id = d.driver_id WHERE a.assignment_id = %s AND a.user_id = %s """, (assignment_id, user_id)) assignment_row = cursor.fetchone() if not assignment_row: cursor.close() conn.close() return { "success": False, "error": f"Assignment not found: {assignment_id}" } status = assignment_row['status'] order_id = assignment_row['order_id'] driver_id = assignment_row['driver_id'] delivery_address = assignment_row['delivery_address'] customer_name = assignment_row['customer_name'] driver_name = assignment_row['driver_name'] expected_delivery_time = assignment_row['expected_delivery_time'] sla_grace_period_minutes = assignment_row['sla_grace_period_minutes'] or 15 # Validate status if status not in ["active", "in_progress"]: cursor.close() conn.close() return { "success": False, "error": f"Cannot fail delivery: assignment status is '{status}'. Must be 'active' or 'in_progress'." } # Current timestamp for failure failure_time = datetime.now() # Step 1: Update assignment to failed update_fields = [ "status = %s", "failure_reason = %s", "actual_arrival = %s", "updated_at = %s" ] params = ["failed", failure_reason, failure_time, failure_time] if notes: update_fields.append("notes = %s") params.append(notes) params.append(assignment_id) cursor.execute(f""" UPDATE assignments SET {', '.join(update_fields)} WHERE assignment_id = %s """, tuple(params)) # Step 2: Update driver location to reported current location (address provided by user) cursor.execute(""" UPDATE drivers SET current_lat = %s, current_lng = %s, current_address = %s, last_location_update = %s, updated_at = %s WHERE driver_id = %s """, (current_lat, current_lng, current_address, failure_time, failure_time, driver_id)) logger.info(f"Driver {driver_id} location updated to reported position: {current_address} ({current_lat}, {current_lng})") # Step 3: Calculate delivery performance status for failure delivery_status = "failed_on_time" # Default - failed but before deadline timing_info = { "expected_delivery_time": expected_delivery_time.isoformat() if expected_delivery_time else None, "failure_time": failure_time.isoformat(), "sla_grace_period_minutes": sla_grace_period_minutes } if expected_delivery_time: if failure_time <= expected_delivery_time: delivery_status = "failed_on_time" timing_info["status"] = "Failed before deadline (attempted delivery on time)" else: delivery_status = "failed_late" delay_minutes = int((failure_time - expected_delivery_time).total_seconds() / 60) timing_info["status"] = f"Failed after deadline (late attempt)" timing_info["delay_minutes"] = delay_minutes # Step 4: Update order status to failed with timing info cursor.execute(""" UPDATE orders SET status = 'failed', delivered_at = %s, delivery_status = %s, updated_at = %s WHERE order_id = %s """, (failure_time, delivery_status, failure_time, order_id)) logger.info(f"Order {order_id} marked as failed with status '{delivery_status}'") # Step 4: Check if driver has other active assignments cursor.execute(""" SELECT COUNT(*) as count FROM assignments WHERE driver_id = %s AND status IN ('active', 'in_progress') AND assignment_id != %s """, (driver_id, assignment_id)) other_assignments_count = cursor.fetchone()['count'] # Step 5: If no other active assignments, set driver to active cascading_actions = [] if other_assignments_count == 0: cursor.execute(""" UPDATE drivers SET status = 'active', updated_at = %s WHERE driver_id = %s """, (failure_time, driver_id)) cascading_actions.append(f"Driver {driver_name} set to 'active' (no other assignments)") else: cascading_actions.append(f"Driver {driver_name} remains 'busy' ({other_assignments_count} other active assignment(s))") conn.commit() cursor.close() conn.close() logger.info(f"Delivery marked as failed: {assignment_id}") # Format failure reason for display reason_display = failure_reason.replace("_", " ").title() return { "success": True, "assignment_id": assignment_id, "order_id": order_id, "driver_id": driver_id, "customer_name": customer_name, "driver_name": driver_name, "failed_at": failure_time.isoformat(), "failure_reason": failure_reason, "failure_reason_display": reason_display, "delivery_status": delivery_status, "timing": timing_info, "delivery_address": delivery_address, "driver_location": { "lat": current_lat, "lng": current_lng, "address": current_address, "updated_at": failure_time.isoformat() }, "cascading_actions": cascading_actions, "message": f"Delivery failed for order {order_id}. Reason: {reason_display}. Timing: {timing_info.get('status', delivery_status)}. Driver {driver_name} location updated to {current_address or f'({current_lat}, {current_lng})'}." } except Exception as e: logger.error(f"Failed to record delivery failure: {e}") return { "success": False, "error": f"Failed to record delivery failure: {str(e)}" } def get_tools_list() -> list: """Get list of available tools""" return [tool["name"] for tool in TOOLS_SCHEMA] def get_tool_description(tool_name: str) -> str: """Get description for a specific tool""" for tool in TOOLS_SCHEMA: if tool["name"] == tool_name: return tool["description"] return ""