Spaces:
Running
on
Zero
Running
on
Zero
| """ | |
| Enhanced Anchor System with Semantic Grounding and Provenance | |
| """ | |
| from typing import List, Dict, Any, Optional, cast | |
| import time | |
| import hashlib | |
| from warbler_cda.embeddings import EmbeddingProvider, EmbeddingProviderFactory | |
| from warbler_cda.anchor_memory_pool import AnchorMemoryPool, get_global_anchor_pool | |
| from warbler_cda.anchor_data_classes import SemanticAnchor, AnchorProvenance | |
| # Privacy hooks for PII scrubbing before anchor injection | |
| PRIVACY_HOOKS_AVAILABLE = False | |
| class SemanticAnchorGraph: | |
| """Enhanced CastleGraph with semantic grounding and provenance.""" | |
| def __init__( | |
| self, | |
| embedding_provider: Optional[EmbeddingProvider] = None, | |
| config: Optional[Dict[str, Any]] = None, | |
| memory_pool: Optional[AnchorMemoryPool] = None, | |
| privacy_hooks: Optional[Any] = None, | |
| ): | |
| """Initialize the semantic anchor manager.""" | |
| self.config = config or {} | |
| self.embedding_provider = ( | |
| embedding_provider or EmbeddingProviderFactory.get_default_provider() | |
| ) | |
| # Memory pool for performance optimization | |
| self.memory_pool = memory_pool or get_global_anchor_pool() | |
| # Privacy hooks for PII scrubbing (optional but recommended) | |
| self.privacy_hooks = privacy_hooks | |
| # Anchor storage | |
| self.anchors: Dict[str, SemanticAnchor] = {} | |
| self.clusters: Dict[str, List[str]] = {} # cluster_id -> anchor_ids | |
| # Lifecycle configuration | |
| self.max_age_days = self.config.get("max_age_days", 30) | |
| self.consolidation_threshold = self.config.get("consolidation_threshold", 0.8) | |
| self.eviction_heat_threshold = self.config.get("eviction_heat_threshold", 0.1) | |
| # Performance configuration | |
| self.enable_memory_pooling = self.config.get("enable_memory_pooling", True) | |
| # Metrics | |
| self.metrics = { | |
| "total_anchors_created": 0, | |
| "total_updates": 0, | |
| "total_evictions": 0, | |
| "total_consolidations": 0, | |
| } | |
| def create_or_update_anchor( | |
| self, concept_text: str, utterance_id: str, context: Dict[str, Any] | |
| ) -> str: | |
| """Create new anchor or update existing one with PII scrubbing applied.""" | |
| # 🔐 PRIVACY HOOK: Apply PII scrubbing before anchor injection | |
| original_concept_text = concept_text | |
| privacy_metadata = {} | |
| if self.privacy_hooks: | |
| concept_text, privacy_metadata = self.privacy_hooks.scrub_content_for_anchor_injection( | |
| concept_text, context, utterance_id | |
| ) | |
| # Add privacy metadata to context for provenance tracking | |
| context = context.copy() | |
| context["privacy_scrubbing_applied"] = privacy_metadata.get( | |
| "privacy_hook_applied", False | |
| ) | |
| context["original_content_length"] = len(original_concept_text) | |
| context["scrubbed_content_length"] = len(concept_text) | |
| # Validate privacy compliance if configured | |
| is_compliant, violations = self.privacy_hooks.validate_privacy_compliance( | |
| original_concept_text, context | |
| ) | |
| if not is_compliant: | |
| context["privacy_violations"] = violations | |
| # Log the violation but continue with scrubbed content | |
| print(f"⚠️ Privacy violations detected for anchor injection: {violations}") | |
| # Generate embedding from scrubbed content | |
| embedding = self.embedding_provider.embed_text(concept_text) | |
| # Check for existing similar anchor | |
| existing_anchor_id = self._find_similar_anchor(embedding) | |
| if existing_anchor_id: | |
| # Update existing anchor | |
| anchor = self.anchors[existing_anchor_id] | |
| old_embedding = anchor.embedding.copy() | |
| # Update embedding (weighted average with recency bias) | |
| weight = 0.3 # Weight for new embedding | |
| anchor.embedding = [ | |
| (1 - weight) * old + weight * new for old, new in zip(anchor.embedding, embedding) | |
| ] | |
| # Calculate semantic drift | |
| anchor.semantic_drift = self._calculate_drift(old_embedding, anchor.embedding) | |
| # Update provenance | |
| anchor.provenance.add_update(utterance_id, context) | |
| # Increase heat | |
| anchor.heat += 0.1 | |
| self.metrics["total_updates"] += 1 | |
| return existing_anchor_id | |
| else: | |
| # Create new anchor using memory pool | |
| anchor_id = self._generate_anchor_id(concept_text) | |
| if self.enable_memory_pooling: | |
| # Use memory pool for better performance | |
| anchor = self.memory_pool.acquire_anchor( | |
| anchor_id=anchor_id, | |
| concept_text=concept_text, | |
| embedding=embedding, | |
| heat=0.2, # Initial heat | |
| creation_context=context, | |
| ) | |
| # Add initial utterance | |
| anchor.provenance.add_update(utterance_id, context) | |
| else: | |
| # Create anchor directly (fallback) | |
| provenance = AnchorProvenance( | |
| first_seen=time.time(), | |
| utterance_ids=[utterance_id], | |
| update_count=1, | |
| last_updated=time.time(), | |
| creation_context=context, | |
| update_history=[], | |
| ) | |
| anchor = SemanticAnchor( | |
| anchor_id=anchor_id, | |
| concept_text=concept_text, | |
| embedding=embedding, | |
| heat=0.2, # Initial heat | |
| provenance=provenance, | |
| ) | |
| self.anchors[anchor_id] = anchor | |
| self.metrics["total_anchors_created"] += 1 | |
| return anchor_id | |
| def get_semantic_clusters(self, max_clusters: int = 5) -> List[Dict[str, Any]]: | |
| """Get semantic clusters of anchors.""" | |
| if len(self.anchors) < 2: | |
| return [] | |
| # Simple clustering based on embedding similarity | |
| anchors_list = list(self.anchors.values()) | |
| clusters = [] | |
| used_anchors = set() | |
| for anchor in anchors_list: | |
| if anchor.anchor_id in used_anchors: | |
| continue | |
| cluster_anchors = [anchor] | |
| used_anchors.add(anchor.anchor_id) | |
| # Find similar anchors | |
| for other_anchor in anchors_list: | |
| if other_anchor.anchor_id in used_anchors: | |
| continue | |
| similarity = self.embedding_provider.calculate_similarity( | |
| anchor.embedding, other_anchor.embedding | |
| ) | |
| if similarity > self.consolidation_threshold: | |
| cluster_anchors.append(other_anchor) | |
| used_anchors.add(other_anchor.anchor_id) | |
| if len(cluster_anchors) > 1: | |
| cluster_info = self._create_cluster_info(cluster_anchors) | |
| clusters.append(cluster_info) | |
| if len(clusters) >= max_clusters: | |
| break | |
| return clusters | |
| def get_anchor_diff(self, since_timestamp: float) -> Dict[str, Any]: | |
| """Get anchor changes since timestamp.""" | |
| added = [] | |
| updated = [] | |
| decayed = [] | |
| for anchor in self.anchors.values(): | |
| if anchor.provenance.first_seen > since_timestamp: | |
| added.append( | |
| { | |
| "anchor_id": anchor.anchor_id, | |
| "concept_text": anchor.concept_text, | |
| "heat": anchor.heat, | |
| "first_seen": anchor.provenance.first_seen, | |
| } | |
| ) | |
| elif anchor.provenance.last_updated > since_timestamp: | |
| # Check if reinforced or decayed | |
| recent_updates = [ | |
| update | |
| for update in anchor.provenance.update_history | |
| if update["timestamp"] > since_timestamp | |
| ] | |
| if recent_updates: | |
| updated.append( | |
| { | |
| "anchor_id": anchor.anchor_id, | |
| "concept_text": anchor.concept_text, | |
| "heat": anchor.heat, | |
| "updates": len(recent_updates), | |
| "semantic_drift": anchor.semantic_drift, | |
| } | |
| ) | |
| return { | |
| "since_timestamp": since_timestamp, | |
| "added": added, | |
| "updated": updated, | |
| "decayed": decayed, | |
| "total_anchors": len(self.anchors), | |
| } | |
| def apply_lifecycle_policies(self) -> Dict[str, Any]: | |
| """Apply aging, consolidation, and eviction policies.""" | |
| actions = {"aged": 0, "consolidated": 0, "evicted": 0, "evicted_anchors": []} | |
| anchors_to_evict = [] | |
| # Apply aging | |
| for anchor in self.anchors.values(): | |
| age_days = anchor.calculate_age_days() | |
| # Heat decay based on age | |
| decay_factor = max(0.95**age_days, 0.1) | |
| anchor.heat *= decay_factor | |
| actions["aged"] += 1 | |
| # Mark for eviction if too old or too cold | |
| if age_days > self.max_age_days or anchor.heat < self.eviction_heat_threshold: | |
| anchors_to_evict.append(anchor.anchor_id) | |
| # Evict old/cold anchors and return to memory pool | |
| for anchor_id in anchors_to_evict: | |
| evicted_anchor = self.anchors.pop(anchor_id) | |
| # Return to memory pool if enabled | |
| if self.enable_memory_pooling: | |
| self.memory_pool.return_anchor(evicted_anchor) | |
| actions["evicted"] += 1 | |
| actions["evicted_anchors"].append( | |
| { | |
| "anchor_id": anchor_id, | |
| "concept_text": evicted_anchor.concept_text, | |
| "age_days": evicted_anchor.calculate_age_days(), | |
| "final_heat": evicted_anchor.heat, | |
| } | |
| ) | |
| self.metrics["total_evictions"] += actions["evicted"] | |
| # Trigger memory pool cleanup periodically | |
| if self.enable_memory_pooling and actions["evicted"] > 0: | |
| self.memory_pool.cleanup_pool() | |
| return actions | |
| def get_stability_metrics(self) -> Dict[str, Any]: | |
| """Calculate anchor churn, drift, and stability metrics.""" | |
| if not self.anchors: | |
| return { | |
| "total_anchors": 0, | |
| "average_age_days": 0, | |
| "average_heat": 0, | |
| "average_drift": 0, | |
| "churn_rate": 0, | |
| "stability_score": 1.0, | |
| } | |
| anchors = list(self.anchors.values()) | |
| # Basic metrics | |
| total_anchors = len(anchors) | |
| average_age = sum(a.calculate_age_days() for a in anchors) / total_anchors | |
| average_heat = sum(a.heat for a in anchors) / total_anchors | |
| average_drift = sum(a.semantic_drift for a in anchors) / total_anchors | |
| # Churn rate (evictions per day) | |
| days_active = max(average_age, 1) | |
| churn_rate = self.metrics["total_evictions"] / days_active | |
| # Overall stability score (lower drift = higher stability) | |
| stability_score = max(0, 1 - average_drift) | |
| # Get memory pool metrics if enabled | |
| memory_metrics = {} | |
| if self.enable_memory_pooling: | |
| memory_metrics = self.memory_pool.get_pool_metrics() | |
| return { | |
| "total_anchors": total_anchors, | |
| "average_age_days": average_age, | |
| "average_heat": average_heat, | |
| "average_drift": average_drift, | |
| "churn_rate": churn_rate, | |
| "stability_score": stability_score, | |
| "provider_info": self.embedding_provider.get_provider_info(), | |
| "memory_pool_metrics": memory_metrics, | |
| } | |
| def _find_similar_anchor(self, embedding: List[float]) -> Optional[str]: | |
| """Find existing anchor with similar embedding.""" | |
| best_similarity = 0 | |
| best_anchor_id = None | |
| for anchor_id, anchor in self.anchors.items(): | |
| similarity = self.embedding_provider.calculate_similarity(embedding, anchor.embedding) | |
| if similarity > best_similarity and similarity > self.consolidation_threshold: | |
| best_similarity = similarity | |
| best_anchor_id = anchor_id | |
| return best_anchor_id | |
| def _calculate_drift(self, old_embedding: List[float], new_embedding: List[float]) -> float: | |
| """Calculate semantic drift between embeddings.""" | |
| similarity = self.embedding_provider.calculate_similarity(old_embedding, new_embedding) | |
| return 1.0 - similarity # Drift is inverse of similarity | |
| def _generate_anchor_id(self, concept_text: str) -> str: | |
| """Generate unique anchor ID.""" | |
| timestamp = str(int(time.time() * 1000)) | |
| text_hash = hashlib.md5(concept_text.encode()).hexdigest()[:8] | |
| return f"anchor_{timestamp}_{text_hash}" | |
| def _create_cluster_info(self, cluster_anchors: List[SemanticAnchor]) -> Dict[str, Any]: | |
| """Create cluster information summary.""" | |
| cluster_id = f"cluster_{int(time.time())}_{len(cluster_anchors)}" | |
| # Calculate centroid | |
| centroid = [0.0] * len(cluster_anchors[0].embedding) | |
| for anchor in cluster_anchors: | |
| for i, value in enumerate(anchor.embedding): | |
| centroid[i] += value | |
| centroid = [v / len(cluster_anchors) for v in centroid] | |
| # Summary text (most common concepts) | |
| concepts = [anchor.concept_text for anchor in cluster_anchors] | |
| summary = f"Cluster of {len(concepts)} related concepts" | |
| return { | |
| "cluster_id": cluster_id, | |
| "anchor_count": len(cluster_anchors), | |
| "anchor_ids": [a.anchor_id for a in cluster_anchors], | |
| "centroid": centroid, | |
| "summary": summary, | |
| "total_heat": sum(a.heat for a in cluster_anchors), | |
| "average_age": sum(a.calculate_age_days() for a in cluster_anchors) | |
| / len(cluster_anchors), | |
| } | |
| def get_privacy_metrics(self) -> Dict[str, Any]: | |
| """Get privacy enforcement metrics from privacy hooks""" | |
| if self.privacy_hooks: | |
| return cast(Dict[str, Any], self.privacy_hooks.get_privacy_metrics()) | |
| else: | |
| return { | |
| "privacy_hooks_enabled": False, | |
| "privacy_note": "Privacy hooks not available or disabled", | |
| } | |