#!/usr/bin/env python3 """ Gradio interface for Warbler CDA on HuggingFace Spaces. Provides a web UI for the FractalStat RAG system with GPU acceleration. """ import gradio as gr import time # Import the HuggingFace Spaces GPU decorator try: from spaces import GPU except ImportError: # Fallback if not available GPU = lambda func: func # Import Warbler CDA components from warbler_cda.retrieval_api import RetrievalAPI, RetrievalQuery, RetrievalMode from warbler_cda.embeddings import EmbeddingProviderFactory from warbler_cda.fractalstat_rag_bridge import FractalStatRAGBridge from warbler_cda.semantic_anchors import SemanticAnchorGraph from warbler_cda.pack_loader import PackLoader from warbler_cda.conflict_detector import ConflictDetector # Add Bob the Skeptic # Initialize the system print("🚀 Initializing Warbler CDA...") # Initialize the system components print("⚙️ Creating embedding provider with multi-worker GPU acceleration...") embedding_provider = EmbeddingProviderFactory.create_provider("sentence_transformer", { "num_workers": 4, # Restore multi-worker for speed (device test will handle ZeroGPU) "batch_size": 64, # Larger batches for better throughput "cache_dir": ".embedding_cache" # Let device auto-detection handle ZeroGPU (will fall back to CPU if needed) }) print(f"✅ Embedding provider: {embedding_provider.get_provider_info()['provider_id']}") print("⚙️ Initializing semantic anchors...") semantic_anchors = SemanticAnchorGraph(embedding_provider=embedding_provider) print("✅ Semantic anchors initialized") print("⚙️ Initializing FractalStat bridge...") fractalstat_bridge = FractalStatRAGBridge() print("✅ FractalStat bridge initialized") print("🕵️ Initializing Bob the Skeptic (conflict detector)...") bob_the_skeptic = ConflictDetector(embedding_provider=embedding_provider) print("✅ Bob the Skeptic initialized") print("⚙️ Creating RetrievalAPI...") api = RetrievalAPI( semantic_anchors=semantic_anchors, embedding_provider=embedding_provider, fractalstat_bridge=fractalstat_bridge, conflict_detector=bob_the_skeptic, config={"enable_fractalstat_hybrid": True} ) print("✅ RetrievalAPI initialized with Bob the Skeptic") # Load packs print("📚 Loading Warbler packs...") pack_loader = PackLoader() documents = pack_loader.discover_documents() # If no packs found, try to download them with timeout protection if len(documents) == 0: print("⚠️ No packs found locally. Attempting to download from HuggingFace...") try: # Suppress HF datasets progress bars for cleaner output import os os.environ["HF_HUB_DISABLE_PROGRESS_BARS"] = "1" from warbler_cda.utils.hf_warbler_ingest import HFWarblerIngestor # ADD TIMEOUT PROTECTION for HF Spaces import signal def timeout_handler(signum, frame): raise TimeoutError("HF download timed out") # Set 3-minute timeout for small downloads (should be plenty for 500 papers) signal.signal(signal.SIGALRM, timeout_handler) signal.alarm(180) # 3 minutes try: ingestor = HFWarblerIngestor(packs_dir=pack_loader.packs_dir, verbose=True) # Enable all available HF dataset packs for maximum knowledge diversity datasets_to_download = [ "arxiv", # Physics and mathematics papers "edustories", # Educational narratives and stories "novels", # Fiction literature "manuals", # Technical documentation "enterprise", # Business and corporate content "prompt-report" # AI prompt engineering reports ] total_docs = 0 successful_downloads = 0 for dataset in datasets_to_download: try: print(f"📦 Downloading {dataset} (timeout: 3 minutes)...") # Balance between coverage and deployment time - 50k arxiv papers plus all other packs arxiv_limit = 50000 if dataset == "arxiv" else None # Balanced capacity success = ingestor.ingest_dataset(dataset, arxiv_limit=arxiv_limit) if success: successful_downloads += 1 # Count documents in this pack pack_docs = pack_loader.discover_documents() new_count = len(pack_docs) - total_docs total_docs = len(pack_docs) print(f"✅ {dataset}: {new_count} documents successfully loaded") else: print(f"❌ {dataset} download failed (no documents retrieved)") except Exception as e: print(f"❌ {dataset} download error: {e}") continue # Continue to next dataset instead of failing completely print(f"📊 Total: {total_docs} documents from {successful_downloads}/{len(datasets_to_download)} packs") if successful_downloads > 0: # Reload after download documents = pack_loader.discover_documents() print(f"✅ Downloaded {len(documents)} documents") else: print("❌ No HF datasets downloaded successfully, using sample documents...") documents = [] signal.alarm(0) # Cancel timeout except TimeoutError: print("⏰ HF download timed out - proceeding with sample documents") documents = [] signal.alarm(0) # Cancel timeout except Exception as e: print(f"⚠️ Could not download packs: {e}") print("Using sample documents instead...") documents = [] if len(documents) == 0: # Fallback to comprehensive sample documents that match common test queries sample_docs = [ {"id": "sample1", "content": "FractalStat is an 8-dimensional addressing system for intelligent retrieval.", "metadata": {}}, {"id": "sample2", "content": "Semantic search finds documents by meaning, not just keywords.", "metadata": {}}, {"id": "sample3", "content": "Bob the Skeptic validates results to prevent bias and hallucinations.", "metadata": {}}, {"id": "sample4", "content": "Hello world! This is a sample document for testing search functionality.", "metadata": {}}, {"id": "sample5", "content": "This document contains information about rotation dynamics of Saturn's moons.", "metadata": {}}, {"id": "sample6", "content": "Machine learning and artificial intelligence are transforming technology.", "metadata": {}}, {"id": "sample7", "content": "Ancient library keepers preserved wisdom through generations.", "metadata": {}}, {"id": "sample8", "content": "Wisdom about courage comes from facing fears directly.", "metadata": {}}, ] for doc in sample_docs: api.add_document(doc["id"], doc["content"], doc["metadata"]) print(f"✅ Loaded {len(sample_docs)} comprehensive sample documents") else: print(f"✅ Found {len(documents)} documents") # Batch process embeddings for much faster ingestion if documents: # Extract content for batch embeddings content_list = [doc["content"] for doc in documents] print(f"⚙️ Batch embedding {len(content_list)} documents with {embedding_provider.num_workers} workers...") # Get embeddings in parallel batches embeddings_list = embedding_provider.embed_batch(content_list, show_progress=True) print(f"✅ Batch embeddings completed: {len(embeddings_list)} embeddings") # Ingest documents with pre-computed embeddings for i, doc in enumerate(documents): embedding = embeddings_list[i] if i < len(embeddings_list) else None # Create FractalStat coordinates fractalstat_coords = None if embedding and hasattr(embedding_provider, "compute_fractalstat_from_embedding"): fractalstat_coords = embedding_provider.compute_fractalstat_from_embedding(embedding) api.add_document( doc_id=doc["id"], content=doc["content"], metadata=doc.get("metadata", {}), embedding=embedding, fractalstat_coordinates=fractalstat_coords ) print(f"🎉 Warbler CDA ready with {api.get_context_store_size()} documents!") else: print("🎉 Warbler CDA ready with 0 documents") @GPU def query_warbler(query_text: str, max_results: int = 5, use_hybrid: bool = True) -> str: """Query the Warbler CDA system.""" if not query_text.strip(): return "Please enter a query." start_time = time.time() # Create query - use hybrid mode when requested query_mode = RetrievalMode.HYBRID_SEMANTIC_FRACTALSTAT if use_hybrid else RetrievalMode.SEMANTIC_SIMILARITY query = RetrievalQuery( query_id=f"gradio_{int(time.time())}", mode=query_mode, semantic_query=query_text, max_results=max_results, fractalstat_hybrid=use_hybrid, # Full hybrid mode when user enables it confidence_threshold=0.3 # Restore normal threshold now that we might have real HF data ) # DEBUG: Log query details print(f"DEBUG: Executing query '{query_text}' with mode={query_mode}, max_results={max_results}, hybrid={use_hybrid}") # Execute query assembly = api.retrieve_context(query) elapsed_ms = (time.time() - start_time) * 1000 # DEBUG: Log results summary with details print(f"DEBUG: Query completed in {elapsed_ms:.0f}ms, found {len(assembly.results)} results") if assembly.results: print(f"DEBUG: Top 3 relevance scores: {[r.relevance_score for r in assembly.results[:3]]}") print(f"DEBUG: Confidence threshold was: {query.confidence_threshold}") else: print(f"DEBUG: No results above threshold: {query.confidence_threshold}") # Hybrid Fallback: If hybrid mode and no results, fall back to pure semantic search if use_hybrid and len(assembly.results) == 0: print(f"DEBUG: Hybrid returned 0 results, falling back to pure semantic search") # Reset timer for fallback query fallback_start = time.time() query.confidence_threshold = 0.2 # Lower threshold for semantic fallback query.fractalstat_hybrid = False # Disable hybrid for this query # Re-execute query with semantic-only mode assembly = api.retrieve_context(query) fallback_ms = (time.time() - fallback_start) * 1000 elapsed_ms = (time.time() - start_time) * 1000 # Update total time print(f"DEBUG: Semantic fallback completed in {fallback_ms:.0f}ms, found {len(assembly.results)} results") if assembly.results: print(f"DEBUG: Top 3 relevance scores from semantic: {[r.relevance_score for r in assembly.results[:3]]}") print(f"DEBUG: Hybrid fallback successful - results returned via semantic search") hybrid_fallback_used = use_hybrid and len(assembly.results) > 0 and not query.fractalstat_hybrid query_mode_display = f"{query_mode} (+ Semantic Fallback)" if hybrid_fallback_used else query_mode # Format results output = f"## Query Results\n\n" output += f"**Query:** {query_text}\n\n" output += f"**Found:** {len(assembly.results)} results in {elapsed_ms:.0f}ms\n\n" output += f"**Quality Score:** {assembly.assembly_quality:.3f}\n\n" if assembly.results: output += "### Top Results\n\n" for i, result in enumerate(assembly.results[:max_results], 1): output += f"**{i}. Score: {result.relevance_score:.3f}**\n\n" output += f"{result.content[:300]}...\n\n" if use_hybrid: output += f"- Semantic: {result.semantic_similarity:.3f}\n" output += f"- FractalStat: {result.fractalstat_resonance:.3f}\n\n" output += "---\n\n" else: output += "No results found.\n" return output def get_system_stats() -> str: """Get comprehensive system statistics with real-time updates.""" metrics = api.get_retrieval_metrics() # Get current time for freshness indicator current_time = time.strftime("%H:%M:%S UTC", time.gmtime()) output = "## System Statistics\n\n" output += f"**Last Updated:** {current_time}\n\n" # Document Store output += "### 📚 Document Store\n\n" output += f"**Total Documents:** {api.get_context_store_size():,}\n\n" output += f"**Document Types:** Scientific papers, novels, education, fiction, technical docs\n\n" # Query Performance output += "### ⚡ Query Performance\n\n" output += f"**Total Queries:** {metrics['retrieval_metrics']['total_queries']}\n\n" output += f"**Cache Hit Rate:** {metrics['cache_performance']['hit_rate']:.1%}\n\n" output += f"**Average Response Time:** {metrics['retrieval_metrics']['average_retrieval_time_ms']:.0f}ms\n\n" output += f"**Average Quality Score:** {metrics['system_health']['average_quality']:.3f}\n\n" # Conflict Detection (Bob the Skeptic) output += "### 🕵️ Bob the Skeptic - Conflict Detection\n\n" # Access conflict detector if available conflict_detector = getattr(api, 'conflict_detector', None) if hasattr(api, 'config') and api.config else None if conflict_detector and hasattr(conflict_detector, 'get_global_conflict_summary'): try: conflict_summary = conflict_detector.get_global_conflict_summary() output += f"**Total Conflicts Detected:** {conflict_summary['total_conflicts']}\n\n" # Show confidence breakdown conf_dist = conflict_summary['confidence_distribution'] output += f"**Conflict Confidence Levels:**\n" output += f"- High Confidence (80%+): {conf_dist['high']}\n" output += f"- Medium Confidence (60-79%): {conf_dist['medium']}\n" output += f"- Low Confidence (<60%): {conf_dist['low']}\n\n" output += f"**Recent Conflicts:** {conflict_summary['recent_conflicts_1h']} in last hour\n\n" output += f"**System Health Score:** {conflict_summary['system_health_score']:.2f}\n\n" except Exception as e: output += f"**Status:** Error accessing conflict detector: {str(e)}\n\n" else: output += "**Status:** Conflict detection not configured or unavailable\n\n" # FractalStat Intelligence output += "### 🔄 FractalStat Intelligence\n\n" # Check if fractalstat bridge is available if api.fractalstat_bridge: output += "**Status:** Active - 8D multi-dimensional addressing enabled\n\n" available_dimensions = [ "Realm (semantic domains)", "Lineage (generation)", "Adjacency (connectivity)", "Horizon (lifecycle stages)", "Luminosity (semantic brightness)", "Polarity (tension/resonance)", "Dimensionality (complexity)", "Alignment (social coordination)" ] output += "**Active Dimensions:**\n- " + "\n- ".join(available_dimensions) + "\n\n" # Entanglement status if hasattr(api.fractalstat_bridge, 'entanglement_resonance'): output += "**Entanglement Engine:** ✅ ACTIVE - Cross-coordinate conceptual connections\n\n" else: output += "**Entanglement Engine:** ❌ NOT YET INTEGRATED\n\n" else: output += "**Status:** Not configured\n\n" # System Health output += "### 🏥 System Health\n\n" output += "**Overall Status:** 🟢 Operational\n\n" output += "**Components:**\n" component_status = metrics['system_health'] output += f"- Semantic Anchors: {'✅' if component_status.get('semantic_anchors_available', False) else '❌'}\n" output += f"- Embedding Provider: {'✅' if component_status.get('embedding_provider_available', False) else '❌'}\n" output += f"- FractalStat Bridge: {'✅' if component_status.get('fractalstat_bridge_available', False) else '❌'}\n" output += f"- Conflict Detector: {'✅' if conflict_detector else '❌'}\n\n" # Recent Activity output += "### 📈 Recent Activity\n\n" output += f"**Retrieval Success Rate:** {metrics['system_health']['retrieval_success_rate']:.1% if 'retrieval_success_rate' in metrics['system_health'] else 'N/A'}\n\n" return output # Create Gradio interface with gr.Blocks(title="Warbler CDA - FractalStat RAG") as demo: gr.Markdown(""" # 🦜 Warbler CDA - FractalStat RAG System Semantic retrieval with 8D FractalStat multi-dimensional addressing and intelligent fallback. **Features:** - 165,000+ documents from arXiv, novels, education, and fiction - Hybrid semantic + FractalStat scoring with automatic fallback - Smart scoring: semantic search works for plain English, hybrid excels at technical queries - Bob the Skeptic bias detection - ZeroGPU compatible for reliable HuggingFace Spaces deployment """) with gr.Tab("Query"): with gr.Row(): with gr.Column(): query_input = gr.Textbox( label="Query", placeholder="Enter your search query...", lines=2 ) max_results = gr.Slider( minimum=1, maximum=20, value=5, step=1, label="Max Results" ) use_hybrid = gr.Checkbox( label="Enable FractalStat Hybrid Scoring", value=True # Enable by default - users want the 8D system ) query_btn = gr.Button("Search", variant="primary") with gr.Column(): results_output = gr.Markdown(label="Results") # Add Bob quarantine info to results def format_results_with_quarantine(output: str, results_count: int, quarantined_count: int) -> str: """Format results with Bob's quarantine information.""" if quarantined_count > 0: status_line = f"**Bob the Skeptic**: {quarantined_count} conflicting results quarantined, {results_count} results retained\n\n" elif quarantined_count == 0: status_line = "**Bob the Skeptic**: No conflicting results detected\n\n" else: # quarantined_count is None when Bob not available status_line = "**Bob the Skeptic**: Conflict detection unavailable\n\n" return output.replace("## Query Results\n\n", "## Query Results\n\n" + status_line) def query_warbler_with_quarantine(query_text: str, max_results: int = 5, use_hybrid: bool = True) -> str: """Query with Bob's quarantine reporting.""" if not query_text.strip(): return "Please enter a query." start_time = time.time() # Create query - use hybrid mode when requested query_mode = RetrievalMode.HYBRID_SEMANTIC_FRACTALSTAT if use_hybrid else RetrievalMode.SEMANTIC_SIMILARITY query = RetrievalQuery( query_id=f"gradio_{int(time.time())}", mode=query_mode, semantic_query=query_text, max_results=max_results * 2, # Get extra results for quarantine prioritizing fractalstat_hybrid=use_hybrid, # Full hybrid mode when user enables it confidence_threshold=0.3 # Restore normal threshold now that we might have real HF data ) # DEBUG: Log query details print(f"DEBUG: Executing query '{query_text}' with mode={query_mode}, max_results={max_results}, hybrid={use_hybrid}") # Execute query assembly = api.retrieve_context(query) elapsed_ms = (time.time() - start_time) * 1000 # Find quarantined results (those with conflict flags) original_results = len(assembly.results) quarantined_results = [r for r in assembly.results if r.conflict_flags] retained_results = [r for r in assembly.results if not r.conflict_flags] # Limit to requested max_results from retained results final_results = retained_results[:max_results] # DEBUG: Log results summary with Bob information print(f"DEBUG: Query completed in {elapsed_ms:.0f}ms, " f"found {original_results} raw results, " f"Bob quarantined {len(quarantined_results)}, " f"retained {len(final_results)} for display") if assembly.results: print(f"DEBUG: Top 3 relevance scores: {[r.relevance_score for r in assembly.results[:3]]}") print(f"DEBUG: Confidence threshold was: {query.confidence_threshold}") # Hybrid Fallback: If hybrid mode and no final results from retained, try semantic on quarantine pool fallback_triggered = False if use_hybrid and len(final_results) == 0 and len(quarantined_results) > 0: print(f"DEBUG: Hybrid retained 0 results, checking quarantined pool for semantic fallback") # Create semantic-only query for quarantined content semantic_query = RetrievalQuery( query_id=f"quarantine_fallback_{int(time.time())}", mode=RetrievalMode.SEMANTIC_SIMILARITY, semantic_query=query_text, max_results=max_results, confidence_threshold=0.3, fractalstat_hybrid=False ) # Note: In a full implementation, we'd re-query just the quarantined IDs # For now, trigger full semantic search as fallback fallback_assembly = api.retrieve_context(semantic_query) fallback_triggered = True final_results = fallback_assembly.results[:max_results] elapsed_ms += (time.time() - start_time - elapsed_ms/1000) * 1000 print(f"DEBUG: Quarantine fallback triggered - retrieved {len(final_results)} from semantic search") hybrid_fallback_used = fallback_triggered # Format results with Bob quarantine info output = f"## Query Results\n\n" output += f"**Query:** {query_text}\n\n" output += f"**Found:** {len(final_results)} results in {elapsed_ms:.0f}ms\n\n" if len(quarantined_results) > 0: output += f"**Risk Assessment:** {len(quarantined_results)} potentially conflicting results quarantined by Bob the Skeptic\n\n" elif len(final_results) < original_results: output += f"**Risk Assessment:** {original_results - len(final_results)} duplicates removed\n\n" else: output += "**Risk Assessment:** No conflicts detected by Bob the Skeptic\n\n" output += f"**Quality Score:** {assembly.assembly_quality:.3f}\n\n" if final_results: output += "### Top Results\n\n" for i, result in enumerate(final_results[:max_results], 1): output += f"**{i}. Score: {result.relevance_score:.3f}**\n\n" output += f"{result.content[:300]}...\n\n" if use_hybrid and result.fractalstat_resonance is not None: output += f"- Semantic: {result.semantic_similarity:.3f}\n" output += f"- FractalStat: {result.fractalstat_resonance:.3f}\n\n" output += "---\n\n" else: output += "No results found.\n" if hybrid_fallback_used: output += "*\\*Note: Used semantic search because hybrid results were quarantined\\*\n" return output query_btn.click( # pylint: disable=E1101 fn=query_warbler_with_quarantine, inputs=[query_input, max_results, use_hybrid], outputs=results_output ) gr.Examples( examples=[ ["hello world", 5, False], # Semantic search for plain English ["rotation dynamics of Saturn's moons", 5, True], # Hybrid for technical/scientific ["dancing under the moon", 5, False], # Semantic for casual queries ["interplanetary approach maneuvers", 5, True], # Hybrid for scientific terms ], inputs=[query_input, max_results, use_hybrid] ) with gr.Tab("System Stats"): stats_output = gr.Markdown() stats_btn = gr.Button("Refresh Stats") stats_btn.click(fn=get_system_stats, outputs=stats_output) # pylint: disable=E1101 demo.load(fn=get_system_stats, outputs=stats_output) # pylint: disable=E1101 with gr.Tab("About"): gr.Markdown(""" ## About Warbler CDA Warbler CDA is a production-ready RAG system featuring: - **8D FractalStat Addressing**: Multi-dimensional intelligence for superior retrieval - **Semantic Anchors**: Persistent memory with provenance tracking - **Bob the Skeptic**: Automatic bias detection and validation - **Narrative Coherence**: Quality analysis beyond simple similarity ### Performance - 84% test coverage with 587 passing tests - 9-28s query response time - 0.88 average relevance score - 75-83% narrative coherence ### Links - [Source Code](https://gitlab.com/tiny-walnut-games/the-seed) - [Documentation](https://gitlab.com/tiny-walnut-games/the-seed/-/tree/main/warbler-cda-package) - [Performance Report](https://gitlab.com/tiny-walnut-games/the-seed/-/blob/main/warbler-cda-package/WARBLER_CDA_PERFORMANCE_REPORT.md) """) if __name__ == "__main__": demo.launch(server_name="0.0.0.0", server_port=7860)