import gradio as gr import asyncio import os import zipfile import requests from pathlib import Path import numpy as np from typing import List # Try different LightRAG imports based on version try: from lightrag import LightRAG, QueryParam from lightrag.utils import EmbeddingFunc LIGHTRAG_AVAILABLE = True except ImportError: try: from lightrag.lightrag import LightRAG from lightrag.query import QueryParam from lightrag.utils import EmbeddingFunc LIGHTRAG_AVAILABLE = True except ImportError: try: from lightrag.core import LightRAG from lightrag.core import QueryParam from lightrag.utils import EmbeddingFunc LIGHTRAG_AVAILABLE = True except ImportError: print("❌ LightRAG import failed - using fallback mode") LIGHTRAG_AVAILABLE = False # Fallback CloudflareWorker with simple search class CloudflareWorker: def __init__(self, cloudflare_api_key: str, api_base_url: str, llm_model_name: str, embedding_model_name: str): self.cloudflare_api_key = cloudflare_api_key self.api_base_url = api_base_url self.llm_model_name = llm_model_name self.embedding_model_name = embedding_model_name self.max_tokens = 4080 self.max_response_tokens = 4080 async def _send_request(self, model_name: str, input_: dict, debug_log: str = ""): headers = {"Authorization": f"Bearer {self.cloudflare_api_key}"} try: response_raw = requests.post( f"{self.api_base_url}{model_name}", headers=headers, json=input_, timeout=30 ).json() result = response_raw.get("result", {}) if "data" in result: return np.array(result["data"]) if LIGHTRAG_AVAILABLE else result["data"] if "response" in result: return result["response"] raise ValueError(f"Unexpected response format: {response_raw}") except Exception as e: print(f"Cloudflare API Error: {e}") return None async def query(self, prompt: str, system_prompt: str = '', **kwargs) -> str: kwargs.pop("hashing_kv", None) message = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": prompt} ] input_ = { "messages": message, "max_tokens": self.max_tokens, "response_token_limit": self.max_response_tokens, } result = await self._send_request(self.llm_model_name, input_) return result if result is not None else "Error: Failed to get response" async def embedding_chunk(self, texts: List[str]): input_ = { "text": texts, "max_tokens": self.max_tokens, "response_token_limit": self.max_response_tokens, } result = await self._send_request(self.embedding_model_name, input_) if result is None: if LIGHTRAG_AVAILABLE: return np.random.rand(len(texts), 1024).astype(np.float32) else: return [[0.0] * 1024 for _ in texts] return result # Simple fallback knowledge store if LightRAG fails class SimpleKnowledgeStore: def __init__(self, data_dir: str): self.data_dir = data_dir self.chunks = [] self.entities = [] self.load_data() def load_data(self): try: import json chunks_file = Path(self.data_dir) / "kv_store_text_chunks.json" if chunks_file.exists(): with open(chunks_file, 'r', encoding='utf-8') as f: data = json.load(f) self.chunks = list(data.values()) if data else [] entities_file = Path(self.data_dir) / "vdb_entities.json" if entities_file.exists(): with open(entities_file, 'r', encoding='utf-8') as f: entities_data = json.load(f) if isinstance(entities_data, dict) and 'data' in entities_data: self.entities = entities_data['data'] elif isinstance(entities_data, list): self.entities = entities_data else: self.entities = [] print(f"✅ Loaded {len(self.chunks)} chunks and {len(self.entities)} entities") except Exception as e: print(f"⚠️ Error loading data: {e}") self.chunks = [] self.entities = [] def search(self, query: str, limit: int = 5) -> List[str]: query_lower = query.lower() results = [] for chunk in self.chunks: if isinstance(chunk, dict) and 'content' in chunk: content = chunk['content'] if any(word in content.lower() for word in query_lower.split()): results.append(content) for entity in self.entities: if isinstance(entity, dict): entity_text = str(entity) if any(word in entity_text.lower() for word in query_lower.split()): results.append(entity_text) return results[:limit] # Configuration CLOUDFLARE_API_KEY = os.getenv('CLOUDFLARE_API_KEY', 'lMbDDfHi887AK243ZUenm4dHV2nwEx2NSmX6xuq5') API_BASE_URL = "https://api.cloudflare.com/client/v4/accounts/07c4bcfbc1891c3e528e1c439fee68bd/ai/run/" EMBEDDING_MODEL = '@cf/baai/bge-m3' LLM_MODEL = "@cf/meta/llama-3.2-3b-instruct" WORKING_DIR = "./dickens" # Global instances rag_instance = None knowledge_store = None cloudflare_worker = None async def initialize_system(): global rag_instance, knowledge_store, cloudflare_worker print("🔄 Initializing system...") # Download data if needed dickens_path = Path(WORKING_DIR) has_data = dickens_path.exists() and len(list(dickens_path.glob("*.json"))) > 0 if not has_data: print("📥 Downloading RAG database...") try: # REPLACE YOUR_USERNAME with your actual GitHub username data_url = "https://github.com/YOUR_USERNAME/fire-safety-ai/releases/download/v1.0-data/dickens.zip" response = requests.get(data_url, timeout=60) response.raise_for_status() with open("dickens.zip", "wb") as f: f.write(response.content) with zipfile.ZipFile("dickens.zip", 'r') as zip_ref: zip_ref.extractall(".") os.remove("dickens.zip") print("✅ Data downloaded!") except Exception as e: print(f"⚠️ Download failed: {e}") os.makedirs(WORKING_DIR, exist_ok=True) # Initialize Cloudflare worker cloudflare_worker = CloudflareWorker( cloudflare_api_key=CLOUDFLARE_API_KEY, api_base_url=API_BASE_URL, embedding_model_name=EMBEDDING_MODEL, llm_model_name=LLM_MODEL, ) # Try to initialize LightRAG, fallback to simple store if LIGHTRAG_AVAILABLE: try: rag_instance = LightRAG( working_dir=WORKING_DIR, max_parallel_insert=2, llm_model_func=cloudflare_worker.query, llm_model_name=LLM_MODEL, llm_model_max_token_size=4080, embedding_func=EmbeddingFunc( embedding_dim=1024, max_token_size=2048, func=lambda texts: cloudflare_worker.embedding_chunk(texts), ), ) await rag_instance.initialize_storages() print("✅ LightRAG system initialized!") except Exception as e: print(f"⚠️ LightRAG failed, using fallback: {e}") knowledge_store = SimpleKnowledgeStore(WORKING_DIR) else: print("🔄 Using simple knowledge store...") knowledge_store = SimpleKnowledgeStore(WORKING_DIR) print("✅ System ready!") # Initialize on startup asyncio.run(initialize_system()) async def ask_question(question, mode="hybrid"): if not question.strip(): return "❌ Please enter a question." try: print(f"🔍 Processing question: {question}") # Use LightRAG if available, otherwise fallback if rag_instance and LIGHTRAG_AVAILABLE: response = await rag_instance.aquery( question, param=QueryParam(mode=mode) ) return response elif knowledge_store and cloudflare_worker: # Fallback: simple search + Cloudflare AI relevant_chunks = knowledge_store.search(question, limit=3) context = "\n".join(relevant_chunks) if relevant_chunks else "No specific context found." system_prompt = """You are a Fire Safety AI Assistant specializing in Vietnamese fire safety regulations. Use the provided context to answer questions about building codes, emergency exits, and fire safety requirements.""" user_prompt = f"""Context: {context} Question: {question} Please provide a helpful answer based on the context about Vietnamese fire safety regulations.""" response = await cloudflare_worker.query(user_prompt, system_prompt) return response else: return "❌ System not initialized yet. Please wait..." except Exception as e: return f"❌ Error: {str(e)}" def sync_ask_question(question, mode): return asyncio.run(ask_question(question, mode)) # Create Gradio interface with gr.Blocks(title="🔥 Fire Safety AI Assistant", theme=gr.themes.Soft()) as demo: gr.HTML("
Ask questions about Vietnamese fire safety regulations
") with gr.Row(): with gr.Column(scale=1): question_input = gr.Textbox( label="Your Question", placeholder="What are the requirements for emergency exits?", lines=3 ) mode_dropdown = gr.Dropdown( choices=["hybrid", "local", "global", "naive"], value="hybrid", label="Search Mode", info="Hybrid is recommended for best results" ) submit_btn = gr.Button("🔍 Ask Question", variant="primary", size="lg") with gr.Column(scale=2): answer_output = gr.Textbox( label="Answer", lines=15, show_copy_button=True ) # System status status_text = "✅ LightRAG System" if LIGHTRAG_AVAILABLE else "⚠️ Fallback Mode" gr.HTML(f"Status: {status_text}
") # Example questions gr.HTML("