Spaces:
Runtime error
Runtime error
| from typing import Dict, List | |
| from crewai import Crew, Task | |
| import logging | |
| from utils.log_manager import LogManager | |
| from agents.conversation_agent import ConversationAgent | |
| from agents.assessment_agent import AssessmentAgent | |
| from agents.mindfulness_agent import MindfulnessAgent | |
| from agents.crisis_agent import CrisisAgent | |
| class WellnessOrchestrator: | |
| """Orchestrates the coordination between different agents""" | |
| def __init__(self, model_config: Dict): | |
| self.model_config = model_config | |
| self.log_manager = LogManager() | |
| self.logger = self.log_manager.get_agent_logger("orchestrator") | |
| # Initialize agents | |
| self.initialize_agents() | |
| # Initialize CrewAI | |
| self.initialize_crew() | |
| def initialize_agents(self): | |
| """Initialize all agents with their specific roles and tools""" | |
| self.logger.info("Initializing agents") | |
| try: | |
| # Common agent configuration | |
| agent_config = { | |
| "llm_config": self.model_config.get("conversation", {}), | |
| "temperature": 0.7, | |
| "max_iterations": 3 | |
| } | |
| # Initialize each agent | |
| self.conversation_agent = ConversationAgent( | |
| model_config=self.model_config, | |
| **agent_config | |
| ) | |
| self.assessment_agent = ConversationAgent( # Temporarily using ConversationAgent | |
| model_config=self.model_config, | |
| **agent_config | |
| ) | |
| self.mindfulness_agent = ConversationAgent( # Temporarily using ConversationAgent | |
| model_config=self.model_config, | |
| **agent_config | |
| ) | |
| self.crisis_agent = ConversationAgent( # Temporarily using ConversationAgent | |
| model_config=self.model_config, | |
| **agent_config | |
| ) | |
| self.logger.info("All agents initialized successfully") | |
| except Exception as e: | |
| self.logger.error(f"Error initializing agents: {str(e)}") | |
| raise | |
| def initialize_crew(self): | |
| """Initialize CrewAI with agents""" | |
| self.logger.info("Initializing CrewAI") | |
| try: | |
| # Create the crew with all agent instances | |
| self.crew = Crew( | |
| agents=[ | |
| self.conversation_agent, | |
| self.assessment_agent, | |
| self.mindfulness_agent, | |
| self.crisis_agent | |
| ] | |
| ) | |
| self.logger.info("CrewAI initialized successfully") | |
| except Exception as e: | |
| self.logger.error(f"Error initializing CrewAI: {str(e)}") | |
| raise | |
| def process_message(self, message: str, context: Dict = None) -> Dict: | |
| """Process user message through appropriate agents""" | |
| self.logger.info("Processing message through agents") | |
| context = context or {} | |
| try: | |
| # Create a task based on message type | |
| if self._is_crisis(message): | |
| task = Task( | |
| description=message, | |
| agent=self.crisis_agent, | |
| context=context, | |
| expected_output="Crisis intervention response" | |
| ) | |
| response = self.crisis_agent.execute_task(task) | |
| agent_type = "crisis" | |
| elif "assess" in message.lower() or "evaluate" in message.lower(): | |
| task = Task( | |
| description=message, | |
| agent=self.assessment_agent, | |
| context=context, | |
| expected_output="Mental health assessment response" | |
| ) | |
| response = self.assessment_agent.execute_task(task) | |
| agent_type = "assessment" | |
| elif "meditate" in message.lower() or "mindful" in message.lower(): | |
| task = Task( | |
| description=message, | |
| agent=self.mindfulness_agent, | |
| context=context, | |
| expected_output="Mindfulness guidance response" | |
| ) | |
| response = self.mindfulness_agent.execute_task(task) | |
| agent_type = "mindfulness" | |
| else: | |
| task = Task( | |
| description=message, | |
| agent=self.conversation_agent, | |
| context=context, | |
| expected_output="Therapeutic conversation response" | |
| ) | |
| response = self.conversation_agent.execute_task(task) | |
| agent_type = "conversation" | |
| return { | |
| "message": response, | |
| "agent_type": agent_type, | |
| "task_type": task.expected_output | |
| } | |
| except Exception as e: | |
| self.logger.error(f"Error processing message: {str(e)}") | |
| return { | |
| "message": "I apologize, but I encountered an error. Please try again.", | |
| "agent_type": "error", | |
| "task_type": "error_handling" | |
| } | |
| def _is_crisis(self, message: str) -> bool: | |
| """Check if message indicates a crisis situation""" | |
| crisis_indicators = [ | |
| "suicide", "kill myself", "end it all", | |
| "hurt myself", "give up", "can't go on", | |
| "emergency", "crisis", "urgent help" | |
| ] | |
| return any(indicator in message.lower() for indicator in crisis_indicators) | |
| def get_status(self) -> Dict: | |
| """Get status of all agents""" | |
| return { | |
| "conversation": self.conversation_agent.get_status(), | |
| "assessment": self.assessment_agent.get_status(), | |
| "mindfulness": self.mindfulness_agent.get_status(), | |
| "crisis": self.crisis_agent.get_status() | |
| } |