LifeAdmin-AI / agent /agent_core.py
Maheen001's picture
Update agent/agent_core.py
dd82506 verified
raw
history blame
9.25 kB
"""
LifeAdmin AI - Core Agent Logic
Final stable version (No async-generators – fully HF compatible)
Includes async helpers used by the UI:
- process_files_to_rag(files)
- manual_tool_call(tool_name, args)
"""
import asyncio
import json
import time
from typing import List, Dict, Any, Optional
from dataclasses import dataclass, asdict
from enum import Enum
from pathlib import Path
from agent.mcp_client import MCPClient
from agent.rag_engine import RAGEngine
from agent.memory import MemoryStore
from utils.llm_utils import get_llm_response
# =============================
# DATA MODELS
# =============================
class TaskStatus(Enum):
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class AgentThought:
step: int
type: str # 'planning', 'tool_call', 'reflection', 'answer'
content: str
tool_name: Optional[str] = None
tool_args: Optional[Dict] = None
tool_result: Optional[Any] = None
timestamp: float = None
def __post_init__(self):
if self.timestamp is None:
self.timestamp = time.time()
@dataclass
class AgentTask:
id: str
description: str
tool: str
args: Dict[str, Any]
status: TaskStatus = TaskStatus.PENDING
result: Optional[Any] = None
error: Optional[str] = None
# =============================
# MAIN AGENT CLASS
# =============================
class LifeAdminAgent:
def __init__(self):
self.mcp_client = MCPClient()
self.rag_engine = RAGEngine()
self.memory = MemoryStore()
self.thoughts: List[AgentThought] = []
# -----------------------
# UTIL
# -----------------------
def reset(self):
self.thoughts = []
# -----------------------
# PLANNING
# -----------------------
async def plan(self, user_request: str, files: List[str] = None) -> List[AgentTask]:
self.thoughts.append(AgentThought(
step=len(self.thoughts) + 1,
type="planning",
content=f"Analyzing: {user_request}"
))
tools = await self.mcp_client.list_tools()
tool_desc = "\n".join([f"- {t['name']}: {t['description']}" for t in tools])
rag_docs = []
if user_request.strip():
rag_docs = await self.rag_engine.search(user_request, k=3)
rag_context = "\n".join([d.get("text", "")[:200] for d in rag_docs]) if rag_docs else "None"
memory_context = self.memory.get_relevant_memories(user_request)
prompt = f"""
You are a task planner.
REQUEST:
{user_request}
FILES: {files or []}
TOOLS:
{tool_desc}
RAG CONTEXT:
{rag_context}
MEMORY:
{memory_context}
Return ONLY JSON list:
[
{{
"id": "task1",
"description": "Extract text",
"tool": "ocr_extract_text",
"args": {{"file_path": "x.pdf"}}
}}
]
"""
response = await get_llm_response(prompt, temperature=0.2)
text = response.strip()
if "```json" in text:
text = text.split("```json")[1].split("```")[0].strip()
elif "```" in text:
text = text.split("```")[1].split("```")[0].strip()
try:
plan_data = json.loads(text)
tasks = [AgentTask(**t) for t in plan_data]
except Exception:
self.thoughts.append(AgentThought(
step=len(self.thoughts)+1,
type="planning",
content="Planning failed (invalid JSON)"
))
return []
self.thoughts.append(AgentThought(
step=len(self.thoughts)+1,
type="planning",
content=f"Created {len(tasks)} tasks"
))
return tasks
# -----------------------
# EXECUTION
# -----------------------
async def execute_task(self, task: AgentTask):
self.thoughts.append(AgentThought(
step=len(self.thoughts)+1,
type="tool_call",
content=f"Executing: {task.description}",
tool_name=task.tool,
tool_args=task.args
))
task.status = TaskStatus.IN_PROGRESS
try:
result = await self.mcp_client.call_tool(task.tool, task.args)
task.result = result
task.status = TaskStatus.COMPLETED
self.thoughts.append(AgentThought(
step=len(self.thoughts)+1,
type="tool_call",
content=f"βœ“ Completed",
tool_name=task.tool,
tool_result=result
))
except Exception as e:
task.status = TaskStatus.FAILED
task.error = str(e)
self.thoughts.append(AgentThought(
step=len(self.thoughts)+1,
type="tool_call",
content=f"βœ— Failed: {e}",
tool_name=task.tool
))
return task
# -----------------------
# REFLECTION
# -----------------------
async def reflect(self, tasks: List[AgentTask], original: str) -> str:
self.thoughts.append(AgentThought(
step=len(self.thoughts)+1,
type="reflection",
content="Summarizing results..."
))
results = []
for t in tasks:
if t.status == TaskStatus.COMPLETED:
results.append(f"βœ“ {t.description}: {str(t.result)[:200]}")
else:
results.append(f"βœ— {t.description}: {t.error}")
prompt = f"""
Provide a helpful summary for the user.
REQUEST:
{original}
RESULTS:
{chr(10).join(results)}
Write a clear, friendly answer.
"""
answer = await get_llm_response(prompt, temperature=0.4)
self.thoughts.append(AgentThought(
step=len(self.thoughts)+1,
type="answer",
content=answer
))
# store memory
try:
self.memory.add_memory(
content=f"Request: {original}\nAnswer: {answer}",
metadata={"timestamp": time.time()}
)
except Exception:
# don't break on memory errors
pass
return answer
# -----------------------
# MAIN EXECUTION (FULLY FIXED)
# -----------------------
async def execute(self, user_request: str, files: List[str] = None):
"""
A simple coroutine returning final_answer, thoughts
No yields β†’ No async generator β†’ No syntax errors
"""
self.reset()
tasks = await self.plan(user_request, files)
if not tasks:
# return is allowed now
return "Could not generate plan. Try rephrasing.", self.thoughts
executed = []
for t in tasks:
executed.append(await self.execute_task(t))
final_answer = await self.reflect(executed, user_request)
return final_answer, self.thoughts
# -----------------------
# EXPORT THOUGHTS
# -----------------------
def get_thought_trace(self):
return [asdict(t) for t in self.thoughts]
# -----------------------
# Additional helpers expected by UI
# -----------------------
async def process_files_to_rag(self, files: List[Dict[str, str]]):
"""
Process uploaded files and add text to RAG.
files: List[{'path': '/abs/path', 'name': 'file.pdf'}]
"""
for file_info in files:
try:
path = file_info.get('path')
if not path:
continue
# small heuristic on extension
if path.lower().endswith('.pdf'):
from utils.pdf_utils import extract_text_from_pdf
text = extract_text_from_pdf(path)
elif path.lower().endswith(('.png', '.jpg', '.jpeg')):
# Use OCR tool via MCPClient (local fallback)
try:
res = await self.mcp_client.call_tool('ocr_extract_text', {'file_path': path, 'language': 'en'})
text = res.get('text', '')
except Exception:
# Last-resort: empty text
text = ""
else:
# treat as text file
try:
with open(path, 'r', encoding='utf-8') as f:
text = f.read()
except Exception:
text = ""
# add to RAG
if text:
await self.rag_engine.add_document(text=text, metadata={'filename': file_info.get('name'), 'path': path})
except Exception as e:
# log (print) but don't raise
print(f"Error processing {file_info.get('name')}: {e}")
async def manual_tool_call(self, tool_name: str, args: Dict[str, Any]) -> Dict[str, Any]:
"""
Direct tool call helper used by UI buttons. Returns dict with 'success' key.
"""
try:
result = await self.mcp_client.call_tool(tool_name, args)
return {'success': True, 'result': result}
except Exception as e:
return {'success': False, 'error': str(e)}