""" Plugin management service implementation. """ import os import importlib.util import importlib import sys from typing import Dict, List, Optional, Any from ..interfaces.service_interfaces import IPluginService from ..interfaces.plugin_interfaces import ( IPlugin, PluginMetadata, PluginError, PluginInitializationError, PluginDependencyError ) class PluginService(IPluginService): """Service for managing the plugin system.""" def __init__(self, plugins_directory: str = "plugins"): self.plugins_directory = plugins_directory self.loaded_plugins: Dict[str, IPlugin] = {} self.plugin_modules: Dict[str, Any] = {} self.game_context: Dict[str, Any] = {} def set_game_context(self, context: Dict[str, Any]): """Set the game context for plugins.""" self.game_context = context def load_plugins(self) -> int: """Load all plugins from plugins directory.""" if not os.path.exists(self.plugins_directory): os.makedirs(self.plugins_directory) print(f"[PluginService] Created plugins directory: {self.plugins_directory}") return 0 loaded_count = 0 plugin_files = [f for f in os.listdir(self.plugins_directory) if f.endswith('.py') and not f.startswith('__')] for plugin_file in plugin_files: plugin_path = os.path.join(self.plugins_directory, plugin_file) if self.load_plugin(plugin_path): loaded_count += 1 print(f"[PluginService] Loaded {loaded_count}/{len(plugin_files)} plugins") return loaded_count def load_plugin(self, plugin_path: str) -> bool: """Load a specific plugin.""" try: if not os.path.exists(plugin_path): print(f"[PluginService] Plugin file not found: {plugin_path}") return False # Extract plugin name from file path plugin_name = os.path.splitext(os.path.basename(plugin_path))[0] # Load module spec = importlib.util.spec_from_file_location(plugin_name, plugin_path) if not spec or not spec.loader: print(f"[PluginService] Failed to create spec for {plugin_path}") return False module = importlib.util.module_from_spec(spec) # Execute module spec.loader.exec_module(module) # Find plugin class (should implement IPlugin) plugin_class = None plugin_candidates = [] for attr_name in dir(module): attr = getattr(module, attr_name) if (isinstance(attr, type) and issubclass(attr, IPlugin) and attr != IPlugin): # Skip interface classes (they usually start with 'I' and are imported) if not (attr_name.startswith('I') and hasattr(attr, '__abstractmethods__') and attr.__abstractmethods__): plugin_candidates.append((attr_name, attr)) # Prefer classes defined in this module (not imported interfaces) for attr_name, attr in plugin_candidates: if attr.__module__ == module.__name__: plugin_class = attr break # If no module-local class found, take the first candidate if not plugin_class and plugin_candidates: plugin_class = plugin_candidates[0][1] if not plugin_class: print(f"[PluginService] No IPlugin implementation found in {plugin_path}") return False # Instantiate plugin plugin_instance = plugin_class() # Check metadata metadata = plugin_instance.metadata if not isinstance(metadata, PluginMetadata): print(f"[PluginService] Invalid metadata in plugin {plugin_name}") return False # Check dependencies if not self._check_dependencies(metadata.dependencies): print(f"[PluginService] Dependencies not met for plugin {metadata.id}") return False # Initialize plugin if not plugin_instance.initialize(self.game_context): print(f"[PluginService] Failed to initialize plugin {metadata.id}") return False # Store plugin self.loaded_plugins[metadata.id] = plugin_instance self.plugin_modules[metadata.id] = module print(f"[PluginService] Successfully loaded plugin: {metadata.name} v{metadata.version}") return True except Exception as e: print(f"[PluginService] Error loading plugin {plugin_path}: {e}") return False def unload_plugin(self, plugin_id: str) -> bool: """Unload a plugin.""" try: if plugin_id not in self.loaded_plugins: return False plugin = self.loaded_plugins[plugin_id] # Shutdown plugin plugin.shutdown() # Remove from loaded plugins del self.loaded_plugins[plugin_id] # Remove module from sys.modules if present if plugin_id in self.plugin_modules: module = self.plugin_modules[plugin_id] module_name = getattr(module, '__name__', None) if module_name and module_name in sys.modules: del sys.modules[module_name] del self.plugin_modules[plugin_id] print(f"[PluginService] Unloaded plugin: {plugin_id}") return True except Exception as e: print(f"[PluginService] Error unloading plugin {plugin_id}: {e}") return False def get_loaded_plugins(self) -> List[str]: """Get list of loaded plugin IDs.""" return list(self.loaded_plugins.keys()) def reload_plugin(self, plugin_id: str) -> bool: """Reload a plugin (hot-reload).""" try: if plugin_id not in self.loaded_plugins: return False # Get plugin file path plugin_files = [f for f in os.listdir(self.plugins_directory) if f.endswith('.py')] plugin_file = None for f in plugin_files: plugin_path = os.path.join(self.plugins_directory, f) # Try to match by loading and checking metadata # This is a simplified approach if plugin_id in f.lower(): plugin_file = plugin_path break if not plugin_file: print(f"[PluginService] Could not find file for plugin {plugin_id}") return False # Unload current plugin self.unload_plugin(plugin_id) # Load updated plugin return self.load_plugin(plugin_file) except Exception as e: print(f"[PluginService] Error reloading plugin {plugin_id}: {e}") return False def get_plugin(self, plugin_id: str) -> Optional[IPlugin]: """Get a loaded plugin instance.""" return self.loaded_plugins.get(plugin_id) def get_plugins_by_type(self, plugin_type: str) -> List[IPlugin]: """Get all plugins of a specific type.""" result = [] for plugin in self.loaded_plugins.values(): if plugin.metadata.plugin_type.value == plugin_type: result.append(plugin) return result def call_plugin_event(self, event_name: str, *args, **kwargs): """Call an event on all loaded plugins.""" for plugin in self.loaded_plugins.values(): try: if hasattr(plugin, event_name): method = getattr(plugin, event_name) if callable(method): method(*args, **kwargs) except Exception as e: print(f"[PluginService] Error calling {event_name} on plugin {plugin.metadata.id}: {e}") def get_plugin_status(self, plugin_id: str) -> Optional[Dict[str, Any]]: """Get status of a specific plugin.""" plugin = self.loaded_plugins.get(plugin_id) if not plugin: return None try: status = plugin.get_status() status['metadata'] = { 'id': plugin.metadata.id, 'name': plugin.metadata.name, 'version': plugin.metadata.version, 'type': plugin.metadata.plugin_type.value, 'author': plugin.metadata.author } return status except Exception as e: return {'error': str(e)} def get_all_plugin_status(self) -> Dict[str, Dict[str, Any]]: """Get status of all loaded plugins.""" result = {} for plugin_id in self.loaded_plugins: result[plugin_id] = self.get_plugin_status(plugin_id) return result def _check_dependencies(self, dependencies: List[str]) -> bool: """Check if plugin dependencies are satisfied.""" # For now, just check if dependency plugins are loaded for dep in dependencies: if dep not in self.loaded_plugins: return False return True def create_sample_plugin(self) -> bool: """Create a sample plugin file for demonstration.""" try: sample_plugin_content = '''""" Sample plugin demonstrating the plugin system. """ from src.interfaces.plugin_interfaces import IPlugin, PluginMetadata, PluginType from typing import Dict, Any class SamplePlugin(IPlugin): """Sample plugin implementation.""" @property def metadata(self) -> PluginMetadata: return PluginMetadata( id="sample_plugin", name="Sample Plugin", version="1.0.0", description="A sample plugin demonstrating the plugin system", author="MMORPG System", plugin_type=PluginType.EVENT, dependencies=[], config={} ) def initialize(self, context: Dict[str, Any]) -> bool: """Initialize the plugin.""" self.context = context print("[SamplePlugin] Plugin initialized!") return True def shutdown(self) -> bool: """Shutdown the plugin.""" print("[SamplePlugin] Plugin shutdown!") return True def get_status(self) -> Dict[str, Any]: """Get plugin status.""" return { "status": "active", "message": "Sample plugin is running normally" } def on_player_join(self, player_id: str) -> None: """Called when a player joins.""" print(f"[SamplePlugin] Player {player_id} joined!") def on_player_leave(self, player_id: str) -> None: """Called when a player leaves.""" print(f"[SamplePlugin] Player {player_id} left!") def on_chat_message(self, sender_id: str, message: str, message_type: str) -> None: """Called when a chat message is sent.""" if "hello plugin" in message.lower(): print(f"[SamplePlugin] Detected greeting from {sender_id}") ''' sample_path = os.path.join(self.plugins_directory, "sample_plugin.py") with open(sample_path, 'w', encoding='utf-8') as f: f.write(sample_plugin_content) print(f"[PluginService] Created sample plugin at {sample_path}") return True except Exception as e: print(f"[PluginService] Error creating sample plugin: {e}") return False