MnemoCore / tests /test_engine_methods.py
Granis87's picture
Initial upload of MnemoCore
dbb04e4 verified
"""
Tests for Extracted Engine Methods
===================================
Unit tests for the refactored private helper methods in HAIMEngine:
- _encode_input()
- _evaluate_tier()
- _persist_memory()
- _trigger_post_store()
"""
import os
from collections import deque
import pytest
import pytest_asyncio
from unittest.mock import AsyncMock, patch, MagicMock
from mnemocore.core.config import get_config, reset_config
from mnemocore.core.engine import HAIMEngine
from mnemocore.core.binary_hdv import BinaryHDV
from mnemocore.core.node import MemoryNode
@pytest.fixture
def test_engine(tmp_path):
"""Create a test engine with isolated configuration."""
reset_config()
data_dir = tmp_path / "data"
data_dir.mkdir()
os.environ["HAIM_DATA_DIR"] = str(data_dir)
os.environ["HAIM_MEMORY_FILE"] = str(data_dir / "memory.jsonl")
os.environ["HAIM_CODEBOOK_FILE"] = str(data_dir / "codebook.json")
os.environ["HAIM_SYNAPSES_FILE"] = str(data_dir / "synapses.json")
os.environ["HAIM_WARM_MMAP_DIR"] = str(data_dir / "warm")
os.environ["HAIM_COLD_ARCHIVE_DIR"] = str(data_dir / "cold")
os.environ["HAIM_ENCODING_MODE"] = "binary"
os.environ["HAIM_DIMENSIONALITY"] = "1024"
reset_config()
engine = HAIMEngine()
yield engine
# Cleanup
for key in [
"HAIM_DATA_DIR",
"HAIM_MEMORY_FILE",
"HAIM_CODEBOOK_FILE",
"HAIM_SYNAPSES_FILE",
"HAIM_WARM_MMAP_DIR",
"HAIM_COLD_ARCHIVE_DIR",
"HAIM_ENCODING_MODE",
"HAIM_DIMENSIONALITY",
]:
if key in os.environ:
del os.environ[key]
reset_config()
# =============================================================================
# Tests for _encode_input()
# =============================================================================
@pytest.mark.asyncio
class TestEncodeInput:
"""Test suite for _encode_input method."""
async def test_encode_input_basic(self, test_engine):
"""Test basic encoding without goal_id."""
await test_engine.initialize()
encoded_vec, metadata = await test_engine._encode_input("test content")
assert isinstance(encoded_vec, BinaryHDV)
assert encoded_vec.dimension == test_engine.dimension
assert metadata == {}
async def test_encode_input_with_metadata(self, test_engine):
"""Test encoding with existing metadata."""
await test_engine.initialize()
existing_metadata = {"key": "value", "number": 42}
encoded_vec, metadata = await test_engine._encode_input(
"test content", metadata=existing_metadata
)
assert isinstance(encoded_vec, BinaryHDV)
assert metadata["key"] == "value"
assert metadata["number"] == 42
async def test_encode_input_with_goal_id(self, test_engine):
"""Test encoding with goal context binding."""
await test_engine.initialize()
encoded_vec, metadata = await test_engine._encode_input(
"test content", goal_id="goal-123"
)
assert isinstance(encoded_vec, BinaryHDV)
assert metadata["goal_context"] == "goal-123"
async def test_encode_input_with_goal_and_metadata(self, test_engine):
"""Test encoding with both goal_id and existing metadata."""
await test_engine.initialize()
existing_metadata = {"priority": "high"}
encoded_vec, metadata = await test_engine._encode_input(
"test content", metadata=existing_metadata, goal_id="goal-456"
)
assert isinstance(encoded_vec, BinaryHDV)
assert metadata["priority"] == "high"
assert metadata["goal_context"] == "goal-456"
async def test_encode_input_deterministic(self, test_engine):
"""Test that same content produces same encoding."""
await test_engine.initialize()
encoded_vec1, _ = await test_engine._encode_input("identical content")
encoded_vec2, _ = await test_engine._encode_input("identical content")
# Same content should produce identical vectors
assert encoded_vec1.data.tobytes() == encoded_vec2.data.tobytes()
async def test_encode_input_different_content(self, test_engine):
"""Test that different content produces different encodings."""
await test_engine.initialize()
encoded_vec1, _ = await test_engine._encode_input("content A")
encoded_vec2, _ = await test_engine._encode_input("completely different content B")
# Different content should produce different vectors
similarity = encoded_vec1.similarity(encoded_vec2)
# Similarity should be less than 1.0 for different content
assert similarity < 1.0
# =============================================================================
# Tests for _evaluate_tier()
# =============================================================================
@pytest.mark.asyncio
class TestEvaluateTier:
"""Test suite for _evaluate_tier method."""
async def test_evaluate_tier_with_epistemic_drive(self, test_engine):
"""Test EIG calculation when epistemic drive is active."""
await test_engine.initialize()
test_engine.epistemic_drive_active = True
encoded_vec = BinaryHDV.random(test_engine.dimension)
metadata = {}
updated_metadata = await test_engine._evaluate_tier(encoded_vec, metadata)
assert "eig" in updated_metadata
assert isinstance(updated_metadata["eig"], float)
assert 0.0 <= updated_metadata["eig"] <= 1.0
async def test_evaluate_tier_without_epistemic_drive(self, test_engine):
"""Test that EIG is set to 0 when epistemic drive is inactive."""
await test_engine.initialize()
test_engine.epistemic_drive_active = False
encoded_vec = BinaryHDV.random(test_engine.dimension)
metadata = {}
updated_metadata = await test_engine._evaluate_tier(encoded_vec, metadata)
assert updated_metadata["eig"] == 0.0
async def test_evaluate_tier_high_eig_tags(self, test_engine):
"""Test that high EIG adds epistemic_high tag."""
await test_engine.initialize()
test_engine.epistemic_drive_active = True
test_engine.surprise_threshold = 0.1 # Low threshold to trigger tagging
# Create a random vector that will likely be different from context
encoded_vec = BinaryHDV.random(test_engine.dimension)
metadata = {}
updated_metadata = await test_engine._evaluate_tier(encoded_vec, metadata)
if updated_metadata["eig"] >= test_engine.surprise_threshold:
assert "epistemic_high" in updated_metadata.get("tags", [])
async def test_evaluate_tier_preserves_existing_tags(self, test_engine):
"""Test that existing tags are preserved when adding epistemic_high."""
await test_engine.initialize()
test_engine.epistemic_drive_active = True
test_engine.surprise_threshold = 0.0 # Guarantee tagging
encoded_vec = BinaryHDV.random(test_engine.dimension)
metadata = {"tags": ["existing_tag"]}
updated_metadata = await test_engine._evaluate_tier(encoded_vec, metadata)
assert "existing_tag" in updated_metadata["tags"]
assert "epistemic_high" in updated_metadata["tags"]
async def test_evaluate_tier_low_eig_no_tag(self, test_engine):
"""Test that low EIG does not add epistemic_high tag."""
await test_engine.initialize()
test_engine.epistemic_drive_active = True
test_engine.surprise_threshold = 1.0 # Impossibly high threshold
encoded_vec = BinaryHDV.random(test_engine.dimension)
metadata = {}
updated_metadata = await test_engine._evaluate_tier(encoded_vec, metadata)
assert "tags" not in updated_metadata or "epistemic_high" not in updated_metadata.get("tags", [])
# =============================================================================
# Tests for _persist_memory()
# =============================================================================
@pytest.mark.asyncio
class TestPersistMemory:
"""Test suite for _persist_memory method."""
async def test_persist_memory_creates_node(self, test_engine):
"""Test that _persist_memory creates a valid MemoryNode."""
await test_engine.initialize()
encoded_vec = BinaryHDV.random(test_engine.dimension)
metadata = {"eig": 0.5}
node = await test_engine._persist_memory("test content", encoded_vec, metadata)
assert isinstance(node, MemoryNode)
assert node.content == "test content"
assert node.hdv.data.tobytes() == encoded_vec.data.tobytes()
assert node.metadata == metadata
async def test_persist_memory_stores_in_tier_manager(self, test_engine):
"""Test that node is stored in tier manager (HOT tier)."""
await test_engine.initialize()
encoded_vec = BinaryHDV.random(test_engine.dimension)
metadata = {"eig": 0.5}
node = await test_engine._persist_memory("test content", encoded_vec, metadata)
# Verify node is in HOT tier
async with test_engine.tier_manager.lock:
assert node.id in test_engine.tier_manager.hot
assert test_engine.tier_manager.hot[node.id].id == node.id
async def test_persist_memory_sets_epistemic_value(self, test_engine):
"""Test that epistemic_value is correctly set from metadata."""
await test_engine.initialize()
encoded_vec = BinaryHDV.random(test_engine.dimension)
metadata = {"eig": 0.75}
node = await test_engine._persist_memory("test content", encoded_vec, metadata)
assert node.epistemic_value == 0.75
async def test_persist_memory_calculates_ltp(self, test_engine):
"""Test that LTP is calculated after persistence."""
await test_engine.initialize()
encoded_vec = BinaryHDV.random(test_engine.dimension)
metadata = {"eig": 0.5}
node = await test_engine._persist_memory("test content", encoded_vec, metadata)
# LTP should be calculated (non-zero with default config)
assert hasattr(node, "ltp_strength")
assert node.ltp_strength >= 0.0
async def test_persist_memory_writes_to_disk(self, test_engine):
"""Test that memory is appended to persistence log."""
await test_engine.initialize()
encoded_vec = BinaryHDV.random(test_engine.dimension)
metadata = {"eig": 0.5}
node = await test_engine._persist_memory("test content", encoded_vec, metadata)
# Check persistence file exists and contains the node
assert os.path.exists(test_engine.persist_path)
# =============================================================================
# Tests for _trigger_post_store()
# =============================================================================
@pytest.mark.asyncio
class TestTriggerPostStore:
"""Test suite for _trigger_post_store method."""
async def test_trigger_post_store_adds_to_subconscious_queue(self, test_engine):
"""Test that node ID is added to subconscious queue."""
await test_engine.initialize()
# Pre-populate queue to prevent dream from consuming our node
test_engine.subconscious_queue.clear()
test_engine.subconscious_queue.append("placeholder")
node = MemoryNode(
id="test-node-id",
hdv=BinaryHDV.random(test_engine.dimension),
content="test content",
metadata={},
)
metadata = {}
await test_engine._trigger_post_store(node, metadata)
# Our node should have been added (dream may have popped placeholder)
assert "test-node-id" in test_engine.subconscious_queue
async def test_trigger_post_store_skips_dream_for_gap_fill(self, test_engine):
"""Test that background dream is skipped for gap-filled memories."""
await test_engine.initialize()
test_engine.subconscious_queue.clear()
node = MemoryNode(
id="gap-fill-node",
hdv=BinaryHDV.random(test_engine.dimension),
content="generated content",
metadata={},
)
metadata = {"source": "llm_gap_fill"}
# Should not raise any errors
await test_engine._trigger_post_store(node, metadata)
# For gap fill, node should remain in queue since dream is skipped
assert "gap-fill-node" in test_engine.subconscious_queue
async def test_trigger_post_store_triggers_dream_for_normal_memory(self, test_engine):
"""Test that background dream is triggered for normal memories."""
await test_engine.initialize()
# Pre-populate to test that dream is triggered
test_engine.subconscious_queue.clear()
test_engine.subconscious_queue.append("pre-existing")
node = MemoryNode(
id="normal-node",
hdv=BinaryHDV.random(test_engine.dimension),
content="normal content",
metadata={},
)
metadata = {}
# The dream should be triggered and process the queue
await test_engine._trigger_post_store(node, metadata)
# Either node was added and dream consumed it, or it's still there
# The key test is that no error was raised
assert True
async def test_trigger_post_store_with_empty_subconscious_queue(self, test_engine):
"""Test behavior when subconscious queue is initially empty."""
await test_engine.initialize()
test_engine.subconscious_queue.clear()
node = MemoryNode(
id="first-node",
hdv=BinaryHDV.random(test_engine.dimension),
content="first content",
metadata={},
)
metadata = {}
await test_engine._trigger_post_store(node, metadata)
# Queue may be empty after dream consumes, but node was added
# The test verifies no exception was raised
assert True
async def test_trigger_post_store_gap_fill_not_consumed(self, test_engine):
"""Test that gap-filled nodes remain in queue since dream is skipped."""
await test_engine.initialize()
test_engine.subconscious_queue.clear()
# Gap fill should NOT trigger dream, so node should remain
node = MemoryNode(
id="gap-node",
hdv=BinaryHDV.random(test_engine.dimension),
content="gap fill content",
metadata={},
)
metadata = {"source": "llm_gap_fill"}
await test_engine._trigger_post_store(node, metadata)
# Gap fill skips dream, so node should be in queue
assert "gap-node" in test_engine.subconscious_queue
async def test_trigger_post_store_multiple_gap_fills(self, test_engine):
"""Test multiple gap fill calls add multiple entries to queue."""
await test_engine.initialize()
test_engine.subconscious_queue.clear()
for i in range(3):
node = MemoryNode(
id=f"gap-node-{i}",
hdv=BinaryHDV.random(test_engine.dimension),
content=f"gap content {i}",
metadata={},
)
# Gap fill source skips dream, so nodes accumulate
await test_engine._trigger_post_store(node, {"source": "llm_gap_fill"})
assert len(test_engine.subconscious_queue) == 3
async def test_subconscious_queue_respects_maxlen_config(self, tmp_path):
"""Queue should drop oldest items when maxlen is configured."""
reset_config()
data_dir = tmp_path / "data"
data_dir.mkdir()
os.environ["HAIM_DATA_DIR"] = str(data_dir)
os.environ["HAIM_MEMORY_FILE"] = str(data_dir / "memory.jsonl")
os.environ["HAIM_CODEBOOK_FILE"] = str(data_dir / "codebook.json")
os.environ["HAIM_SYNAPSES_FILE"] = str(data_dir / "synapses.json")
os.environ["HAIM_WARM_MMAP_DIR"] = str(data_dir / "warm")
os.environ["HAIM_COLD_ARCHIVE_DIR"] = str(data_dir / "cold")
os.environ["HAIM_ENCODING_MODE"] = "binary"
os.environ["HAIM_DIMENSIONALITY"] = "1024"
os.environ["HAIM_DREAM_LOOP_SUBCONSCIOUS_QUEUE_MAXLEN"] = "2"
reset_config()
engine = HAIMEngine()
assert isinstance(engine.subconscious_queue, deque)
engine.subconscious_queue.append("id-1")
engine.subconscious_queue.append("id-2")
engine.subconscious_queue.append("id-3")
assert list(engine.subconscious_queue) == ["id-2", "id-3"]
for key in [
"HAIM_DATA_DIR",
"HAIM_MEMORY_FILE",
"HAIM_CODEBOOK_FILE",
"HAIM_SYNAPSES_FILE",
"HAIM_WARM_MMAP_DIR",
"HAIM_COLD_ARCHIVE_DIR",
"HAIM_ENCODING_MODE",
"HAIM_DIMENSIONALITY",
"HAIM_DREAM_LOOP_SUBCONSCIOUS_QUEUE_MAXLEN",
]:
if key in os.environ:
del os.environ[key]
reset_config()
# =============================================================================
# Integration Tests for store() orchestration
# =============================================================================
@pytest.mark.asyncio
class TestStoreOrchestration:
"""Integration tests for the refactored store() method."""
async def test_store_returns_valid_id(self, test_engine):
"""Test that store() returns a valid UUID string."""
await test_engine.initialize()
node_id = await test_engine.store("test memory content")
assert isinstance(node_id, str)
assert len(node_id) == 36 # UUID format
async def test_store_with_all_parameters(self, test_engine):
"""Test store() with all optional parameters."""
await test_engine.initialize()
metadata = {"priority": "high", "category": "test"}
node_id = await test_engine.store(
content="complete test",
metadata=metadata,
goal_id="goal-789",
)
node = await test_engine.get_memory(node_id)
assert node is not None
assert node.metadata["priority"] == "high"
assert node.metadata["category"] == "test"
assert node.metadata["goal_context"] == "goal-789"
assert "eig" in node.metadata
async def test_store_pipeline_integration(self, test_engine):
"""Test complete pipeline from encoding to persistence."""
await test_engine.initialize()
content = "integration test content"
node_id = await test_engine.store(content)
# Verify node exists in tier manager
node = await test_engine.tier_manager.get_memory(node_id)
assert node is not None
assert node.content == content
# Node starts in hot (may be demoted based on config, so just check it exists)
assert node.tier in ["hot", "warm"]
# Verify persistence
assert os.path.exists(test_engine.persist_path)