Spaces:
Running
on
Zero
Running
on
Zero
| """ | |
| Warbler CDA - HuggingFace Space Demo | |
| Interactive demo of the Cognitive Development Architecture RAG system | |
| """ | |
| import json | |
| import time | |
| import os | |
| import threading | |
| import gradio as gr | |
| import spaces | |
| from pathlib import Path | |
| from typing import Tuple, Optional, Dict | |
| # Set TOKENIZERS_PARALLELISM to avoid warnings with SentenceTransformers | |
| os.environ["TOKENIZERS_PARALLELISM"] = "false" | |
| # Global variables for background ingestion tracking | |
| ingestion_status = { | |
| "running": False, | |
| "total_docs": 0, | |
| "processed": 0, | |
| "failed": 0, | |
| "start_time": None, | |
| "eta": 0, | |
| "rate": 0, | |
| } | |
| def background_ingest_packs(api, pack_docs, pack_manager): | |
| """Background function to ingest packs without blocking app startup""" | |
| global ingestion_status | |
| # Suppress numpy warnings during ingestion to avoid cluttering logs in HF Spaces | |
| import warnings | |
| with warnings.catch_warnings(): | |
| warnings.filterwarnings("ignore", message="invalid value encountered", category=RuntimeWarning) | |
| ingestion_status["running"] = True | |
| ingestion_status["total_docs"] = len(pack_docs) | |
| ingestion_status["processed"] = 0 | |
| ingestion_status["failed"] = 0 | |
| ingestion_status["start_time"] = time.time() | |
| print(f"[INFO] Ingesting {len(pack_docs)} documents from Warbler packs...") | |
| total_docs = len(pack_docs) | |
| processed = 0 | |
| failed = 0 | |
| start_time = time.time() | |
| batch_size = 1000 | |
| # Process in batches to avoid memory issues and provide progress | |
| for batch_start in range(0, total_docs, batch_size): | |
| batch_end = min(batch_start + batch_size, total_docs) | |
| batch = pack_docs[batch_start:batch_end] | |
| batch_processed = 0 | |
| batch_failed = 0 | |
| for doc in batch: | |
| success = api.add_document(doc["id"], doc["content"], doc["metadata"]) | |
| if not success: | |
| batch_failed += 1 | |
| failed += 1 | |
| if failed <= 5: # Log first few failures | |
| print(f"[WARN] Failed to add document {doc['id']}") | |
| batch_processed += 1 | |
| processed += 1 | |
| # Update global status | |
| ingestion_status["processed"] = processed | |
| ingestion_status["failed"] = failed | |
| # Progress update after each batch | |
| elapsed = time.time() - start_time | |
| rate = processed / elapsed if elapsed > 0 else 0 | |
| eta = (total_docs - processed) / rate if rate > 0 else 0 | |
| ingestion_status["rate"] = rate | |
| ingestion_status["eta"] = eta | |
| print( | |
| f"[PROGRESS] {processed}/{total_docs} documents ingested " | |
| f"({processed/total_docs*100:.1f}%) - " | |
| f"{rate:.1f} docs/sec - ETA: {eta/60:.1f} min" | |
| ) | |
| # Force garbage collection after large batches to free memory | |
| if processed % 10000 == 0: | |
| import gc | |
| gc.collect() | |
| packs_loaded = processed | |
| pack_manager.mark_packs_ingested(1, packs_loaded) | |
| total_time = time.time() - start_time | |
| print( | |
| f"[OK] Loaded {packs_loaded} documents from Warbler packs " | |
| f"({failed} failed) in {total_time:.1f} seconds" | |
| ) | |
| # Mark ingestion complete | |
| ingestion_status["running"] = False | |
| SAMPLE_DOCS = [ | |
| { | |
| "id": "wisdom_1", | |
| "content": "True wisdom comes from understanding both success and failure. Each setback teaches resilience.", | |
| "metadata": { | |
| "realm_type": "wisdom", | |
| "realm_label": "philosophy", | |
| "lifecycle_stage": "peak", | |
| }, | |
| }, | |
| { | |
| "id": "wisdom_2", | |
| "content": "Courage is not the absence of fear, but the determination to act despite it.", | |
| "metadata": { | |
| "realm_type": "wisdom", | |
| "realm_label": "virtue", | |
| "lifecycle_stage": "emergence", | |
| }, | |
| }, | |
| { | |
| "id": "tech_1", | |
| "content": "The Warbler CDA system uses STAT7 addressing for multi-dimensional retrieval.", | |
| "metadata": { | |
| "realm_type": "technical", | |
| "realm_label": "documentation", | |
| "lifecycle_stage": "peak", | |
| }, | |
| }, | |
| { | |
| "id": "narrative_1", | |
| "content": "In the ancient library, the keeper of memories preserved stories across generations.", | |
| "metadata": { | |
| "realm_type": "narrative", | |
| "realm_label": "lore", | |
| "lifecycle_stage": "crystallization", | |
| }, | |
| }, | |
| { | |
| "id": "pattern_1", | |
| "content": "Patterns emerge when we observe the connections between seemingly unrelated events.", | |
| "metadata": { | |
| "realm_type": "pattern", | |
| "realm_label": "insight", | |
| "lifecycle_stage": "emergence", | |
| }, | |
| }, | |
| ] | |
| class PackManager: | |
| def __init__(self): | |
| self.cache_dir = Path.home() / ".warbler_cda" / "cache" | |
| self.cache_dir.mkdir(parents=True, exist_ok=True) | |
| self.metadata_file = self.cache_dir / "pack_metadata.json" | |
| self.skip_cache = os.getenv("WARBLER_SKIP_PACK_CACHE", "").lower() == "true" | |
| self.sample_only = os.getenv("WARBLER_SAMPLE_ONLY", "").lower() == "true" | |
| self.ingest_packs = os.getenv("WARBLER_INGEST_PACKS", "true").lower() == "true" | |
| def _load_metadata(self) -> Optional[Dict]: | |
| if not self.metadata_file.exists(): | |
| return None | |
| try: | |
| with open(self.metadata_file, "r") as f: | |
| return json.load(f) | |
| except BaseException: | |
| return None | |
| def _save_metadata(self, metadata: Dict): | |
| try: | |
| with open(self.metadata_file, "w") as f: | |
| json.dump(metadata, f, indent=2) | |
| except Exception as e: | |
| print(f"[WARN] Failed to save pack metadata: {e}") | |
| def health_check(self, api, expected_doc_count: int = None) -> bool: | |
| if not api: | |
| return False | |
| try: | |
| current_size = api.get_context_store_size() | |
| if expected_doc_count and current_size < expected_doc_count: | |
| return False | |
| return current_size > 0 | |
| except BaseException: | |
| return False | |
| def should_ingest_packs(self, api, pack_count: int) -> bool: | |
| if self.skip_cache or not self.ingest_packs or self.sample_only: | |
| return False | |
| if not self.health_check(api, expected_doc_count=10): | |
| return True | |
| metadata = self._load_metadata() | |
| if not metadata or metadata.get("pack_count") != pack_count: | |
| return True | |
| return False | |
| def mark_packs_ingested(self, pack_count: int, doc_count: int): | |
| metadata = { | |
| "ingested_at": time.time(), | |
| "pack_count": pack_count, | |
| "doc_count": doc_count, | |
| "status": "healthy", | |
| } | |
| self._save_metadata(metadata) | |
| pack_manager = PackManager() | |
| try: | |
| from warbler_cda import ( | |
| RetrievalAPI, | |
| SemanticAnchorGraph, | |
| EmbeddingProviderFactory, | |
| STAT7RAGBridge, | |
| RetrievalQuery, | |
| RetrievalMode, | |
| ) | |
| from warbler_cda.pack_loader import PackLoader | |
| WARBLER_AVAILABLE = True | |
| except ImportError: | |
| WARBLER_AVAILABLE = False | |
| print("Warning: Warbler CDA not installed. Using mock mode.") | |
| api = None | |
| if WARBLER_AVAILABLE: | |
| try: | |
| embedding_provider = EmbeddingProviderFactory.get_default_provider() | |
| semantic_anchors = SemanticAnchorGraph(embedding_provider=embedding_provider) | |
| stat7_bridge = STAT7RAGBridge() | |
| api = RetrievalAPI( | |
| semantic_anchors=semantic_anchors, | |
| embedding_provider=embedding_provider, | |
| stat7_bridge=stat7_bridge, | |
| config={"enable_stat7_hybrid": True}, | |
| ) | |
| packs_loaded = 0 | |
| if pack_manager.sample_only: | |
| print("[INFO] Loading sample documents only (WARBLER_SAMPLE_ONLY=true)") | |
| for doc in SAMPLE_DOCS: | |
| api.add_document(doc["id"], doc["content"], doc["metadata"]) | |
| packs_loaded = len(SAMPLE_DOCS) | |
| print(f"[OK] Loaded {packs_loaded} sample documents") | |
| elif pack_manager.ingest_packs: | |
| from warbler_cda.pack_sync import PackSync | |
| pack_sync = PackSync() | |
| sync_status = pack_sync.get_sync_status() | |
| print(f"[INFO] Pack Status: {sync_status}") | |
| pack_loader = PackLoader() | |
| pack_docs = pack_loader.discover_documents() | |
| if pack_docs and pack_manager.should_ingest_packs(api, len(pack_docs)): | |
| # Start background ingestion | |
| ingestion_thread = threading.Thread( | |
| target=background_ingest_packs, args=(api, pack_docs, pack_manager), daemon=True | |
| ) | |
| ingestion_thread.start() | |
| packs_loaded = 0 # Will be updated asynchronously | |
| print(f"[INFO] Started background ingestion of {len(pack_docs)} documents") | |
| elif pack_docs: | |
| packs_loaded = len(pack_docs) | |
| print(f"[INFO] Using cached pack data ({packs_loaded} documents)") | |
| else: | |
| print("[INFO] No Warbler packs found. Using sample documents instead.") | |
| for doc in SAMPLE_DOCS: | |
| api.add_document(doc["id"], doc["content"], doc["metadata"]) | |
| packs_loaded = len(SAMPLE_DOCS) | |
| print(f"[OK] Loaded {packs_loaded} sample documents") | |
| context_size = api.get_context_store_size() | |
| print(f"[OK] Total documents in context store: {context_size}") | |
| except Exception as e: | |
| print(f"[ERROR] Failed to initialize Warbler CDA: {e}") | |
| api = None | |
| import traceback | |
| traceback.print_exc() | |
| def query_warbler( | |
| query_text: str, | |
| max_results: int = 5, | |
| use_hybrid: bool = True, | |
| weight_semantic: float = 0.6, | |
| weight_stat7: float = 0.4, | |
| ) -> Tuple[str, str]: | |
| """ | |
| Query the Warbler CDA system | |
| Returns: | |
| Tuple of (results_text, metrics_json) | |
| """ | |
| if not WARBLER_AVAILABLE or not api: | |
| return "Warbler CDA not available. Please install the package.", "{}" | |
| if not query_text.strip(): | |
| return "Please enter a query.", "{}" | |
| try: | |
| start_time = time.time() | |
| print(f"DEBUG: Context store size: {api.get_context_store_size()}") | |
| # Create query | |
| query = RetrievalQuery( | |
| query_id=f"demo_{int(time.time())}", | |
| mode=RetrievalMode.SEMANTIC_SIMILARITY, | |
| semantic_query=query_text, | |
| max_results=max_results, | |
| confidence_threshold=0.3, | |
| stat7_hybrid=use_hybrid, | |
| weight_semantic=weight_semantic, | |
| weight_stat7=weight_stat7, | |
| ) | |
| print(f"DEBUG: Query created - ID: {query.query_id}, Text: {query_text}") | |
| # Execute query | |
| assembly = api.retrieve_context(query) | |
| print( | |
| f"DEBUG: Retrieved {len(assembly.results)} results, Assembly ID: {assembly.assembly_id}" | |
| ) | |
| elapsed_ms = (time.time() - start_time) * 1000 | |
| # Format results | |
| results_text = "# Query Results\n\n" | |
| results_text += f"**Query:** {query_text}\n\n" | |
| results_text += ( | |
| f"**Mode:** {'Hybrid (Semantic + STAT7)' if use_hybrid else 'Semantic Only'}\n\n" | |
| ) | |
| results_text += f"**Results Found:** {len(assembly.results)}\n\n" | |
| results_text += f"**Assembly Quality:** {assembly.assembly_quality:.3f}\n\n" | |
| results_text += f"**Execution Time:** {elapsed_ms:.1f}ms\n\n" | |
| results_text += "---\n\n" | |
| if assembly.results: | |
| for i, result in enumerate(assembly.results, 1): | |
| results_text += f"### Result {i}\n\n" | |
| results_text += f"**Relevance Score:** {result.relevance_score:.3f}\n\n" | |
| if use_hybrid: | |
| results_text += f"- Semantic Similarity: {result.semantic_similarity:.3f}\n" | |
| results_text += f"- STAT7 Resonance: {result.stat7_resonance:.3f}\n\n" | |
| results_text += f"**Content:** {result.content}\n\n" | |
| results_text += f"**Type:** {result.content_type}\n\n" | |
| if result.metadata: | |
| results_text += "**Metadata:**\n" | |
| for key, value in result.metadata.items(): | |
| if key != "stat7": # Skip complex STAT7 object | |
| results_text += f"- {key}: {value}\n" | |
| results_text += "\n" | |
| results_text += "---\n\n" | |
| else: | |
| results_text += ( | |
| "*No results found. Try adjusting your query or adding more documents.*\n" | |
| ) | |
| # Metrics | |
| metrics = { | |
| "query_id": assembly.assembly_id, | |
| "result_count": len(assembly.results), | |
| "total_relevance": assembly.total_relevance, | |
| "assembly_quality": assembly.assembly_quality, | |
| "temporal_span_hours": assembly.temporal_span_hours, | |
| "anchor_coverage": len(assembly.anchor_coverage), | |
| "execution_time_ms": elapsed_ms, | |
| "hybrid_mode": use_hybrid, | |
| } | |
| metrics_json = json.dumps(metrics, indent=2) | |
| return results_text, metrics_json | |
| except Exception as e: | |
| return f"Error: {str(e)}", json.dumps({"error": str(e)}, indent=2) | |
| def add_document(doc_id: str, content: str, realm_type: str, realm_label: str) -> str: | |
| """Add a new document to the system""" | |
| if not WARBLER_AVAILABLE or not api: | |
| return "Warbler CDA not available." | |
| if not doc_id.strip() or not content.strip(): | |
| return "Please provide both document ID and content." | |
| try: | |
| metadata = { | |
| "realm_type": realm_type, | |
| "realm_label": realm_label, | |
| "lifecycle_stage": "emergence", | |
| "activity_level": 0.7, | |
| } | |
| success = api.add_document(doc_id, content, metadata) | |
| if success: | |
| return f"[OK] Document '{doc_id}' added successfully!\n\nTotal documents: {api.get_context_store_size()}" | |
| else: | |
| return f"[ERROR] Document '{doc_id}' already exists." | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| def get_system_stats() -> str: | |
| """Get system statistics""" | |
| if not WARBLER_AVAILABLE or not api: | |
| return "Warbler CDA not available." | |
| try: | |
| metrics = api.get_retrieval_metrics() | |
| stats = "# System Statistics\n\n" | |
| stats += f"**Total Documents:** {metrics['context_store_size']}\n\n" | |
| stats += f"**Total Queries:** {metrics['retrieval_metrics']['total_queries']}\n\n" | |
| stats += f"**Cache Hit Rate:** {metrics['cache_performance']['hit_rate']:.1%}\n\n" | |
| stats += f"**Average Results per Query:** {metrics['retrieval_metrics']['average_results_per_query']:.1f}\n\n" | |
| stats += f"**Average Retrieval Time:** {metrics['retrieval_metrics']['average_retrieval_time_ms']:.1f}ms\n\n" | |
| stats += f"**Hybrid Queries:** {metrics['retrieval_metrics']['hybrid_queries']}\n\n" | |
| stats += "## Quality Distribution\n\n" | |
| for quality, count in metrics["retrieval_metrics"]["quality_distribution"].items(): | |
| stats += f"- {quality.capitalize()}: {count}\n" | |
| # Add ingestion status information | |
| global ingestion_status | |
| stats += "\n## Background Pack Ingestion\n\n" | |
| if ingestion_status["running"]: | |
| # Currently ingesting | |
| progress_percent = (ingestion_status["processed"] / ingestion_status["total_docs"] * 100) if ingestion_status["total_docs"] > 0 else 0 | |
| eta_minutes = ingestion_status["eta"] / 60 if ingestion_status["eta"] > 0 else 0 | |
| stats += "**Status:** 🟢 **ACTIVE** - Ingesting documents...\n\n" | |
| stats += "```\n" | |
| stats += f"Progress: {ingestion_status['processed']}/{ingestion_status['total_docs']} documents\n" | |
| stats += f"Complete: {progress_percent:.1f}%\n" | |
| stats += f"Rate: {ingestion_status['rate']:.1f} docs/sec\n" | |
| stats += f"ETA: {eta_minutes:.1f} minutes\n" | |
| if ingestion_status['failed'] > 0: | |
| stats += f"Failed: {ingestion_status['failed']} documents\n" | |
| stats += "```\n\n" | |
| elif ingestion_status["total_docs"] > 0: | |
| # Completed ingestion (has totals but not running) | |
| stats += "**Status:** ✅ **COMPLETE**\n\n" | |
| stats += f"**Last Ingestion:** Processed {ingestion_status['processed']} documents" | |
| if ingestion_status['failed'] > 0: | |
| stats += f" ({ingestion_status['failed']} failed)" | |
| stats += "\n\n" | |
| else: | |
| # No background ingestion detected | |
| stats += "**Status:** ⚪ **IDLE** - No background ingestion active\n\n" | |
| return stats | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| with gr.Blocks(title="Warbler CDA - RAG System Demo") as demo: | |
| gr.Markdown( | |
| """ | |
| # Warbler CDA - Cognitive Development Architecture | |
| Interactive demo of a production-ready RAG system with **STAT7 multi-dimensional addressing**. | |
| ## Features | |
| - **Semantic Search**: Find relevant documents using natural language | |
| - **STAT7 Hybrid Scoring**: Combine semantic similarity with 7-dimensional resonance | |
| - **Real-time Retrieval**: Sub-second query performance | |
| - **Provenance Tracking**: Full lineage and metadata preservation | |
| """ | |
| ) | |
| with gr.Tab("Query"): | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| query_input = gr.Textbox( | |
| label="Query", | |
| placeholder="Enter your search query (e.g., 'wisdom about courage')", | |
| lines=2, | |
| ) | |
| with gr.Row(): | |
| max_results = gr.Slider( | |
| minimum=1, maximum=10, value=5, step=1, label="Max Results" | |
| ) | |
| use_hybrid = gr.Checkbox(label="Enable STAT7 Hybrid Scoring", value=True) | |
| with gr.Row(): | |
| weight_semantic = gr.Slider( | |
| minimum=0.0, maximum=1.0, value=0.6, step=0.1, label="Semantic Weight" | |
| ) | |
| weight_stat7 = gr.Slider( | |
| minimum=0.0, maximum=1.0, value=0.4, step=0.1, label="STAT7 Weight" | |
| ) | |
| query_btn = gr.Button("Search", variant="primary") | |
| with gr.Column(scale=1): | |
| gr.Markdown( | |
| """ | |
| ### Example Queries | |
| - "wisdom about courage" | |
| - "technical documentation" | |
| - "narrative patterns" | |
| - "ancient knowledge" | |
| - "system architecture" | |
| """ | |
| ) | |
| with gr.Row(): | |
| results_output = gr.Markdown(label="Results") | |
| with gr.Row(): | |
| metrics_output = gr.JSON(label="Metrics") | |
| query_btn.click( | |
| fn=query_warbler, | |
| inputs=[query_input, max_results, use_hybrid, weight_semantic, weight_stat7], | |
| outputs=[results_output, metrics_output], | |
| ) | |
| with gr.Tab("Add Document"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| doc_id_input = gr.Textbox(label="Document ID", placeholder="unique_doc_id") | |
| content_input = gr.Textbox( | |
| label="Content", placeholder="Enter document content...", lines=5 | |
| ) | |
| with gr.Row(): | |
| realm_type_input = gr.Dropdown( | |
| choices=["wisdom", "technical", "narrative", "pattern", "data"], | |
| value="wisdom", | |
| label="Realm Type", | |
| ) | |
| realm_label_input = gr.Textbox( | |
| label="Realm Label", placeholder="e.g., philosophy, documentation" | |
| ) | |
| add_btn = gr.Button("Add Document", variant="primary") | |
| add_output = gr.Textbox(label="Status", lines=3) | |
| add_btn.click( | |
| fn=add_document, | |
| inputs=[doc_id_input, content_input, realm_type_input, realm_label_input], | |
| outputs=add_output, | |
| ) | |
| with gr.Tab("System Stats"): | |
| stats_btn = gr.Button("Refresh Statistics", variant="primary") | |
| stats_output = gr.Markdown() | |
| stats_btn.click(fn=get_system_stats, outputs=stats_output) | |
| # Auto-load stats on tab open | |
| demo.load(fn=get_system_stats, outputs=stats_output) | |
| # Refresh stats every 10 seconds if ingestion is running | |
| def auto_refresh_stats(): | |
| while ingestion_status["running"]: | |
| time.sleep(10) | |
| # Note: In Gradio, we can't directly update from background thread | |
| # This would need a more complex setup with queues or websockets | |
| # For now, users can manually refresh | |
| with gr.Tab("About"): | |
| gr.Markdown( | |
| """ | |
| ## About Warbler CDA | |
| Warbler CDA (Cognitive Development Architecture) is a production-ready RAG system featuring: | |
| ### STAT7 Multi-Dimensional Addressing | |
| Each document is addressed in 7 dimensions: | |
| 1. **Realm**: Domain classification | |
| 2. **Lineage**: Generation/version | |
| 3. **Adjacency**: Connectivity score | |
| 4. **Horizon**: Lifecycle stage | |
| 5. **Luminosity**: Activity level | |
| 6. **Polarity**: Resonance factor | |
| 7. **Dimensionality**: Complexity level | |
| ### Hybrid Scoring | |
| Combines traditional semantic similarity with STAT7 resonance for superior retrieval: | |
| ``` | |
| hybrid_score = (0.6 × semantic) + (0.4 × stat7_resonance) | |
| ``` | |
| ### Validated Performance | |
| - **EXP-01**: 0% collision rate across 10K+ entities | |
| - **EXP-02**: Sub-millisecond retrieval at 100K scale | |
| - **EXP-03**: All 7 dimensions proven necessary | |
| - **EXP-10**: Narrative coherence preserved under concurrent load | |
| ### Links | |
| - [GitHub Repository](https://github.com/tiny-walnut-games/the-seed) | |
| - [Documentation](https://github.com/tiny-walnut-games/the-seed/blob/main/README.md) | |
| - [PyPI Package](https://pypi.org/project/warbler-cda/) | |
| --- | |
| Made with love by Tiny Walnut Games | |
| """ | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() | |