""" FleetMind MCP - Gradio Web Interface Enhanced 3-tab dashboard: Chat, Orders, Drivers """ import sys from pathlib import Path # Add parent directory to path sys.path.insert(0, str(Path(__file__).parent.parent)) import gradio as gr from database.connection import execute_query, execute_write, test_connection from datetime import datetime, timedelta import json # Import chat functionality from chat.chat_engine import ChatEngine from chat.conversation import ConversationManager from chat.geocoding import GeocodingService import uuid # Global session storage SESSIONS = {} # Initialize chat engine and geocoding service chat_engine = ChatEngine() geocoding_service = GeocodingService() # ============================================ # STATISTICS FUNCTIONS # ============================================ def get_orders_stats(): """Get order statistics by status""" try: query = """ SELECT COUNT(*) as total, COUNT(CASE WHEN status = 'pending' THEN 1 END) as pending, COUNT(CASE WHEN status = 'assigned' THEN 1 END) as assigned, COUNT(CASE WHEN status = 'in_transit' THEN 1 END) as in_transit, COUNT(CASE WHEN status = 'delivered' THEN 1 END) as delivered, COUNT(CASE WHEN status = 'failed' THEN 1 END) as failed, COUNT(CASE WHEN status = 'cancelled' THEN 1 END) as cancelled FROM orders """ result = execute_query(query) if result: return result[0] return {"total": 0, "pending": 0, "assigned": 0, "in_transit": 0, "delivered": 0, "failed": 0, "cancelled": 0} except Exception as e: print(f"Error getting order stats: {e}") return {"total": 0, "pending": 0, "assigned": 0, "in_transit": 0, "delivered": 0, "failed": 0, "cancelled": 0} def get_drivers_stats(): """Get driver statistics by status""" try: query = """ SELECT COUNT(*) as total, COUNT(CASE WHEN status = 'active' THEN 1 END) as active, COUNT(CASE WHEN status = 'busy' THEN 1 END) as busy, COUNT(CASE WHEN status = 'offline' THEN 1 END) as offline, COUNT(CASE WHEN status = 'unavailable' THEN 1 END) as unavailable FROM drivers """ result = execute_query(query) if result: return result[0] return {"total": 0, "active": 0, "busy": 0, "offline": 0, "unavailable": 0} except Exception as e: print(f"Error getting driver stats: {e}") return {"total": 0, "active": 0, "busy": 0, "offline": 0, "unavailable": 0} # ============================================ # ORDERS FUNCTIONS # ============================================ def get_all_orders(status_filter="all", priority_filter="all", payment_filter="all", search_term=""): """Get orders with filters""" try: where_clauses = [] params = [] if status_filter and status_filter != "all": where_clauses.append("status = %s") params.append(status_filter) if priority_filter and priority_filter != "all": where_clauses.append("priority = %s") params.append(priority_filter) if payment_filter and payment_filter != "all": where_clauses.append("payment_status = %s") params.append(payment_filter) if search_term: where_clauses.append("(order_id ILIKE %s OR customer_name ILIKE %s OR customer_phone ILIKE %s)") search_pattern = f"%{search_term}%" params.extend([search_pattern, search_pattern, search_pattern]) where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else "" query = f""" SELECT order_id, customer_name, delivery_address, status, priority, assigned_driver_id, time_window_end, created_at FROM orders {where_sql} ORDER BY created_at DESC LIMIT 100 """ results = execute_query(query, tuple(params) if params else ()) if not results: return [["-", "-", "-", "-", "-", "-", "-"]] # Format data for table data = [] for row in results: # Truncate address if too long address = row['delivery_address'] if len(address) > 40: address = address[:37] + "..." # Format deadline deadline = row['time_window_end'].strftime("%Y-%m-%d %H:%M") if row['time_window_end'] else "No deadline" # Driver ID or "Unassigned" driver = row['assigned_driver_id'] if row['assigned_driver_id'] else "Unassigned" data.append([ row['order_id'], row['customer_name'], address, row['status'], row['priority'], driver, deadline ]) return data except Exception as e: print(f"Error fetching orders: {e}") return [[f"Error: {str(e)}", "", "", "", "", "", ""]] def get_order_details(order_id): """Get complete order details""" if not order_id or order_id == "-": return "Select an order from the table to view details" try: query = """ SELECT * FROM orders WHERE order_id = %s """ results = execute_query(query, (order_id,)) if not results: return f"Order {order_id} not found" order = results[0] # Format the details nicely details = f""" # Order Details: {order_id} ## Customer Information - **Name:** {order['customer_name']} - **Phone:** {order['customer_phone'] or 'N/A'} - **Email:** {order['customer_email'] or 'N/A'} ## Delivery Information - **Address:** {order['delivery_address']} - **Coordinates:** ({order['delivery_lat']}, {order['delivery_lng']}) - **Time Window:** {order['time_window_start']} to {order['time_window_end']} ## Package Details - **Weight:** {order['weight_kg'] or 'N/A'} kg - **Volume:** {order['volume_m3'] or 'N/A'} m³ - **Is Fragile:** {"Yes" if order['is_fragile'] else "No"} - **Requires Signature:** {"Yes" if order['requires_signature'] else "No"} - **Requires Cold Storage:** {"Yes" if order['requires_cold_storage'] else "No"} ## Order Status - **Status:** {order['status']} - **Priority:** {order['priority']} - **Assigned Driver:** {order['assigned_driver_id'] or 'Unassigned'} ## Payment - **Order Value:** ${order['order_value'] or '0.00'} - **Payment Status:** {order['payment_status']} ## Special Instructions {order['special_instructions'] or 'None'} ## Timestamps - **Created:** {order['created_at']} - **Updated:** {order['updated_at']} - **Delivered:** {order['delivered_at'] or 'Not delivered yet'} """ return details except Exception as e: return f"Error fetching order details: {str(e)}" # ============================================ # DRIVERS FUNCTIONS # ============================================ def get_all_drivers(status_filter="all", vehicle_filter="all", search_term=""): """Get drivers with filters""" try: where_clauses = [] params = [] if status_filter and status_filter != "all": where_clauses.append("status = %s") params.append(status_filter) if vehicle_filter and vehicle_filter != "all": where_clauses.append("vehicle_type = %s") params.append(vehicle_filter) if search_term: where_clauses.append("(driver_id ILIKE %s OR name ILIKE %s OR phone ILIKE %s OR vehicle_plate ILIKE %s)") search_pattern = f"%{search_term}%" params.extend([search_pattern, search_pattern, search_pattern, search_pattern]) where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else "" query = f""" SELECT driver_id, name, phone, status, vehicle_type, vehicle_plate, current_lat, current_lng, last_location_update FROM drivers {where_sql} ORDER BY name ASC LIMIT 100 """ results = execute_query(query, tuple(params) if params else ()) if not results: return [["-", "-", "-", "-", "-", "-", "-"]] # Format data for table data = [] for row in results: # Format location if row['current_lat'] and row['current_lng']: location = f"{row['current_lat']:.4f}, {row['current_lng']:.4f}" else: location = "No location" # Format last update last_update = row['last_location_update'].strftime("%Y-%m-%d %H:%M") if row['last_location_update'] else "Never" data.append([ row['driver_id'], row['name'], row['phone'] or "N/A", row['status'], f"{row['vehicle_type']} - {row['vehicle_plate'] or 'N/A'}", location, last_update ]) return data except Exception as e: print(f"Error fetching drivers: {e}") return [[f"Error: {str(e)}", "", "", "", "", "", ""]] def get_driver_details(driver_id): """Get complete driver details""" if not driver_id or driver_id == "-": return "Select a driver from the table to view details" try: query = """ SELECT * FROM drivers WHERE driver_id = %s """ results = execute_query(query, (driver_id,)) if not results: return f"Driver {driver_id} not found" driver = results[0] # Parse skills (handle both list and JSON string) if driver['skills']: if isinstance(driver['skills'], list): skills = driver['skills'] else: skills = json.loads(driver['skills']) else: skills = [] skills_str = ", ".join(skills) if skills else "None" # Format the details nicely details = f""" # Driver Details: {driver_id} ## Personal Information - **Name:** {driver['name']} - **Phone:** {driver['phone'] or 'N/A'} - **Email:** {driver['email'] or 'N/A'} ## Current Location - **Coordinates:** ({driver['current_lat']}, {driver['current_lng']}) - **Last Update:** {driver['last_location_update'] or 'Never updated'} ## Status - **Status:** {driver['status']} ## Vehicle Information - **Type:** {driver['vehicle_type']} - **Plate:** {driver['vehicle_plate'] or 'N/A'} - **Capacity (kg):** {driver['capacity_kg'] or 'N/A'} - **Capacity (m³):** {driver['capacity_m3'] or 'N/A'} ## Skills & Certifications {skills_str} ## Timestamps - **Created:** {driver['created_at']} - **Updated:** {driver['updated_at']} """ return details except Exception as e: return f"Error fetching driver details: {str(e)}" def update_order_ui(order_id, **fields): """Update order via UI""" from chat.tools import execute_tool if not order_id: return {"success": False, "message": "Order ID is required"} # Build tool input tool_input = {"order_id": order_id} tool_input.update(fields) # Call update tool result = execute_tool("update_order", tool_input) return result def delete_order_ui(order_id): """Delete order via UI""" from chat.tools import execute_tool if not order_id: return {"success": False, "message": "Order ID is required"} # Call delete tool with confirmation result = execute_tool("delete_order", {"order_id": order_id, "confirm": True}) return result def update_driver_ui(driver_id, **fields): """Update driver via UI""" from chat.tools import execute_tool if not driver_id: return {"success": False, "message": "Driver ID is required"} # Build tool input tool_input = {"driver_id": driver_id} tool_input.update(fields) # Call update tool result = execute_tool("update_driver", tool_input) return result def delete_driver_ui(driver_id): """Delete driver via UI""" from chat.tools import execute_tool if not driver_id: return {"success": False, "message": "Driver ID is required"} # Call delete tool with confirmation result = execute_tool("delete_driver", {"driver_id": driver_id, "confirm": True}) return result # ============================================ # CHAT FUNCTIONS # ============================================ def get_api_status(): """Get API status for chat""" full_status = chat_engine.get_full_status() selected = full_status["selected"] claude_status = full_status["claude"]["status"] gemini_status = full_status["gemini"]["status"] geocoding_status = geocoding_service.get_status() claude_marker = "🎯 **ACTIVE** - " if selected == "anthropic" else "" gemini_marker = "🎯 **ACTIVE** - " if selected == "gemini" else "" return f"""**AI Provider:** **Claude:** {claude_marker}{claude_status} **Gemini:** {gemini_marker}{gemini_status} **Geocoding:** {geocoding_status} """ def handle_chat_message(message, session_id): """Handle chat message from user""" if session_id not in SESSIONS: SESSIONS[session_id] = ConversationManager() welcome = chat_engine.get_welcome_message() SESSIONS[session_id].add_message("assistant", welcome) conversation = SESSIONS[session_id] if not message.strip(): return conversation.get_formatted_history(), conversation.get_tool_calls(), session_id response, tool_calls = chat_engine.process_message(message, conversation) return conversation.get_formatted_history(), conversation.get_tool_calls(), session_id def reset_conversation(session_id): """Reset conversation to start fresh""" new_session_id = str(uuid.uuid4()) new_conversation = ConversationManager() welcome = chat_engine.get_welcome_message() new_conversation.add_message("assistant", welcome) SESSIONS[new_session_id] = new_conversation return ( new_conversation.get_formatted_history(), [], new_session_id ) def get_initial_chat(): """Get initial chat state with welcome message""" session_id = str(uuid.uuid4()) conversation = ConversationManager() welcome = chat_engine.get_welcome_message() conversation.add_message("assistant", welcome) SESSIONS[session_id] = conversation return conversation.get_formatted_history(), [], session_id # ============================================ # GRADIO INTERFACE # ============================================ def create_interface(): """Create the Gradio interface with 3 enhanced tabs""" with gr.Blocks(theme=gr.themes.Soft(), title="FleetMind Dispatch System") as app: gr.Markdown("# 🚚 FleetMind Dispatch System") gr.Markdown("*AI-Powered Delivery Coordination*") with gr.Tabs(): # ========================================== # TAB 1: CHAT # ========================================== with gr.Tab("💬 Chat Assistant"): provider_name = chat_engine.get_provider_name() model_name = chat_engine.get_model_name() gr.Markdown(f"### AI Dispatch Assistant") gr.Markdown(f"*Powered by {provider_name} ({model_name})*") # Quick Action Buttons gr.Markdown("**Quick Actions:**") # Row 1: Create and View with gr.Row(): quick_create_order = gr.Button("📦 Create Order", size="sm") quick_view_orders = gr.Button("📋 View Orders", size="sm") quick_view_drivers = gr.Button("👥 View Drivers", size="sm") quick_check_status = gr.Button("📊 Check Status", size="sm") # Row 2: Order Management with gr.Row(): quick_update_order = gr.Button("✏️ Update Order", size="sm", variant="secondary") quick_delete_order = gr.Button("🗑️ Delete Order", size="sm", variant="secondary") quick_update_driver = gr.Button("✏️ Update Driver", size="sm", variant="secondary") quick_delete_driver = gr.Button("🗑️ Delete Driver", size="sm", variant="secondary") # Chat interface chatbot = gr.Chatbot( label="Chat with AI Assistant", height=600, type="messages", show_copy_button=True, avatar_images=("👤", "🤖") ) msg_input = gr.Textbox( placeholder="Type your message here... (e.g., 'Create order for John at 123 Main St' or 'Show me available drivers')", label="Your Message", lines=3 ) with gr.Row(): send_btn = gr.Button("📤 Send Message", variant="primary", scale=3) clear_btn = gr.Button("🗑️ Clear Chat", scale=1) # API Status in accordion with gr.Accordion("🔧 System Status", open=False): api_status = gr.Markdown(get_api_status()) # Tool usage display with gr.Accordion("🛠️ Tool Usage Log", open=False): gr.Markdown("*See what tools the AI is using behind the scenes*") tool_display = gr.JSON(label="Tools Called", value=[]) # Session state session_id_state = gr.State(value=None) # Event handlers def send_message(message, sess_id): if sess_id is None: sess_id = str(uuid.uuid4()) SESSIONS[sess_id] = ConversationManager() welcome = chat_engine.get_welcome_message() SESSIONS[sess_id].add_message("assistant", welcome) chat_history, tools, new_sess_id = handle_chat_message(message, sess_id) return chat_history, tools, new_sess_id, "" # Quick action functions def quick_action(prompt): return prompt quick_create_order.click( fn=lambda: "Create a new order", outputs=msg_input ) quick_view_orders.click( fn=lambda: "Show me all orders", outputs=msg_input ) quick_view_drivers.click( fn=lambda: "Show me all available drivers", outputs=msg_input ) quick_check_status.click( fn=lambda: "What is the current status of orders and drivers?", outputs=msg_input ) quick_update_order.click( fn=lambda: "Update order [ORDER_ID] - change status to [STATUS]", outputs=msg_input ) quick_delete_order.click( fn=lambda: "Delete order [ORDER_ID]", outputs=msg_input ) quick_update_driver.click( fn=lambda: "Update driver [DRIVER_ID] - change status to [STATUS]", outputs=msg_input ) quick_delete_driver.click( fn=lambda: "Delete driver [DRIVER_ID]", outputs=msg_input ) send_btn.click( fn=send_message, inputs=[msg_input, session_id_state], outputs=[chatbot, tool_display, session_id_state, msg_input] ) msg_input.submit( fn=send_message, inputs=[msg_input, session_id_state], outputs=[chatbot, tool_display, session_id_state, msg_input] ) clear_btn.click( fn=reset_conversation, inputs=[session_id_state], outputs=[chatbot, tool_display, session_id_state] ) # ========================================== # TAB 2: ORDERS # ========================================== with gr.Tab("📦 Orders Management"): gr.Markdown("### Orders Dashboard") # Statistics Cards def update_order_stats(): stats = get_orders_stats() return ( f"**Total:** {stats['total']}", f"**Pending:** {stats['pending']}", f"**In Transit:** {stats['in_transit']}", f"**Delivered:** {stats['delivered']}" ) with gr.Row(): stat_total = gr.Markdown("**Total:** 0") stat_pending = gr.Markdown("**Pending:** 0") stat_transit = gr.Markdown("**In Transit:** 0") stat_delivered = gr.Markdown("**Delivered:** 0") gr.Markdown("---") # Filters gr.Markdown("**Filters:**") with gr.Row(): status_filter = gr.Dropdown( choices=["all", "pending", "assigned", "in_transit", "delivered", "failed", "cancelled"], value="all", label="Status", scale=1 ) priority_filter = gr.Dropdown( choices=["all", "standard", "express", "urgent"], value="all", label="Priority", scale=1 ) payment_filter = gr.Dropdown( choices=["all", "pending", "paid", "cod"], value="all", label="Payment", scale=1 ) search_orders = gr.Textbox( placeholder="Search by Order ID, Customer, Phone...", label="Search", scale=2 ) with gr.Row(): apply_filters_btn = gr.Button("🔍 Apply Filters", variant="primary", scale=1) refresh_orders_btn = gr.Button("🔄 Refresh", scale=1) # Orders Table orders_table = gr.Dataframe( headers=["Order ID", "Customer", "Address", "Status", "Priority", "Driver", "Deadline"], datatype=["str", "str", "str", "str", "str", "str", "str"], label="Orders List (Click row to view details)", value=get_all_orders(), interactive=False, wrap=True ) # Order Details gr.Markdown("---") gr.Markdown("**Order Details:**") order_details = gr.Markdown("*Select an order from the table above to view full details*") # Edit/Delete Actions gr.Markdown("---") gr.Markdown("**Order Actions:**") with gr.Row(): selected_order_id_edit = gr.Textbox(label="Order ID to Edit", placeholder="ORD-XXXXXXXX", scale=2) edit_order_btn = gr.Button("✏️ Edit Order", variant="secondary", scale=1) delete_order_btn = gr.Button("🗑️ Delete Order", variant="stop", scale=1) # Edit Form (in accordion) with gr.Accordion("Edit Order Form", open=False) as edit_accordion: with gr.Row(): edit_customer_name = gr.Textbox(label="Customer Name") edit_customer_phone = gr.Textbox(label="Customer Phone") with gr.Row(): edit_status = gr.Dropdown( choices=["pending", "assigned", "in_transit", "delivered", "failed", "cancelled"], label="Status" ) edit_priority = gr.Dropdown( choices=["standard", "express", "urgent"], label="Priority" ) with gr.Row(): edit_payment_status = gr.Dropdown( choices=["pending", "paid", "cod"], label="Payment Status" ) edit_weight_kg = gr.Number(label="Weight (kg)") edit_special_instructions = gr.Textbox(label="Special Instructions", lines=2) save_order_btn = gr.Button("💾 Save Changes", variant="primary") # Action Results action_result = gr.Markdown("") # Event handlers def filter_and_update_orders(status, priority, payment, search): return get_all_orders(status, priority, payment, search) def refresh_orders_and_stats(status, priority, payment, search): stats = update_order_stats() table = get_all_orders(status, priority, payment, search) return stats[0], stats[1], stats[2], stats[3], table def show_order_details(evt: gr.SelectData, table_data): try: # table_data is a pandas DataFrame from Gradio if hasattr(table_data, 'iloc'): # DataFrame - use iloc to access row and column order_id = table_data.iloc[evt.index[0], 0] else: # List of lists - use standard indexing order_id = table_data[evt.index[0]][0] return get_order_details(order_id) except Exception as e: return f"Error: {str(e)}" apply_filters_btn.click( fn=filter_and_update_orders, inputs=[status_filter, priority_filter, payment_filter, search_orders], outputs=orders_table ) refresh_orders_btn.click( fn=refresh_orders_and_stats, inputs=[status_filter, priority_filter, payment_filter, search_orders], outputs=[stat_total, stat_pending, stat_transit, stat_delivered, orders_table] ) orders_table.select( fn=show_order_details, inputs=[orders_table], outputs=order_details ) # Edit/Delete handlers def handle_edit_order(order_id): if not order_id: return "Please enter an Order ID" # Fetch order details and populate form query = "SELECT * FROM orders WHERE order_id = %s" from database.connection import execute_query results = execute_query(query, (order_id,)) if not results: return "Order not found" order = results[0] return ( order.get('customer_name', ''), order.get('customer_phone', ''), order.get('status', ''), order.get('priority', ''), order.get('payment_status', ''), order.get('weight_kg', 0), order.get('special_instructions', ''), "Order loaded. Update fields and click Save Changes." ) def handle_save_order(order_id, name, phone, status, priority, payment, weight, instructions, status_f, priority_f, payment_f, search): if not order_id: return "Please enter an Order ID", get_all_orders(status_f, priority_f, payment_f, search) fields = {} if name: fields['customer_name'] = name if phone: fields['customer_phone'] = phone if status: fields['status'] = status if priority: fields['priority'] = priority if payment: fields['payment_status'] = payment if weight: fields['weight_kg'] = float(weight) if instructions: fields['special_instructions'] = instructions result = update_order_ui(order_id, **fields) # Refresh table refreshed_table = get_all_orders(status_f, priority_f, payment_f, search) if result['success']: return f"✅ {result['message']}", refreshed_table else: return f"❌ {result.get('error', 'Update failed')}", refreshed_table def handle_delete_order(order_id, status_f, priority_f, payment_f, search): if not order_id: return "Please enter an Order ID", get_all_orders(status_f, priority_f, payment_f, search) result = delete_order_ui(order_id) # Refresh table refreshed_table = get_all_orders(status_f, priority_f, payment_f, search) if result['success']: return f"✅ {result['message']}", refreshed_table else: return f"❌ {result.get('error', 'Deletion failed')}", refreshed_table edit_order_btn.click( fn=handle_edit_order, inputs=[selected_order_id_edit], outputs=[edit_customer_name, edit_customer_phone, edit_status, edit_priority, edit_payment_status, edit_weight_kg, edit_special_instructions, action_result] ) save_order_btn.click( fn=handle_save_order, inputs=[selected_order_id_edit, edit_customer_name, edit_customer_phone, edit_status, edit_priority, edit_payment_status, edit_weight_kg, edit_special_instructions, status_filter, priority_filter, payment_filter, search_orders], outputs=[action_result, orders_table] ) delete_order_btn.click( fn=handle_delete_order, inputs=[selected_order_id_edit, status_filter, priority_filter, payment_filter, search_orders], outputs=[action_result, orders_table] ) # ========================================== # TAB 3: DRIVERS # ========================================== with gr.Tab("👥 Drivers Management"): gr.Markdown("### Drivers Dashboard") # Statistics Cards def update_driver_stats(): stats = get_drivers_stats() return ( f"**Total:** {stats['total']}", f"**Active:** {stats['active']}", f"**Busy:** {stats['busy']}", f"**Offline:** {stats['offline']}" ) with gr.Row(): driver_stat_total = gr.Markdown("**Total:** 0") driver_stat_active = gr.Markdown("**Active:** 0") driver_stat_busy = gr.Markdown("**Busy:** 0") driver_stat_offline = gr.Markdown("**Offline:** 0") gr.Markdown("---") # Filters gr.Markdown("**Filters:**") with gr.Row(): driver_status_filter = gr.Dropdown( choices=["all", "active", "busy", "offline", "unavailable"], value="all", label="Status", scale=1 ) vehicle_filter = gr.Dropdown( choices=["all", "van", "truck", "car", "motorcycle"], value="all", label="Vehicle Type", scale=1 ) search_drivers = gr.Textbox( placeholder="Search by Driver ID, Name, Phone, Plate...", label="Search", scale=2 ) with gr.Row(): apply_driver_filters_btn = gr.Button("🔍 Apply Filters", variant="primary", scale=1) refresh_drivers_btn = gr.Button("🔄 Refresh", scale=1) # Drivers Table drivers_table = gr.Dataframe( headers=["Driver ID", "Name", "Phone", "Status", "Vehicle", "Location", "Last Update"], datatype=["str", "str", "str", "str", "str", "str", "str"], label="Drivers List (Click row to view details)", value=get_all_drivers(), interactive=False, wrap=True ) # Driver Details gr.Markdown("---") gr.Markdown("**Driver Details:**") driver_details = gr.Markdown("*Select a driver from the table above to view full details*") # Edit/Delete Actions gr.Markdown("---") gr.Markdown("**Driver Actions:**") with gr.Row(): selected_driver_id_edit = gr.Textbox(label="Driver ID to Edit", placeholder="DRV-XXXXXXXX", scale=2) edit_driver_btn = gr.Button("✏️ Edit Driver", variant="secondary", scale=1) delete_driver_btn = gr.Button("🗑️ Delete Driver", variant="stop", scale=1) # Edit Form (in accordion) with gr.Accordion("Edit Driver Form", open=False) as driver_edit_accordion: with gr.Row(): edit_driver_name = gr.Textbox(label="Driver Name") edit_driver_phone = gr.Textbox(label="Phone") with gr.Row(): edit_driver_email = gr.Textbox(label="Email") edit_driver_status = gr.Dropdown( choices=["active", "busy", "offline", "unavailable"], label="Status" ) with gr.Row(): edit_vehicle_type = gr.Textbox(label="Vehicle Type") edit_vehicle_plate = gr.Textbox(label="Vehicle Plate") with gr.Row(): edit_capacity_kg = gr.Number(label="Capacity (kg)") edit_capacity_m3 = gr.Number(label="Capacity (m³)") save_driver_btn = gr.Button("💾 Save Changes", variant="primary") # Action Results driver_action_result = gr.Markdown("") # Event handlers def filter_and_update_drivers(status, vehicle, search): return get_all_drivers(status, vehicle, search) def refresh_drivers_and_stats(status, vehicle, search): stats = update_driver_stats() table = get_all_drivers(status, vehicle, search) return stats[0], stats[1], stats[2], stats[3], table def show_driver_details(evt: gr.SelectData, table_data): try: # table_data is a pandas DataFrame from Gradio if hasattr(table_data, 'iloc'): # DataFrame - use iloc to access row and column driver_id = table_data.iloc[evt.index[0], 0] else: # List of lists - use standard indexing driver_id = table_data[evt.index[0]][0] return get_driver_details(driver_id) except Exception as e: return f"Error: {str(e)}" apply_driver_filters_btn.click( fn=filter_and_update_drivers, inputs=[driver_status_filter, vehicle_filter, search_drivers], outputs=drivers_table ) refresh_drivers_btn.click( fn=refresh_drivers_and_stats, inputs=[driver_status_filter, vehicle_filter, search_drivers], outputs=[driver_stat_total, driver_stat_active, driver_stat_busy, driver_stat_offline, drivers_table] ) drivers_table.select( fn=show_driver_details, inputs=[drivers_table], outputs=driver_details ) # Edit/Delete handlers def handle_edit_driver(driver_id): if not driver_id: return "Please enter a Driver ID" # Fetch driver details and populate form query = "SELECT * FROM drivers WHERE driver_id = %s" from database.connection import execute_query results = execute_query(query, (driver_id,)) if not results: return "Driver not found" driver = results[0] return ( driver.get('name', ''), driver.get('phone', ''), driver.get('email', ''), driver.get('status', ''), driver.get('vehicle_type', ''), driver.get('vehicle_plate', ''), driver.get('capacity_kg', 0), driver.get('capacity_m3', 0), "Driver loaded. Update fields and click Save Changes." ) def handle_save_driver(driver_id, name, phone, email, status, vehicle_type, vehicle_plate, capacity_kg, capacity_m3, status_f, vehicle_f, search): if not driver_id: return "Please enter a Driver ID", get_all_drivers(status_f, vehicle_f, search) fields = {} if name: fields['name'] = name if phone: fields['phone'] = phone if email: fields['email'] = email if status: fields['status'] = status if vehicle_type: fields['vehicle_type'] = vehicle_type if vehicle_plate: fields['vehicle_plate'] = vehicle_plate if capacity_kg: fields['capacity_kg'] = float(capacity_kg) if capacity_m3: fields['capacity_m3'] = float(capacity_m3) result = update_driver_ui(driver_id, **fields) # Refresh table refreshed_table = get_all_drivers(status_f, vehicle_f, search) if result['success']: return f"✅ {result['message']}", refreshed_table else: return f"❌ {result.get('error', 'Update failed')}", refreshed_table def handle_delete_driver(driver_id, status_f, vehicle_f, search): if not driver_id: return "Please enter a Driver ID", get_all_drivers(status_f, vehicle_f, search) result = delete_driver_ui(driver_id) # Refresh table refreshed_table = get_all_drivers(status_f, vehicle_f, search) if result['success']: return f"✅ {result['message']}", refreshed_table else: return f"❌ {result.get('error', 'Deletion failed')}", refreshed_table edit_driver_btn.click( fn=handle_edit_driver, inputs=[selected_driver_id_edit], outputs=[edit_driver_name, edit_driver_phone, edit_driver_email, edit_driver_status, edit_vehicle_type, edit_vehicle_plate, edit_capacity_kg, edit_capacity_m3, driver_action_result] ) save_driver_btn.click( fn=handle_save_driver, inputs=[selected_driver_id_edit, edit_driver_name, edit_driver_phone, edit_driver_email, edit_driver_status, edit_vehicle_type, edit_vehicle_plate, edit_capacity_kg, edit_capacity_m3, driver_status_filter, vehicle_filter, search_drivers], outputs=[driver_action_result, drivers_table] ) delete_driver_btn.click( fn=handle_delete_driver, inputs=[selected_driver_id_edit, driver_status_filter, vehicle_filter, search_drivers], outputs=[driver_action_result, drivers_table] ) gr.Markdown("---") gr.Markdown("*FleetMind v1.0 - AI-Powered Dispatch Coordination*") # Initialize chat and stats on load app.load( fn=get_initial_chat, outputs=[chatbot, tool_display, session_id_state] ).then( fn=update_order_stats, outputs=[stat_total, stat_pending, stat_transit, stat_delivered] ).then( fn=update_driver_stats, outputs=[driver_stat_total, driver_stat_active, driver_stat_busy, driver_stat_offline] ) return app # ============================================ # MAIN # ============================================ if __name__ == "__main__": print("=" * 60) print("FleetMind - Starting Enhanced UI") print("=" * 60) print("\nChecking database connection...") if test_connection(): print("✅ Database connected") else: print("❌ Database connection failed") print("\nStarting Gradio interface...") print("=" * 60) app = create_interface() app.launch( server_name="0.0.0.0", server_port=7860, share=False, show_error=True, show_api=False )