""" LifeAdmin AI - Core Agent Logic Autonomous planning, tool orchestration, and execution """ import asyncio import json import time from typing import List, Dict, Any, Optional from dataclasses import dataclass, asdict from enum import Enum from agent.mcp_client import MCPClient from agent.rag_engine import RAGEngine from agent.memory import MemoryStore from utils.llm_utils import get_llm_response class TaskStatus(Enum): PENDING = "pending" IN_PROGRESS = "in_progress" COMPLETED = "completed" FAILED = "failed" @dataclass class AgentThought: """Represents a thought/step in agent reasoning""" step: int type: str # 'planning', 'tool_call', 'reflection', 'answer' content: str tool_name: Optional[str] = None tool_args: Optional[Dict] = None tool_result: Optional[Any] = None timestamp: float = None def __post_init__(self): if self.timestamp is None: self.timestamp = time.time() @dataclass class AgentTask: """Represents a task to be executed""" id: str description: str tool: str args: Dict[str, Any] status: TaskStatus = TaskStatus.PENDING result: Optional[Any] = None error: Optional[str] = None class LifeAdminAgent: """Main autonomous agent with planning, tool calling, and reflection""" def __init__(self): self.mcp_client = MCPClient() self.rag_engine = RAGEngine() self.memory = MemoryStore() self.thoughts: List[AgentThought] = [] self.current_context = {} def reset_context(self): """Reset agent context for new task""" self.thoughts = [] self.current_context = {} async def plan(self, user_request: str, available_files: List[str] = None) -> List[AgentTask]: """ Create execution plan from user request Args: user_request: Natural language request from user available_files: List of uploaded files Returns: List of tasks to execute """ self.thoughts.append(AgentThought( step=len(self.thoughts) + 1, type='planning', content=f"Analyzing request: {user_request}" )) # Get available tools tools = await self.mcp_client.list_tools() tool_descriptions = "\n".join([ f"- {tool['name']}: {tool.get('description', '')}" for tool in tools ]) # Search RAG for relevant context relevant_docs = [] if user_request: relevant_docs = await self.rag_engine.search(user_request, k=3) context = "\n".join([doc['text'][:200] for doc in relevant_docs]) if relevant_docs else "No previous documents" # Get memory memory_context = self.memory.get_relevant_memories(user_request) # Create planning prompt planning_prompt = f"""You are an autonomous life admin agent. Create a step-by-step execution plan. USER REQUEST: {user_request} AVAILABLE FILES: {', '.join(available_files) if available_files else 'None'} AVAILABLE TOOLS: {tool_descriptions} RELEVANT CONTEXT: {context} MEMORY: {memory_context} Create a JSON plan with tasks. Each task should have: - id: unique identifier - description: what this task does - tool: which tool to use - args: arguments for the tool (as a dict) Return ONLY valid JSON array of tasks, no other text. Example format: [ {{ "id": "task_1", "description": "Extract text from document", "tool": "ocr_extract_text", "args": {{"file_path": "document.pdf", "language": "en"}} }} ] """ self.thoughts.append(AgentThought( step=len(self.thoughts) + 1, type='planning', content="Creating execution plan with LLM..." )) try: plan_response = await get_llm_response(planning_prompt, temperature=0.3) # Extract JSON from response plan_text = plan_response.strip() if '```json' in plan_text: plan_text = plan_text.split('```json')[1].split('```')[0].strip() elif '```' in plan_text: plan_text = plan_text.split('```')[1].split('```')[0].strip() tasks_data = json.loads(plan_text) tasks = [ AgentTask(**{**task, 'status': TaskStatus.PENDING}) for task in tasks_data ] self.thoughts.append(AgentThought( step=len(self.thoughts) + 1, type='planning', content=f"Created plan with {len(tasks)} tasks" )) return tasks except Exception as e: self.thoughts.append(AgentThought( step=len(self.thoughts) + 1, type='planning', content=f"Planning failed: {str(e)}" )) return [] async def execute_task(self, task: AgentTask) -> AgentTask: """Execute a single task using MCP tools""" self.thoughts.append(AgentThought( step=len(self.thoughts) + 1, type='tool_call', content=f"Executing: {task.description}", tool_name=task.tool, tool_args=task.args )) task.status = TaskStatus.IN_PROGRESS try: # Call MCP tool result = await self.mcp_client.call_tool(task.tool, task.args) task.result = result task.status = TaskStatus.COMPLETED self.thoughts.append(AgentThought( step=len(self.thoughts) + 1, type='tool_call', content=f"✓ Completed: {task.description}", tool_name=task.tool, tool_result=result )) return task except Exception as e: task.error = str(e) task.status = TaskStatus.FAILED self.thoughts.append(AgentThought( step=len(self.thoughts) + 1, type='tool_call', content=f"✗ Failed: {task.description} - {str(e)}", tool_name=task.tool )) return task async def reflect(self, tasks: List[AgentTask], original_request: str) -> str: """ Reflect on execution results and create final answer Args: tasks: Executed tasks original_request: Original user request Returns: Final answer string """ self.thoughts.append(AgentThought( step=len(self.thoughts) + 1, type='reflection', content="Analyzing results and creating response..." )) # Compile results results_summary = [] for task in tasks: if task.status == TaskStatus.COMPLETED: results_summary.append(f"✓ {task.description}: {str(task.result)[:200]}") else: results_summary.append(f"✗ {task.description}: {task.error}") reflection_prompt = f"""You are an autonomous life admin agent. Review the execution results and create a helpful response. ORIGINAL REQUEST: {original_request} EXECUTION RESULTS: {chr(10).join(results_summary)} Provide a clear, helpful response to the user about what was accomplished. Be specific about: 1. What tasks were completed successfully 2. What outputs were created (files, calendar events, etc.) 3. Any issues encountered 4. Next steps if applicable Keep response concise but informative. """ try: final_answer = await get_llm_response(reflection_prompt, temperature=0.7) self.thoughts.append(AgentThought( step=len(self.thoughts) + 1, type='answer', content=final_answer )) # Store in memory self.memory.add_memory( f"Request: {original_request}\nResult: {final_answer}", metadata={'type': 'task_completion', 'timestamp': time.time()} ) return final_answer except Exception as e: error_msg = f"Reflection failed: {str(e)}" self.thoughts.append(AgentThought( step=len(self.thoughts) + 1, type='answer', content=error_msg )) return error_msg async def execute(self, user_request: str, files: List[str] = None, stream_thoughts: bool = False): """ Main execution loop - plan, execute, reflect Args: user_request: User's natural language request files: Uploaded files to process stream_thoughts: Whether to yield thoughts as they happen Yields: Thoughts if stream_thoughts=True Returns: Final answer and complete thought trace """ self.reset_context() # Phase 1: Planning if stream_thoughts: yield self.thoughts[-1] if self.thoughts else None tasks = await self.plan(user_request, files) if stream_thoughts: for thought in self.thoughts[-2:]: # Last 2 planning thoughts yield thought if not tasks: error_thought = AgentThought( step=len(self.thoughts) + 1, type='answer', content="Could not create execution plan. Please rephrase your request." ) self.thoughts.append(error_thought) return error_thought.content, self.thoughts # Phase 2: Execution executed_tasks = [] for task in tasks: executed_task = await self.execute_task(task) executed_tasks.append(executed_task) if stream_thoughts: yield self.thoughts[-1] # Latest thought # Phase 3: Reflection final_answer = await self.reflect(executed_tasks, user_request) if stream_thoughts: yield self.thoughts[-1] # Final answer thought return final_answer, self.thoughts def get_thought_trace(self) -> List[Dict]: """Get formatted thought trace for UI display""" return [asdict(thought) for thought in self.thoughts] async def process_files_to_rag(self, files: List[Dict[str, str]]): """Process uploaded files and add to RAG engine""" for file_info in files: try: # Extract text based on file type if file_info['path'].endswith('.pdf'): from utils.pdf_utils import extract_text_from_pdf text = extract_text_from_pdf(file_info['path']) elif file_info['path'].endswith(('.png', '.jpg', '.jpeg')): # Use OCR tool result = await self.mcp_client.call_tool( 'ocr_extract_text', {'file_path': file_info['path'], 'language': 'en'} ) text = result.get('text', '') else: with open(file_info['path'], 'r', encoding='utf-8') as f: text = f.read() # Add to RAG await self.rag_engine.add_document( text=text, metadata={'filename': file_info['name'], 'path': file_info['path']} ) except Exception as e: print(f"Error processing {file_info['name']}: {e}") async def manual_tool_call(self, tool_name: str, args: Dict[str, Any]) -> Any: """Direct tool call for manual mode""" try: result = await self.mcp_client.call_tool(tool_name, args) return {'success': True, 'result': result} except Exception as e: return {'success': False, 'error': str(e)}