""" Summarization Ladder - Hierarchical Memory Compression with Micro and Macro Distillation. Implements rolling N-window micro-summaries and pipeline macro distillation for the Cognitive Geo-Thermal Lore Engine v0.3. """ from typing import List, Dict, Any, Optional import time import hashlib from dataclasses import dataclass from collections import deque @dataclass class MicroSummary: """Rolling window micro-summary with provenance.""" summary_id: str window_fragments: List[str] # Original fragment IDs in this window compressed_text: str window_size: int creation_timestamp: float heat_aggregate: float semantic_centroid: Optional[List[float]] = None def get_age_seconds(self) -> float: """Get summary age in seconds.""" return time.time() - self.creation_timestamp @dataclass class MacroDistillation: """Macro distillation from N micro-summaries.""" distillation_id: str source_micro_summaries: List[str] # Micro-summary IDs distilled_essence: str consolidation_ratio: float # Original fragments / distilled size provenance_chain: List[Dict[str, Any]] creation_timestamp: float anchor_reinforcements: List[str] # Anchor IDs that were reinforced class SummarizationLadder: """ Hierarchical summarization system with micro-summaries and macro distillation. Architecture: - Micro-summaries: Rolling N-window summaries of recent fragments - Macro distillation: Pipeline processing after N micro-summaries accumulated - Recovery distillation: Anchor reinforcement during distillation process """ def __init__(self, config: Optional[Dict[str, Any]] = None, embedding_provider=None): """Initialize the summarization ladder.""" self.config = config or {} self.embedding_provider = embedding_provider # Configuration parameters self.micro_window_size = self.config.get("micro_window_size", 5) self.macro_trigger_count = self.config.get("macro_trigger_count", 3) self.max_micro_summaries = self.config.get("max_micro_summaries", 20) # Storage self.micro_summaries: deque = deque(maxlen=self.max_micro_summaries) self.macro_distillations: List[MacroDistillation] = [] self.fragment_buffer: deque = deque(maxlen=self.micro_window_size) # State tracking self.total_fragments_processed = 0 self.micro_summaries_created = 0 self.macro_distillations_created = 0 # Metrics self.metrics = { "total_fragments": 0, "micro_summaries_created": 0, "macro_distillations_created": 0, "compression_ratio": 0.0, "processing_time_ms": 0.0, } def process_fragments(self, fragments: List[Dict[str, Any]]) -> Dict[str, Any]: """ Process incoming fragments through the summarization ladder. Returns processing report with micro-summaries and any macro distillations. """ start_time = time.time() processing_report = { "fragments_processed": len(fragments), "micro_summaries_created": 0, "macro_distillations_created": 0, "new_micro_summaries": [], "new_macro_distillations": [], } for fragment in fragments: self.fragment_buffer.append(fragment) self.total_fragments_processed += 1 # Check if we should create a micro-summary if len(self.fragment_buffer) >= self.micro_window_size: micro_summary = self._create_micro_summary() if micro_summary: self.micro_summaries.append(micro_summary) processing_report["micro_summaries_created"] += 1 processing_report["new_micro_summaries"].append( { "summary_id": micro_summary.summary_id, "compressed_text": micro_summary.compressed_text[:100] + "...", "window_size": micro_summary.window_size, "heat_aggregate": micro_summary.heat_aggregate, } ) # Check if we should trigger macro distillation if len(self.micro_summaries) >= self.macro_trigger_count: macro_distillation = self._create_macro_distillation() if macro_distillation: self.macro_distillations.append(macro_distillation) processing_report["macro_distillations_created"] += 1 processing_report["new_macro_distillations"].append( { "distillation_id": macro_distillation.distillation_id, "distilled_essence": macro_distillation.distilled_essence[:100] + "...", "consolidation_ratio": macro_distillation.consolidation_ratio, "source_count": len(macro_distillation.source_micro_summaries), } ) # Update metrics elapsed_ms = (time.time() - start_time) * 1000 self.metrics["total_fragments"] = self.total_fragments_processed self.metrics["micro_summaries_created"] = self.micro_summaries_created self.metrics["macro_distillations_created"] = self.macro_distillations_created self.metrics["processing_time_ms"] += elapsed_ms # Calculate compression ratio if self.total_fragments_processed > 0: compressed_units = len(self.micro_summaries) + len(self.macro_distillations) self.metrics["compression_ratio"] = self.total_fragments_processed / max( compressed_units, 1 ) processing_report["elapsed_ms"] = elapsed_ms processing_report["total_micro_summaries"] = len(self.micro_summaries) processing_report["total_macro_distillations"] = len(self.macro_distillations) return processing_report def get_recovery_context(self, anchor_id: str, context_size: int = 3) -> Dict[str, Any]: """ Get recovery distillation context for anchor reinforcement. Returns relevant micro-summaries and macro distillations that relate to the anchor. """ recovery_context = { "anchor_id": anchor_id, "related_micro_summaries": [], "related_macro_distillations": [], "temporal_sequence": [], "consolidation_path": [], } # Find micro-summaries that might relate to the anchor # (In a full implementation, this would use semantic similarity) recent_micros = list(self.micro_summaries)[-context_size:] for micro in recent_micros: recovery_context["related_micro_summaries"].append( { "summary_id": micro.summary_id, "compressed_text": micro.compressed_text, "heat_aggregate": micro.heat_aggregate, "age_seconds": micro.get_age_seconds(), } ) # Find relevant macro distillations recent_macros = self.macro_distillations[-context_size:] if self.macro_distillations else [] for macro in recent_macros: if anchor_id in macro.anchor_reinforcements: recovery_context["related_macro_distillations"].append( { "distillation_id": macro.distillation_id, "distilled_essence": macro.distilled_essence, "consolidation_ratio": macro.consolidation_ratio, "anchor_reinforcements": macro.anchor_reinforcements, } ) # Build temporal sequence showing information flow all_items = [] for micro in recent_micros: all_items.append(("micro", micro.creation_timestamp, micro.summary_id)) for macro in recent_macros: all_items.append(("macro", macro.creation_timestamp, macro.distillation_id)) all_items.sort(key=lambda x: x[1]) recovery_context["temporal_sequence"] = [ {"type": item[0], "timestamp": item[1], "id": item[2]} for item in all_items ] return recovery_context def get_compression_metrics(self) -> Dict[str, Any]: """Get comprehensive compression and performance metrics.""" return { "summarization_ladder_metrics": self.metrics.copy(), "current_state": { "micro_summaries_active": len(self.micro_summaries), "macro_distillations_total": len(self.macro_distillations), "fragment_buffer_size": len(self.fragment_buffer), "compression_ratio": self.metrics["compression_ratio"], }, "ladder_health": { "processing_efficiency": self._calculate_processing_efficiency(), "compression_effectiveness": self._calculate_compression_effectiveness(), "temporal_coverage_hours": self._calculate_temporal_coverage(), }, } def _create_micro_summary(self) -> Optional[MicroSummary]: """Create a micro-summary from the current fragment buffer.""" if len(self.fragment_buffer) < self.micro_window_size: return None fragments = list(self.fragment_buffer) # Extract fragment IDs and text fragment_ids = [f.get("id", f"frag_{i}") for i, f in enumerate(fragments)] fragment_texts = [f.get("text", "") for f in fragments] # Simple summarization (in production, would use more sophisticated methods) compressed_text = self._compress_fragment_texts(fragment_texts) # Calculate aggregate heat heat_aggregate = sum(f.get("heat", 0.1) for f in fragments) / len(fragments) # Generate semantic centroid if embedding provider available semantic_centroid = None if self.embedding_provider and fragment_texts: try: embeddings = [self.embedding_provider.embed_text(text) for text in fragment_texts] if embeddings: # Calculate centroid dim = len(embeddings[0]) semantic_centroid = [ sum(emb[i] for emb in embeddings) / len(embeddings) for i in range(dim) ] except Exception: # Fallback to None if embedding fails pass # Create micro-summary summary_id = self._generate_summary_id(compressed_text) micro_summary = MicroSummary( summary_id=summary_id, window_fragments=fragment_ids, compressed_text=compressed_text, window_size=len(fragments), creation_timestamp=time.time(), heat_aggregate=heat_aggregate, semantic_centroid=semantic_centroid, ) self.micro_summaries_created += 1 # Clear part of buffer to allow for overlap overlap_size = max(1, self.micro_window_size // 3) for _ in range(len(self.fragment_buffer) - overlap_size): self.fragment_buffer.popleft() return micro_summary def _create_macro_distillation(self) -> Optional[MacroDistillation]: """Create a macro distillation from recent micro-summaries.""" if len(self.micro_summaries) < self.macro_trigger_count: return None # Take the oldest micro-summaries for distillation source_summaries = [] source_summary_ids = [] for _ in range(self.macro_trigger_count): if self.micro_summaries: micro = self.micro_summaries.popleft() source_summaries.append(micro) source_summary_ids.append(micro.summary_id) if not source_summaries: return None # Distill the essence from micro-summaries distilled_essence = self._distill_macro_essence(source_summaries) # Calculate consolidation ratio total_original_fragments = sum(len(micro.window_fragments) for micro in source_summaries) consolidation_ratio = total_original_fragments / 1.0 # 1 distillation from N fragments # Build provenance chain provenance_chain = [ { "micro_summary_id": micro.summary_id, "original_fragments": len(micro.window_fragments), "heat_contribution": micro.heat_aggregate, "creation_timestamp": micro.creation_timestamp, } for micro in source_summaries ] # Mock anchor reinforcements (in production, would integrate with SemanticAnchorGraph) anchor_reinforcements = [f"anchor_reinforce_{i}" for i in range(len(source_summaries))] # Create macro distillation distillation_id = self._generate_distillation_id(distilled_essence) macro_distillation = MacroDistillation( distillation_id=distillation_id, source_micro_summaries=source_summary_ids, distilled_essence=distilled_essence, consolidation_ratio=consolidation_ratio, provenance_chain=provenance_chain, creation_timestamp=time.time(), anchor_reinforcements=anchor_reinforcements, ) self.macro_distillations_created += 1 return macro_distillation def _compress_fragment_texts(self, texts: List[str]) -> str: """Compress multiple fragment texts into a micro-summary.""" if not texts: return "(empty window)" # Simple compression: take key phrases from each text key_phrases = [] for text in texts: # Extract first meaningful phrase (up to 30 chars) clean_text = text.strip() if clean_text: phrase = clean_text[:30] if len(clean_text) > 30: phrase += "..." key_phrases.append(phrase) # Combine into micro-summary if len(key_phrases) == 1: return f"[Micro] {key_phrases[0]}" else: return f"[Micro] {' • '.join(key_phrases[:3])}" # Limit to 3 phrases def _distill_macro_essence(self, micro_summaries: List[MicroSummary]) -> str: """Distill macro essence from multiple micro-summaries.""" if not micro_summaries: return "(empty distillation)" # Extract key themes from micro-summaries themes = [] for micro in micro_summaries: # Extract meaningful content from micro-summary content = micro.compressed_text.replace("[Micro]", "").strip() if content: themes.append(content) # Create macro distillation if len(themes) == 1: return f"[Macro] {themes[0]}" else: # Combine themes into higher-level abstraction combined = " ⟶ ".join(themes[:2]) # Show progression return f"[Macro] {combined}" def _generate_summary_id(self, content: str) -> str: """Generate unique ID for micro-summary.""" timestamp = str(int(time.time() * 1000)) content_hash = hashlib.md5(content.encode()).hexdigest()[:8] return f"micro_{timestamp}_{content_hash}" def _generate_distillation_id(self, essence: str) -> str: """Generate unique ID for macro distillation.""" timestamp = str(int(time.time() * 1000)) essence_hash = hashlib.md5(essence.encode()).hexdigest()[:8] return f"macro_{timestamp}_{essence_hash}" def _calculate_processing_efficiency(self) -> float: """Calculate processing efficiency metric.""" if self.metrics["total_fragments"] == 0: return 1.0 total_time_seconds = self.metrics["processing_time_ms"] / 1000.0 if total_time_seconds == 0: return 1.0 return self.metrics["total_fragments"] / total_time_seconds def _calculate_compression_effectiveness(self) -> float: """Calculate how effectively we're compressing information.""" return min(self.metrics["compression_ratio"] / 10.0, 1.0) # Normalize to 0-1 def _calculate_temporal_coverage(self) -> float: """Calculate temporal coverage in hours.""" if not self.micro_summaries and not self.macro_distillations: return 0.0 oldest_time = time.time() newest_time = 0 for micro in self.micro_summaries: oldest_time = min(oldest_time, micro.creation_timestamp) newest_time = max(newest_time, micro.creation_timestamp) for macro in self.macro_distillations: oldest_time = min(oldest_time, macro.creation_timestamp) newest_time = max(newest_time, macro.creation_timestamp) if newest_time > oldest_time: return (newest_time - oldest_time) / 3600.0 # Convert to hours return 0.0