|
|
"""
|
|
|
LlamaIndex Knowledge Engine - For $1,000 Prize
|
|
|
|
|
|
Enterprise RAG for connecting to company knowledge bases.
|
|
|
"""
|
|
|
|
|
|
import os
|
|
|
from typing import List, Dict, Any, Optional
|
|
|
from pathlib import Path
|
|
|
|
|
|
try:
|
|
|
from llama_index.core import (
|
|
|
VectorStoreIndex,
|
|
|
SimpleDirectoryReader,
|
|
|
StorageContext,
|
|
|
load_index_from_storage,
|
|
|
Settings
|
|
|
)
|
|
|
from llama_index.embeddings.openai import OpenAIEmbedding
|
|
|
from llama_index.llms.anthropic import Anthropic
|
|
|
from llama_index.vector_stores.chroma import ChromaVectorStore
|
|
|
import chromadb
|
|
|
LLAMAINDEX_AVAILABLE = True
|
|
|
except ImportError:
|
|
|
LLAMAINDEX_AVAILABLE = False
|
|
|
print("[WARNING] LlamaIndex not installed")
|
|
|
|
|
|
|
|
|
class KnowledgeEngine:
|
|
|
"""
|
|
|
Enterprise knowledge integration using LlamaIndex.
|
|
|
|
|
|
Prize Integration: LlamaIndex Category Award ($1,000)
|
|
|
- RAG for enterprise documents
|
|
|
- Multi-source knowledge integration
|
|
|
- Context-aware MCP generation
|
|
|
"""
|
|
|
|
|
|
def __init__(self, persist_dir: str = "./chroma_db"):
|
|
|
self.persist_dir = Path(persist_dir)
|
|
|
self.persist_dir.mkdir(parents=True, exist_ok=True)
|
|
|
self.index = None
|
|
|
self._initialized = False
|
|
|
|
|
|
if not LLAMAINDEX_AVAILABLE:
|
|
|
return
|
|
|
|
|
|
def _ensure_initialized(self):
|
|
|
"""Lazy initialization to avoid startup errors"""
|
|
|
if self._initialized or not LLAMAINDEX_AVAILABLE:
|
|
|
return
|
|
|
|
|
|
try:
|
|
|
|
|
|
Settings.embed_model = OpenAIEmbedding(
|
|
|
api_key=os.getenv("OPENAI_API_KEY"),
|
|
|
model="text-embedding-3-small"
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.chroma_client = chromadb.PersistentClient(path=str(self.persist_dir))
|
|
|
self.chroma_collection = self.chroma_client.get_or_create_collection("omnimind_knowledge")
|
|
|
|
|
|
|
|
|
self.vector_store = ChromaVectorStore(chroma_collection=self.chroma_collection)
|
|
|
self.storage_context = StorageContext.from_defaults(vector_store=self.vector_store)
|
|
|
|
|
|
|
|
|
try:
|
|
|
self.index = load_index_from_storage(self.storage_context)
|
|
|
print("[OK] Loaded existing knowledge base")
|
|
|
except:
|
|
|
self.index = None
|
|
|
print("[INFO] No existing knowledge base - will create on first document add")
|
|
|
|
|
|
self._initialized = True
|
|
|
except Exception as e:
|
|
|
print(f"[WARNING] LlamaIndex initialization failed: {e}")
|
|
|
self._initialized = False
|
|
|
|
|
|
async def add_documents(self, documents_path: str) -> Dict[str, Any]:
|
|
|
"""Add documents to the knowledge base"""
|
|
|
self._ensure_initialized()
|
|
|
if not LLAMAINDEX_AVAILABLE or not self._initialized:
|
|
|
return {"status": "unavailable", "message": "LlamaIndex not installed"}
|
|
|
|
|
|
reader = SimpleDirectoryReader(documents_path)
|
|
|
documents = reader.load_data()
|
|
|
|
|
|
if self.index is None:
|
|
|
self.index = VectorStoreIndex.from_documents(
|
|
|
documents,
|
|
|
storage_context=self.storage_context
|
|
|
)
|
|
|
else:
|
|
|
for doc in documents:
|
|
|
self.index.insert(doc)
|
|
|
|
|
|
self.index.storage_context.persist()
|
|
|
|
|
|
return {
|
|
|
"status": "success",
|
|
|
"documents_added": len(documents),
|
|
|
"total_documents": len(self.chroma_collection.get()["ids"])
|
|
|
}
|
|
|
|
|
|
async def query(self, question: str, top_k: int = 3) -> Dict[str, Any]:
|
|
|
"""Query the knowledge base"""
|
|
|
self._ensure_initialized()
|
|
|
if not LLAMAINDEX_AVAILABLE or not self._initialized or self.index is None:
|
|
|
return {
|
|
|
"status": "unavailable",
|
|
|
"answer": "Knowledge base not configured",
|
|
|
"sources": []
|
|
|
}
|
|
|
|
|
|
query_engine = self.index.as_query_engine(similarity_top_k=top_k)
|
|
|
response = query_engine.query(question)
|
|
|
|
|
|
return {
|
|
|
"status": "success",
|
|
|
"answer": str(response),
|
|
|
"sources": [
|
|
|
{
|
|
|
"text": node.node.text[:200] + "...",
|
|
|
"score": node.score
|
|
|
}
|
|
|
for node in response.source_nodes
|
|
|
]
|
|
|
}
|
|
|
|
|
|
async def get_context_for_mcp_generation(
|
|
|
self,
|
|
|
task_description: str
|
|
|
) -> Optional[str]:
|
|
|
"""
|
|
|
Get relevant context from knowledge base for MCP generation.
|
|
|
|
|
|
This makes MCPs context-aware - they can use company-specific info.
|
|
|
"""
|
|
|
if not LLAMAINDEX_AVAILABLE or self.index is None:
|
|
|
return None
|
|
|
|
|
|
result = await self.query(
|
|
|
f"Find information relevant to: {task_description}",
|
|
|
top_k=3
|
|
|
)
|
|
|
|
|
|
if result["status"] == "success":
|
|
|
context_parts = [result["answer"]]
|
|
|
context_parts.extend([s["text"] for s in result["sources"]])
|
|
|
return "\n\n".join(context_parts)
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
knowledge = KnowledgeEngine()
|
|
|
|