| | """
|
| | Cognitive Memory Client
|
| | =======================
|
| | The high-level facade for autonomous agents to interact with the MnemoCore AGI Memory Substrate.
|
| | Provides easy methods for observation, episodic sequence tracking, and working memory recall.
|
| | """
|
| |
|
| | from typing import List, Optional, Any, Tuple
|
| | import logging
|
| |
|
| | from .core.engine import HAIMEngine
|
| | from .core.working_memory import WorkingMemoryService, WorkingMemoryItem
|
| | from .core.episodic_store import EpisodicStoreService
|
| | from .core.semantic_store import SemanticStoreService
|
| | from .core.procedural_store import ProceduralStoreService
|
| | from .core.meta_memory import MetaMemoryService, SelfImprovementProposal
|
| | from .core.memory_model import Procedure
|
| |
|
| | logger = logging.getLogger(__name__)
|
| |
|
| | class CognitiveMemoryClient:
|
| | """
|
| | Plug-and-play cognitive memory facade for agent frameworks (LangGraph, AutoGen, OpenClaw, etc.).
|
| | """
|
| | def __init__(
|
| | self,
|
| | engine: HAIMEngine,
|
| | wm: WorkingMemoryService,
|
| | episodic: EpisodicStoreService,
|
| | semantic: SemanticStoreService,
|
| | procedural: ProceduralStoreService,
|
| | meta: MetaMemoryService,
|
| | ):
|
| | self.engine = engine
|
| | self.wm = wm
|
| | self.episodic = episodic
|
| | self.semantic = semantic
|
| | self.procedural = procedural
|
| | self.meta = meta
|
| |
|
| |
|
| |
|
| | def observe(self, agent_id: str, content: str, kind: str = "observation", importance: float = 0.5, tags: Optional[List[str]] = None, **meta) -> str:
|
| | """
|
| | Push a new observation or thought directly into the agent's short-term Working Memory.
|
| | """
|
| | import uuid
|
| | from datetime import datetime
|
| | item_id = f"wm_{uuid.uuid4().hex[:8]}"
|
| |
|
| | item = WorkingMemoryItem(
|
| | id=item_id,
|
| | agent_id=agent_id,
|
| | created_at=datetime.utcnow(),
|
| | ttl_seconds=3600,
|
| | content=content,
|
| | kind=kind,
|
| | importance=importance,
|
| | tags=tags or [],
|
| | hdv=None
|
| | )
|
| | self.wm.push_item(agent_id, item)
|
| | logger.debug(f"Agent {agent_id} observed: {content[:30]}...")
|
| | return item_id
|
| |
|
| | def get_working_context(self, agent_id: str, limit: int = 16) -> List[WorkingMemoryItem]:
|
| | """
|
| | Read the active, un-pruned context out of the agent's working memory buffer.
|
| | """
|
| | state = self.wm.get_state(agent_id)
|
| | if not state:
|
| | return []
|
| |
|
| | return state.items[-limit:]
|
| |
|
| |
|
| |
|
| | def start_episode(self, agent_id: str, goal: str, context: Optional[str] = None) -> str:
|
| | """Begin a new temporally-linked event sequence."""
|
| | return self.episodic.start_episode(agent_id, goal=goal, context=context)
|
| |
|
| | def append_event(self, episode_id: str, kind: str, content: str, **meta) -> None:
|
| | """Log an action or outcome to an ongoing episode."""
|
| | self.episodic.append_event(episode_id, kind, content, meta)
|
| |
|
| | def end_episode(self, episode_id: str, outcome: str, reward: Optional[float] = None) -> None:
|
| | """Seal an episode, logging its final success or failure state."""
|
| | self.episodic.end_episode(episode_id, outcome, reward)
|
| |
|
| |
|
| |
|
| | async def recall(
|
| | self,
|
| | agent_id: str,
|
| | query: str,
|
| | context: Optional[str] = None,
|
| | top_k: int = 8,
|
| | modes: Tuple[str, ...] = ("episodic", "semantic")
|
| | ) -> List[dict]:
|
| | """
|
| | A unified query interface that checks Working Memory, Episodic History, and the Semantic Vector Store.
|
| | Currently delegates heavily to the backing HAIMEngine, but can be augmented to return semantic concepts.
|
| | """
|
| | results = []
|
| |
|
| |
|
| | if "semantic" in modes:
|
| | engine_results = await self.engine.query(query, top_k=top_k)
|
| | for mem_id, score in engine_results:
|
| | node = await self.engine.tier_manager.get_memory(mem_id)
|
| | if node:
|
| | results.append({"source": "semantic/engine", "content": node.content, "score": score})
|
| |
|
| |
|
| | if "episodic" in modes:
|
| | recent_eps = self.episodic.get_recent(agent_id, limit=top_k, context=context)
|
| | for ep in recent_eps:
|
| | summary = f"Episode(goal={ep.goal}, outcome={ep.outcome}, events={len(ep.events)})"
|
| | results.append({"source": "episodic", "content": summary, "score": ep.reliability})
|
| |
|
| |
|
| | results.sort(key=lambda x: x.get("score", 0.0), reverse=True)
|
| | return results[:top_k]
|
| |
|
| |
|
| |
|
| | def suggest_procedures(self, agent_id: str, query: str, top_k: int = 5) -> List[Procedure]:
|
| | """Fetch executable tool-patterns based on the agent's intent."""
|
| | return self.procedural.find_applicable_procedures(query, agent_id=agent_id, top_k=top_k)
|
| |
|
| | def record_procedure_outcome(self, proc_id: str, success: bool) -> None:
|
| | """Report on the utility of a chosen procedure."""
|
| | self.procedural.record_procedure_outcome(proc_id, success)
|
| |
|
| |
|
| |
|
| | def get_knowledge_gaps(self, agent_id: str, lookback_hours: int = 24) -> List[dict]:
|
| | """Return currently open knowledge gaps identified by the Pulse loop."""
|
| |
|
| | return []
|
| |
|
| | def get_self_improvement_proposals(self) -> List[SelfImprovementProposal]:
|
| | """Retrieve system-generated proposals to improve operation or prompt alignment."""
|
| | return self.meta.list_proposals()
|
| |
|
| |
|