OmniMind-Orchestrator / core /knowledge_engine.py
mgbam's picture
Upload 2 files
8dc61b1 verified
"""
LlamaIndex Knowledge Engine - For $1,000 Prize
Enterprise RAG for connecting to company knowledge bases.
"""
import os
from typing import List, Dict, Any, Optional
from pathlib import Path
try:
from llama_index.core import (
VectorStoreIndex,
SimpleDirectoryReader,
StorageContext,
load_index_from_storage,
Settings
)
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.llms.anthropic import Anthropic
from llama_index.vector_stores.chroma import ChromaVectorStore
import chromadb
LLAMAINDEX_AVAILABLE = True
except ImportError:
LLAMAINDEX_AVAILABLE = False
print("[WARNING] LlamaIndex not installed")
class KnowledgeEngine:
"""
Enterprise knowledge integration using LlamaIndex.
Prize Integration: LlamaIndex Category Award ($1,000)
- RAG for enterprise documents
- Multi-source knowledge integration
- Context-aware MCP generation
"""
def __init__(self, persist_dir: str = "./chroma_db"):
self.persist_dir = Path(persist_dir)
self.persist_dir.mkdir(parents=True, exist_ok=True)
self.index = None
self._initialized = False
if not LLAMAINDEX_AVAILABLE:
return
def _ensure_initialized(self):
"""Lazy initialization to avoid startup errors"""
if self._initialized or not LLAMAINDEX_AVAILABLE:
return
try:
# Configure LlamaIndex settings
Settings.embed_model = OpenAIEmbedding(
api_key=os.getenv("OPENAI_API_KEY"),
model="text-embedding-3-small"
)
# Skip Anthropic LLM initialization to avoid version conflict
# Settings.llm = Anthropic(...)
# Initialize ChromaDB
self.chroma_client = chromadb.PersistentClient(path=str(self.persist_dir))
self.chroma_collection = self.chroma_client.get_or_create_collection("omnimind_knowledge")
# Vector store
self.vector_store = ChromaVectorStore(chroma_collection=self.chroma_collection)
self.storage_context = StorageContext.from_defaults(vector_store=self.vector_store)
# Try to load existing index
try:
self.index = load_index_from_storage(self.storage_context)
print("[OK] Loaded existing knowledge base")
except:
self.index = None
print("[INFO] No existing knowledge base - will create on first document add")
self._initialized = True
except Exception as e:
print(f"[WARNING] LlamaIndex initialization failed: {e}")
self._initialized = False
async def add_documents(self, documents_path: str) -> Dict[str, Any]:
"""Add documents to the knowledge base"""
self._ensure_initialized()
if not LLAMAINDEX_AVAILABLE or not self._initialized:
return {"status": "unavailable", "message": "LlamaIndex not installed"}
reader = SimpleDirectoryReader(documents_path)
documents = reader.load_data()
if self.index is None:
self.index = VectorStoreIndex.from_documents(
documents,
storage_context=self.storage_context
)
else:
for doc in documents:
self.index.insert(doc)
self.index.storage_context.persist()
return {
"status": "success",
"documents_added": len(documents),
"total_documents": len(self.chroma_collection.get()["ids"])
}
async def query(self, question: str, top_k: int = 3) -> Dict[str, Any]:
"""Query the knowledge base"""
self._ensure_initialized()
if not LLAMAINDEX_AVAILABLE or not self._initialized or self.index is None:
return {
"status": "unavailable",
"answer": "Knowledge base not configured",
"sources": []
}
query_engine = self.index.as_query_engine(similarity_top_k=top_k)
response = query_engine.query(question)
return {
"status": "success",
"answer": str(response),
"sources": [
{
"text": node.node.text[:200] + "...",
"score": node.score
}
for node in response.source_nodes
]
}
async def get_context_for_mcp_generation(
self,
task_description: str
) -> Optional[str]:
"""
Get relevant context from knowledge base for MCP generation.
This makes MCPs context-aware - they can use company-specific info.
"""
if not LLAMAINDEX_AVAILABLE or self.index is None:
return None
result = await self.query(
f"Find information relevant to: {task_description}",
top_k=3
)
if result["status"] == "success":
context_parts = [result["answer"]]
context_parts.extend([s["text"] for s in result["sources"]])
return "\n\n".join(context_parts)
return None
# Global knowledge engine
knowledge = KnowledgeEngine()