Spaces:
Running
Running
| """ | |
| LifeAdmin AI - Core Agent Logic | |
| Autonomous planning, tool orchestration, and execution | |
| """ | |
| import asyncio | |
| import json | |
| import time | |
| from typing import List, Dict, Any, Optional | |
| from dataclasses import dataclass, asdict | |
| from enum import Enum | |
| from agent.mcp_client import MCPClient | |
| from agent.rag_engine import RAGEngine | |
| from agent.memory import MemoryStore | |
| from utils.llm_utils import get_llm_response | |
| class TaskStatus(Enum): | |
| PENDING = "pending" | |
| IN_PROGRESS = "in_progress" | |
| COMPLETED = "completed" | |
| FAILED = "failed" | |
| class AgentThought: | |
| """Represents a thought/step in agent reasoning""" | |
| step: int | |
| type: str # 'planning', 'tool_call', 'reflection', 'answer' | |
| content: str | |
| tool_name: Optional[str] = None | |
| tool_args: Optional[Dict] = None | |
| tool_result: Optional[Any] = None | |
| timestamp: float = None | |
| def __post_init__(self): | |
| if self.timestamp is None: | |
| self.timestamp = time.time() | |
| class AgentTask: | |
| """Represents a task to be executed""" | |
| id: str | |
| description: str | |
| tool: str | |
| args: Dict[str, Any] | |
| status: TaskStatus = TaskStatus.PENDING | |
| result: Optional[Any] = None | |
| error: Optional[str] = None | |
| class LifeAdminAgent: | |
| """Main autonomous agent with planning, tool calling, and reflection""" | |
| def __init__(self): | |
| self.mcp_client = MCPClient() | |
| self.rag_engine = RAGEngine() | |
| self.memory = MemoryStore() | |
| self.thoughts: List[AgentThought] = [] | |
| self.current_context = {} | |
| def reset_context(self): | |
| """Reset agent context for new task""" | |
| self.thoughts = [] | |
| self.current_context = {} | |
| async def plan(self, user_request: str, available_files: List[str] = None) -> List[AgentTask]: | |
| """ | |
| Create execution plan from user request | |
| Args: | |
| user_request: Natural language request from user | |
| available_files: List of uploaded files | |
| Returns: | |
| List of tasks to execute | |
| """ | |
| self.thoughts.append(AgentThought( | |
| step=len(self.thoughts) + 1, | |
| type='planning', | |
| content=f"Analyzing request: {user_request}" | |
| )) | |
| # Get available tools | |
| tools = await self.mcp_client.list_tools() | |
| tool_descriptions = "\n".join([ | |
| f"- {tool['name']}: {tool.get('description', '')}" | |
| for tool in tools | |
| ]) | |
| # Search RAG for relevant context | |
| relevant_docs = [] | |
| if user_request: | |
| relevant_docs = await self.rag_engine.search(user_request, k=3) | |
| context = "\n".join([doc['text'][:200] for doc in relevant_docs]) if relevant_docs else "No previous documents" | |
| # Get memory | |
| memory_context = self.memory.get_relevant_memories(user_request) | |
| # Create planning prompt | |
| planning_prompt = f"""You are an autonomous life admin agent. Create a step-by-step execution plan. | |
| USER REQUEST: {user_request} | |
| AVAILABLE FILES: {', '.join(available_files) if available_files else 'None'} | |
| AVAILABLE TOOLS: | |
| {tool_descriptions} | |
| RELEVANT CONTEXT: | |
| {context} | |
| MEMORY: | |
| {memory_context} | |
| Create a JSON plan with tasks. Each task should have: | |
| - id: unique identifier | |
| - description: what this task does | |
| - tool: which tool to use | |
| - args: arguments for the tool (as a dict) | |
| Return ONLY valid JSON array of tasks, no other text. | |
| Example format: | |
| [ | |
| {{ | |
| "id": "task_1", | |
| "description": "Extract text from document", | |
| "tool": "ocr_extract_text", | |
| "args": {{"file_path": "document.pdf", "language": "en"}} | |
| }} | |
| ] | |
| """ | |
| self.thoughts.append(AgentThought( | |
| step=len(self.thoughts) + 1, | |
| type='planning', | |
| content="Creating execution plan with LLM..." | |
| )) | |
| try: | |
| plan_response = await get_llm_response(planning_prompt, temperature=0.3) | |
| # Extract JSON from response | |
| plan_text = plan_response.strip() | |
| if '```json' in plan_text: | |
| plan_text = plan_text.split('```json')[1].split('```')[0].strip() | |
| elif '```' in plan_text: | |
| plan_text = plan_text.split('```')[1].split('```')[0].strip() | |
| tasks_data = json.loads(plan_text) | |
| tasks = [ | |
| AgentTask(**{**task, 'status': TaskStatus.PENDING}) | |
| for task in tasks_data | |
| ] | |
| self.thoughts.append(AgentThought( | |
| step=len(self.thoughts) + 1, | |
| type='planning', | |
| content=f"Created plan with {len(tasks)} tasks" | |
| )) | |
| return tasks | |
| except Exception as e: | |
| self.thoughts.append(AgentThought( | |
| step=len(self.thoughts) + 1, | |
| type='planning', | |
| content=f"Planning failed: {str(e)}" | |
| )) | |
| return [] | |
| async def execute_task(self, task: AgentTask) -> AgentTask: | |
| """Execute a single task using MCP tools""" | |
| self.thoughts.append(AgentThought( | |
| step=len(self.thoughts) + 1, | |
| type='tool_call', | |
| content=f"Executing: {task.description}", | |
| tool_name=task.tool, | |
| tool_args=task.args | |
| )) | |
| task.status = TaskStatus.IN_PROGRESS | |
| try: | |
| # Call MCP tool | |
| result = await self.mcp_client.call_tool(task.tool, task.args) | |
| task.result = result | |
| task.status = TaskStatus.COMPLETED | |
| self.thoughts.append(AgentThought( | |
| step=len(self.thoughts) + 1, | |
| type='tool_call', | |
| content=f"✓ Completed: {task.description}", | |
| tool_name=task.tool, | |
| tool_result=result | |
| )) | |
| return task | |
| except Exception as e: | |
| task.error = str(e) | |
| task.status = TaskStatus.FAILED | |
| self.thoughts.append(AgentThought( | |
| step=len(self.thoughts) + 1, | |
| type='tool_call', | |
| content=f"✗ Failed: {task.description} - {str(e)}", | |
| tool_name=task.tool | |
| )) | |
| return task | |
| async def reflect(self, tasks: List[AgentTask], original_request: str) -> str: | |
| """ | |
| Reflect on execution results and create final answer | |
| Args: | |
| tasks: Executed tasks | |
| original_request: Original user request | |
| Returns: | |
| Final answer string | |
| """ | |
| self.thoughts.append(AgentThought( | |
| step=len(self.thoughts) + 1, | |
| type='reflection', | |
| content="Analyzing results and creating response..." | |
| )) | |
| # Compile results | |
| results_summary = [] | |
| for task in tasks: | |
| if task.status == TaskStatus.COMPLETED: | |
| results_summary.append(f"✓ {task.description}: {str(task.result)[:200]}") | |
| else: | |
| results_summary.append(f"✗ {task.description}: {task.error}") | |
| reflection_prompt = f"""You are an autonomous life admin agent. Review the execution results and create a helpful response. | |
| ORIGINAL REQUEST: {original_request} | |
| EXECUTION RESULTS: | |
| {chr(10).join(results_summary)} | |
| Provide a clear, helpful response to the user about what was accomplished. Be specific about: | |
| 1. What tasks were completed successfully | |
| 2. What outputs were created (files, calendar events, etc.) | |
| 3. Any issues encountered | |
| 4. Next steps if applicable | |
| Keep response concise but informative. | |
| """ | |
| try: | |
| final_answer = await get_llm_response(reflection_prompt, temperature=0.7) | |
| self.thoughts.append(AgentThought( | |
| step=len(self.thoughts) + 1, | |
| type='answer', | |
| content=final_answer | |
| )) | |
| # Store in memory | |
| self.memory.add_memory( | |
| f"Request: {original_request}\nResult: {final_answer}", | |
| metadata={'type': 'task_completion', 'timestamp': time.time()} | |
| ) | |
| return final_answer | |
| except Exception as e: | |
| error_msg = f"Reflection failed: {str(e)}" | |
| self.thoughts.append(AgentThought( | |
| step=len(self.thoughts) + 1, | |
| type='answer', | |
| content=error_msg | |
| )) | |
| return error_msg | |
| async def execute(self, user_request: str, files: List[str] = None, stream_thoughts: bool = False): | |
| """ | |
| Main execution loop - plan, execute, reflect | |
| Args: | |
| user_request: User's natural language request | |
| files: Uploaded files to process | |
| stream_thoughts: Whether to yield thoughts as they happen | |
| Yields: | |
| Thoughts if stream_thoughts=True | |
| Returns: | |
| Final answer and complete thought trace | |
| """ | |
| self.reset_context() | |
| # Phase 1: Planning | |
| if stream_thoughts: | |
| yield self.thoughts[-1] if self.thoughts else None | |
| tasks = await self.plan(user_request, files) | |
| if stream_thoughts: | |
| for thought in self.thoughts[-2:]: # Last 2 planning thoughts | |
| yield thought | |
| if not tasks: | |
| error_thought = AgentThought( | |
| step=len(self.thoughts) + 1, | |
| type='answer', | |
| content="Could not create execution plan. Please rephrase your request." | |
| ) | |
| self.thoughts.append(error_thought) | |
| return error_thought.content, self.thoughts | |
| # Phase 2: Execution | |
| executed_tasks = [] | |
| for task in tasks: | |
| executed_task = await self.execute_task(task) | |
| executed_tasks.append(executed_task) | |
| if stream_thoughts: | |
| yield self.thoughts[-1] # Latest thought | |
| # Phase 3: Reflection | |
| final_answer = await self.reflect(executed_tasks, user_request) | |
| if stream_thoughts: | |
| yield self.thoughts[-1] # Final answer thought | |
| return final_answer, self.thoughts | |
| def get_thought_trace(self) -> List[Dict]: | |
| """Get formatted thought trace for UI display""" | |
| return [asdict(thought) for thought in self.thoughts] | |
| async def process_files_to_rag(self, files: List[Dict[str, str]]): | |
| """Process uploaded files and add to RAG engine""" | |
| for file_info in files: | |
| try: | |
| # Extract text based on file type | |
| if file_info['path'].endswith('.pdf'): | |
| from utils.pdf_utils import extract_text_from_pdf | |
| text = extract_text_from_pdf(file_info['path']) | |
| elif file_info['path'].endswith(('.png', '.jpg', '.jpeg')): | |
| # Use OCR tool | |
| result = await self.mcp_client.call_tool( | |
| 'ocr_extract_text', | |
| {'file_path': file_info['path'], 'language': 'en'} | |
| ) | |
| text = result.get('text', '') | |
| else: | |
| with open(file_info['path'], 'r', encoding='utf-8') as f: | |
| text = f.read() | |
| # Add to RAG | |
| await self.rag_engine.add_document( | |
| text=text, | |
| metadata={'filename': file_info['name'], 'path': file_info['path']} | |
| ) | |
| except Exception as e: | |
| print(f"Error processing {file_info['name']}: {e}") | |
| async def manual_tool_call(self, tool_name: str, args: Dict[str, Any]) -> Any: | |
| """Direct tool call for manual mode""" | |
| try: | |
| result = await self.mcp_client.call_tool(tool_name, args) | |
| return {'success': True, 'result': result} | |
| except Exception as e: | |
| return {'success': False, 'error': str(e)} |