MnemoCore / src /mnemocore /agent_interface.py
Granis87's picture
Upload folder using huggingface_hub
c3a3710 verified
"""
Cognitive Memory Client
=======================
The high-level facade for autonomous agents to interact with the MnemoCore AGI Memory Substrate.
Provides easy methods for observation, episodic sequence tracking, and working memory recall.
"""
from typing import List, Optional, Any, Tuple
import logging
from .core.engine import HAIMEngine
from .core.working_memory import WorkingMemoryService, WorkingMemoryItem
from .core.episodic_store import EpisodicStoreService
from .core.semantic_store import SemanticStoreService
from .core.procedural_store import ProceduralStoreService
from .core.meta_memory import MetaMemoryService, SelfImprovementProposal
from .core.memory_model import Procedure
logger = logging.getLogger(__name__)
class CognitiveMemoryClient:
"""
Plug-and-play cognitive memory facade for agent frameworks (LangGraph, AutoGen, OpenClaw, etc.).
"""
def __init__(
self,
engine: HAIMEngine,
wm: WorkingMemoryService,
episodic: EpisodicStoreService,
semantic: SemanticStoreService,
procedural: ProceduralStoreService,
meta: MetaMemoryService,
):
self.engine = engine
self.wm = wm
self.episodic = episodic
self.semantic = semantic
self.procedural = procedural
self.meta = meta
# --- Observation & WM ---
def observe(self, agent_id: str, content: str, kind: str = "observation", importance: float = 0.5, tags: Optional[List[str]] = None, **meta) -> str:
"""
Push a new observation or thought directly into the agent's short-term Working Memory.
"""
import uuid
from datetime import datetime
item_id = f"wm_{uuid.uuid4().hex[:8]}"
item = WorkingMemoryItem(
id=item_id,
agent_id=agent_id,
created_at=datetime.utcnow(),
ttl_seconds=3600, # 1 hour default
content=content,
kind=kind, # type: ignore
importance=importance,
tags=tags or [],
hdv=None # Could encode via engine in future
)
self.wm.push_item(agent_id, item)
logger.debug(f"Agent {agent_id} observed: {content[:30]}...")
return item_id
def get_working_context(self, agent_id: str, limit: int = 16) -> List[WorkingMemoryItem]:
"""
Read the active, un-pruned context out of the agent's working memory buffer.
"""
state = self.wm.get_state(agent_id)
if not state:
return []
return state.items[-limit:]
# --- Episodic ---
def start_episode(self, agent_id: str, goal: str, context: Optional[str] = None) -> str:
"""Begin a new temporally-linked event sequence."""
return self.episodic.start_episode(agent_id, goal=goal, context=context)
def append_event(self, episode_id: str, kind: str, content: str, **meta) -> None:
"""Log an action or outcome to an ongoing episode."""
self.episodic.append_event(episode_id, kind, content, meta)
def end_episode(self, episode_id: str, outcome: str, reward: Optional[float] = None) -> None:
"""Seal an episode, logging its final success or failure state."""
self.episodic.end_episode(episode_id, outcome, reward)
# --- Semantic / Retrieval ---
async def recall(
self,
agent_id: str,
query: str,
context: Optional[str] = None,
top_k: int = 8,
modes: Tuple[str, ...] = ("episodic", "semantic")
) -> List[dict]:
"""
A unified query interface that checks Working Memory, Episodic History, and the Semantic Vector Store.
Currently delegates heavily to the backing HAIMEngine, but can be augmented to return semantic concepts.
"""
results = []
# 1. Broad retrieval via existing HAIM engine (SM / general memories)
if "semantic" in modes:
engine_results = await self.engine.query(query, top_k=top_k)
for mem_id, score in engine_results:
node = await self.engine.tier_manager.get_memory(mem_id) # Fix: tier_manager.get_memory is async
if node:
results.append({"source": "semantic/engine", "content": node.content, "score": score})
# 2. Local episodic retrieval
if "episodic" in modes:
recent_eps = self.episodic.get_recent(agent_id, limit=top_k, context=context)
for ep in recent_eps:
summary = f"Episode(goal={ep.goal}, outcome={ep.outcome}, events={len(ep.events)})"
results.append({"source": "episodic", "content": summary, "score": ep.reliability})
# Sort and trim mixed results
results.sort(key=lambda x: x.get("score", 0.0), reverse=True)
return results[:top_k]
# --- Procedural ---
def suggest_procedures(self, agent_id: str, query: str, top_k: int = 5) -> List[Procedure]:
"""Fetch executable tool-patterns based on the agent's intent."""
return self.procedural.find_applicable_procedures(query, agent_id=agent_id, top_k=top_k)
def record_procedure_outcome(self, proc_id: str, success: bool) -> None:
"""Report on the utility of a chosen procedure."""
self.procedural.record_procedure_outcome(proc_id, success)
# --- Meta / Self-awareness ---
def get_knowledge_gaps(self, agent_id: str, lookback_hours: int = 24) -> List[dict]:
"""Return currently open knowledge gaps identified by the Pulse loop."""
# Stubbed: Would interact with gap_detector
return []
def get_self_improvement_proposals(self) -> List[SelfImprovementProposal]:
"""Retrieve system-generated proposals to improve operation or prompt alignment."""
return self.meta.list_proposals()