File size: 17,318 Bytes
55d584b
0ccf2f0
55d584b
 
 
 
 
ec38897
55d584b
 
ec38897
55d584b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0ccf2f0
55d584b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0ccf2f0
55d584b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0ccf2f0
55d584b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0ccf2f0
55d584b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
"""
Summarization Ladder - Hierarchical Memory Compression with Micro and Macro Distillation.

Implements rolling N-window micro-summaries and pipeline macro distillation
for the Cognitive Geo-Thermal Lore Engine v0.3.
"""

from typing import List, Dict, Any, Optional
import time
import hashlib
from dataclasses import dataclass
from collections import deque


@dataclass
class MicroSummary:
    """Rolling window micro-summary with provenance."""

    summary_id: str
    window_fragments: List[str]  # Original fragment IDs in this window
    compressed_text: str
    window_size: int
    creation_timestamp: float
    heat_aggregate: float
    semantic_centroid: Optional[List[float]] = None

    def get_age_seconds(self) -> float:
        """Get summary age in seconds."""
        return time.time() - self.creation_timestamp


@dataclass
class MacroDistillation:
    """Macro distillation from N micro-summaries."""

    distillation_id: str
    source_micro_summaries: List[str]  # Micro-summary IDs
    distilled_essence: str
    consolidation_ratio: float  # Original fragments / distilled size
    provenance_chain: List[Dict[str, Any]]
    creation_timestamp: float
    anchor_reinforcements: List[str]  # Anchor IDs that were reinforced


class SummarizationLadder:
    """
    Hierarchical summarization system with micro-summaries and macro distillation.

    Architecture:
    - Micro-summaries: Rolling N-window summaries of recent fragments
    - Macro distillation: Pipeline processing after N micro-summaries accumulated
    - Recovery distillation: Anchor reinforcement during distillation process
    """

    def __init__(self, config: Optional[Dict[str, Any]] = None, embedding_provider=None):
        """Initialize the summarization ladder."""
        self.config = config or {}
        self.embedding_provider = embedding_provider

        # Configuration parameters
        self.micro_window_size = self.config.get("micro_window_size", 5)
        self.macro_trigger_count = self.config.get("macro_trigger_count", 3)
        self.max_micro_summaries = self.config.get("max_micro_summaries", 20)

        # Storage
        self.micro_summaries: deque = deque(maxlen=self.max_micro_summaries)
        self.macro_distillations: List[MacroDistillation] = []
        self.fragment_buffer: deque = deque(maxlen=self.micro_window_size)

        # State tracking
        self.total_fragments_processed = 0
        self.micro_summaries_created = 0
        self.macro_distillations_created = 0

        # Metrics
        self.metrics = {
            "total_fragments": 0,
            "micro_summaries_created": 0,
            "macro_distillations_created": 0,
            "compression_ratio": 0.0,
            "processing_time_ms": 0.0,
        }

    def process_fragments(self, fragments: List[Dict[str, Any]]) -> Dict[str, Any]:
        """
        Process incoming fragments through the summarization ladder.

        Returns processing report with micro-summaries and any macro distillations.
        """
        start_time = time.time()
        processing_report = {
            "fragments_processed": len(fragments),
            "micro_summaries_created": 0,
            "macro_distillations_created": 0,
            "new_micro_summaries": [],
            "new_macro_distillations": [],
        }

        for fragment in fragments:
            self.fragment_buffer.append(fragment)
            self.total_fragments_processed += 1

            # Check if we should create a micro-summary
            if len(self.fragment_buffer) >= self.micro_window_size:
                micro_summary = self._create_micro_summary()
                if micro_summary:
                    self.micro_summaries.append(micro_summary)
                    processing_report["micro_summaries_created"] += 1
                    processing_report["new_micro_summaries"].append(
                        {
                            "summary_id": micro_summary.summary_id,
                            "compressed_text": micro_summary.compressed_text[:100] + "...",
                            "window_size": micro_summary.window_size,
                            "heat_aggregate": micro_summary.heat_aggregate,
                        }
                    )

                    # Check if we should trigger macro distillation
                    if len(self.micro_summaries) >= self.macro_trigger_count:
                        macro_distillation = self._create_macro_distillation()
                        if macro_distillation:
                            self.macro_distillations.append(macro_distillation)
                            processing_report["macro_distillations_created"] += 1
                            processing_report["new_macro_distillations"].append(
                                {
                                    "distillation_id": macro_distillation.distillation_id,
                                    "distilled_essence": macro_distillation.distilled_essence[:100]
                                    + "...",
                                    "consolidation_ratio": macro_distillation.consolidation_ratio,
                                    "source_count": len(macro_distillation.source_micro_summaries),
                                }
                            )

        # Update metrics
        elapsed_ms = (time.time() - start_time) * 1000
        self.metrics["total_fragments"] = self.total_fragments_processed
        self.metrics["micro_summaries_created"] = self.micro_summaries_created
        self.metrics["macro_distillations_created"] = self.macro_distillations_created
        self.metrics["processing_time_ms"] += elapsed_ms

        # Calculate compression ratio
        if self.total_fragments_processed > 0:
            compressed_units = len(self.micro_summaries) + len(self.macro_distillations)
            self.metrics["compression_ratio"] = self.total_fragments_processed / max(
                compressed_units, 1
            )

        processing_report["elapsed_ms"] = elapsed_ms
        processing_report["total_micro_summaries"] = len(self.micro_summaries)
        processing_report["total_macro_distillations"] = len(self.macro_distillations)

        return processing_report

    def get_recovery_context(self, anchor_id: str, context_size: int = 3) -> Dict[str, Any]:
        """
        Get recovery distillation context for anchor reinforcement.

        Returns relevant micro-summaries and macro distillations that relate to the anchor.
        """
        recovery_context = {
            "anchor_id": anchor_id,
            "related_micro_summaries": [],
            "related_macro_distillations": [],
            "temporal_sequence": [],
            "consolidation_path": [],
        }

        # Find micro-summaries that might relate to the anchor
        # (In a full implementation, this would use semantic similarity)
        recent_micros = list(self.micro_summaries)[-context_size:]
        for micro in recent_micros:
            recovery_context["related_micro_summaries"].append(
                {
                    "summary_id": micro.summary_id,
                    "compressed_text": micro.compressed_text,
                    "heat_aggregate": micro.heat_aggregate,
                    "age_seconds": micro.get_age_seconds(),
                }
            )

        # Find relevant macro distillations
        recent_macros = self.macro_distillations[-context_size:] if self.macro_distillations else []
        for macro in recent_macros:
            if anchor_id in macro.anchor_reinforcements:
                recovery_context["related_macro_distillations"].append(
                    {
                        "distillation_id": macro.distillation_id,
                        "distilled_essence": macro.distilled_essence,
                        "consolidation_ratio": macro.consolidation_ratio,
                        "anchor_reinforcements": macro.anchor_reinforcements,
                    }
                )

        # Build temporal sequence showing information flow
        all_items = []
        for micro in recent_micros:
            all_items.append(("micro", micro.creation_timestamp, micro.summary_id))
        for macro in recent_macros:
            all_items.append(("macro", macro.creation_timestamp, macro.distillation_id))

        all_items.sort(key=lambda x: x[1])
        recovery_context["temporal_sequence"] = [
            {"type": item[0], "timestamp": item[1], "id": item[2]} for item in all_items
        ]

        return recovery_context

    def get_compression_metrics(self) -> Dict[str, Any]:
        """Get comprehensive compression and performance metrics."""
        return {
            "summarization_ladder_metrics": self.metrics.copy(),
            "current_state": {
                "micro_summaries_active": len(self.micro_summaries),
                "macro_distillations_total": len(self.macro_distillations),
                "fragment_buffer_size": len(self.fragment_buffer),
                "compression_ratio": self.metrics["compression_ratio"],
            },
            "ladder_health": {
                "processing_efficiency": self._calculate_processing_efficiency(),
                "compression_effectiveness": self._calculate_compression_effectiveness(),
                "temporal_coverage_hours": self._calculate_temporal_coverage(),
            },
        }

    def _create_micro_summary(self) -> Optional[MicroSummary]:
        """Create a micro-summary from the current fragment buffer."""
        if len(self.fragment_buffer) < self.micro_window_size:
            return None

        fragments = list(self.fragment_buffer)

        # Extract fragment IDs and text
        fragment_ids = [f.get("id", f"frag_{i}") for i, f in enumerate(fragments)]
        fragment_texts = [f.get("text", "") for f in fragments]

        # Simple summarization (in production, would use more sophisticated methods)
        compressed_text = self._compress_fragment_texts(fragment_texts)

        # Calculate aggregate heat
        heat_aggregate = sum(f.get("heat", 0.1) for f in fragments) / len(fragments)

        # Generate semantic centroid if embedding provider available
        semantic_centroid = None
        if self.embedding_provider and fragment_texts:
            try:
                embeddings = [self.embedding_provider.embed_text(text) for text in fragment_texts]
                if embeddings:
                    # Calculate centroid
                    dim = len(embeddings[0])
                    semantic_centroid = [
                        sum(emb[i] for emb in embeddings) / len(embeddings) for i in range(dim)
                    ]
            except Exception:
                # Fallback to None if embedding fails
                pass

        # Create micro-summary
        summary_id = self._generate_summary_id(compressed_text)
        micro_summary = MicroSummary(
            summary_id=summary_id,
            window_fragments=fragment_ids,
            compressed_text=compressed_text,
            window_size=len(fragments),
            creation_timestamp=time.time(),
            heat_aggregate=heat_aggregate,
            semantic_centroid=semantic_centroid,
        )

        self.micro_summaries_created += 1

        # Clear part of buffer to allow for overlap
        overlap_size = max(1, self.micro_window_size // 3)
        for _ in range(len(self.fragment_buffer) - overlap_size):
            self.fragment_buffer.popleft()

        return micro_summary

    def _create_macro_distillation(self) -> Optional[MacroDistillation]:
        """Create a macro distillation from recent micro-summaries."""
        if len(self.micro_summaries) < self.macro_trigger_count:
            return None

        # Take the oldest micro-summaries for distillation
        source_summaries = []
        source_summary_ids = []

        for _ in range(self.macro_trigger_count):
            if self.micro_summaries:
                micro = self.micro_summaries.popleft()
                source_summaries.append(micro)
                source_summary_ids.append(micro.summary_id)

        if not source_summaries:
            return None

        # Distill the essence from micro-summaries
        distilled_essence = self._distill_macro_essence(source_summaries)

        # Calculate consolidation ratio
        total_original_fragments = sum(len(micro.window_fragments) for micro in source_summaries)
        consolidation_ratio = total_original_fragments / 1.0  # 1 distillation from N fragments

        # Build provenance chain
        provenance_chain = [
            {
                "micro_summary_id": micro.summary_id,
                "original_fragments": len(micro.window_fragments),
                "heat_contribution": micro.heat_aggregate,
                "creation_timestamp": micro.creation_timestamp,
            }
            for micro in source_summaries
        ]

        # Mock anchor reinforcements (in production, would integrate with SemanticAnchorGraph)
        anchor_reinforcements = [f"anchor_reinforce_{i}" for i in range(len(source_summaries))]

        # Create macro distillation
        distillation_id = self._generate_distillation_id(distilled_essence)
        macro_distillation = MacroDistillation(
            distillation_id=distillation_id,
            source_micro_summaries=source_summary_ids,
            distilled_essence=distilled_essence,
            consolidation_ratio=consolidation_ratio,
            provenance_chain=provenance_chain,
            creation_timestamp=time.time(),
            anchor_reinforcements=anchor_reinforcements,
        )

        self.macro_distillations_created += 1
        return macro_distillation

    def _compress_fragment_texts(self, texts: List[str]) -> str:
        """Compress multiple fragment texts into a micro-summary."""
        if not texts:
            return "(empty window)"

        # Simple compression: take key phrases from each text
        key_phrases = []
        for text in texts:
            # Extract first meaningful phrase (up to 30 chars)
            clean_text = text.strip()
            if clean_text:
                phrase = clean_text[:30]
                if len(clean_text) > 30:
                    phrase += "..."
                key_phrases.append(phrase)

        # Combine into micro-summary
        if len(key_phrases) == 1:
            return f"[Micro] {key_phrases[0]}"
        else:
            return f"[Micro] {' • '.join(key_phrases[:3])}"  # Limit to 3 phrases

    def _distill_macro_essence(self, micro_summaries: List[MicroSummary]) -> str:
        """Distill macro essence from multiple micro-summaries."""
        if not micro_summaries:
            return "(empty distillation)"

        # Extract key themes from micro-summaries
        themes = []
        for micro in micro_summaries:
            # Extract meaningful content from micro-summary
            content = micro.compressed_text.replace("[Micro]", "").strip()
            if content:
                themes.append(content)

        # Create macro distillation
        if len(themes) == 1:
            return f"[Macro] {themes[0]}"
        else:
            # Combine themes into higher-level abstraction
            combined = " ⟶ ".join(themes[:2])  # Show progression
            return f"[Macro] {combined}"

    def _generate_summary_id(self, content: str) -> str:
        """Generate unique ID for micro-summary."""
        timestamp = str(int(time.time() * 1000))
        content_hash = hashlib.md5(content.encode()).hexdigest()[:8]
        return f"micro_{timestamp}_{content_hash}"

    def _generate_distillation_id(self, essence: str) -> str:
        """Generate unique ID for macro distillation."""
        timestamp = str(int(time.time() * 1000))
        essence_hash = hashlib.md5(essence.encode()).hexdigest()[:8]
        return f"macro_{timestamp}_{essence_hash}"

    def _calculate_processing_efficiency(self) -> float:
        """Calculate processing efficiency metric."""
        if self.metrics["total_fragments"] == 0:
            return 1.0
        total_time_seconds = self.metrics["processing_time_ms"] / 1000.0
        if total_time_seconds == 0:
            return 1.0
        return self.metrics["total_fragments"] / total_time_seconds

    def _calculate_compression_effectiveness(self) -> float:
        """Calculate how effectively we're compressing information."""
        return min(self.metrics["compression_ratio"] / 10.0, 1.0)  # Normalize to 0-1

    def _calculate_temporal_coverage(self) -> float:
        """Calculate temporal coverage in hours."""
        if not self.micro_summaries and not self.macro_distillations:
            return 0.0

        oldest_time = time.time()
        newest_time = 0

        for micro in self.micro_summaries:
            oldest_time = min(oldest_time, micro.creation_timestamp)
            newest_time = max(newest_time, micro.creation_timestamp)

        for macro in self.macro_distillations:
            oldest_time = min(oldest_time, macro.creation_timestamp)
            newest_time = max(newest_time, macro.creation_timestamp)

        if newest_time > oldest_time:
            return (newest_time - oldest_time) / 3600.0  # Convert to hours
        return 0.0