File size: 6,022 Bytes
c3a3710
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
"""

Cognitive Memory Client

=======================

The high-level facade for autonomous agents to interact with the MnemoCore AGI Memory Substrate.

Provides easy methods for observation, episodic sequence tracking, and working memory recall.

"""

from typing import List, Optional, Any, Tuple
import logging

from .core.engine import HAIMEngine
from .core.working_memory import WorkingMemoryService, WorkingMemoryItem
from .core.episodic_store import EpisodicStoreService
from .core.semantic_store import SemanticStoreService
from .core.procedural_store import ProceduralStoreService
from .core.meta_memory import MetaMemoryService, SelfImprovementProposal
from .core.memory_model import Procedure

logger = logging.getLogger(__name__)

class CognitiveMemoryClient:
    """

    Plug-and-play cognitive memory facade for agent frameworks (LangGraph, AutoGen, OpenClaw, etc.).

    """
    def __init__(

        self,

        engine: HAIMEngine,

        wm: WorkingMemoryService,

        episodic: EpisodicStoreService,

        semantic: SemanticStoreService,

        procedural: ProceduralStoreService,

        meta: MetaMemoryService,

    ):
        self.engine = engine
        self.wm = wm
        self.episodic = episodic
        self.semantic = semantic
        self.procedural = procedural
        self.meta = meta

    # --- Observation & WM ---

    def observe(self, agent_id: str, content: str, kind: str = "observation", importance: float = 0.5, tags: Optional[List[str]] = None, **meta) -> str:
        """

        Push a new observation or thought directly into the agent's short-term Working Memory.

        """
        import uuid
        from datetime import datetime
        item_id = f"wm_{uuid.uuid4().hex[:8]}"
        
        item = WorkingMemoryItem(
            id=item_id,
            agent_id=agent_id,
            created_at=datetime.utcnow(),
            ttl_seconds=3600, # 1 hour default
            content=content,
            kind=kind, # type: ignore
            importance=importance,
            tags=tags or [],
            hdv=None # Could encode via engine in future
        )
        self.wm.push_item(agent_id, item)
        logger.debug(f"Agent {agent_id} observed: {content[:30]}...")
        return item_id

    def get_working_context(self, agent_id: str, limit: int = 16) -> List[WorkingMemoryItem]:
        """

        Read the active, un-pruned context out of the agent's working memory buffer.

        """
        state = self.wm.get_state(agent_id)
        if not state:
            return []
        
        return state.items[-limit:]

    # --- Episodic ---

    def start_episode(self, agent_id: str, goal: str, context: Optional[str] = None) -> str:
        """Begin a new temporally-linked event sequence."""
        return self.episodic.start_episode(agent_id, goal=goal, context=context)

    def append_event(self, episode_id: str, kind: str, content: str, **meta) -> None:
        """Log an action or outcome to an ongoing episode."""
        self.episodic.append_event(episode_id, kind, content, meta)

    def end_episode(self, episode_id: str, outcome: str, reward: Optional[float] = None) -> None:
        """Seal an episode, logging its final success or failure state."""
        self.episodic.end_episode(episode_id, outcome, reward)

    # --- Semantic / Retrieval ---

    async def recall(

        self, 

        agent_id: str, 

        query: str, 

        context: Optional[str] = None,

        top_k: int = 8, 

        modes: Tuple[str, ...] = ("episodic", "semantic")

    ) -> List[dict]:
        """

        A unified query interface that checks Working Memory, Episodic History, and the Semantic Vector Store.

        Currently delegates heavily to the backing HAIMEngine, but can be augmented to return semantic concepts.

        """
        results = []
        
        # 1. Broad retrieval via existing HAIM engine (SM / general memories)
        if "semantic" in modes:
            engine_results = await self.engine.query(query, top_k=top_k)
            for mem_id, score in engine_results:
                node = await self.engine.tier_manager.get_memory(mem_id)  # Fix: tier_manager.get_memory is async
                if node:
                    results.append({"source": "semantic/engine", "content": node.content, "score": score})
                
        # 2. Local episodic retrieval 
        if "episodic" in modes:
            recent_eps = self.episodic.get_recent(agent_id, limit=top_k, context=context)
            for ep in recent_eps:
                summary = f"Episode(goal={ep.goal}, outcome={ep.outcome}, events={len(ep.events)})"
                results.append({"source": "episodic", "content": summary, "score": ep.reliability})
                
        # Sort and trim mixed results
        results.sort(key=lambda x: x.get("score", 0.0), reverse=True)
        return results[:top_k]

    # --- Procedural ---

    def suggest_procedures(self, agent_id: str, query: str, top_k: int = 5) -> List[Procedure]:
        """Fetch executable tool-patterns based on the agent's intent."""
        return self.procedural.find_applicable_procedures(query, agent_id=agent_id, top_k=top_k)

    def record_procedure_outcome(self, proc_id: str, success: bool) -> None:
        """Report on the utility of a chosen procedure."""
        self.procedural.record_procedure_outcome(proc_id, success)

    # --- Meta / Self-awareness ---

    def get_knowledge_gaps(self, agent_id: str, lookback_hours: int = 24) -> List[dict]:
        """Return currently open knowledge gaps identified by the Pulse loop."""
        # Stubbed: Would interact with gap_detector
        return []

    def get_self_improvement_proposals(self) -> List[SelfImprovementProposal]:
        """Retrieve system-generated proposals to improve operation or prompt alignment."""
        return self.meta.list_proposals()