Spaces:
Running
on
Zero
Running
on
Zero
| #!/usr/bin/env python3 | |
| """ | |
| Pack Compression Script using Evaporation Engine | |
| This script compresses warbler packs by replacing document content with | |
| compressed proto-thoughts generated by the evaporation engine. | |
| """ | |
| import json | |
| import sys | |
| from pathlib import Path | |
| from typing import Dict, Any, List | |
| # Add the project root to Python path | |
| sys.path.insert(0, str(Path(__file__).parent)) | |
| from warbler_cda.melt_layer import MeltLayer, MagmaStore | |
| from warbler_cda.evaporation import EvaporationEngine, CloudStore | |
| def load_jsonl_file(filepath: str) -> List[Dict[str, Any]]: | |
| """Load a JSONL file and return list of documents.""" | |
| documents = [] | |
| with open(filepath, "r", encoding="utf-8") as f: | |
| for line in f: | |
| line = line.strip() | |
| if line: | |
| documents.append(json.loads(line)) | |
| return documents | |
| def save_jsonl_file(filepath: str, documents: List[Dict[str, Any]]) -> None: | |
| """Save list of documents to a JSONL file.""" | |
| with open(filepath, "w", encoding="utf-8") as f: | |
| for doc in documents: | |
| f.write(json.dumps(doc, ensure_ascii=False) + "\n") | |
| def compress_pack(pack_path: str, output_suffix: str = "_compressed") -> None: | |
| """Compress a single pack using evaporation engine.""" | |
| pack_path = Path(pack_path) | |
| if not pack_path.exists(): | |
| raise FileNotFoundError(f"Pack path {pack_path} does not exist") | |
| # Find all JSONL files in the pack | |
| jsonl_files = list(pack_path.glob("*.jsonl")) | |
| if not jsonl_files: | |
| print(f"No JSONL files found in {pack_path}") | |
| return | |
| print(f"Found {len(jsonl_files)} JSONL files in {pack_path}") | |
| # Initialize evaporation components | |
| magma_store = MagmaStore() | |
| cloud_store = CloudStore() | |
| melt_layer = MeltLayer(magma_store) | |
| evaporation_engine = EvaporationEngine(magma_store, cloud_store) | |
| total_docs = 0 | |
| compressed_docs = 0 | |
| for jsonl_file in jsonl_files: | |
| print(f"Processing {jsonl_file.name}...") | |
| # Load documents | |
| documents = load_jsonl_file(str(jsonl_file)) | |
| total_docs += len(documents) | |
| compressed_documents = [] | |
| for doc in documents: | |
| if "content" not in doc: | |
| print("Warning: Document missing 'content' field, skipping") | |
| continue | |
| content = doc["content"] | |
| if not content or not isinstance(content, str): | |
| print("Warning: Empty or invalid content, skipping") | |
| continue | |
| try: | |
| # Create a fragment from the document content | |
| fragment = {"id": doc.get("content_id", f"doc_{compressed_docs}"), "text": content} | |
| # Create glyph from the single fragment | |
| melt_layer.retire_cluster({"fragments": [fragment]}) | |
| # Evaporate to get proto-thought | |
| mist_lines = evaporation_engine.evaporate(limit=1) | |
| if mist_lines: | |
| proto_thought = mist_lines[0]["proto_thought"] | |
| # Replace content with compressed proto-thought | |
| compressed_doc = doc.copy() | |
| compressed_doc["content"] = proto_thought | |
| compressed_doc["original_content_length"] = len(content) | |
| compressed_doc["compressed_content_length"] = len(proto_thought) | |
| compressed_documents.append(compressed_doc) | |
| compressed_docs += 1 | |
| else: | |
| print( | |
| f"Warning: Failed to evaporate glyph for document {doc.get('content_id', 'unknown')}" | |
| ) | |
| # Keep original document if evaporation fails | |
| compressed_documents.append(doc) | |
| except Exception as e: | |
| print(f"Error processing document {doc.get('content_id', 'unknown')}: {e}") | |
| # Keep original document on error | |
| compressed_documents.append(doc) | |
| # Save compressed file | |
| output_file = jsonl_file.parent / f"{jsonl_file.stem}{output_suffix}{jsonl_file.suffix}" | |
| save_jsonl_file(str(output_file), compressed_documents) | |
| print(f"Saved compressed file: {output_file}") | |
| print("Compression complete:") | |
| print(f" Total documents processed: {total_docs}") | |
| print(f" Documents compressed: {compressed_docs}") | |
| if total_docs > 0: | |
| print(f" Compression ratio: {compressed_docs/total_docs:.2%}") | |
| def main(): | |
| if len(sys.argv) != 2: | |
| print("Usage: python compress_packs.py <pack_path>") | |
| sys.exit(1) | |
| pack_path = sys.argv[1] | |
| compress_pack(pack_path) | |
| if __name__ == "__main__": | |
| main() | |