warbler-cda / compress_packs.py
Bellok
refactor(app): improve code formatting and add background ingestion status display
ec38897
raw
history blame
4.71 kB
#!/usr/bin/env python3
"""
Pack Compression Script using Evaporation Engine
This script compresses warbler packs by replacing document content with
compressed proto-thoughts generated by the evaporation engine.
"""
import json
import sys
from pathlib import Path
from typing import Dict, Any, List
# Add the project root to Python path
sys.path.insert(0, str(Path(__file__).parent))
from warbler_cda.melt_layer import MeltLayer, MagmaStore
from warbler_cda.evaporation import EvaporationEngine, CloudStore
def load_jsonl_file(filepath: str) -> List[Dict[str, Any]]:
"""Load a JSONL file and return list of documents."""
documents = []
with open(filepath, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line:
documents.append(json.loads(line))
return documents
def save_jsonl_file(filepath: str, documents: List[Dict[str, Any]]) -> None:
"""Save list of documents to a JSONL file."""
with open(filepath, "w", encoding="utf-8") as f:
for doc in documents:
f.write(json.dumps(doc, ensure_ascii=False) + "\n")
def compress_pack(pack_path: str, output_suffix: str = "_compressed") -> None:
"""Compress a single pack using evaporation engine."""
pack_path = Path(pack_path)
if not pack_path.exists():
raise FileNotFoundError(f"Pack path {pack_path} does not exist")
# Find all JSONL files in the pack
jsonl_files = list(pack_path.glob("*.jsonl"))
if not jsonl_files:
print(f"No JSONL files found in {pack_path}")
return
print(f"Found {len(jsonl_files)} JSONL files in {pack_path}")
# Initialize evaporation components
magma_store = MagmaStore()
cloud_store = CloudStore()
melt_layer = MeltLayer(magma_store)
evaporation_engine = EvaporationEngine(magma_store, cloud_store)
total_docs = 0
compressed_docs = 0
for jsonl_file in jsonl_files:
print(f"Processing {jsonl_file.name}...")
# Load documents
documents = load_jsonl_file(str(jsonl_file))
total_docs += len(documents)
compressed_documents = []
for doc in documents:
if "content" not in doc:
print("Warning: Document missing 'content' field, skipping")
continue
content = doc["content"]
if not content or not isinstance(content, str):
print("Warning: Empty or invalid content, skipping")
continue
try:
# Create a fragment from the document content
fragment = {"id": doc.get("content_id", f"doc_{compressed_docs}"), "text": content}
# Create glyph from the single fragment
melt_layer.retire_cluster({"fragments": [fragment]})
# Evaporate to get proto-thought
mist_lines = evaporation_engine.evaporate(limit=1)
if mist_lines:
proto_thought = mist_lines[0]["proto_thought"]
# Replace content with compressed proto-thought
compressed_doc = doc.copy()
compressed_doc["content"] = proto_thought
compressed_doc["original_content_length"] = len(content)
compressed_doc["compressed_content_length"] = len(proto_thought)
compressed_documents.append(compressed_doc)
compressed_docs += 1
else:
print(
f"Warning: Failed to evaporate glyph for document {doc.get('content_id', 'unknown')}"
)
# Keep original document if evaporation fails
compressed_documents.append(doc)
except Exception as e:
print(f"Error processing document {doc.get('content_id', 'unknown')}: {e}")
# Keep original document on error
compressed_documents.append(doc)
# Save compressed file
output_file = jsonl_file.parent / f"{jsonl_file.stem}{output_suffix}{jsonl_file.suffix}"
save_jsonl_file(str(output_file), compressed_documents)
print(f"Saved compressed file: {output_file}")
print("Compression complete:")
print(f" Total documents processed: {total_docs}")
print(f" Documents compressed: {compressed_docs}")
if total_docs > 0:
print(f" Compression ratio: {compressed_docs/total_docs:.2%}")
def main():
if len(sys.argv) != 2:
print("Usage: python compress_packs.py <pack_path>")
sys.exit(1)
pack_path = sys.argv[1]
compress_pack(pack_path)
if __name__ == "__main__":
main()