diff --git "a/app.py" "b/app.py" --- "a/app.py" +++ "b/app.py" @@ -1,2342 +1,126 @@ -#!/usr/bin/env python3 -""" -COMPLETE MMORPG with MCP Server Integration - ALL FEATURES RESTORED -Fixed deadlock issue while preserving all original functionality -""" - -import gradio as gr -import asyncio -import json -import time -import uuid -import random -import random -import threading -from datetime import datetime, timedelta -from typing import Dict, List, Any, Optional -from dataclasses import dataclass, asdict -from abc import ABC, abstractmethod -from mcp import ClientSession -from mcp.client.sse import sse_client -from contextlib import AsyncExitStack - -# ============================================================================ -# CORE GAME STATE MANAGEMENT -# ============================================================================ - -@dataclass -@dataclass -class Player: - id: str - name: str - type: str # "human" or "ai_agent" - x: int = 100 - y: int = 100 - level: int = 1 - hp: int = 100 - max_hp: int = 100 - gold: int = 50 - experience: int = 0 - last_active: float = 0 - session_hash: str = "" - ai_agent_id: str = "" # For AI agents connected via MCP - -class GameWorld: - def __init__(self): - self.players: Dict[str, Player] = {} - self.npcs: Dict[str, Dict] = { - 'read2burn_mailbox': { - 'id': 'read2burn_mailbox', - 'name': 'Secure Mailbox', - 'x': 200, 'y': 150, - 'char': '๐Ÿ“ฎ', - 'type': 'addon' - }, - 'merchant': { - 'id': 'merchant', - 'name': 'Tom the Trader', - 'x': 300, 'y': 200, - 'char': '๐Ÿช', - 'type': 'basic' - }, - 'weather_oracle': { - 'id': 'weather_oracle', - 'name': 'Weather Oracle', - 'x': 150, 'y': 300, - 'char': '๐ŸŒค๏ธ', - 'type': 'mcp' - }, - 'scholar': { - 'id': 'scholar', - 'name': 'Professor Wise', - 'x': 100, 'y': 100, - 'char': '๐Ÿ“š', - 'type': 'learning', - 'personality': 'wise_teacher' - }, - 'jester': { - 'id': 'jester', - 'name': 'Funny Pete', - 'x': 400, 'y': 100, - 'char': '๐Ÿƒ', - 'type': 'entertainment', - 'personality': 'comedian' - }, - 'warrior': { - 'id': 'warrior', - 'name': 'Captain Steel', - 'x': 350, 'y': 350, - 'char': 'โš”๏ธ', - 'type': 'combat', - 'personality': 'tough_trainer' - }, - 'healer': { - 'id': 'healer', - 'name': 'Sister Grace', - 'x': 50, 'y': 250, - 'char': '๐Ÿ’š', - 'type': 'healing', - 'personality': 'gentle_healer' - }, - 'wanderer': { - 'id': 'wanderer', - 'name': 'Roaming Rick', - 'x': 200, 'y': 200, - 'char': '๐Ÿšถ', - 'type': 'moving', - 'personality': 'traveler', - 'movement': { - 'speed': 2, - 'direction_x': 1, - 'direction_y': 1, - 'last_move': time.time() - } - }, - 'sage': { - 'id': 'sage', - 'name': 'Ancient Sage', - 'x': 450, 'y': 250, - 'char': '๐Ÿง™', - 'type': 'magic', - 'personality': 'mystical_sage' - } - } - self.chat_messages: List[Dict] = [] - self.world_events: List[Dict] = [] - self.addon_npcs: Dict[str, 'NPCAddon'] = {} - self._lock = threading.RLock() # FIXED: Use RLock to prevent deadlock - - # Add initial chat message - self.add_chat_message("System", "๐ŸŽฎ Game server started!") - - def add_player(self, player: Player) -> bool: - print(f"[GameWorld.add_player] Adding {player.name}") - with self._lock: - if len(self.players) >= 20: # Max players - return False - self.players[player.id] = player - self.add_chat_message("System", f"๐ŸŽฎ {player.name} ({player.type}) joined the game!") - print(f"[GameWorld] Player {player.name} added. Total players: {len(self.players)}") - return True - - def remove_player(self, player_id: str) -> bool: - with self._lock: - if player_id in self.players: - player = self.players[player_id] - self.add_chat_message("System", f"๐Ÿ‘‹ {player.name} left the game!") - del self.players[player_id] - return True - return False - - def move_player(self, player_id: str, direction: str) -> bool: - with self._lock: - if player_id not in self.players: - return False - - player = self.players[player_id] - old_x, old_y = player.x, player.y - - # Move player - #TODO this should be based on width and height of the game world - if direction == "up": - player.y = max(0, player.y - 25) - elif direction == "down": - player.y = min(575, player.y + 25) - elif direction == "left": - player.x = max(0, player.x - 25) - elif direction == "right": - player.x = min(775, player.x + 25) - - player.last_active = time.time() - - # Add small XP for movement - if old_x != player.x or old_y != player.y: - player.experience += 1 - if player.experience >= player.level * 100: - player.level += 1 - player.max_hp += 10 - player.hp = player.max_hp - player.gold += 10 - self.add_chat_message("System", f"๐ŸŽ‰ {player.name} reached level {player.level}!") - # Check for NPC interactions - self.check_npc_proximity(player_id) - - return old_x != player.x or old_y != player.y - def add_chat_message(self, sender: str, message: str, message_type: str = "public", target: str = None, sender_id: str = None): - with self._lock: - chat_msg = { - 'sender': sender, - 'message': message, - 'timestamp': time.strftime("%H:%M:%S"), - 'id': len(self.chat_messages), - 'type': message_type, - 'target': target, - 'sender_id': sender_id - } - self.chat_messages.append(chat_msg) - if len(self.chat_messages) > 50: - self.chat_messages = self.chat_messages[-50:] - - def check_npc_proximity(self, player_id: str): - """Check if player is near any NPCs""" - player = self.players.get(player_id) - if not player: - return [] - - nearby_entities = [] - - # Check NPCs - for npc_id, npc in self.npcs.items(): - distance = ((player.x - npc['x'])**2 + (player.y - npc['y'])**2)**0.5 - if distance < 50: # Close enough to interact - self.add_world_event(f"{player.name} is near {npc['name']}") - nearby_entities.append({ - 'type': 'npc', - 'id': npc_id, - 'name': npc['name'], - 'distance': distance - }) - - # Check other players - for other_id, other_player in self.players.items(): - if other_id != player_id: - distance = ((player.x - other_player.x)**2 + (player.y - other_player.y)**2)**0.5 - if distance < 50: # Close enough to chat privately - nearby_entities.append({ - 'type': 'player', - 'id': other_id, - 'name': other_player.name, - 'distance': distance - }) - - return nearby_entities - - def send_private_message(self, sender_id: str, target_id: str, message: str) -> tuple[bool, str]: - """Send private message between players or to NPC""" - sender = self.players.get(sender_id) - if not sender: - error_msg = f"Sender player {sender_id} not found" - print(f"[PRIVATE_ERROR] {error_msg}") - return False, error_msg - - print(f"[PRIVATE] Sending message from {sender.name} ({sender_id}) to {target_id}: {message}") - - # Check if target is an NPC - if target_id in self.npcs: - npc = self.npcs[target_id] - print(f"[PRIVATE] Found NPC: {npc['name']} (ID: {target_id})") - # Send player's message - self.add_chat_message( - f"๐Ÿ”’ {sender.name}", - f"[Private to {npc['name']}]: {message}", - "private_to_npc", - target_id, - sender_id - ) # Get and send NPC response - npc_response = self.get_npc_response(target_id, message, sender_id) - self.add_chat_message( - f"๐Ÿค– {npc['name']}", - npc_response, - "private_from_npc", - sender_id, - target_id - ) - print(f"[PRIVATE] NPC {npc['name']} responded: {npc_response}") - return True, f"Message sent to {npc['name']}" - - # Check if target is another player - target = self.players.get(target_id) - if target: - print(f"[PRIVATE] Found player: {target.name} (ID: {target_id})") - self.add_chat_message( - f"๐Ÿ”’ {sender.name}", - f"[Private to {target.name}]: {message}", - "private_to_player", - target_id, - sender_id ) - return True, f"Message sent to {target.name}" - - # Neither NPC nor player found - available_npcs = list(self.npcs.keys()) - available_players = list(self.players.keys()) - error_msg = f"Target '{target_id}' not found. Available NPCs: {available_npcs}, Available players: {available_players}" - print(f"[PRIVATE_ERROR] {error_msg}") - return False, error_msg - - def get_npc_response(self, npc_id: str, message: str, player_id: str = None) -> str: - """Generate NPC response - checks for addons first, then falls back to generic responses""" - npc = self.npcs.get(npc_id) - if not npc: - return "I don't understand." - - # Check if this NPC has an addon that can handle commands - if npc_id in self.addon_npcs and player_id: - addon = self.addon_npcs[npc_id] - print(f"[NPC_RESPONSE] Found addon for {npc_id}, delegating to handle_command") - return addon.handle_command(player_id, message) - - # Fall back to personality-based or ID-based responses - personality = npc.get('personality', npc_id) - - # Basic NPC responses based on NPC type or personality - responses = { - 'read2burn_mailbox': [ - "Would you like to send a secure message?", - "I can help you with encrypted messaging.", - "Your message will burn after reading!" - ], - 'merchant': [ - "Welcome to my shop! What would you like to buy?", - "I have the finest items in the realm!", - "Special discount today - 10% off all potions!" - ], - 'weather_oracle': [ - "The winds whisper of changes ahead...", - "I sense a storm approaching...", - "The weather spirits are restless today." - ], 'wise_teacher': [ - "Ah, a curious mind! What would you like to learn?", - "Knowledge is the greatest treasure. Ask me anything!", - "I have studied the ancient texts for decades.", - "Wisdom comes through questioning. What puzzles you?" - ], - 'comedian': [ - "Haha! Want to hear a joke? Why don't skeletons fight? They don't have the guts!", - "What do you call a sleeping bull? A bulldozer! *laughs*", - "I've got jokes for days! Life's too short not to laugh!", - "Why did the scarecrow win an award? He was outstanding in his field!" - ], - 'tough_trainer': [ - "Ready for combat training? Show me your stance!", - "A true warrior trains every day. Are you committed?", - "Strength comes from discipline and practice!", - "The blade is an extension of your will. Focus!" - ], - 'gentle_healer': [ - "Blessings upon you, traveler. Do you need healing?", - "The light guides my hands. I can mend your wounds.", - "Health of body and spirit go hand in hand.", - "May the divine light restore your vitality!" - ], - 'traveler': [ - "The road calls to me... always moving, always exploring!", - "I've seen wonders beyond imagination in my travels.", - "Adventure awaits around every corner!", - "Sometimes the journey is more important than the destination." - ], - 'mystical_sage': [ - "Magic flows through all things, young one.", - "The arcane arts require patience and understanding.", - "Ancient powers stir... can you feel them?", - "Wisdom and magic are closely intertwined." - ] - } - # Try personality first, then fall back to npc_id, then default - response_key = personality if personality in responses else npc_id - return random.choice(responses.get(response_key, [ - "Hello there, traveler!", - "How can I help you?", - "Nice weather we're having." - ])) - - def get_private_messages_for_player(self, player_id: str) -> List[Dict]: - """Get private messages for a specific player""" - player = self.players.get(player_id) - if not player: - return [] - - private_messages = [] - for msg in self.chat_messages[-20:]: # Last 20 messages - msg_type = msg.get('type', 'public') - if msg_type != 'public': - # Check if this message is for this player - if (msg.get('target') == player_id or - msg.get('sender_id') == player_id or - (msg_type in ['private_from_npc', 'private_to_npc'] and - (msg.get('target') == player_id or msg.get('sender_id') == player_id))): - - print(f"[PRIVATE_FILTER] Found private message for {player_id}: {msg['sender']}: {msg['message']} (type: {msg_type})") - private_messages.append(msg) - - print(f"[PRIVATE_DEBUG] Player {player_id} has {len(private_messages)} private messages") - return private_messages - - def add_world_event(self, event: str): - self.world_events.append({ - 'event': event, - 'timestamp': time.time() - }) - if len(self.world_events) > 20: - self.world_events = self.world_events[-20:] - - def update_moving_npcs(self): - """Update positions of moving NPCs like Roaming Rick""" - current_time = time.time() - - with self._lock: - for npc_id, npc in self.npcs.items(): - # Check if this NPC has movement configuration - if npc.get('type') == 'moving' and 'movement' in npc: - movement = npc['movement'] - - # Check if enough time has passed for movement (based on speed) - time_since_last_move = current_time - movement.get('last_move', 0) - movement_interval = 2.0 / movement.get('speed', 1) # Faster speed = shorter interval - - if time_since_last_move >= movement_interval: - # Store old position - old_x, old_y = npc['x'], npc['y'] - - # Calculate new position - move_distance = 25 # Same as player movement - direction_x = movement.get('direction_x', 1) - direction_y = movement.get('direction_y', 1) - - new_x = npc['x'] + (move_distance * direction_x) - new_y = npc['y'] + (move_distance * direction_y) - - # Check boundaries and bounce if needed - if new_x <= 0 or new_x >= 775: - direction_x *= -1 # Reverse X direction - new_x = max(0, min(775, new_x)) - - if new_y <= 0 or new_y >= 575: - direction_y *= -1 # Reverse Y direction - new_y = max(0, min(575, new_y)) - - # Update NPC position and movement data - npc['x'] = new_x - npc['y'] = new_y - movement['direction_x'] = direction_x - movement['direction_y'] = direction_y - movement['last_move'] = current_time - - # Add world event if NPC moved - if old_x != new_x or old_y != new_y: - self.add_world_event(f"๐Ÿšถ {npc['name']} roams to ({int(new_x)}, {int(new_y)})") - - # Check if any players are now near this moved NPC - for player_id, player in self.players.items(): - distance = ((player.x - new_x)**2 + (player.y - new_y)**2)**0.5 - if distance < 50: # Close enough to notice - self.add_world_event(f"๐Ÿ‘€ {player.name} notices {npc['name']} nearby") - -# Global game world -game_world = GameWorld() - -# ============================================================================ -# NPC ADDON SYSTEM - RESTORED -# ============================================================================ - -class NPCAddon(ABC): - """Base class for NPC add-ons""" - - @property - @abstractmethod - def addon_id(self) -> str: - pass - - @property - @abstractmethod - def addon_name(self) -> str: - pass - - @abstractmethod - def get_interface(self) -> gr.Component: - """Return Gradio interface for this add-on""" - pass - - @abstractmethod - def handle_command(self, player_id: str, command: str) -> str: - """Handle player commands""" - pass - -class Read2BurnMailboxAddon(NPCAddon): - """Self-destructing secure mailbox add-on - RESTORED""" - - def __init__(self): - self.messages: Dict[str, Dict] = {} - self.access_log: List[Dict] = [] - - @property - def addon_id(self) -> str: - return "read2burn_mailbox" - - @property - def addon_name(self) -> str: - return "๐Ÿ”ฅ Read2Burn Secure Mailbox" - - def get_interface(self) -> gr.Component: - with gr.Column() as interface: - gr.Markdown(""" - ## ๐Ÿ”ฅ Read2Burn Secure Mailbox - - **Features:** - - Messages self-destruct after reading - - End-to-end encryption simulation - - 24-hour expiration - - Anonymous delivery - - **Commands:** - - `create Your secret message here` - Create new message - - `read MESSAGE_ID` - Read message (destroys it!) - - `list` - Show your created messages - """) - - with gr.Row(): - command_input = gr.Textbox( - label="Command", - placeholder="create Hello, this message will self-destruct!", - scale=3 - ) - send_btn = gr.Button("Send", variant="primary", scale=1) - - result_output = gr.Textbox( - label="Mailbox Response", - lines=5, - interactive=False - ) - - # Message history - message_history = gr.Dataframe( - headers=["Message ID", "Created", "Status", "Reads Left"], - label="Your Messages", - interactive=False - ) - - # FIXED: Use a different approach for player identification - def handle_mailbox_command(command: str): - # Get the current player from the global state - current_players = list(game_world.players.keys()) - if not current_players: - return "โŒ No players in the game! Please join the game first.", [] - - # Use the most recently active player (simplified approach) - player_id = max(current_players, key=lambda pid: game_world.players[pid].last_active) - player_name = game_world.players[player_id].name - - print(f"[Read2Burn] Command '{command}' from player {player_name} ({player_id})") - - result = self.handle_command(player_id, command) - history = self.get_player_message_history(player_id) - - # Add player info to the result - result = f"**Player:** {player_name}\n\n{result}" - - return result, history - - send_btn.click( - handle_mailbox_command, - inputs=[command_input], - outputs=[result_output, message_history] - ) - - command_input.submit( - handle_mailbox_command, - inputs=[command_input], - outputs=[result_output, message_history] - ) - - return interface - - def handle_command(self, player_id: str, command: str) -> str: - """Handle Read2Burn mailbox commands""" - parts = command.strip().split(' ', 1) - cmd = parts[0].lower() - - if cmd == "create" and len(parts) > 1: - return self.create_message(player_id, parts[1]) - elif cmd == "read" and len(parts) > 1: - return self.read_message(player_id, parts[1]) - elif cmd == "list": - return self.list_player_messages(player_id) - else: - return "โ“ Invalid command. Try: create , read , or list" - - def create_message(self, creator_id: str, content: str) -> str: - """Create a new self-destructing message""" - message_id = self.generate_message_id() - - self.messages[message_id] = { - 'id': message_id, - 'creator': creator_id, - 'content': content, # In production, encrypt this - 'created_at': time.time(), - 'expires_at': time.time() + (24 * 3600), # 24 hours - 'reads_left': 1, - 'burned': False - } - - self.access_log.append({ - 'action': 'create', - 'message_id': message_id, - 'player_id': creator_id, - 'timestamp': time.time() - }) - - return f"โœ… **Message Created Successfully!**\n\n๐Ÿ“ **Message ID:** `{message_id}`\n๐Ÿ”— Share this ID with the recipient\nโฐ Expires in 24 hours\n๐Ÿ”ฅ Burns after 1 read" - - def read_message(self, reader_id: str, message_id: str) -> str: - """Read and burn a message""" - if message_id not in self.messages: - return "โŒ Message not found or already burned" - - message = self.messages[message_id] - - # Check expiry - if time.time() > message['expires_at']: - del self.messages[message_id] - return "โŒ Message expired and has been burned" - - # Check if already burned - if message['burned'] or message['reads_left'] <= 0: - del self.messages[message_id] - return "โŒ Message has already been burned" - - # Read the message - content = message['content'] - message['reads_left'] -= 1 - - self.access_log.append({ - 'action': 'read', - 'message_id': message_id, - 'player_id': reader_id, - 'timestamp': time.time() - }) - - # Burn if no reads left - if message['reads_left'] <= 0: - message['burned'] = True - del self.messages[message_id] - burn_notice = "\n\n๐Ÿ”ฅ **This message has been BURNED and deleted forever!**" - else: - burn_notice = f"\n\nโš ๏ธ **{message['reads_left']} reads remaining before burn**" - - return f"๐Ÿ“– **Message Content:**\n\n{content}{burn_notice}" - - def list_player_messages(self, player_id: str) -> str: - """List messages created by player""" - player_messages = [ - msg for msg in self.messages.values() - if msg['creator'] == player_id - ] - - if not player_messages: - return "๐Ÿ“ญ You have no active messages" - - result = "๐Ÿ“จ **Your Active Messages:**\n\n" - for msg in player_messages: - expires_in = int((msg['expires_at'] - time.time()) / 3600) - result += f"๐Ÿ†” `{msg['id']}` | โฑ๏ธ {expires_in}h left | ๐Ÿ‘๏ธ {msg['reads_left']} reads\n" - - return result - - def get_player_message_history(self, player_id: str) -> List[List]: - """Get message history for display""" - player_messages = [ - msg for msg in self.messages.values() - if msg['creator'] == player_id - ] - - history = [] - for msg in player_messages: - expires_in = int((msg['expires_at'] - time.time()) / 3600) - status = "Active" if not msg['burned'] else "Burned" - history.append([ - msg['id'][:8] + "...", - datetime.fromtimestamp(msg['created_at']).strftime("%H:%M"), - status, - str(msg['reads_left']) - ]) - - return history - - def generate_message_id(self) -> str: - """Generate a unique message ID""" - import string - chars = string.ascii_letters + string.digits - return ''.join(random.choice(chars) for _ in range(12)) - -# ============================================================================ -# MCP ADDON SYSTEM - RESTORED -# ============================================================================ - -# Setup event loop for MCP client -loop = asyncio.new_event_loop() -asyncio.set_event_loop(loop) - -class SimpleMCPClient: - """MCP client for weather services""" - def __init__(self): - self.session = None - self.connected = False - self.tools = [] - self.exit_stack = None - self.server_url = "https://chris4k-weather.hf.space/gradio_api/mcp/sse" - - def connect(self) -> str: - """Connect to the hardcoded MCP server""" - return loop.run_until_complete(self._connect()) - - async def _connect(self) -> str: - try: - # Clean up previous connection - if self.exit_stack: - await self.exit_stack.aclose() - - self.exit_stack = AsyncExitStack() - - # Connect to SSE MCP server - sse_transport = await self.exit_stack.enter_async_context( - sse_client(self.server_url) - ) - read_stream, write_callable = sse_transport - - self.session = await self.exit_stack.enter_async_context( - ClientSession(read_stream, write_callable) - ) - await self.session.initialize() - - # Get available tools - response = await self.session.list_tools() - self.tools = response.tools - - self.connected = True - tool_names = [tool.name for tool in self.tools] - return f"โœ… Connected to weather server!\nAvailable tools: {', '.join(tool_names)}" - - except Exception as e: - self.connected = False - return f"โŒ Connection failed: {str(e)}" - - def get_weather(self, location: str) -> str: - """Get weather for a location (city, country format)""" - if not self.connected: - # Try to auto-connect - connect_result = self.connect() - if not self.connected: - return f"โŒ Failed to connect to weather server: {connect_result}" - - if not location.strip(): - return "โŒ Please enter a location (e.g., 'Berlin, Germany')" - - return loop.run_until_complete(self._get_weather(location)) - - async def _get_weather(self, location: str) -> str: - try: - # Parse location - if ',' in location: - city, country = [part.strip() for part in location.split(',', 1)] - else: - city = location.strip() - country = "" - - # Find the weather tool - weather_tool = next((tool for tool in self.tools if 'weather' in tool.name.lower()), None) - if not weather_tool: - return "โŒ Weather tool not found on server" - - # Call the tool - params = {"city": city, "country": country} - result = await self.session.call_tool(weather_tool.name, params) - - # Extract content properly - content_text = "" - if hasattr(result, 'content') and result.content: - if isinstance(result.content, list): - for content_item in result.content: - if hasattr(content_item, 'text'): - content_text += content_item.text - elif hasattr(content_item, 'content'): - content_text += str(content_item.content) - else: - content_text += str(content_item) - elif hasattr(result.content, 'text'): - content_text = result.content.text - else: - content_text = str(result.content) - - if not content_text: - return "โŒ No content received from server" - - try: - # Try to parse as JSON - parsed = json.loads(content_text) - if isinstance(parsed, dict): - if 'error' in parsed: - return f"โŒ Error: {parsed['error']}" - - # Format weather data nicely - if 'current_weather' in parsed: - weather = parsed['current_weather'] - formatted = f"๐ŸŒ **{parsed.get('location', 'Unknown')}**\n\n" - formatted += f"๐ŸŒก๏ธ Temperature: {weather.get('temperature_celsius', 'N/A')}ยฐC\n" - formatted += f"๐ŸŒค๏ธ Conditions: {weather.get('weather_description', 'N/A')}\n" - formatted += f"๐Ÿ’จ Wind: {weather.get('wind_speed_kmh', 'N/A')} km/h\n" - formatted += f"๐Ÿ’ง Humidity: {weather.get('humidity_percent', 'N/A')}%\n" - return formatted - elif 'temperature (ยฐC)' in parsed: - # Handle the original format from your server - formatted = f"๐ŸŒ **{parsed.get('location', 'Unknown')}**\n\n" - formatted += f"๐ŸŒก๏ธ Temperature: {parsed.get('temperature (ยฐC)', 'N/A')}ยฐC\n" - formatted += f"๐ŸŒค๏ธ Weather Code: {parsed.get('weather_code', 'N/A')}\n" - formatted += f"๐Ÿ• Timezone: {parsed.get('timezone', 'N/A')}\n" - formatted += f"๐Ÿ•’ Local Time: {parsed.get('local_time', 'N/A')}\n" - return formatted - else: - return f"โœ… Weather data:\n```json\n{json.dumps(parsed, indent=2)}\n```" - - except json.JSONDecodeError: - # If not JSON, return as text - return f"โœ… Weather data:\n```\n{content_text}\n```" - - return f"โœ… Raw result:\n{content_text}" - - except Exception as e: - return f"โŒ Failed to get weather: {str(e)}" - -class MCPAddonWrapper(NPCAddon): - """Wrapper to use any MCP server as a game add-on""" - - def __init__(self, mcp_endpoint: str, addon_name: str): - self.mcp_endpoint = mcp_endpoint - self._addon_name = addon_name - self._addon_id = f"mcp_{addon_name.lower().replace(' ', '_')}" - - # Initialize MCP client - self.mcp_client = SimpleMCPClient() - # Auto-connect on initialization - try: - connection_result = self.mcp_client.connect() - print(f"[MCP] {addon_name} connection: {connection_result}") - except Exception as e: - print(f"[MCP] Failed to connect {addon_name}: {e}") - - @property - def addon_id(self) -> str: - return self._addon_id - - @property - def addon_name(self) -> str: - return self._addon_name - - def get_interface(self) -> gr.Component: - with gr.Column() as interface: - gr.Markdown(f"## ๐ŸŒค๏ธ {self.addon_name}") - gr.Markdown("*Ask for weather in any city! Format: 'City, Country' (e.g., 'Berlin, Germany')*") - - # Connection status - connection_status = gr.HTML( - value="
๐ŸŸข Auto-connecting to weather server...
" - ) - - location_input = gr.Textbox( - label="Location", - placeholder="e.g., Berlin, Germany", - lines=1 - ) - - weather_output = gr.Textbox( - label="Weather Information", - lines=8, - interactive=False - ) - - get_weather_btn = gr.Button("๐ŸŒก๏ธ Get Weather", variant="primary") - - # Examples - with gr.Row(): - gr.Examples( - examples=[ - ["Berlin, Germany"], - ["Tokyo, Japan"], - ["New York, USA"], - ["London, UK"], - ["Sydney, Australia"] - ], - inputs=[location_input] - ) - - def handle_weather_request(location: str): - # Get the current player from the global state - current_players = list(game_world.players.keys()) - if not current_players: - return "โŒ No players in the game! Please join the game first!" - - if not location.strip(): - return "โŒ Please enter a location (e.g., 'Berlin, Germany')" - - # Use the most recently active player - player_id = max(current_players, key=lambda pid: game_world.players[pid].last_active) - - # Get weather using MCP client - result = self.mcp_client.get_weather(location) - - # Log the interaction - player_name = game_world.players[player_id].name - print(f"[WEATHER] {player_name} requested weather for {location}") - - return result - - get_weather_btn.click( - handle_weather_request, - inputs=[location_input], - outputs=[weather_output] - ) - - location_input.submit( - handle_weather_request, - inputs=[location_input], - outputs=[weather_output] - ) - - return interface - - def handle_command(self, player_id: str, command: str) -> str: - """Handle weather commands from private messages""" - # Remove common command prefixes - clean_command = command.strip() - if clean_command.startswith('/'): - clean_command = clean_command[1:] - - # Check if it looks like a weather request - if not clean_command: - return "๐ŸŒค๏ธ Weather Oracle: Please tell me a location! Format: 'City, Country' (e.g., 'Berlin, Germany')" - - # Get weather for the location - result = self.mcp_client.get_weather(clean_command) - - # Log the interaction - if player_id in game_world.players: - player_name = game_world.players[player_id].name - print(f"[WEATHER_PM] {player_name} requested weather for {clean_command}") - - return f"๐ŸŒค๏ธ **Weather Oracle**: {result}" - -# ============================================================================ -# MCP TOOLS FOR AI AGENTS - RESTORED -# ============================================================================ - -class GradioMCPTools: - """MCP tools integrated directly into Gradio app""" - - def __init__(self, game_world: GameWorld): - self.game_world = game_world - self.ai_agents: Dict[str, Player] = {} - - def register_ai_agent(self, agent_name: str, mcp_client_id: str = None) -> str: - """Register an AI agent as a player""" - if mcp_client_id is None: - mcp_client_id = f"ai_{uuid.uuid4().hex[:8]}" - - agent_id = f"ai_{uuid.uuid4().hex[:8]}" - - agent_player = Player( - id=agent_id, - name=agent_name, - type="ai_agent", - x=random.randint(50, 450), - y=random.randint(50, 350), - ai_agent_id=mcp_client_id, - last_active=time.time() - ) - - if self.game_world.add_player(agent_player): - self.ai_agents[mcp_client_id] = agent_player - return agent_id - else: - raise Exception("Game is full, cannot add AI agent") - - def move_ai_agent(self, mcp_client_id: str, direction: str) -> Dict: - """Move AI agent in the game world""" - if mcp_client_id not in self.ai_agents: - return {"error": "AI agent not registered"} - - agent = self.ai_agents[mcp_client_id] - success = self.game_world.move_player(agent.id, direction) - - return { - "success": success, - "new_position": {"x": agent.x, "y": agent.y}, - "nearby_players": self.get_nearby_entities(agent.id), - "world_events": self.game_world.world_events[-5:] - } - - def ai_agent_chat(self, mcp_client_id: str, message: str) -> Dict: - """AI agent sends chat message""" - if mcp_client_id not in self.ai_agents: - return {"error": "AI agent not registered"} - - agent = self.ai_agents[mcp_client_id] - self.game_world.add_chat_message(f"๐Ÿค– {agent.name}", message) - - return {"success": True, "message": "Chat message sent"} - - def get_game_state_for_ai(self, mcp_client_id: str) -> Dict: - """Get current game state for AI agent""" - if mcp_client_id not in self.ai_agents: - return {"error": "AI agent not registered"} - - agent = self.ai_agents[mcp_client_id] - - return { - "agent_status": asdict(agent), - "nearby_entities": self.get_nearby_entities(agent.id), - "recent_chat": self.game_world.chat_messages[-10:], - "world_events": self.game_world.world_events[-5:], - "available_npcs": list(self.game_world.npcs.keys()) - } - - def get_nearby_entities(self, player_id: str, radius: int = 100) -> List[Dict]: - """Get entities near a player""" - player = self.game_world.players.get(player_id) - if not player: - return [] - - nearby = [] - - # Check other players - for other_player in self.game_world.players.values(): - if other_player.id == player_id: - continue - - distance = ((player.x - other_player.x)**2 + (player.y - other_player.y)**2)**0.5 - if distance <= radius: - nearby.append({ - "type": "player", - "name": other_player.name, - "player_type": other_player.type, - "position": {"x": other_player.x, "y": other_player.y}, - "distance": round(distance, 1) - }) - - # Check NPCs - for npc in self.game_world.npcs.values(): - distance = ((player.x - npc['x'])**2 + (player.y - npc['y'])**2)**0.5 - if distance <= radius: - nearby.append({ - "type": "npc", - "name": npc['name'], - "npc_id": npc['id'], - "position": {"x": npc['x'], "y": npc['y']}, - "distance": round(distance, 1) - }) - - return nearby - -# Global MCP tools -mcp_tools = GradioMCPTools(game_world) - -# ============================================================================ -# HELPER FUNCTIONS - RESTORED -# ============================================================================ - -def get_player_id_from_session(session_hash: str) -> Optional[str]: - """Get player ID from session hash""" - for player in game_world.players.values(): - if player.session_hash == session_hash: - return player.id - return None - -def create_game_world_html() -> str: - """Render the game world as HTML""" - html = f""" -
- """ - - # Draw grid pattern - html += """ -
-
- """ - - # Draw NPCs - for npc in game_world.npcs.values(): - html += f""" -
- {npc['char']} -
-
{npc['name']}
- """ - - # Draw players - for player in game_world.players.values(): - char = "๐Ÿค–" if player.type == "ai_agent" else "๐Ÿงโ€โ™‚๏ธ" - color = "gold" if player.type == "ai_agent" else "lightblue" - - html += f""" -
- {char} -
-
- {player.name} (Lv.{player.level}) -
- """ - - html += "
" - return html - -def get_player_stats_display(player_id: str) -> Dict: - """Get formatted player stats for display""" - if not player_id or player_id not in game_world.players: - return {"status": "โŒ Not connected", "info": "Join the game to see your stats"} - - player = game_world.players[player_id] - return { - "status": "๐ŸŸข Connected", - "name": player.name, - "type": player.type, - "level": player.level, - "hp": f"{player.hp}/{player.max_hp}", - "gold": player.gold, - "experience": f"{player.experience}/{player.level * 100}", - "position": f"({player.x}, {player.y})", - "last_update": time.strftime("%H:%M:%S"), - "session_id": player.id[:8] + "..." - } - -# ============================================================================ -# MAIN APPLICATION WITH ALL FEATURES - RESTORED -# ============================================================================ - -def create_mmorpg_interface(): - """Create the complete MMORPG interface with all features""" - - # Initialize add-ons - read2burn_addon = Read2BurnMailboxAddon() - game_world.addon_npcs['read2burn_mailbox'] = read2burn_addon - - # Example MCP add-on (simulated) - weather_mcp_addon = MCPAddonWrapper("http://localhost:8001/mcp", "Weather Oracle") - game_world.addon_npcs['weather_oracle'] = weather_mcp_addon - - with gr.Blocks( - title="๐ŸŽฎ MMORPG with Complete MCP Integration" - ) as demo: - # Keyboard status indicator - keyboard_status = gr.HTML( - value="
๐ŸŽฎ Keyboard loading...
" - ) - # Inject keyboard control script - gr.HTML(r""" - -""") - - gr.Markdown(""" -
-

๐ŸŽฎ MMORPG with Complete MCP Integration

-

All features restored! Keyboard controls, NPCs, MCP, Read2Burn & more!

-

๐ŸŒŸ Real-time multiplayer โ€ข ๐Ÿค– AI agent support โ€ข ๐Ÿ”ฅ Read2Burn messaging โ€ข ๐Ÿ”Œ MCP add-ons โ€ข โŒจ๏ธ Keyboard controls

-
- """) - - # Player session state - player_state = gr.State({}) - - with gr.Tabs(): - - # Main Game Tab - with gr.Tab("๐ŸŒ Game World"): - with gr.Row(): - with gr.Column(scale=2): - # Player registration - with gr.Group(): - gr.Markdown("### ๐ŸŽฎ Join the Adventure") - with gr.Row(): - player_name = gr.Textbox( - label="Player Name", - placeholder="Enter your character name", - scale=3 - ) - join_btn = gr.Button("Join Game", variant="primary", scale=1) - leave_btn = gr.Button("Leave Game", variant="secondary", scale=1) - - # Keyboard controls info - ENHANCED - gr.Markdown(""" - ### โŒจ๏ธ Controls - **Mouse:** Click movement buttons below - **Keyboard:** Use **WASD** or **Arrow Keys** for movement - ๐Ÿ’ก *Press F12 โ†’ Console to see keyboard debug info* - """) - - # Game world view - game_view = gr.HTML( - value=create_game_world_html() - ) - - # Movement controls - gr.Markdown("### ๐Ÿ•น๏ธ Movement Controls") - with gr.Row(): - gr.HTML("") - move_up = gr.Button("โ†‘", size="lg", scale=1) - gr.HTML("") - with gr.Row(): - move_left = gr.Button("โ†", size="lg", scale=1) - action_btn = gr.Button("โš”๏ธ", size="lg", scale=1, variant="secondary") - move_right = gr.Button("โ†’", size="lg", scale=1) - with gr.Row(): - gr.HTML("") - move_down = gr.Button("โ†“", size="lg", scale=1) - gr.HTML("") - - with gr.Column(scale=1): - # Player info - player_info = gr.JSON( - label="๐Ÿงโ€โ™‚๏ธ Player Stats", - value={"status": "Not connected", "info": "Join the game to see your stats"} - ) - - # Online players - online_players = gr.Dataframe( - headers=["Name", "Type", "Level"], - label="๐Ÿ‘ฅ Online Players", - interactive=False - ) - - # World events - world_events = gr.Textbox( - label="๐ŸŒ World Events & NPC Interactions", - lines=4, - interactive=False, - placeholder="World events will appear here...\n\n๐Ÿ’ก Tip: Walk near NPCs (๐Ÿ“ฎ๐Ÿช๐ŸŒค๏ธ) to interact with them!\nThen visit the 'NPC Add-ons' tab to use their features." - ) - - # Chat system - with gr.Row(): - with gr.Column(scale=4): - chat_display = gr.Chatbot( - label="๐Ÿ’ฌ Game Chat", height=200, - type='messages', - value=[{"role": "assistant", "content": "Welcome! Join the game to start chatting!"}] - ) - - with gr.Row(): - chat_input = gr.Textbox( - placeholder="Type your message...", - scale=4, - container=False - ) - chat_send = gr.Button("Send", scale=1, variant="primary") - # Private Chat Section - Multi-Tab Implementation - with gr.Column(scale=2): - # Auto-refresh toggle - with gr.Row(): - auto_refresh_enabled = gr.Checkbox( - label="Auto-refresh (2s)", - value=True, - info="Toggle to preserve manual selections" - ) - - proximity_info = gr.HTML( - value="
๐Ÿ” Move near NPCs or players to chat privately
", - label="๐Ÿ“ฑ Nearby Entities" - ) - - # Private chat interface with multi-tab support - with gr.Group(visible=False) as private_chat_group: - nearby_entities = gr.Dropdown( - label="๐Ÿ’ฌ Start new chat with", - choices=[], - interactive=True - ) - - with gr.Row(): - start_chat_btn = gr.Button("Start Chat", variant="primary", scale=1) - clear_all_tabs_btn = gr.Button("Clear All", variant="secondary", scale=1) - - # Chat tabs container - with gr.Column() as chat_tabs_container: - # This will be dynamically populated with chat tabs - chat_tabs_state = gr.State({}) # Store active chat tabs: {entity_id: {tab_info, history, pinned}} - active_tabs_display = gr.HTML( - value="
No active chats
", - label="Active Chats" - ) - - # Current active chat display - current_chat_display = gr.Chatbot( - label="๐Ÿ”’ Private Messages", - height=150, - type='messages', - value=[], - visible=False - ) - - with gr.Row(visible=False) as chat_input_row: - private_message_input = gr.Textbox( - placeholder="Type private message...", - scale=4, - container=False - ) - private_send_btn = gr.Button("Send", scale=1, variant="secondary") - - # NPC Add-ons Tab - RESTORED - with gr.Tab("๐Ÿค– NPC Add-ons"): - gr.Markdown("## Available NPC Add-ons") - gr.Markdown("*Extensible plugin system - each NPC can have unique functionality!*") - - with gr.Tabs() as addon_tabs: - # Read2Burn Mailbox Add-on - with gr.Tab("๐Ÿ”ฅ Read2Burn Mailbox"): - read2burn_addon.get_interface() - - # Weather Oracle MCP Add-on - with gr.Tab("๐ŸŒค๏ธ Weather Oracle (MCP)"): - weather_mcp_addon.get_interface() - - # Add-on Manager - with gr.Tab("โš™๏ธ Manage Add-ons"): - gr.Markdown("### Install New Add-ons") - - with gr.Row(): - addon_type = gr.Dropdown( - choices=["Python Plugin", "MCP Server", "Web Service"], - label="Add-on Type", - value="MCP Server" - ) - - with gr.Row(): - addon_url = gr.Textbox( - label="Add-on URL/Path", - placeholder="http://localhost:8001/mcp or path/to/plugin.py" - ) - - with gr.Row(): - addon_name = gr.Textbox( - label="Display Name", - placeholder="My Custom Add-on" - ) - - install_btn = gr.Button("Install Add-on", variant="primary") - install_status = gr.Textbox(label="Status", interactive=False) - - def install_addon(addon_type: str, addon_url: str, addon_name: str): - if not addon_url or not addon_name: - return "โŒ Please provide both URL and name" - - if addon_type == "MCP Server": - try: - new_addon = MCPAddonWrapper(addon_url, addon_name) - game_world.addon_npcs[new_addon.addon_id] = new_addon - return f"โœ… Successfully installed MCP add-on '{addon_name}'" - except Exception as e: - return f"โŒ Failed to install add-on: {str(e)}" - else: - return f"๐Ÿšง {addon_type} installation not yet implemented" - - install_btn.click( - install_addon, - inputs=[addon_type, addon_url, addon_name], - outputs=[install_status] - ) - - # MCP Integration Tab - RESTORED - with gr.Tab("๐Ÿ”Œ MCP Integration"): - gr.Markdown(""" - ## MCP Integration for AI Agents - - AI agents can connect to this game via MCP and participate as players! - - ### Available MCP Tools: - - `register_ai_agent(name)` - Join the game as an AI player - - `move_agent(direction)` - Move around the world - - `send_chat(message)` - Chat with other players - - `get_game_state()` - Get current world information - - `interact_with_npc(npc_id, message)` - Use NPC add-ons - - ### API Endpoints (when launched with api_open=True): - - `/api/register_ai_agent` - Register new AI agent - - `/api/move_ai_agent` - Move AI agent - - `/api/ai_agent_chat` - Send chat message - - `/api/get_game_state_for_ai` - Get game state - """) - - # MCP server info - mcp_info = gr.JSON( - label="MCP Server Information", - value={ - "server_status": "๐ŸŸข Active", - "server_url": "This Gradio app serves as MCP server", - "tools_available": [ - "register_ai_agent", - "move_agent", - "send_chat", - "get_game_state", - "interact_with_npc" - ], - "active_ai_agents": 0, - "total_players": 0 - } - ) - - # AI Agent simulator (for testing) - with gr.Group(): - gr.Markdown("### ๐Ÿงช Test AI Agent") - gr.Markdown("*Use this to simulate AI agent connections for testing*") - - with gr.Row(): - ai_name = gr.Textbox( - label="AI Agent Name", - placeholder="Claude the Explorer", - scale=3 - ) - register_ai_btn = gr.Button("Register AI Agent", variant="primary", scale=1) - - with gr.Row(): - ai_action = gr.Dropdown( - choices=["move up", "move down", "move left", "move right", "chat"], - label="AI Action", - scale=2 - ) - - ai_message = gr.Textbox( - label="AI Message (for chat)", - placeholder="Hello humans!", - scale=3 - ) - - execute_ai_btn = gr.Button("Execute AI Action", variant="secondary") - ai_result = gr.Textbox(label="AI Action Result", interactive=False, lines=5) - - # ==================================================================== - # KEYBOARD CONTROLS - RESTORED AND ENHANCED - # ==================================================================== - - # JavaScript code to inject into head (FIXED SYNTAX) - keyboard_js = """ - - """ - - gr.HTML(keyboard_js) - - # ==================================================================== - # EVENT HANDLERS - RESTORED - # ==================================================================== - - def join_game(name: str, current_state: Dict, request: gr.Request): - """Handle player joining the game""" - print(f"[JOIN] Called with name='{name}', state={current_state}") - - if not name.strip(): - return ( - current_state, - {"status": "โŒ Error", "info": "Please enter a valid name"}, - create_game_world_html(), - [], - "Enter a player name to join!" - ) - - # Check if already joined - if current_state.get("player_id"): - player_id = current_state["player_id"] - if player_id in game_world.players: - player_stats = get_player_stats_display(player_id) - return ( - current_state, - player_stats, - create_game_world_html(), - [[p.name, p.type, p.level] for p in game_world.players.values()], - "Already connected!" - ) - - # Create new player - player_id = str(uuid.uuid4()) - player = Player( - id=player_id, - name=name.strip(), - type="human", - session_hash=request.session_hash, - last_active=time.time() - ) - - if game_world.add_player(player): - new_state = {"player_id": player_id, "player_name": name} - player_display = get_player_stats_display(player_id) - # Update displays - world_html = create_game_world_html() - players_list = [ - [p.name, p.type, p.level] - for p in game_world.players.values() - ] - events = "\n".join([ - f"{e.get('event', '')}" - for e in game_world.world_events[-5:] - ]) - # Get proximity info for private chat - proximity_html, private_chat_visible, entity_choices, private_messages = get_proximity_status(new_state) - - return (new_state, player_display, world_html, players_list, events, - proximity_html, private_chat_visible, entity_choices, private_messages) - else: - # Game is full - return default proximity state - return ( - current_state, - {"status": "โŒ Error", "info": "Game is full (20/20 players)"}, - create_game_world_html(), - [], - "Game is full!", - "
๐Ÿ” Join the game to see nearby entities
", - gr.update(visible=False), - gr.update(choices=[]), - [] ) - - def leave_game(current_state: Dict): - """Handle player leaving the game""" - if current_state and current_state.get("player_id"): - player_id = current_state["player_id"] - game_world.remove_player(player_id) - return ( - {}, # Clear state - {"status": "Not connected", "info": "Join the game to see your stats"}, - create_game_world_html(), - [[p.name, p.type, p.level] for p in game_world.players.values()], - "\n".join([e.get('event', '') for e in game_world.world_events[-5:]]), - "
๐Ÿ” Join the game to see nearby entities
", - gr.update(visible=False), - gr.update(choices=[]), - [] - ) - else: - return ( - current_state, - {"status": "Not connected", "info": "You're not in the game"}, - create_game_world_html(), - [], - "Not connected", - "
๐Ÿ” Join the game to see nearby entities
", - gr.update(visible=False), - gr.update(choices=[]), - [] ) - - def handle_movement(direction: str, current_state: Dict): - """Handle player movement""" - print(f"[MOVE] Direction: {direction}, state: {current_state}") - player_id = current_state.get("player_id") - if not player_id: - return current_state, create_game_world_html(), [], get_player_stats_display(None) - - success = game_world.move_player(player_id, direction) - if success: - # Update displays - world_html = create_game_world_html() - players_list = [ - [p.name, p.type, p.level] - for p in game_world.players.values() - ] - player_stats = get_player_stats_display(player_id) - - # Get proximity info for private chat - proximity_html, private_chat_visible, entity_choices, private_messages = get_proximity_status(current_state) - - return (current_state, world_html, players_list, player_stats, - proximity_html, private_chat_visible, entity_choices, private_messages) - # No movement occurred - proximity_html, private_chat_visible, entity_choices, private_messages = get_proximity_status(current_state) - return (current_state, create_game_world_html(), [], get_player_stats_display(player_id), - proximity_html, private_chat_visible, entity_choices, private_messages) - - def handle_chat_command(message: str, player: Player, player_id: str) -> str: - """Handle chat commands like /heal, /stats, etc.""" - parts = message.split() - command = parts[0].lower() - - if command == "/heal": - # Healing command - if player.health < 100: - old_health = player.health - player.health = min(100, player.health + 25) - game_world.add_world_event(f"โœจ {player.name} used healing magic!") - return f"๐Ÿ’š {player.name} healed for {player.health - old_health} HP! Health: {player.health}/100" - else: - return f"๐Ÿ’š {player.name} is already at full health!" - - elif command == "/stats": - # Show player stats - return f"๐Ÿ“Š {player.name} - Health: {player.health}/100, Position: ({player.x}, {player.y}), Type: {player.type}" - - elif command == "/time": - # Show current game time - return f"๐Ÿ• Current time: {time.strftime('%H:%M:%S')}" - - elif command == "/players": - # List online players - players_list = [p.name for p in game_world.players.values()] - return f"๐Ÿ‘ฅ Online players ({len(players_list)}): {', '.join(players_list)}" - - elif command == "/npcs": - # List nearby NPCs - nearby_npcs = [] - for npc_id, npc in game_world.npcs.items(): - distance = ((player.x - npc['x'])**2 + (player.y - npc['y'])**2)**0.5 - if distance <= 100: - nearby_npcs.append(f"{npc['name']} ({npc['type']})") - if nearby_npcs: - return f"๐Ÿค– Nearby NPCs: {', '.join(nearby_npcs)}" - else: - return "๐Ÿค– No NPCs nearby. Move around to find them!" - - elif command == "/help": - # Show available commands - return """๐Ÿ“– Available Commands: -/heal - Restore 25 HP -/stats - Show your player stats -/time - Show current time -/players - List online players -/npcs - List nearby NPCs -/help - Show this help""" - - else: - return f"โ“ Unknown command: {command}. Use /help to see available commands." - - def handle_chat(message: str, history: List, current_state: Dict): - """Handle chat messages and commands""" - if not message.strip(): - return history, "" - - player_id = current_state.get("player_id") - if not player_id: - return history, "" - - player = game_world.players.get(player_id) - if not player: - return history, "" - - # Check if message is a command - if message.startswith('/'): - command_result = handle_chat_command(message, player, player_id) - if command_result: - game_world.add_chat_message("๐ŸŽฎ System", command_result) - else: - # Regular chat message - game_world.add_chat_message(player.name, message.strip()) - - # Update chat display with new message format - formatted_history = [] - for msg in game_world.chat_messages[-20:]: formatted_history.append({ - "role": "assistant", - "content": f"[{msg['timestamp']}] {msg['sender']}: {msg['message']}" - }) - - return formatted_history, "" - - def get_updated_private_messages(player_id: str): - """Get formatted private messages for display""" - if not player_id: - return [] - - private_messages = game_world.get_private_messages_for_player(player_id) - formatted_private = [] - for msg in private_messages: - formatted_private.append({ - "role": "assistant", - "content": f"[{msg['timestamp']}] {msg['sender']}: {msg['message']}" - }) - - print(f"[PRIVATE_UPDATE] Returning {len(formatted_private)} private messages for player {player_id}") - return formatted_private - - def handle_private_message(message: str, current_state: Dict, chat_tabs_state: Dict): - """Handle private messages using the active chat tab""" - if not message.strip(): - return [], "" - - if not current_state.get("player_id"): - return [], "" - - if not chat_tabs_state: - return [], "" - - print(f"[PRIVATE_MESSAGE_DEBUG] Chat tabs state: {chat_tabs_state}") - # Find the active chat tab - active_entity_id = None - for entity_id, tab_info in chat_tabs_state.items(): - print(f"[PRIVATE_MESSAGE_DEBUG] Checking tab: entity_id={entity_id}, tab_info={tab_info}") - if tab_info.get('active', False): - active_entity_id = entity_id - break - - print(f"[PRIVATE_MESSAGE_DEBUG] Active entity_id: {active_entity_id}") - - if not active_entity_id: - return [], "" - - player_id = current_state["player_id"] - print(f"[PRIVATE_MESSAGE_DEBUG] Sending message from player {player_id} to entity {active_entity_id}: '{message.strip()}'") - # Send the private message - success, error_message = game_world.send_private_message(player_id, active_entity_id, message.strip()) - - if success: - # Get updated messages for this entity - updated_messages = get_chat_messages_for_entity(player_id, active_entity_id) - return updated_messages, "" - else: - return [], "" - - def get_chat_messages_for_entity(player_id: str, entity_id: str) -> List: - """Get chat messages for a specific entity""" - messages = [] - for msg in game_world.chat_messages: - msg_type = msg.get('type', '') - # Check for private messages between player and entity - if msg_type in ['private_to_npc', 'private_from_npc', 'private_to_player', 'private_from_player']: - # Message is to/from this specific entity and player - if ((msg.get('sender_id') == player_id and msg.get('target') == entity_id) or - (msg.get('sender_id') == entity_id and msg.get('target') == player_id)): - messages.append({ - "role": "assistant" if msg.get('sender_id') != player_id else "user", - "content": f"[{msg['timestamp']}] {msg['sender']}: {msg['message']}" - }) - - print(f"[CHAT_MESSAGES] Found {len(messages)} messages between player {player_id} and entity {entity_id}") - return messages - - def start_new_chat(entity_id: str, entity_name: str, chat_tabs_state: Dict) -> tuple: - """Start a new chat tab with an entity""" - if not entity_id or not entity_name: - return chat_tabs_state, "No entity selected" - - # Create new tab if it doesn't exist - if entity_id not in chat_tabs_state: - chat_tabs_state[entity_id] = { - 'name': entity_name, - 'active': True, - 'pinned': False, - 'unread': 0 - } - # Deactivate other tabs - for other_id in chat_tabs_state: - if other_id != entity_id: - chat_tabs_state[other_id]['active'] = False - else: - # Switch to existing tab - for other_id in chat_tabs_state: - chat_tabs_state[other_id]['active'] = (other_id == entity_id) - - # Generate updated tabs HTML - tabs_html = generate_chat_tabs_html(chat_tabs_state) - return chat_tabs_state, tabs_html - - def close_chat_tab(entity_id: str, chat_tabs_state: Dict) -> tuple: - """Close a chat tab (respects pinned status)""" - if entity_id in chat_tabs_state: - if not chat_tabs_state[entity_id].get('pinned', False): - del chat_tabs_state[entity_id] - # If there are remaining tabs, activate the first one - if chat_tabs_state: - first_tab = next(iter(chat_tabs_state)) - chat_tabs_state[first_tab]['active'] = True - - tabs_html = generate_chat_tabs_html(chat_tabs_state) - return chat_tabs_state, tabs_html - - def toggle_pin_tab(entity_id: str, chat_tabs_state: Dict) -> tuple: - """Toggle pin status of a chat tab""" - if entity_id in chat_tabs_state: - chat_tabs_state[entity_id]['pinned'] = not chat_tabs_state[entity_id].get('pinned', False) - - tabs_html = generate_chat_tabs_html(chat_tabs_state) - return chat_tabs_state, tabs_html - - def clear_all_chat_tabs(chat_tabs_state: Dict) -> tuple: - """Clear all non-pinned chat tabs""" - # Keep only pinned tabs - pinned_tabs = {k: v for k, v in chat_tabs_state.items() if v.get('pinned', False)} - chat_tabs_state.clear() - chat_tabs_state.update(pinned_tabs) - # If any tabs remain, activate the first one - if chat_tabs_state: - first_tab = next(iter(chat_tabs_state)) - chat_tabs_state[first_tab]['active'] = True - tabs_html = generate_chat_tabs_html(chat_tabs_state) - return chat_tabs_state, tabs_html - - # New: handle starting a chat and show chat area - def handle_start_chat(selection: str, chat_tabs_state: Dict, current_state: Dict): - """Start a chat tab and show messages and input row""" - print(f"[HANDLE_START_CHAT] Called with selection='{selection}', type={type(selection)}") - - if not selection: - print(f"[HANDLE_START_CHAT] No selection provided") - return chat_tabs_state, generate_chat_tabs_html(chat_tabs_state), gr.update(value=[], visible=False), gr.update(visible=False) - - # The selection should be the entity_id from dropdown value (first part of tuple) - entity_id = selection - print(f"[HANDLE_START_CHAT] Using entity_id='{entity_id}'") - - # Extra validation: ensure we have a valid entity ID - if not entity_id or not isinstance(entity_id, str): - print(f"[HANDLE_START_CHAT] Invalid entity_id: {entity_id}") - return chat_tabs_state, generate_chat_tabs_html(chat_tabs_state), gr.update(value=[], visible=False), gr.update(visible=False) - - # Find the entity name by looking up the entity in the game world - entity_name = None - entity_type = None - - if entity_id in game_world.npcs: - entity_name = game_world.npcs[entity_id]['name'] - entity_type = "NPC" - print(f"[HANDLE_START_CHAT] Found NPC: {entity_id} -> {entity_name}") - elif entity_id in game_world.players: - entity_name = game_world.players[entity_id].name - entity_type = "Player" - print(f"[HANDLE_START_CHAT] Found Player: {entity_id} -> {entity_name}") - else: - print(f"[HANDLE_START_CHAT] ERROR: Entity '{entity_id}' not found in NPCs or players") - print(f"[HANDLE_START_CHAT] Available NPCs: {list(game_world.npcs.keys())}") - print(f"[HANDLE_START_CHAT] Available Players: {list(game_world.players.keys())}") - # Use entity_id as fallback name, but this shouldn't happen with proper dropdown - entity_name = entity_id - entity_type = "Unknown" - - print(f"[CHAT_START] Starting chat with entity_id: '{entity_id}', entity_name: '{entity_name}', type: {entity_type}") - - # Validate that we're not accidentally using a display name as entity_id - if entity_name and "(" in entity_id and ")" in entity_id: - print(f"[HANDLE_START_CHAT] WARNING: entity_id '{entity_id}' looks like a display name! This suggests a bug.") - - # create or switch tab - use entity_id as the key - chat_tabs_state, tabs_html = start_new_chat(entity_id, entity_name, chat_tabs_state) - # load messages - player_id = current_state.get("player_id") - messages = get_chat_messages_for_entity(player_id, entity_id) if player_id else [] - - print(f"[HANDLE_START_CHAT] Created/switched to tab with key: '{entity_id}' and loaded {len(messages)} messages") - return chat_tabs_state, tabs_html, gr.update(value=messages, visible=True), gr.update(visible=True) - - # ...existing generate_chat_tabs_html... - def generate_chat_tabs_html(chat_tabs_state: Dict) -> str: - """Generate HTML for chat tabs display""" - if not chat_tabs_state: - return "
No active chats
" - - tabs_html = "
" - for entity_id, tab_info in chat_tabs_state.items(): - active_style = "background: #e3f2fd; border: 2px solid #2196f3;" if tab_info.get('active') else "background: #f5f5f5; border: 1px solid #ccc;" - pin_icon = "๐Ÿ“Œ" if tab_info.get('pinned') else "" - unread_badge = f" ({tab_info.get('unread', 0)})" if tab_info.get('unread', 0) > 0 else "" - - tabs_html += f""" -
- {tab_info['name']}{unread_badge} - {pin_icon} - ร— -
- """ - - tabs_html += "
" - return tabs_html - - def switch_to_chat_tab(entity_id: str, chat_tabs_state: Dict, player_id: str) -> tuple: - """Switch to a specific chat tab""" - if entity_id in chat_tabs_state: - # Deactivate all tabs - for tab_id in chat_tabs_state: - chat_tabs_state[tab_id]['active'] = False - - # Activate selected tab - chat_tabs_state[entity_id]['active'] = True - chat_tabs_state[entity_id]['unread'] = 0 # Clear unread count - - # Get messages for this entity - messages = get_chat_messages_for_entity(player_id, entity_id) - tabs_html = generate_chat_tabs_html(chat_tabs_state) - - return chat_tabs_state, tabs_html, messages, True # Show chat input - - return chat_tabs_state, generate_chat_tabs_html(chat_tabs_state), [], False - - def get_proximity_status(current_state: Dict, preserve_selection: str = None): - """Get proximity status and nearby entities""" - player_id = current_state.get("player_id") - print(f"[PROXIMITY_DEBUG] Called with player_id={player_id}, preserve_selection={preserve_selection}") - - if not player_id: - print(f"[PROXIMITY_DEBUG] No player_id, returning empty") - return ( - "
๐Ÿ” Join the game to see nearby entities
", - gr.update(visible=False), - gr.update(choices=[]), - [] ) - # Get nearby entities - nearby_entities = game_world.check_npc_proximity(player_id) - print(f"[PROXIMITY_DEBUG] Found {len(nearby_entities)} nearby entities: {[e.get('name') for e in nearby_entities]}") - - if not nearby_entities: - print(f"[PROXIMITY_DEBUG] No nearby entities, returning empty choices") - return ( - "
๐Ÿ” Move near NPCs or players to chat privately
", - gr.update(visible=False), - gr.update(choices=[], value=None), - [] - ) - - # Format proximity info and create dropdown choices - entity_list = [] - dropdown_choices = [] - - for entity in nearby_entities: - entity_id = entity['id'] - entity_name = entity['name'] - entity_type = entity['type'] - - if entity_type == 'npc': - entity_list.append(f"๐Ÿค– {entity_name} (NPC)") - else: # player - entity_list.append(f"๐Ÿ‘ค {entity_name} (Player)") - - # Create dropdown choice as (label, value) tuple - Gradio format - # IMPORTANT: value must be entity_id, label is display name - display_label = f"{entity_name} ({entity_type.upper()})" - dropdown_choices.append((display_label, entity_id)) - - print(f"[PROXIMITY_DEBUG] Added dropdown choice: Label='{display_label}' -> Value='{entity_id}'") - - print(f"[PROXIMITY_DEBUG] Created {len(dropdown_choices)} dropdown choices:") - for choice in dropdown_choices: - print(f"[PROXIMITY_DEBUG] - Label: '{choice[0]}', Value: '{choice[1]}'") - print(f"[PROXIMITY_DEBUG] When user selects from dropdown, handle_start_chat will receive the Value (entity_id), not the Label") - - proximity_html = f""" -
-
๐Ÿ“ฑ Nearby for Private Chat:
- {'
'.join(entity_list)} -
- """ - - # Get private messages for display - private_messages = game_world.get_private_messages_for_player(player_id) - formatted_private = [] - for msg in private_messages: - formatted_private.append({ - "role": "assistant", - "content": f"[{msg['timestamp']}] {msg['sender']}: {msg['message']}" - }) - - # Determine dropdown value: preserve if still valid - dropdown_value = None - if preserve_selection: - valid_ids = [choice[0] for choice in dropdown_choices] - if preserve_selection in valid_ids: - dropdown_value = preserve_selection - print(f"[PROXIMITY_DEBUG] Preserving selection: {preserve_selection}") - else: - print(f"[PROXIMITY_DEBUG] Cannot preserve selection '{preserve_selection}' - not in valid_ids: {valid_ids}") - - print(f"[PROXIMITY_DEBUG] Returning dropdown with value={dropdown_value}, choices={dropdown_choices}") - return ( - proximity_html, - gr.update(visible=True), - gr.update(choices=dropdown_choices, value=dropdown_value), - formatted_private - ) - - def register_test_ai_agent(ai_name: str): - """Register a test AI agent""" - if not ai_name.strip(): - return "Please enter AI agent name" - - try: - test_client_id = f"test_{uuid.uuid4().hex[:8]}" - agent_id = mcp_tools.register_ai_agent(ai_name.strip(), test_client_id) - return f"โœ… AI agent '{ai_name}' registered with ID: {agent_id}\nClient ID: {test_client_id}" - except Exception as e: - return f"โŒ Failed to register AI agent: {str(e)}" - - def execute_ai_action(action: str, message: str): - """Execute AI agent action""" - if not mcp_tools.ai_agents: - return "โŒ No AI agents registered. Register an AI agent first!" - - # Use the first registered AI agent for demo - client_id = list(mcp_tools.ai_agents.keys())[0] - - if action.startswith("move"): - direction = action.split()[1] - result = mcp_tools.move_ai_agent(client_id, direction) - return f"๐Ÿค– Move result:\n{json.dumps(result, indent=2)}" - elif action == "chat": - if not message.strip(): - return "โŒ Please enter a message for chat" - result = mcp_tools.ai_agent_chat(client_id, message.strip()) - return f"๐Ÿ’ฌ Chat result:\n{json.dumps(result, indent=2)}" - - return "โ“ Unknown action" - - def auto_refresh(current_state: Dict, current_dropdown: str, auto_refresh_enabled: bool): - """Auto-refresh game displays""" - if not auto_refresh_enabled: - # Return current values without updating if auto-refresh is disabled - return (gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), - gr.update(), gr.update(), gr.update(), gr.update()) - - player_id = current_state.get("player_id") if current_state else None - - # Update moving NPCs before rendering - game_world.update_moving_npcs() - - world_html = create_game_world_html() - players_list = [ - [p.name, p.type, p.level] - for p in game_world.players.values() - ] - - # Format chat - formatted_chat = [] - for msg in game_world.chat_messages[-20:]: - formatted_chat.append({ - "role": "assistant", - "content": f"[{msg['timestamp']}] {msg['sender']}: {msg['message']}" - }) - - events = "\n".join([ - f"{e.get('event', '')}" - for e in game_world.world_events[-5:] - ]) if game_world.world_events else "No recent events" - - # Update player stats - player_stats = get_player_stats_display(player_id) - - # Update MCP info - mcp_status = { - "server_status": "๐ŸŸข Active", - "server_url": "This Gradio app serves as MCP server", - "tools_available": [ - "register_ai_agent", - "move_agent", - "send_chat", - "get_game_state", - "interact_with_npc" - ], - "active_ai_agents": len(mcp_tools.ai_agents), - "total_players": len(game_world.players) } - # Get proximity info for private chat - preserve dropdown selection during auto-refresh - proximity_html, private_chat_visible, entity_choices, _ = get_proximity_status(current_state, current_dropdown) - # Return gr.update() for private_chat_display to avoid overwriting it - return (world_html, players_list, formatted_chat, events, player_stats, mcp_status, - proximity_html, private_chat_visible, entity_choices, gr.update()) - - # Wire up event handlers - join_btn.click( - join_game, - inputs=[player_name, player_state], - outputs=[player_state, player_info, game_view, online_players, world_events, - proximity_info, private_chat_group, nearby_entities, current_chat_display] - ) - - leave_btn.click( - leave_game, - inputs=[player_state], - outputs=[player_state, player_info, game_view, online_players, world_events, - proximity_info, private_chat_group, nearby_entities, current_chat_display] ) - - # Movement buttons - FIXED - def move_up_handler(state): - return handle_movement("up", state) - - def move_down_handler(state): - return handle_movement("down", state) - - def move_left_handler(state): - return handle_movement("left", state) - - def move_right_handler(state): - return handle_movement("right", state) - - move_up.click( - move_up_handler, - inputs=[player_state], - outputs=[player_state, game_view, online_players, player_info, - proximity_info, private_chat_group, nearby_entities, current_chat_display] - ) - - move_down.click( - move_down_handler, - inputs=[player_state], - outputs=[player_state, game_view, online_players, player_info, - proximity_info, private_chat_group, nearby_entities, current_chat_display] - ) - - move_left.click( - move_left_handler, - inputs=[player_state], - outputs=[player_state, game_view, online_players, player_info, - proximity_info, private_chat_group, nearby_entities, current_chat_display] - ) - - move_right.click( - move_right_handler, - inputs=[player_state], - outputs=[player_state, game_view, online_players, player_info, - proximity_info, private_chat_group, nearby_entities, current_chat_display] ) - - # Chat - chat_send.click( - handle_chat, - inputs=[chat_input, chat_display, player_state], - outputs=[chat_display, chat_input] - ) - - chat_input.submit( - handle_chat, - inputs=[chat_input, chat_display, player_state], - outputs=[chat_display, chat_input] ) - - # Private Chat Handlers - private_send_btn.click( - handle_private_message, - inputs=[private_message_input, player_state, chat_tabs_state], - outputs=[current_chat_display, private_message_input] - ) - - private_message_input.submit( - handle_private_message, - inputs=[private_message_input, player_state, chat_tabs_state], - outputs=[current_chat_display, private_message_input] - ) - - # AI testing - register_ai_btn.click( - register_test_ai_agent, - inputs=[ai_name], - outputs=[ai_result] - ) - - execute_ai_btn.click( - execute_ai_action, - inputs=[ai_action, ai_message], - outputs=[ai_result] - ) - # Multi-Chat Tab Event Handlers - start_chat_btn.click( - handle_start_chat, - inputs=[nearby_entities, chat_tabs_state, player_state], - outputs=[chat_tabs_state, active_tabs_display, current_chat_display, chat_input_row] - ) - - clear_all_tabs_btn.click( - clear_all_chat_tabs, - inputs=[chat_tabs_state], - outputs=[chat_tabs_state, active_tabs_display] - ) # Auto-refresh - refresh_timer = gr.Timer(value=2) # Refresh every 2 seconds - refresh_timer.tick( - auto_refresh, - inputs=[player_state, nearby_entities, auto_refresh_enabled], - outputs=[game_view, online_players, chat_display, world_events, player_info, mcp_info, - proximity_info, private_chat_group, nearby_entities, current_chat_display] - ) - - return demo.queue() - -# ============================================================================ -# MAIN APPLICATION -# ============================================================================ - -if __name__ == "__main__": - print("๐ŸŽฎ Starting COMPLETE MMORPG with ALL FEATURES...") - print("\n๐Ÿ”ง Features Restored:") - print("โœ… FIXED: Deadlock issue resolved with RLock") - print("โœ… Human players can join and play") - print("โœ… AI agents can connect via MCP API") - print("โœ… Extensible NPC add-on system") - print("โœ… MCP servers can be used as add-ons") - print("โœ… Real-time multiplayer gameplay") - print("โœ… Read2Burn secure mailbox add-on") - print("โœ… Keyboard controls (WASD + Arrow Keys)") - print("โœ… Complete MCP integration") - print("โœ… All original features restored") - print("="*60) - - app = create_mmorpg_interface() - - # Launch with MCP API access print("\n๐Ÿš€ Launching complete game server...") - print("๐ŸŒ Human players: Access via web interface") - print("๐Ÿค– AI agents: Connect via MCP API endpoints") - print("โŒจ๏ธ Keyboard: Use WASD or Arrow Keys for movement") - print("๐Ÿ”Œ MCP Tools available for AI integration") - print("๐Ÿ”ฅ Read2Burn mailbox for secure messaging") - print("๐ŸŽฏ All features working with deadlock fix!") - print("="*60) - app.launch( - #server_name="127.0.0.1", - #server_port=7868, - mcp_server=True, - share=True, - show_error=True, - debug=True - ) \ No newline at end of file +#!/usr/bin/env python3 +""" +MMORPG Application - Refactored Clean Architecture +Main application entry point using clean architecture principles +""" + +import gradio as gr +import asyncio +import threading +from typing import Optional + +# Import our clean architecture components +from src.core.game_engine import GameEngine +from src.facades.game_facade import GameFacade +from src.ui.huggingface_ui import HuggingFaceUI +from src.ui.interface_manager import InterfaceManager +from src.mcp.mcp_tools import GradioMCPTools + +class MMORPGApplication: + """Main application class that orchestrates the MMORPG game.""" + + def __init__(self): + """Initialize the application with all necessary components.""" + # Initialize core game engine (singleton) + self.game_engine = GameEngine() + + # Initialize game facade for simplified operations + self.game_facade = GameFacade() + # Initialize UI components + self.ui = HuggingFaceUI(self.game_facade) + self.interface_manager = InterfaceManager(self.game_facade, self.ui) + + # Initialize MCP tools for AI agent integration + self.mcp_tools = GradioMCPTools(self.game_facade) + + # Gradio interface reference + self.gradio_interface: Optional[gr.Blocks] = None + + # Background tasks + self._cleanup_task = None + self._auto_refresh_task = None + + def create_gradio_interface(self) -> gr.Blocks: + """Create and configure the Gradio interface.""" + # Use the InterfaceManager to create the complete interface with proper event handling + self.interface_manager._create_interface_with_events() + self.gradio_interface = self.interface_manager.interface + return self.gradio_interface + + def start_background_tasks(self): + """Start background tasks for game maintenance.""" + + def cleanup_task(): + """Background task for cleaning up inactive players.""" + while True: + try: + self.game_facade.cleanup_inactive_players() + threading.Event().wait(30) # Wait 30 seconds + except Exception as e: + print(f"Error in cleanup task: {e}") + threading.Event().wait(5) # Wait before retry + + def auto_refresh_task(): + """Background task for auto-refreshing game state.""" + while True: + try: + # Trigger refresh for active sessions + # This would need session tracking for real implementation + threading.Event().wait(2) # Refresh every 2 seconds + except Exception as e: + print(f"Error in auto-refresh task: {e}") + threading.Event().wait(5) # Wait before retry + + # Start cleanup task + self._cleanup_task = threading.Thread(target=cleanup_task, daemon=True) + self._cleanup_task.start() + + # Start auto-refresh task + self._auto_refresh_task = threading.Thread(target=auto_refresh_task, daemon=True) + self._auto_refresh_task.start() + + def run(self, share: bool = False, server_port: int = 7860): + """Run the MMORPG application.""" + print("๐ŸŽฎ Starting MMORPG Application...") + print("๐Ÿ—๏ธ Initializing game engine...") + # Initialize game world and services + if not self.game_engine.start(): + print("โŒ Failed to start game engine") + return + + print("๐ŸŽจ Creating user interface...") + + # Create Gradio interface + interface = self.create_gradio_interface() + + print("๐Ÿ”ง Starting background tasks...") + + # Start background maintenance tasks + self.start_background_tasks() + + print("๐Ÿš€ Launching server...") + print(f"๐ŸŒ Server will be available at: http://localhost:{server_port}") + + if share: + print("๐Ÿ”— Public URL will be generated...") + + # Launch the interface + interface.launch( + share=share, + debug=True, + server_port=server_port, + mcp_server=True, # Enable MCP server integration + show_error=True, + quiet=False + ) + +def main(): + """Main entry point for the application.""" + # Create and run the application + app = MMORPGApplication() + # Run with default settings + # Change share=True to make it publicly accessible + app.run(share=True, server_port=7869) + +if __name__ == "__main__": + main()