MnemoCore / tests /test_dream_loop.py
Granis87's picture
Upload folder using huggingface_hub
c3a3710 verified
"""
Tests for Dream Loop (Subconscious Daemon)
Tests configurability, graceful shutdown, non-blocking behavior, and metrics.
"""
import asyncio
import time
import sys
import importlib
import pytest
from unittest.mock import MagicMock, AsyncMock, patch
from dataclasses import dataclass
pytest_plugins = ['pytest_asyncio']
@dataclass(frozen=True)
class MockDreamLoopConfig:
"""Mock dream loop configuration."""
enabled: bool = True
frequency_seconds: int = 1 # Short for testing
batch_size: int = 10
max_iterations: int = 0
ollama_url: str = "http://localhost:11434/api/generate"
model: str = "gemma3:1b"
@dataclass(frozen=True)
class MockRedisConfig:
"""Mock Redis configuration."""
url: str = "redis://localhost:6379/0"
stream_key: str = "haim:subconscious"
max_connections: int = 10
socket_timeout: int = 5
password: str = None
@dataclass(frozen=True)
class MockConfig:
"""Mock configuration for testing."""
dream_loop: MockDreamLoopConfig = None
redis: MockRedisConfig = None
def __post_init__(self):
if self.dream_loop is None:
object.__setattr__(self, 'dream_loop', MockDreamLoopConfig())
if self.redis is None:
object.__setattr__(self, 'redis', MockRedisConfig())
@pytest.fixture
def mock_config():
"""Create a mock configuration with short intervals for testing."""
return MockConfig(
dream_loop=MockDreamLoopConfig(
enabled=True,
frequency_seconds=1,
batch_size=10,
max_iterations=0,
),
redis=MockRedisConfig()
)
@pytest.fixture
def mock_config_disabled():
"""Create a mock configuration with dream loop disabled."""
return MockConfig(
dream_loop=MockDreamLoopConfig(
enabled=False,
frequency_seconds=1,
),
redis=MockRedisConfig()
)
@pytest.fixture
def mock_config_limited_iterations():
"""Create a mock configuration with limited iterations."""
return MockConfig(
dream_loop=MockDreamLoopConfig(
enabled=True,
frequency_seconds=1,
max_iterations=2,
),
redis=MockRedisConfig()
)
@pytest.fixture
def mock_storage():
"""Create a mock AsyncRedisStorage."""
storage = MagicMock()
storage.redis_client = MagicMock()
storage.check_health = AsyncMock(return_value=True)
storage.publish_event = AsyncMock(return_value=None)
storage.retrieve_memory = AsyncMock(return_value=None)
storage.close = AsyncMock(return_value=None)
return storage
@pytest.fixture
def daemon_module():
"""Fixture to import the daemon module with all mocks in place."""
# Create mock for aiohttp
mock_aiohttp = MagicMock()
# Create mock for DREAM_LOOP metrics
mock_dream_loop_total = MagicMock()
mock_dream_loop_total.labels = MagicMock(return_value=MagicMock())
mock_dream_loop_iteration_seconds = MagicMock()
mock_dream_loop_iteration_seconds.observe = MagicMock()
mock_dream_loop_insights = MagicMock()
mock_dream_loop_insights.labels = MagicMock(return_value=MagicMock())
mock_dream_loop_active = MagicMock()
mock_dream_loop_active.set = MagicMock()
# Patch sys.modules to inject mocks before import
patches = {
'aiohttp': mock_aiohttp,
'mnemocore.subconscious.daemon.aiohttp': mock_aiohttp,
'mnemocore.subconscious.daemon.DREAM_LOOP_TOTAL': mock_dream_loop_total,
'mnemocore.subconscious.daemon.DREAM_LOOP_ITERATION_SECONDS': mock_dream_loop_iteration_seconds,
'mnemocore.subconscious.daemon.DREAM_LOOP_INSIGHTS_GENERATED': mock_dream_loop_insights,
'mnemocore.subconscious.daemon.DREAM_LOOP_ACTIVE': mock_dream_loop_active,
}
# Apply patches to sys.modules
original_values = {}
for key, value in patches.items():
if key in sys.modules:
original_values[key] = sys.modules[key]
sys.modules[key] = value
# Remove daemon from sys.modules if it exists to force reload
if 'mnemocore.subconscious.daemon' in sys.modules:
del sys.modules['mnemocore.subconscious.daemon']
try:
import mnemocore.subconscious.daemon as dm
yield dm
finally:
# Restore original sys.modules
for key in patches:
if key in original_values:
sys.modules[key] = original_values[key]
elif key in sys.modules:
del sys.modules[key]
# Clean up daemon module
if 'mnemocore.subconscious.daemon' in sys.modules:
del sys.modules['mnemocore.subconscious.daemon']
class TestDreamLoopStartsAndStops:
"""Test that dream loop can start and stop properly."""
@pytest.mark.asyncio
async def test_dream_loop_starts_and_stops(self, mock_config, mock_storage, daemon_module):
"""Test that the dream loop starts and stops correctly."""
SubconsciousDaemon = daemon_module.SubconsciousDaemon
daemon = SubconsciousDaemon(storage=mock_storage, config=mock_config)
# Verify initial state
assert daemon.running is False
assert daemon._should_stop() is False
# Start daemon in background task
run_task = asyncio.create_task(daemon.run())
# Wait a bit for startup
await asyncio.sleep(0.2)
# Verify running state
assert daemon.running is True
# Stop the daemon
await daemon.request_stop()
# Wait for the task to complete
await asyncio.wait_for(run_task, timeout=2.0)
# Verify stopped state
assert daemon.running is False
assert daemon._should_stop() is True
@pytest.mark.asyncio
async def test_dream_loop_respects_disabled_config(self, mock_config_disabled, mock_storage, daemon_module):
"""Test that the dream loop exits immediately when disabled."""
SubconsciousDaemon = daemon_module.SubconsciousDaemon
daemon = SubconsciousDaemon(storage=mock_storage, config=mock_config_disabled)
# Run should return immediately when disabled
await daemon.run()
# Verify it never started
assert daemon.running is False
class TestDreamLoopFrequency:
"""Test that dream loop respects frequency configuration."""
@pytest.mark.asyncio
async def test_dream_respects_frequency(self, mock_config, mock_storage, daemon_module):
"""Test that dream cycles respect the configured frequency."""
SubconsciousDaemon = daemon_module.SubconsciousDaemon
daemon = SubconsciousDaemon(storage=mock_storage, config=mock_config)
# Track cycle times
cycle_times = []
original_run_cycle = daemon.run_cycle
async def tracked_run_cycle():
cycle_times.append(time.time())
await original_run_cycle()
daemon.run_cycle = tracked_run_cycle
# Start daemon
run_task = asyncio.create_task(daemon.run())
# Wait for a couple of cycles
await asyncio.sleep(0.3)
await daemon.request_stop()
await asyncio.wait_for(run_task, timeout=2.0)
# Verify at least one cycle ran
assert len(cycle_times) >= 1
class TestDreamLoopNonBlocking:
"""Test that dream loop does not block other operations."""
@pytest.mark.asyncio
async def test_dream_does_not_block_queries(self, mock_config, mock_storage, daemon_module):
"""Test that dream loop iterations don't block other async operations."""
SubconsciousDaemon = daemon_module.SubconsciousDaemon
daemon = SubconsciousDaemon(storage=mock_storage, config=mock_config)
# Track query execution
query_executed = asyncio.Event()
async def mock_query():
query_executed.set()
return {"result": "ok"}
# Start daemon
run_task = asyncio.create_task(daemon.run())
# Simulate a concurrent query while daemon is running
await asyncio.sleep(0.1) # Let daemon start
query_task = asyncio.create_task(mock_query())
# Query should complete quickly (not blocked by daemon)
try:
await asyncio.wait_for(query_executed.wait(), timeout=0.5)
assert query_executed.is_set()
finally:
await daemon.request_stop()
await asyncio.wait_for(run_task, timeout=2.0)
class TestDreamLoopIdempotentRestart:
"""Test that dream loop can be restarted idempotently."""
@pytest.mark.asyncio
async def test_dream_loop_idempotent_restart(self, mock_config, mock_storage, daemon_module):
"""Test that the dream loop can be stopped and restarted multiple times."""
SubconsciousDaemon = daemon_module.SubconsciousDaemon
daemon = SubconsciousDaemon(storage=mock_storage, config=mock_config)
# First run
run_task1 = asyncio.create_task(daemon.run())
await asyncio.sleep(0.1)
assert daemon.running is True
await daemon.request_stop()
await asyncio.wait_for(run_task1, timeout=2.0)
assert daemon.running is False
assert daemon._should_stop() is True
# Second run (restart)
run_task2 = asyncio.create_task(daemon.run())
await asyncio.sleep(0.1)
assert daemon.running is True
await daemon.request_stop()
await asyncio.wait_for(run_task2, timeout=2.0)
assert daemon.running is False
@pytest.mark.asyncio
async def test_dream_loop_multiple_stop_calls(self, mock_config, mock_storage, daemon_module):
"""Test that multiple stop calls don't cause issues."""
SubconsciousDaemon = daemon_module.SubconsciousDaemon
daemon = SubconsciousDaemon(storage=mock_storage, config=mock_config)
# Multiple stop calls should not raise errors
daemon.stop()
daemon.stop()
await daemon.request_stop()
daemon.stop()
assert daemon._should_stop() is True
class TestDreamLoopMetrics:
"""Test that dream loop emits proper metrics."""
@pytest.mark.asyncio
async def test_dream_loop_metrics_recorded(self, mock_config, mock_storage, daemon_module):
"""Test that metrics are recorded during dream loop execution."""
SubconsciousDaemon = daemon_module.SubconsciousDaemon
daemon = SubconsciousDaemon(storage=mock_storage, config=mock_config)
# Run one cycle
run_task = asyncio.create_task(daemon.run())
await asyncio.sleep(0.3)
await daemon.request_stop()
await asyncio.wait_for(run_task, timeout=2.0)
# Verify daemon stopped
assert daemon.running is False
class TestDreamLoopMaxIterations:
"""Test that dream loop respects max_iterations configuration."""
@pytest.mark.asyncio
async def test_dream_loop_respects_max_iterations(self, mock_config_limited_iterations, mock_storage, daemon_module):
"""Test that the dream loop stops after max_iterations."""
SubconsciousDaemon = daemon_module.SubconsciousDaemon
daemon = SubconsciousDaemon(storage=mock_storage, config=mock_config_limited_iterations)
# Start daemon
start_time = time.time()
run_task = asyncio.create_task(daemon.run())
# Wait for task to complete (should stop after max_iterations)
await asyncio.wait_for(run_task, timeout=5.0)
elapsed = time.time() - start_time
# Verify it stopped on its own
assert daemon.running is False
# Should have completed within reasonable time (2 iterations at 1s each)
assert elapsed < 5.0
class TestDreamLoopConfiguration:
"""Test dream loop configuration loading."""
def test_dream_loop_config_from_yaml(self):
"""Test that dream loop configuration is loaded from config.yaml."""
from mnemocore.core.config import load_config, DreamLoopConfig
config = load_config()
# Verify dream_loop config exists and has correct attributes
assert hasattr(config, 'dream_loop')
assert isinstance(config.dream_loop, DreamLoopConfig)
assert hasattr(config.dream_loop, 'enabled')
assert hasattr(config.dream_loop, 'frequency_seconds')
assert hasattr(config.dream_loop, 'batch_size')
assert hasattr(config.dream_loop, 'max_iterations')
assert hasattr(config.dream_loop, 'ollama_url')
assert hasattr(config.dream_loop, 'model')
def test_dream_loop_config_defaults(self):
"""Test that dream loop config has sensible defaults."""
from mnemocore.core.config import DreamLoopConfig
config = DreamLoopConfig()
assert config.enabled is True
assert config.frequency_seconds == 60
assert config.batch_size == 10
assert config.max_iterations == 0
assert config.ollama_url == "http://localhost:11434/api/generate"
assert config.model == "gemma3:1b"