#!/usr/bin/env python3 """ Pack Compression Script using Evaporation Engine This script compresses warbler packs by replacing document content with compressed proto-thoughts generated by the evaporation engine. """ import json import sys from pathlib import Path from typing import Dict, Any, List # Add the project root to Python path sys.path.insert(0, str(Path(__file__).parent)) from warbler_cda.melt_layer import MeltLayer, MagmaStore from warbler_cda.evaporation import EvaporationEngine, CloudStore def load_jsonl_file(filepath: str) -> List[Dict[str, Any]]: """Load a JSONL file and return list of documents.""" documents = [] with open(filepath, "r", encoding="utf-8") as f: for line in f: line = line.strip() if line: documents.append(json.loads(line)) return documents def save_jsonl_file(filepath: str, documents: List[Dict[str, Any]]) -> None: """Save list of documents to a JSONL file.""" with open(filepath, "w", encoding="utf-8") as f: for doc in documents: f.write(json.dumps(doc, ensure_ascii=False) + "\n") def compress_pack(pack_path: str, output_suffix: str = "_compressed") -> None: """Compress a single pack using evaporation engine.""" pack_path = Path(pack_path) if not pack_path.exists(): raise FileNotFoundError(f"Pack path {pack_path} does not exist") # Find all JSONL files in the pack jsonl_files = list(pack_path.glob("*.jsonl")) if not jsonl_files: print(f"No JSONL files found in {pack_path}") return print(f"Found {len(jsonl_files)} JSONL files in {pack_path}") # Initialize evaporation components magma_store = MagmaStore() cloud_store = CloudStore() melt_layer = MeltLayer(magma_store) evaporation_engine = EvaporationEngine(magma_store, cloud_store) total_docs = 0 compressed_docs = 0 for jsonl_file in jsonl_files: print(f"Processing {jsonl_file.name}...") # Load documents documents = load_jsonl_file(str(jsonl_file)) total_docs += len(documents) compressed_documents = [] for doc in documents: if "content" not in doc: print("Warning: Document missing 'content' field, skipping") continue content = doc["content"] if not content or not isinstance(content, str): print("Warning: Empty or invalid content, skipping") continue try: # Create a fragment from the document content fragment = {"id": doc.get("content_id", f"doc_{compressed_docs}"), "text": content} # Create glyph from the single fragment melt_layer.retire_cluster({"fragments": [fragment]}) # Evaporate to get proto-thought mist_lines = evaporation_engine.evaporate(limit=1) if mist_lines: proto_thought = mist_lines[0]["proto_thought"] # Replace content with compressed proto-thought compressed_doc = doc.copy() compressed_doc["content"] = proto_thought compressed_doc["original_content_length"] = len(content) compressed_doc["compressed_content_length"] = len(proto_thought) compressed_documents.append(compressed_doc) compressed_docs += 1 else: print( f"Warning: Failed to evaporate glyph for document {doc.get('content_id', 'unknown')}" ) # Keep original document if evaporation fails compressed_documents.append(doc) except Exception as e: print(f"Error processing document {doc.get('content_id', 'unknown')}: {e}") # Keep original document on error compressed_documents.append(doc) # Save compressed file output_file = jsonl_file.parent / f"{jsonl_file.stem}{output_suffix}{jsonl_file.suffix}" save_jsonl_file(str(output_file), compressed_documents) print(f"Saved compressed file: {output_file}") print("Compression complete:") print(f" Total documents processed: {total_docs}") print(f" Documents compressed: {compressed_docs}") if total_docs > 0: print(f" Compression ratio: {compressed_docs/total_docs:.2%}") def main(): if len(sys.argv) != 2: print("Usage: python compress_packs.py ") sys.exit(1) pack_path = sys.argv[1] compress_pack(pack_path) if __name__ == "__main__": main()