warbler-cda / warbler_cda /anchor_memory_pool.py
Bellok's picture
Upload folder using huggingface_hub
0ccf2f0 verified
raw
history blame
11.9 kB
#!/usr/bin/env python3
"""
Anchor Memory Pool - Performance Optimization for v0.6.
Implements object pooling for semantic anchors to reduce GC churn
and improve memory management during high-throughput ingestion.
🧙‍♂️ "Memory is the castle's foundation - manage it wisely,
lest the whole structure crumble under its own weight." - Bootstrap Sentinel
"""
from typing import List, Dict, Any, Optional, Deque
import time
from collections import deque
from dataclasses import dataclass
from threading import Lock
from warbler_cda.anchor_data_classes import SemanticAnchor, AnchorProvenance
@dataclass
class PoolMetrics:
"""Memory pool performance metrics."""
total_created: int = 0
total_reused: int = 0
total_returned: int = 0
current_pool_size: int = 0
peak_pool_size: int = 0
gc_collections_avoided: int = 0
memory_pressure_events: int = 0
last_cleanup_timestamp: float = 0.0
def get_reuse_rate(self) -> float:
"""Calculate object reuse rate percentage."""
total_requested = self.total_created + self.total_reused
return (self.total_reused / total_requested * 100) if total_requested > 0 else 0.0
class AnchorMemoryPool:
"""
Memory pool for semantic anchors with adaptive sizing and cleanup.
Features:
- Object reuse to reduce GC pressure
- Adaptive pool sizing based on usage patterns
- Memory pressure detection and cleanup
- Thread-safe operations for concurrent access
"""
def __init__(
self,
initial_size: int = 50,
max_size: int = 500,
cleanup_interval: float = 300.0,
memory_pressure_threshold: int = 1000,
):
self.initial_size = initial_size
self.max_size = max_size
self.cleanup_interval = cleanup_interval
self.memory_pressure_threshold = memory_pressure_threshold
# Pool storage
self.available_anchors: Deque[SemanticAnchor] = deque()
self.available_provenances: Deque[AnchorProvenance] = deque()
# Thread safety
self._lock = Lock()
# Metrics
self.metrics = PoolMetrics()
# Initialize pool with clean objects
self._preallocate_pool()
def _preallocate_pool(self):
"""Pre-allocate pool with initial objects."""
for _ in range(self.initial_size):
# Create clean anchor object
anchor = self._create_clean_anchor()
self.available_anchors.append(anchor)
# Create clean provenance object
provenance = self._create_clean_provenance()
self.available_provenances.append(provenance)
self.metrics.current_pool_size = self.initial_size
self.metrics.peak_pool_size = self.initial_size
def _create_clean_anchor(self) -> SemanticAnchor:
"""Create a clean anchor object for pooling."""
return SemanticAnchor(
anchor_id="",
concept_text="",
embedding=[],
heat=0.0,
provenance=None, # Will be set when acquired
cluster_id=None,
semantic_drift=0.0,
stability_score=1.0,
)
def _create_clean_provenance(self) -> AnchorProvenance:
"""Create a clean provenance object for pooling."""
return AnchorProvenance(
first_seen=0.0,
utterance_ids=[],
update_count=0,
last_updated=0.0,
creation_context={},
update_history=[],
)
def acquire_anchor(
self,
anchor_id: str,
concept_text: str,
embedding: List[float],
heat: float,
creation_context: Dict[str, Any],
) -> SemanticAnchor:
"""
Acquire an anchor from the pool or create new one.
Returns a configured anchor ready for use.
"""
with self._lock:
# Try to reuse from pool
if self.available_anchors and self.available_provenances:
anchor = self.available_anchors.popleft()
provenance = self.available_provenances.popleft()
self.metrics.total_reused += 1
self.metrics.gc_collections_avoided += 1
else:
# Create new objects if pool is empty
anchor = self._create_clean_anchor()
provenance = self._create_clean_provenance()
self.metrics.total_created += 1
# Configure the anchor
anchor.anchor_id = anchor_id
anchor.concept_text = concept_text
anchor.embedding = embedding.copy() # Defensive copy
anchor.heat = heat
anchor.cluster_id = None
anchor.semantic_drift = 0.0
anchor.stability_score = 1.0
# Configure provenance
current_time = time.time()
provenance.first_seen = current_time
provenance.utterance_ids = []
provenance.update_count = 0
provenance.last_updated = current_time
provenance.creation_context = creation_context.copy()
provenance.update_history = []
anchor.provenance = provenance
self.metrics.current_pool_size = len(self.available_anchors)
return anchor
def return_anchor(self, anchor: SemanticAnchor):
"""
Return an anchor to the pool for reuse.
Cleans the anchor state before returning to pool.
"""
if anchor is None:
return
with self._lock:
# Check if we're at capacity
if len(self.available_anchors) >= self.max_size:
# Pool is full, let object be garbage collected
self.metrics.memory_pressure_events += 1
return
# Clean the anchor for reuse
self._clean_anchor_for_reuse(anchor)
# Return to pool
self.available_anchors.append(anchor)
if anchor.provenance:
self._clean_provenance_for_reuse(anchor.provenance)
self.available_provenances.append(anchor.provenance)
self.metrics.total_returned += 1
self.metrics.current_pool_size = len(self.available_anchors)
self.metrics.peak_pool_size = max(
self.metrics.peak_pool_size, self.metrics.current_pool_size
)
def _clean_anchor_for_reuse(self, anchor: SemanticAnchor):
"""Clean anchor state for reuse."""
anchor.anchor_id = ""
anchor.concept_text = ""
anchor.embedding.clear()
anchor.heat = 0.0
anchor.cluster_id = None
anchor.semantic_drift = 0.0
anchor.stability_score = 1.0
def _clean_provenance_for_reuse(self, provenance: AnchorProvenance):
"""Clean provenance state for reuse."""
provenance.first_seen = 0.0
provenance.utterance_ids.clear()
provenance.update_count = 0
provenance.last_updated = 0.0
provenance.creation_context.clear()
provenance.update_history.clear()
def cleanup_pool(self, force: bool = False):
"""
Clean up pool based on usage patterns and memory pressure.
Args:
force: Force cleanup regardless of interval
"""
current_time = time.time()
if (
not force
and (current_time - self.metrics.last_cleanup_timestamp) < self.cleanup_interval
):
return
with self._lock:
# Adaptive pool sizing based on usage patterns
target_size = self._calculate_optimal_pool_size()
current_size = len(self.available_anchors)
if current_size > target_size:
# Reduce pool size
excess = current_size - target_size
for _ in range(excess):
if self.available_anchors:
self.available_anchors.popleft()
if self.available_provenances:
self.available_provenances.popleft()
elif current_size < target_size and current_size < self.max_size:
# Grow pool if needed
needed = min(target_size - current_size, self.max_size - current_size)
for _ in range(needed):
anchor = self._create_clean_anchor()
provenance = self._create_clean_provenance()
self.available_anchors.append(anchor)
self.available_provenances.append(provenance)
self.metrics.last_cleanup_timestamp = current_time
self.metrics.current_pool_size = len(self.available_anchors)
def _calculate_optimal_pool_size(self) -> int:
"""Calculate optimal pool size based on usage patterns."""
reuse_rate = self.metrics.get_reuse_rate()
# High reuse rate = keep larger pool
if reuse_rate > 80:
return min(self.max_size, int(self.initial_size * 2))
elif reuse_rate > 50:
return int(self.initial_size * 1.5)
else:
return self.initial_size
def get_pool_metrics(self) -> Dict[str, Any]:
"""Get comprehensive pool performance metrics."""
with self._lock:
return {
"pool_status": {
"current_size": self.metrics.current_pool_size,
"peak_size": self.metrics.peak_pool_size,
"max_size": self.max_size,
"utilization_pct": (self.metrics.current_pool_size / self.max_size) * 100,
},
"performance_metrics": {
"total_created": self.metrics.total_created,
"total_reused": self.metrics.total_reused,
"total_returned": self.metrics.total_returned,
"reuse_rate_pct": self.metrics.get_reuse_rate(),
"gc_collections_avoided": self.metrics.gc_collections_avoided,
},
"memory_management": {
"memory_pressure_events": self.metrics.memory_pressure_events,
"last_cleanup": self.metrics.last_cleanup_timestamp,
"cleanup_interval": self.cleanup_interval,
},
}
def get_memory_savings_estimate(self) -> Dict[str, Any]:
"""Estimate memory savings from pooling."""
# Rough estimates based on object sizes
anchor_size_bytes = 1024 # Approximate size of SemanticAnchor
provenance_size_bytes = 512 # Approximate size of AnchorProvenance
total_objects_avoided = self.metrics.gc_collections_avoided * 2 # anchor + provenance
memory_saved_bytes = total_objects_avoided * (anchor_size_bytes + provenance_size_bytes)
return {
"objects_reused": self.metrics.total_reused,
"gc_collections_avoided": self.metrics.gc_collections_avoided,
"estimated_memory_saved_bytes": memory_saved_bytes,
"estimated_memory_saved_mb": memory_saved_bytes / (1024 * 1024),
"efficiency_score": self.metrics.get_reuse_rate() / 100.0,
}
# Global pool instance for shared use
_global_anchor_pool: Optional[AnchorMemoryPool] = None
def get_global_anchor_pool() -> AnchorMemoryPool:
"""Get or create global anchor pool instance."""
global _global_anchor_pool # pylint: disable=global-statement
if _global_anchor_pool is None:
_global_anchor_pool = AnchorMemoryPool()
return _global_anchor_pool
def configure_global_pool(initial_size: int = 50, max_size: int = 500) -> AnchorMemoryPool:
"""Configure global anchor pool with custom settings."""
global _global_anchor_pool # pylint: disable=global-statement
_global_anchor_pool = AnchorMemoryPool(initial_size=initial_size, max_size=max_size)
return _global_anchor_pool