Spaces:
Running
Running
| """ | |
| LifeAdmin AI - Core Agent Logic | |
| Final stable version (No async-generators β fully HF compatible) | |
| Includes async helpers used by the UI: | |
| - process_files_to_rag(files) | |
| - manual_tool_call(tool_name, args) | |
| """ | |
| import asyncio | |
| import json | |
| import time | |
| from typing import List, Dict, Any, Optional | |
| from dataclasses import dataclass, asdict | |
| from enum import Enum | |
| from pathlib import Path | |
| from agent.mcp_client import MCPClient | |
| from agent.rag_engine import RAGEngine | |
| from agent.memory import MemoryStore | |
| from utils.llm_utils import get_llm_response | |
| # ============================= | |
| # DATA MODELS | |
| # ============================= | |
| class TaskStatus(Enum): | |
| PENDING = "pending" | |
| IN_PROGRESS = "in_progress" | |
| COMPLETED = "completed" | |
| FAILED = "failed" | |
| class AgentThought: | |
| step: int | |
| type: str # 'planning', 'tool_call', 'reflection', 'answer' | |
| content: str | |
| tool_name: Optional[str] = None | |
| tool_args: Optional[Dict] = None | |
| tool_result: Optional[Any] = None | |
| timestamp: float = None | |
| def __post_init__(self): | |
| if self.timestamp is None: | |
| self.timestamp = time.time() | |
| class AgentTask: | |
| id: str | |
| description: str | |
| tool: str | |
| args: Dict[str, Any] | |
| status: TaskStatus = TaskStatus.PENDING | |
| result: Optional[Any] = None | |
| error: Optional[str] = None | |
| # ============================= | |
| # MAIN AGENT CLASS | |
| # ============================= | |
| class LifeAdminAgent: | |
| def __init__(self): | |
| self.mcp_client = MCPClient() | |
| self.rag_engine = RAGEngine() | |
| self.memory = MemoryStore() | |
| self.thoughts: List[AgentThought] = [] | |
| # ----------------------- | |
| # UTIL | |
| # ----------------------- | |
| def reset(self): | |
| self.thoughts = [] | |
| # ----------------------- | |
| # PLANNING | |
| # ----------------------- | |
| async def plan(self, user_request: str, files: List[str] = None) -> List[AgentTask]: | |
| self.thoughts.append(AgentThought( | |
| step=len(self.thoughts) + 1, | |
| type="planning", | |
| content=f"Analyzing: {user_request}" | |
| )) | |
| tools = await self.mcp_client.list_tools() | |
| tool_desc = "\n".join([f"- {t['name']}: {t['description']}" for t in tools]) | |
| rag_docs = [] | |
| if user_request.strip(): | |
| rag_docs = await self.rag_engine.search(user_request, k=3) | |
| rag_context = "\n".join([d.get("text", "")[:200] for d in rag_docs]) if rag_docs else "None" | |
| memory_context = self.memory.get_relevant_memories(user_request) | |
| prompt = f""" | |
| You are a task planner. | |
| REQUEST: | |
| {user_request} | |
| FILES: {files or []} | |
| TOOLS: | |
| {tool_desc} | |
| RAG CONTEXT: | |
| {rag_context} | |
| MEMORY: | |
| {memory_context} | |
| Return ONLY JSON list: | |
| [ | |
| {{ | |
| "id": "task1", | |
| "description": "Extract text", | |
| "tool": "ocr_extract_text", | |
| "args": {{"file_path": "x.pdf"}} | |
| }} | |
| ] | |
| """ | |
| response = await get_llm_response(prompt, temperature=0.2) | |
| text = response.strip() | |
| if "```json" in text: | |
| text = text.split("```json")[1].split("```")[0].strip() | |
| elif "```" in text: | |
| text = text.split("```")[1].split("```")[0].strip() | |
| try: | |
| plan_data = json.loads(text) | |
| tasks = [AgentTask(**t) for t in plan_data] | |
| except Exception: | |
| self.thoughts.append(AgentThought( | |
| step=len(self.thoughts)+1, | |
| type="planning", | |
| content="Planning failed (invalid JSON)" | |
| )) | |
| return [] | |
| self.thoughts.append(AgentThought( | |
| step=len(self.thoughts)+1, | |
| type="planning", | |
| content=f"Created {len(tasks)} tasks" | |
| )) | |
| return tasks | |
| # ----------------------- | |
| # EXECUTION | |
| # ----------------------- | |
| async def execute_task(self, task: AgentTask): | |
| self.thoughts.append(AgentThought( | |
| step=len(self.thoughts)+1, | |
| type="tool_call", | |
| content=f"Executing: {task.description}", | |
| tool_name=task.tool, | |
| tool_args=task.args | |
| )) | |
| task.status = TaskStatus.IN_PROGRESS | |
| try: | |
| result = await self.mcp_client.call_tool(task.tool, task.args) | |
| task.result = result | |
| task.status = TaskStatus.COMPLETED | |
| self.thoughts.append(AgentThought( | |
| step=len(self.thoughts)+1, | |
| type="tool_call", | |
| content=f"β Completed", | |
| tool_name=task.tool, | |
| tool_result=result | |
| )) | |
| except Exception as e: | |
| task.status = TaskStatus.FAILED | |
| task.error = str(e) | |
| self.thoughts.append(AgentThought( | |
| step=len(self.thoughts)+1, | |
| type="tool_call", | |
| content=f"β Failed: {e}", | |
| tool_name=task.tool | |
| )) | |
| return task | |
| # ----------------------- | |
| # REFLECTION | |
| # ----------------------- | |
| async def reflect(self, tasks: List[AgentTask], original: str) -> str: | |
| self.thoughts.append(AgentThought( | |
| step=len(self.thoughts)+1, | |
| type="reflection", | |
| content="Summarizing results..." | |
| )) | |
| results = [] | |
| for t in tasks: | |
| if t.status == TaskStatus.COMPLETED: | |
| results.append(f"β {t.description}: {str(t.result)[:200]}") | |
| else: | |
| results.append(f"β {t.description}: {t.error}") | |
| prompt = f""" | |
| Provide a helpful summary for the user. | |
| REQUEST: | |
| {original} | |
| RESULTS: | |
| {chr(10).join(results)} | |
| Write a clear, friendly answer. | |
| """ | |
| answer = await get_llm_response(prompt, temperature=0.4) | |
| self.thoughts.append(AgentThought( | |
| step=len(self.thoughts)+1, | |
| type="answer", | |
| content=answer | |
| )) | |
| # store memory | |
| try: | |
| self.memory.add_memory( | |
| content=f"Request: {original}\nAnswer: {answer}", | |
| metadata={"timestamp": time.time()} | |
| ) | |
| except Exception: | |
| # don't break on memory errors | |
| pass | |
| return answer | |
| # ----------------------- | |
| # MAIN EXECUTION (FULLY FIXED) | |
| # ----------------------- | |
| async def execute(self, user_request: str, files: List[str] = None): | |
| """ | |
| A simple coroutine returning final_answer, thoughts | |
| No yields β No async generator β No syntax errors | |
| """ | |
| self.reset() | |
| tasks = await self.plan(user_request, files) | |
| if not tasks: | |
| # return is allowed now | |
| return "Could not generate plan. Try rephrasing.", self.thoughts | |
| executed = [] | |
| for t in tasks: | |
| executed.append(await self.execute_task(t)) | |
| final_answer = await self.reflect(executed, user_request) | |
| return final_answer, self.thoughts | |
| # ----------------------- | |
| # EXPORT THOUGHTS | |
| # ----------------------- | |
| def get_thought_trace(self): | |
| return [asdict(t) for t in self.thoughts] | |
| # ----------------------- | |
| # Additional helpers expected by UI | |
| # ----------------------- | |
| async def process_files_to_rag(self, files: List[Dict[str, str]]): | |
| """ | |
| Process uploaded files and add text to RAG. | |
| files: List[{'path': '/abs/path', 'name': 'file.pdf'}] | |
| """ | |
| for file_info in files: | |
| try: | |
| path = file_info.get('path') | |
| if not path: | |
| continue | |
| # small heuristic on extension | |
| if path.lower().endswith('.pdf'): | |
| from utils.pdf_utils import extract_text_from_pdf | |
| text = extract_text_from_pdf(path) | |
| elif path.lower().endswith(('.png', '.jpg', '.jpeg')): | |
| # Use OCR tool via MCPClient (local fallback) | |
| try: | |
| res = await self.mcp_client.call_tool('ocr_extract_text', {'file_path': path, 'language': 'en'}) | |
| text = res.get('text', '') | |
| except Exception: | |
| # Last-resort: empty text | |
| text = "" | |
| else: | |
| # treat as text file | |
| try: | |
| with open(path, 'r', encoding='utf-8') as f: | |
| text = f.read() | |
| except Exception: | |
| text = "" | |
| # add to RAG | |
| if text: | |
| await self.rag_engine.add_document(text=text, metadata={'filename': file_info.get('name'), 'path': path}) | |
| except Exception as e: | |
| # log (print) but don't raise | |
| print(f"Error processing {file_info.get('name')}: {e}") | |
| async def manual_tool_call(self, tool_name: str, args: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Direct tool call helper used by UI buttons. Returns dict with 'success' key. | |
| """ | |
| try: | |
| result = await self.mcp_client.call_tool(tool_name, args) | |
| return {'success': True, 'result': result} | |
| except Exception as e: | |
| return {'success': False, 'error': str(e)} | |