warbler-cda / app.py
Bellok
refactor(stats): replace separate successful retrievals and error rate with combined retrieval success rate
d5e328f
#!/usr/bin/env python3
"""
Gradio interface for Warbler CDA on HuggingFace Spaces.
Provides a web UI for the FractalStat RAG system with GPU acceleration.
"""
import gradio as gr
import time
# Import the HuggingFace Spaces GPU decorator
try:
from spaces import GPU
except ImportError:
# Fallback if not available
GPU = lambda func: func
# Import Warbler CDA components
from warbler_cda.retrieval_api import RetrievalAPI, RetrievalQuery, RetrievalMode
from warbler_cda.embeddings import EmbeddingProviderFactory
from warbler_cda.fractalstat_rag_bridge import FractalStatRAGBridge
from warbler_cda.semantic_anchors import SemanticAnchorGraph
from warbler_cda.pack_loader import PackLoader
from warbler_cda.conflict_detector import ConflictDetector # Add Bob the Skeptic
# Initialize the system
print("πŸš€ Initializing Warbler CDA...")
# Initialize the system components
print("βš™οΈ Creating embedding provider with multi-worker GPU acceleration...")
embedding_provider = EmbeddingProviderFactory.create_provider("sentence_transformer", {
"num_workers": 4, # Restore multi-worker for speed (device test will handle ZeroGPU)
"batch_size": 64, # Larger batches for better throughput
"cache_dir": ".embedding_cache"
# Let device auto-detection handle ZeroGPU (will fall back to CPU if needed)
})
print(f"βœ… Embedding provider: {embedding_provider.get_provider_info()['provider_id']}")
print("βš™οΈ Initializing semantic anchors...")
semantic_anchors = SemanticAnchorGraph(embedding_provider=embedding_provider)
print("βœ… Semantic anchors initialized")
print("βš™οΈ Initializing FractalStat bridge...")
fractalstat_bridge = FractalStatRAGBridge()
print("βœ… FractalStat bridge initialized")
print("πŸ•΅οΈ Initializing Bob the Skeptic (conflict detector)...")
bob_the_skeptic = ConflictDetector(embedding_provider=embedding_provider)
print("βœ… Bob the Skeptic initialized")
print("βš™οΈ Creating RetrievalAPI...")
api = RetrievalAPI(
semantic_anchors=semantic_anchors,
embedding_provider=embedding_provider,
fractalstat_bridge=fractalstat_bridge,
conflict_detector=bob_the_skeptic,
config={"enable_fractalstat_hybrid": True}
)
print("βœ… RetrievalAPI initialized with Bob the Skeptic")
# Load packs
print("πŸ“š Loading Warbler packs...")
pack_loader = PackLoader()
documents = pack_loader.discover_documents()
# If no packs found, try to download them with timeout protection
if len(documents) == 0:
print("⚠️ No packs found locally. Attempting to download from HuggingFace...")
try:
# Suppress HF datasets progress bars for cleaner output
import os
os.environ["HF_HUB_DISABLE_PROGRESS_BARS"] = "1"
from warbler_cda.utils.hf_warbler_ingest import HFWarblerIngestor
# ADD TIMEOUT PROTECTION for HF Spaces
import signal
def timeout_handler(signum, frame):
raise TimeoutError("HF download timed out")
# Set 3-minute timeout for small downloads (should be plenty for 500 papers)
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(180) # 3 minutes
try:
ingestor = HFWarblerIngestor(packs_dir=pack_loader.packs_dir, verbose=True)
# Enable all available HF dataset packs for maximum knowledge diversity
datasets_to_download = [
"arxiv", # Physics and mathematics papers
"edustories", # Educational narratives and stories
"novels", # Fiction literature
"manuals", # Technical documentation
"enterprise", # Business and corporate content
"prompt-report" # AI prompt engineering reports
]
total_docs = 0
successful_downloads = 0
for dataset in datasets_to_download:
try:
print(f"πŸ“¦ Downloading {dataset} (timeout: 3 minutes)...")
# Balance between coverage and deployment time - 50k arxiv papers plus all other packs
arxiv_limit = 50000 if dataset == "arxiv" else None # Balanced capacity
success = ingestor.ingest_dataset(dataset, arxiv_limit=arxiv_limit)
if success:
successful_downloads += 1
# Count documents in this pack
pack_docs = pack_loader.discover_documents()
new_count = len(pack_docs) - total_docs
total_docs = len(pack_docs)
print(f"βœ… {dataset}: {new_count} documents successfully loaded")
else:
print(f"❌ {dataset} download failed (no documents retrieved)")
except Exception as e:
print(f"❌ {dataset} download error: {e}")
continue # Continue to next dataset instead of failing completely
print(f"πŸ“Š Total: {total_docs} documents from {successful_downloads}/{len(datasets_to_download)} packs")
if successful_downloads > 0:
# Reload after download
documents = pack_loader.discover_documents()
print(f"βœ… Downloaded {len(documents)} documents")
else:
print("❌ No HF datasets downloaded successfully, using sample documents...")
documents = []
signal.alarm(0) # Cancel timeout
except TimeoutError:
print("⏰ HF download timed out - proceeding with sample documents")
documents = []
signal.alarm(0) # Cancel timeout
except Exception as e:
print(f"⚠️ Could not download packs: {e}")
print("Using sample documents instead...")
documents = []
if len(documents) == 0:
# Fallback to comprehensive sample documents that match common test queries
sample_docs = [
{"id": "sample1", "content": "FractalStat is an 8-dimensional addressing system for intelligent retrieval.", "metadata": {}},
{"id": "sample2", "content": "Semantic search finds documents by meaning, not just keywords.", "metadata": {}},
{"id": "sample3", "content": "Bob the Skeptic validates results to prevent bias and hallucinations.", "metadata": {}},
{"id": "sample4", "content": "Hello world! This is a sample document for testing search functionality.", "metadata": {}},
{"id": "sample5", "content": "This document contains information about rotation dynamics of Saturn's moons.", "metadata": {}},
{"id": "sample6", "content": "Machine learning and artificial intelligence are transforming technology.", "metadata": {}},
{"id": "sample7", "content": "Ancient library keepers preserved wisdom through generations.", "metadata": {}},
{"id": "sample8", "content": "Wisdom about courage comes from facing fears directly.", "metadata": {}},
]
for doc in sample_docs:
api.add_document(doc["id"], doc["content"], doc["metadata"])
print(f"βœ… Loaded {len(sample_docs)} comprehensive sample documents")
else:
print(f"βœ… Found {len(documents)} documents")
# Batch process embeddings for much faster ingestion
if documents:
# Extract content for batch embeddings
content_list = [doc["content"] for doc in documents]
print(f"βš™οΈ Batch embedding {len(content_list)} documents with {embedding_provider.num_workers} workers...")
# Get embeddings in parallel batches
embeddings_list = embedding_provider.embed_batch(content_list, show_progress=True)
print(f"βœ… Batch embeddings completed: {len(embeddings_list)} embeddings")
# Ingest documents with pre-computed embeddings
for i, doc in enumerate(documents):
embedding = embeddings_list[i] if i < len(embeddings_list) else None
# Create FractalStat coordinates
fractalstat_coords = None
if embedding and hasattr(embedding_provider, "compute_fractalstat_from_embedding"):
fractalstat_coords = embedding_provider.compute_fractalstat_from_embedding(embedding)
api.add_document(
doc_id=doc["id"],
content=doc["content"],
metadata=doc.get("metadata", {}),
embedding=embedding,
fractalstat_coordinates=fractalstat_coords
)
print(f"πŸŽ‰ Warbler CDA ready with {api.get_context_store_size()} documents!")
else:
print("πŸŽ‰ Warbler CDA ready with 0 documents")
@GPU
def query_warbler(query_text: str, max_results: int = 5, use_hybrid: bool = True) -> str:
"""Query the Warbler CDA system."""
if not query_text.strip():
return "Please enter a query."
start_time = time.time()
# Create query - use hybrid mode when requested
query_mode = RetrievalMode.HYBRID_SEMANTIC_FRACTALSTAT if use_hybrid else RetrievalMode.SEMANTIC_SIMILARITY
query = RetrievalQuery(
query_id=f"gradio_{int(time.time())}",
mode=query_mode,
semantic_query=query_text,
max_results=max_results,
fractalstat_hybrid=use_hybrid, # Full hybrid mode when user enables it
confidence_threshold=0.3 # Restore normal threshold now that we might have real HF data
)
# DEBUG: Log query details
print(f"DEBUG: Executing query '{query_text}' with mode={query_mode}, max_results={max_results}, hybrid={use_hybrid}")
# Execute query
assembly = api.retrieve_context(query)
elapsed_ms = (time.time() - start_time) * 1000
# DEBUG: Log results summary with details
print(f"DEBUG: Query completed in {elapsed_ms:.0f}ms, found {len(assembly.results)} results")
if assembly.results:
print(f"DEBUG: Top 3 relevance scores: {[r.relevance_score for r in assembly.results[:3]]}")
print(f"DEBUG: Confidence threshold was: {query.confidence_threshold}")
else:
print(f"DEBUG: No results above threshold: {query.confidence_threshold}")
# Hybrid Fallback: If hybrid mode and no results, fall back to pure semantic search
if use_hybrid and len(assembly.results) == 0:
print(f"DEBUG: Hybrid returned 0 results, falling back to pure semantic search")
# Reset timer for fallback query
fallback_start = time.time()
query.confidence_threshold = 0.2 # Lower threshold for semantic fallback
query.fractalstat_hybrid = False # Disable hybrid for this query
# Re-execute query with semantic-only mode
assembly = api.retrieve_context(query)
fallback_ms = (time.time() - fallback_start) * 1000
elapsed_ms = (time.time() - start_time) * 1000 # Update total time
print(f"DEBUG: Semantic fallback completed in {fallback_ms:.0f}ms, found {len(assembly.results)} results")
if assembly.results:
print(f"DEBUG: Top 3 relevance scores from semantic: {[r.relevance_score for r in assembly.results[:3]]}")
print(f"DEBUG: Hybrid fallback successful - results returned via semantic search")
hybrid_fallback_used = use_hybrid and len(assembly.results) > 0 and not query.fractalstat_hybrid
query_mode_display = f"{query_mode} (+ Semantic Fallback)" if hybrid_fallback_used else query_mode
# Format results
output = f"## Query Results\n\n"
output += f"**Query:** {query_text}\n\n"
output += f"**Found:** {len(assembly.results)} results in {elapsed_ms:.0f}ms\n\n"
output += f"**Quality Score:** {assembly.assembly_quality:.3f}\n\n"
if assembly.results:
output += "### Top Results\n\n"
for i, result in enumerate(assembly.results[:max_results], 1):
output += f"**{i}. Score: {result.relevance_score:.3f}**\n\n"
output += f"{result.content[:300]}...\n\n"
if use_hybrid:
output += f"- Semantic: {result.semantic_similarity:.3f}\n"
output += f"- FractalStat: {result.fractalstat_resonance:.3f}\n\n"
output += "---\n\n"
else:
output += "No results found.\n"
return output
def get_system_stats() -> str:
"""Get comprehensive system statistics with real-time updates."""
metrics = api.get_retrieval_metrics()
# Get current time for freshness indicator
current_time = time.strftime("%H:%M:%S UTC", time.gmtime())
output = "## System Statistics\n\n"
output += f"**Last Updated:** {current_time}\n\n"
# Document Store
output += "### πŸ“š Document Store\n\n"
output += f"**Total Documents:** {api.get_context_store_size():,}\n\n"
output += f"**Document Types:** Scientific papers, novels, education, fiction, technical docs\n\n"
# Query Performance
output += "### ⚑ Query Performance\n\n"
output += f"**Total Queries:** {metrics['retrieval_metrics']['total_queries']}\n\n"
output += f"**Cache Hit Rate:** {metrics['cache_performance']['hit_rate']:.1%}\n\n"
output += f"**Average Response Time:** {metrics['retrieval_metrics']['average_retrieval_time_ms']:.0f}ms\n\n"
output += f"**Average Quality Score:** {metrics['system_health']['average_quality']:.3f}\n\n"
# Conflict Detection (Bob the Skeptic)
output += "### πŸ•΅οΈ Bob the Skeptic - Conflict Detection\n\n"
# Access conflict detector if available
conflict_detector = getattr(api, 'conflict_detector', None) if hasattr(api, 'config') and api.config else None
if conflict_detector and hasattr(conflict_detector, 'get_global_conflict_summary'):
try:
conflict_summary = conflict_detector.get_global_conflict_summary()
output += f"**Total Conflicts Detected:** {conflict_summary['total_conflicts']}\n\n"
# Show confidence breakdown
conf_dist = conflict_summary['confidence_distribution']
output += f"**Conflict Confidence Levels:**\n"
output += f"- High Confidence (80%+): {conf_dist['high']}\n"
output += f"- Medium Confidence (60-79%): {conf_dist['medium']}\n"
output += f"- Low Confidence (<60%): {conf_dist['low']}\n\n"
output += f"**Recent Conflicts:** {conflict_summary['recent_conflicts_1h']} in last hour\n\n"
output += f"**System Health Score:** {conflict_summary['system_health_score']:.2f}\n\n"
except Exception as e:
output += f"**Status:** Error accessing conflict detector: {str(e)}\n\n"
else:
output += "**Status:** Conflict detection not configured or unavailable\n\n"
# FractalStat Intelligence
output += "### πŸ”„ FractalStat Intelligence\n\n"
# Check if fractalstat bridge is available
if api.fractalstat_bridge:
output += "**Status:** Active - 8D multi-dimensional addressing enabled\n\n"
available_dimensions = [
"Realm (semantic domains)", "Lineage (generation)", "Adjacency (connectivity)",
"Horizon (lifecycle stages)", "Luminosity (semantic brightness)",
"Polarity (tension/resonance)", "Dimensionality (complexity)",
"Alignment (social coordination)"
]
output += "**Active Dimensions:**\n- " + "\n- ".join(available_dimensions) + "\n\n"
# Entanglement status
if hasattr(api.fractalstat_bridge, 'entanglement_resonance'):
output += "**Entanglement Engine:** βœ… ACTIVE - Cross-coordinate conceptual connections\n\n"
else:
output += "**Entanglement Engine:** ❌ NOT YET INTEGRATED\n\n"
else:
output += "**Status:** Not configured\n\n"
# System Health
output += "### πŸ₯ System Health\n\n"
output += "**Overall Status:** 🟒 Operational\n\n"
output += "**Components:**\n"
component_status = metrics['system_health']
output += f"- Semantic Anchors: {'βœ…' if component_status.get('semantic_anchors_available', False) else '❌'}\n"
output += f"- Embedding Provider: {'βœ…' if component_status.get('embedding_provider_available', False) else '❌'}\n"
output += f"- FractalStat Bridge: {'βœ…' if component_status.get('fractalstat_bridge_available', False) else '❌'}\n"
output += f"- Conflict Detector: {'βœ…' if conflict_detector else '❌'}\n\n"
# Recent Activity
output += "### πŸ“ˆ Recent Activity\n\n"
output += f"**Retrieval Success Rate:** {metrics['system_health']['retrieval_success_rate']:.1% if 'retrieval_success_rate' in metrics['system_health'] else 'N/A'}\n\n"
return output
# Create Gradio interface
with gr.Blocks(title="Warbler CDA - FractalStat RAG") as demo:
gr.Markdown("""
# 🦜 Warbler CDA - FractalStat RAG System
Semantic retrieval with 8D FractalStat multi-dimensional addressing and intelligent fallback.
**Features:**
- 165,000+ documents from arXiv, novels, education, and fiction
- Hybrid semantic + FractalStat scoring with automatic fallback
- Smart scoring: semantic search works for plain English, hybrid excels at technical queries
- Bob the Skeptic bias detection
- ZeroGPU compatible for reliable HuggingFace Spaces deployment
""")
with gr.Tab("Query"):
with gr.Row():
with gr.Column():
query_input = gr.Textbox(
label="Query",
placeholder="Enter your search query...",
lines=2
)
max_results = gr.Slider(
minimum=1,
maximum=20,
value=5,
step=1,
label="Max Results"
)
use_hybrid = gr.Checkbox(
label="Enable FractalStat Hybrid Scoring",
value=True # Enable by default - users want the 8D system
)
query_btn = gr.Button("Search", variant="primary")
with gr.Column():
results_output = gr.Markdown(label="Results")
# Add Bob quarantine info to results
def format_results_with_quarantine(output: str, results_count: int, quarantined_count: int) -> str:
"""Format results with Bob's quarantine information."""
if quarantined_count > 0:
status_line = f"**Bob the Skeptic**: {quarantined_count} conflicting results quarantined, {results_count} results retained\n\n"
elif quarantined_count == 0:
status_line = "**Bob the Skeptic**: No conflicting results detected\n\n"
else: # quarantined_count is None when Bob not available
status_line = "**Bob the Skeptic**: Conflict detection unavailable\n\n"
return output.replace("## Query Results\n\n", "## Query Results\n\n" + status_line)
def query_warbler_with_quarantine(query_text: str, max_results: int = 5, use_hybrid: bool = True) -> str:
"""Query with Bob's quarantine reporting."""
if not query_text.strip():
return "Please enter a query."
start_time = time.time()
# Create query - use hybrid mode when requested
query_mode = RetrievalMode.HYBRID_SEMANTIC_FRACTALSTAT if use_hybrid else RetrievalMode.SEMANTIC_SIMILARITY
query = RetrievalQuery(
query_id=f"gradio_{int(time.time())}",
mode=query_mode,
semantic_query=query_text,
max_results=max_results * 2, # Get extra results for quarantine prioritizing
fractalstat_hybrid=use_hybrid, # Full hybrid mode when user enables it
confidence_threshold=0.3 # Restore normal threshold now that we might have real HF data
)
# DEBUG: Log query details
print(f"DEBUG: Executing query '{query_text}' with mode={query_mode}, max_results={max_results}, hybrid={use_hybrid}")
# Execute query
assembly = api.retrieve_context(query)
elapsed_ms = (time.time() - start_time) * 1000
# Find quarantined results (those with conflict flags)
original_results = len(assembly.results)
quarantined_results = [r for r in assembly.results if r.conflict_flags]
retained_results = [r for r in assembly.results if not r.conflict_flags]
# Limit to requested max_results from retained results
final_results = retained_results[:max_results]
# DEBUG: Log results summary with Bob information
print(f"DEBUG: Query completed in {elapsed_ms:.0f}ms, "
f"found {original_results} raw results, "
f"Bob quarantined {len(quarantined_results)}, "
f"retained {len(final_results)} for display")
if assembly.results:
print(f"DEBUG: Top 3 relevance scores: {[r.relevance_score for r in assembly.results[:3]]}")
print(f"DEBUG: Confidence threshold was: {query.confidence_threshold}")
# Hybrid Fallback: If hybrid mode and no final results from retained, try semantic on quarantine pool
fallback_triggered = False
if use_hybrid and len(final_results) == 0 and len(quarantined_results) > 0:
print(f"DEBUG: Hybrid retained 0 results, checking quarantined pool for semantic fallback")
# Create semantic-only query for quarantined content
semantic_query = RetrievalQuery(
query_id=f"quarantine_fallback_{int(time.time())}",
mode=RetrievalMode.SEMANTIC_SIMILARITY,
semantic_query=query_text,
max_results=max_results,
confidence_threshold=0.3,
fractalstat_hybrid=False
)
# Note: In a full implementation, we'd re-query just the quarantined IDs
# For now, trigger full semantic search as fallback
fallback_assembly = api.retrieve_context(semantic_query)
fallback_triggered = True
final_results = fallback_assembly.results[:max_results]
elapsed_ms += (time.time() - start_time - elapsed_ms/1000) * 1000
print(f"DEBUG: Quarantine fallback triggered - retrieved {len(final_results)} from semantic search")
hybrid_fallback_used = fallback_triggered
# Format results with Bob quarantine info
output = f"## Query Results\n\n"
output += f"**Query:** {query_text}\n\n"
output += f"**Found:** {len(final_results)} results in {elapsed_ms:.0f}ms\n\n"
if len(quarantined_results) > 0:
output += f"**Risk Assessment:** {len(quarantined_results)} potentially conflicting results quarantined by Bob the Skeptic\n\n"
elif len(final_results) < original_results:
output += f"**Risk Assessment:** {original_results - len(final_results)} duplicates removed\n\n"
else:
output += "**Risk Assessment:** No conflicts detected by Bob the Skeptic\n\n"
output += f"**Quality Score:** {assembly.assembly_quality:.3f}\n\n"
if final_results:
output += "### Top Results\n\n"
for i, result in enumerate(final_results[:max_results], 1):
output += f"**{i}. Score: {result.relevance_score:.3f}**\n\n"
output += f"{result.content[:300]}...\n\n"
if use_hybrid and result.fractalstat_resonance is not None:
output += f"- Semantic: {result.semantic_similarity:.3f}\n"
output += f"- FractalStat: {result.fractalstat_resonance:.3f}\n\n"
output += "---\n\n"
else:
output += "No results found.\n"
if hybrid_fallback_used:
output += "*\\*Note: Used semantic search because hybrid results were quarantined\\*\n"
return output
query_btn.click( # pylint: disable=E1101
fn=query_warbler_with_quarantine,
inputs=[query_input, max_results, use_hybrid],
outputs=results_output
)
gr.Examples(
examples=[
["hello world", 5, False], # Semantic search for plain English
["rotation dynamics of Saturn's moons", 5, True], # Hybrid for technical/scientific
["dancing under the moon", 5, False], # Semantic for casual queries
["interplanetary approach maneuvers", 5, True], # Hybrid for scientific terms
],
inputs=[query_input, max_results, use_hybrid]
)
with gr.Tab("System Stats"):
stats_output = gr.Markdown()
stats_btn = gr.Button("Refresh Stats")
stats_btn.click(fn=get_system_stats, outputs=stats_output) # pylint: disable=E1101
demo.load(fn=get_system_stats, outputs=stats_output) # pylint: disable=E1101
with gr.Tab("About"):
gr.Markdown("""
## About Warbler CDA
Warbler CDA is a production-ready RAG system featuring:
- **8D FractalStat Addressing**: Multi-dimensional intelligence for superior retrieval
- **Semantic Anchors**: Persistent memory with provenance tracking
- **Bob the Skeptic**: Automatic bias detection and validation
- **Narrative Coherence**: Quality analysis beyond simple similarity
### Performance
- 84% test coverage with 587 passing tests
- 9-28s query response time
- 0.88 average relevance score
- 75-83% narrative coherence
### Links
- [Source Code](https://gitlab.com/tiny-walnut-games/the-seed)
- [Documentation](https://gitlab.com/tiny-walnut-games/the-seed/-/tree/main/warbler-cda-package)
- [Performance Report](https://gitlab.com/tiny-walnut-games/the-seed/-/blob/main/warbler-cda-package/WARBLER_CDA_PERFORMANCE_REPORT.md)
""")
if __name__ == "__main__":
demo.launch(server_name="0.0.0.0", server_port=7860)