""" LifeAdmin AI - Core Agent Logic Final stable version (No async-generators – fully HF compatible) Includes async helpers used by the UI: - process_files_to_rag(files) - manual_tool_call(tool_name, args) """ import asyncio import json import time from typing import List, Dict, Any, Optional from dataclasses import dataclass, asdict from enum import Enum from pathlib import Path from agent.mcp_client import MCPClient from agent.rag_engine import RAGEngine from agent.memory import MemoryStore from utils.llm_utils import get_llm_response # ============================= # DATA MODELS # ============================= class TaskStatus(Enum): PENDING = "pending" IN_PROGRESS = "in_progress" COMPLETED = "completed" FAILED = "failed" @dataclass class AgentThought: step: int type: str # 'planning', 'tool_call', 'reflection', 'answer' content: str tool_name: Optional[str] = None tool_args: Optional[Dict] = None tool_result: Optional[Any] = None timestamp: float = None def __post_init__(self): if self.timestamp is None: self.timestamp = time.time() @dataclass class AgentTask: id: str description: str tool: str args: Dict[str, Any] status: TaskStatus = TaskStatus.PENDING result: Optional[Any] = None error: Optional[str] = None # ============================= # MAIN AGENT CLASS # ============================= class LifeAdminAgent: def __init__(self): self.mcp_client = MCPClient() self.rag_engine = RAGEngine() self.memory = MemoryStore() self.thoughts: List[AgentThought] = [] # ----------------------- # UTIL # ----------------------- def reset(self): self.thoughts = [] # ----------------------- # PLANNING # ----------------------- async def plan(self, user_request: str, files: List[str] = None) -> List[AgentTask]: self.thoughts.append(AgentThought( step=len(self.thoughts) + 1, type="planning", content=f"Analyzing: {user_request}" )) tools = await self.mcp_client.list_tools() tool_desc = "\n".join([f"- {t['name']}: {t['description']}" for t in tools]) rag_docs = [] if user_request.strip(): rag_docs = await self.rag_engine.search(user_request, k=3) rag_context = "\n".join([d.get("text", "")[:200] for d in rag_docs]) if rag_docs else "None" memory_context = self.memory.get_relevant_memories(user_request) prompt = f""" You are a task planner. REQUEST: {user_request} FILES: {files or []} TOOLS: {tool_desc} RAG CONTEXT: {rag_context} MEMORY: {memory_context} Return ONLY JSON list: [ {{ "id": "task1", "description": "Extract text", "tool": "ocr_extract_text", "args": {{"file_path": "x.pdf"}} }} ] """ response = await get_llm_response(prompt, temperature=0.2) text = response.strip() if "```json" in text: text = text.split("```json")[1].split("```")[0].strip() elif "```" in text: text = text.split("```")[1].split("```")[0].strip() try: plan_data = json.loads(text) tasks = [AgentTask(**t) for t in plan_data] except Exception: self.thoughts.append(AgentThought( step=len(self.thoughts)+1, type="planning", content="Planning failed (invalid JSON)" )) return [] self.thoughts.append(AgentThought( step=len(self.thoughts)+1, type="planning", content=f"Created {len(tasks)} tasks" )) return tasks # ----------------------- # EXECUTION # ----------------------- async def execute_task(self, task: AgentTask): self.thoughts.append(AgentThought( step=len(self.thoughts)+1, type="tool_call", content=f"Executing: {task.description}", tool_name=task.tool, tool_args=task.args )) task.status = TaskStatus.IN_PROGRESS try: result = await self.mcp_client.call_tool(task.tool, task.args) task.result = result task.status = TaskStatus.COMPLETED self.thoughts.append(AgentThought( step=len(self.thoughts)+1, type="tool_call", content=f"✓ Completed", tool_name=task.tool, tool_result=result )) except Exception as e: task.status = TaskStatus.FAILED task.error = str(e) self.thoughts.append(AgentThought( step=len(self.thoughts)+1, type="tool_call", content=f"✗ Failed: {e}", tool_name=task.tool )) return task # ----------------------- # REFLECTION # ----------------------- async def reflect(self, tasks: List[AgentTask], original: str) -> str: self.thoughts.append(AgentThought( step=len(self.thoughts)+1, type="reflection", content="Summarizing results..." )) results = [] for t in tasks: if t.status == TaskStatus.COMPLETED: results.append(f"✓ {t.description}: {str(t.result)[:200]}") else: results.append(f"✗ {t.description}: {t.error}") prompt = f""" Provide a helpful summary for the user. REQUEST: {original} RESULTS: {chr(10).join(results)} Write a clear, friendly answer. """ answer = await get_llm_response(prompt, temperature=0.4) self.thoughts.append(AgentThought( step=len(self.thoughts)+1, type="answer", content=answer )) # store memory try: self.memory.add_memory( content=f"Request: {original}\nAnswer: {answer}", metadata={"timestamp": time.time()} ) except Exception: # don't break on memory errors pass return answer # ----------------------- # MAIN EXECUTION (FULLY FIXED) # ----------------------- async def execute(self, user_request: str, files: List[str] = None): """ A simple coroutine returning final_answer, thoughts No yields → No async generator → No syntax errors """ self.reset() tasks = await self.plan(user_request, files) if not tasks: # return is allowed now return "Could not generate plan. Try rephrasing.", self.thoughts executed = [] for t in tasks: executed.append(await self.execute_task(t)) final_answer = await self.reflect(executed, user_request) return final_answer, self.thoughts # ----------------------- # EXPORT THOUGHTS # ----------------------- def get_thought_trace(self): return [asdict(t) for t in self.thoughts] # ----------------------- # Additional helpers expected by UI # ----------------------- async def process_files_to_rag(self, files: List[Dict[str, str]]): """ Process uploaded files and add text to RAG. files: List[{'path': '/abs/path', 'name': 'file.pdf'}] """ for file_info in files: try: path = file_info.get('path') if not path: continue # small heuristic on extension if path.lower().endswith('.pdf'): from utils.pdf_utils import extract_text_from_pdf text = extract_text_from_pdf(path) elif path.lower().endswith(('.png', '.jpg', '.jpeg')): # Use OCR tool via MCPClient (local fallback) try: res = await self.mcp_client.call_tool('ocr_extract_text', {'file_path': path, 'language': 'en'}) text = res.get('text', '') except Exception: # Last-resort: empty text text = "" else: # treat as text file try: with open(path, 'r', encoding='utf-8') as f: text = f.read() except Exception: text = "" # add to RAG if text: await self.rag_engine.add_document(text=text, metadata={'filename': file_info.get('name'), 'path': path}) except Exception as e: # log (print) but don't raise print(f"Error processing {file_info.get('name')}: {e}") async def manual_tool_call(self, tool_name: str, args: Dict[str, Any]) -> Dict[str, Any]: """ Direct tool call helper used by UI buttons. Returns dict with 'success' key. """ try: result = await self.mcp_client.call_tool(tool_name, args) return {'success': True, 'result': result} except Exception as e: return {'success': False, 'error': str(e)}