import os import pickle import tempfile import shutil from typing import Dict, Any, List, Optional from datetime import datetime from concurrent.futures import ThreadPoolExecutor import io from config import Config from langchain_core.documents import Document from langchain_community.document_loaders import TextLoader from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_community.vectorstores import FAISS from langchain.chains import RetrievalQA from langchain.prompts import PromptTemplate from langchain.retrievers import BM25Retriever from langchain_community.embeddings import HuggingFaceEmbeddings from langchain_huggingface import HuggingFaceEndpoint class KnowledgeManager: def __init__(self): self.temp_dir = tempfile.mkdtemp() # Use temp directory for HF Spaces self.setup_temp_dirs() self.embeddings = self._init_embeddings() self.vector_db = None self.bm25_retriever = None self.qa_chain = None self.llm = None self.knowledge_texts = [] # Store texts in memory # Initialize with default knowledge self._create_default_knowledge() self._init_system() def setup_temp_dirs(self): """Setup temporary directories for HF Spaces compatibility""" self.knowledge_dir = os.path.join(self.temp_dir, "knowledge") self.vector_store_path = os.path.join(self.temp_dir, "vector_store") self.bm25_store_path = os.path.join(self.temp_dir, "bm25_store.pkl") os.makedirs(self.knowledge_dir, exist_ok=True) os.makedirs(self.vector_store_path, exist_ok=True) def _init_embeddings(self): """Initialize embeddings with error handling""" try: print("[i] Initializing Hugging Face embeddings...") return HuggingFaceEmbeddings( model_name="sentence-transformers/all-mpnet-base-v2", model_kwargs={'device': 'cpu'}, encode_kwargs={'normalize_embeddings': True} ) except Exception as e: print(f"[!] Error initializing embeddings: {e}") # Fallback to a smaller model try: return HuggingFaceEmbeddings( model_name="sentence-transformers/all-MiniLM-L6-v2", model_kwargs={'device': 'cpu'}, encode_kwargs={'normalize_embeddings': True} ) except Exception as e2: print(f"[!] Fallback embeddings also failed: {e2}") return None def _init_llm(self): """Initialize LLM with proper error handling and fallbacks""" if self.llm is not None: return self.llm hf_token = os.getenv("HUGGINGFACEHUB_API_TOKEN") or os.getenv("HF_TOKEN") if not hf_token: print("[!] No Hugging Face API token found. Set HUGGINGFACEHUB_API_TOKEN or HF_TOKEN environment variable.") return None try: print("[i] Initializing HuggingFace LLM...") self.llm = HuggingFaceEndpoint( repo_id="mistralai/Mistral-7B-Instruct-v0.1", temperature=0.1, max_new_tokens=512, huggingfacehub_api_token=hf_token, timeout=60 # Add timeout ) # Test the LLM with a simple query test_response = self.llm.invoke("Hello") print("[i] LLM initialized successfully") return self.llm except Exception as e: print(f"[!] Error with Mistral model: {e}") # Try alternative models fallback_models = [ "microsoft/DialoGPT-medium", "google/flan-t5-base", "huggingface/CodeBERTa-small-v1" ] for model in fallback_models: try: print(f"[i] Trying fallback model: {model}") self.llm = HuggingFaceEndpoint( repo_id=model, temperature=0.1, max_new_tokens=256, huggingfacehub_api_token=hf_token, timeout=30 ) test_response = self.llm.invoke("Hello") print(f"[i] Successfully initialized with {model}") return self.llm except Exception as e2: print(f"[!] {model} also failed: {e2}") continue print("[!] All LLM models failed. Using mock responses.") return None def _init_system(self): """Initialize the retrieval system""" try: self.vector_db, self.bm25_retriever = self._build_retrievers_from_texts() self.qa_chain = self._create_qa_chain() except Exception as e: print(f"[!] Error initializing system: {e}") def _create_default_knowledge(self): """Create default knowledge base""" default_texts = [ { "filename": "sirraya_xbrain.txt", "content": """Sirraya xBrain - Advanced AI Platform Created by Amir Hameed. Sirraya xBrain is an intelligent AI platform that combines multiple retrieval methods for enhanced question answering capabilities. Key Features: - Hybrid Retrieval System: Combines Vector Search (FAISS) with BM25 keyword search - LISA Assistant: An AI assistant powered by language models - Document Processing: Automatic text chunking and embedding generation - Multi-Modal Retrieval: Both semantic and keyword-based search - Real-time Query Processing: Fast response times with parallel retrieval Technical Components: - FAISS (Facebook AI Similarity Search) for vector-based semantic search - BM25 (Best Matching 25) for traditional keyword-based information retrieval - HuggingFace Transformers for language model integration - LangChain for building the question-answering pipeline The platform is designed to provide accurate and contextually relevant answers by leveraging both semantic understanding and keyword matching techniques.""" }, { "filename": "technical_details.txt", "content": """Technical Architecture of Sirraya xBrain Vector Database: - Uses FAISS for efficient similarity search - Embeddings generated using sentence-transformers/all-mpnet-base-v2 - Cosine similarity for measuring document relevance - Configurable similarity thresholds BM25 Retriever: - Traditional keyword-based search algorithm - Complements vector search for better recall - Effective for exact keyword matches Text Processing: - Recursive character text splitter for document chunking - Configurable chunk size and overlap - Supports multiple text formats Query Processing Pipeline: 1. Parallel retrieval from both vector and BM25 systems 2. Document scoring and ranking 3. Context preparation for language model 4. Answer generation using prompt templates 5. Source document citation Performance Optimizations: - ThreadPoolExecutor for parallel processing - Configurable retrieval parameters - Fallback mechanisms for failed retrievals""" } ] self.knowledge_texts = default_texts # Also save to temp files for compatibility for text_data in default_texts: filepath = os.path.join(self.knowledge_dir, text_data["filename"]) with open(filepath, "w", encoding="utf-8") as f: f.write(text_data["content"]) def _build_retrievers_from_texts(self): """Build retrievers from in-memory texts""" if not self.embeddings: print("[!] No embeddings available") return None, None try: # Create documents from stored texts documents = [] for text_data in self.knowledge_texts: doc = Document( page_content=text_data["content"], metadata={"source": text_data["filename"]} ) documents.append(doc) # Split documents into chunks splitter = RecursiveCharacterTextSplitter( chunk_size=getattr(Config, 'CHUNK_SIZE', 1000), chunk_overlap=getattr(Config, 'CHUNK_OVERLAP', 200), separators=["\n\n", "\n", ". ", "! ", "? ", "; ", " ", ""] ) chunks = splitter.split_documents(documents) if not chunks: print("[!] No chunks created") return None, None print(f"[i] Created {len(chunks)} chunks") # Create vector database vector_db = FAISS.from_documents( chunks, self.embeddings, distance_strategy="COSINE" ) # Create BM25 retriever bm25_retriever = BM25Retriever.from_documents(chunks) bm25_retriever.k = getattr(Config, 'MAX_CONTEXT_CHUNKS', 5) print("[i] Successfully created retrievers") return vector_db, bm25_retriever except Exception as e: print(f"[!] Error building retrievers: {e}") return None, None def add_text_content(self, filename: str, content: str) -> bool: """Add text content to knowledge base""" try: # Add to in-memory storage self.knowledge_texts.append({ "filename": filename, "content": content }) # Save to temp file filepath = os.path.join(self.knowledge_dir, filename) with open(filepath, "w", encoding="utf-8") as f: f.write(content) # Rebuild retrievers self.vector_db, self.bm25_retriever = self._build_retrievers_from_texts() self.qa_chain = self._create_qa_chain() print(f"[i] Added {filename} to knowledge base") return True except Exception as e: print(f"[!] Error adding text content: {e}") return False def add_uploaded_file(self, file_content: bytes, filename: str) -> bool: """Add uploaded file content to knowledge base""" try: # Decode file content content = file_content.decode('utf-8') return self.add_text_content(filename, content) except UnicodeDecodeError: print(f"[!] Could not decode {filename} as UTF-8") return False except Exception as e: print(f"[!] Error processing uploaded file: {e}") return False def _parallel_retrieve(self, question: str) -> List[Document]: """Retrieve documents using both vector and BM25 search""" if not self.vector_db or not self.bm25_retriever: return [] def retrieve_with_bm25(): try: return self.bm25_retriever.invoke(question) except Exception as e: print(f"[!] BM25 retrieval error: {e}") return [] def retrieve_with_vector(): try: retriever = self.vector_db.as_retriever( search_type="similarity_score_threshold", search_kwargs={ "k": getattr(Config, 'MAX_CONTEXT_CHUNKS', 5), "score_threshold": 0.3 } ) return retriever.invoke(question) except Exception as e: print(f"[!] Vector retrieval error: {e}") # Fallback to simple similarity search try: docs = self.vector_db.similarity_search(question, k=3) return docs except Exception as e2: print(f"[!] Fallback vector search also failed: {e2}") return [] try: with ThreadPoolExecutor(max_workers=2) as executor: bm25_future = executor.submit(retrieve_with_bm25) vector_future = executor.submit(retrieve_with_vector) bm25_results = bm25_future.result() vector_results = vector_future.result() # Combine and deduplicate results all_docs = vector_results + bm25_results seen_content = set() unique_docs = [] for doc in all_docs: content_hash = hash(doc.page_content) if content_hash not in seen_content: seen_content.add(content_hash) unique_docs.append(doc) return unique_docs[:getattr(Config, 'MAX_CONTEXT_CHUNKS', 5)] except Exception as e: print(f"[!] Parallel retrieval error: {e}") return [] def _create_qa_chain(self): """Create the QA chain""" if not self.vector_db or not self.bm25_retriever: return None llm = self._init_llm() if not llm: return None prompt_template = """You are LISA, an AI assistant for Sirraya xBrain platform created by Amir Hameed. Use the following context to answer the question accurately and helpfully: Context: {context} Question: {question} Instructions: - Provide accurate answers based on the context - If the information is not in the context, say "I don't have that information in my knowledge base" - Be concise but comprehensive - Cite relevant sources when possible Answer:""" try: return RetrievalQA.from_chain_type( llm=llm, chain_type="stuff", retriever=self.vector_db.as_retriever( search_kwargs={"k": getattr(Config, 'MAX_CONTEXT_CHUNKS', 5)} ), chain_type_kwargs={ "prompt": PromptTemplate( template=prompt_template, input_variables=["context", "question"] ) }, return_source_documents=True ) except Exception as e: print(f"[!] Error creating QA chain: {e}") return None def query(self, question: str) -> Dict[str, Any]: """Process a query and return results""" start_time = datetime.now() # Fallback for when LLM is not available if not self.qa_chain: docs = self._parallel_retrieve(question) if docs: # Simple fallback response using retrieved context context = "\n\n".join([doc.page_content for doc in docs[:2]]) answer = f"Based on the available information: {context[:500]}..." else: answer = "I don't have information about that topic in my knowledge base." return { "answer": answer, "processing_time": (datetime.now() - start_time).total_seconds() * 1000, "source_chunks": docs[:3] if docs else [] } try: # Use the full QA chain docs = self._parallel_retrieve(question) if not docs: return { "answer": "I couldn't find relevant information in my knowledge base for your question.", "processing_time": (datetime.now() - start_time).total_seconds() * 1000, "source_chunks": [] } result = self.qa_chain.invoke({ "query": question, "input_documents": docs }) return { "answer": result.get("result", "No answer could be generated"), "processing_time": (datetime.now() - start_time).total_seconds() * 1000, "source_chunks": result.get("source_documents", [])[:3] } except Exception as e: print(f"[!] Query error: {str(e)}") # Fallback to simple context-based response docs = self._parallel_retrieve(question) if docs: context = docs[0].page_content[:300] + "..." answer = f"Based on available information: {context}" else: answer = "I encountered an error processing your query. Please try rephrasing your question." return { "answer": answer, "processing_time": (datetime.now() - start_time).total_seconds() * 1000, "source_chunks": docs[:2] if docs else [] } def get_knowledge_files_count(self) -> int: """Get count of knowledge files""" return len(self.knowledge_texts) def get_knowledge_summary(self) -> str: """Get summary of knowledge base""" total_files = len(self.knowledge_texts) total_chars = sum(len(text["content"]) for text in self.knowledge_texts) return f"Knowledge Base: {total_files} files, ~{total_chars:,} characters" def cleanup(self): """Clean up temporary files""" try: shutil.rmtree(self.temp_dir) except Exception as e: print(f"[!] Cleanup error: {e}") def __del__(self): """Destructor to clean up resources""" self.cleanup()