File size: 5,420 Bytes
b942332 8dc61b1 b942332 8dc61b1 b942332 8dc61b1 b942332 8dc61b1 b942332 8dc61b1 b942332 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
"""
LlamaIndex Knowledge Engine - For $1,000 Prize
Enterprise RAG for connecting to company knowledge bases.
"""
import os
from typing import List, Dict, Any, Optional
from pathlib import Path
try:
from llama_index.core import (
VectorStoreIndex,
SimpleDirectoryReader,
StorageContext,
load_index_from_storage,
Settings
)
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.llms.anthropic import Anthropic
from llama_index.vector_stores.chroma import ChromaVectorStore
import chromadb
LLAMAINDEX_AVAILABLE = True
except ImportError:
LLAMAINDEX_AVAILABLE = False
print("[WARNING] LlamaIndex not installed")
class KnowledgeEngine:
"""
Enterprise knowledge integration using LlamaIndex.
Prize Integration: LlamaIndex Category Award ($1,000)
- RAG for enterprise documents
- Multi-source knowledge integration
- Context-aware MCP generation
"""
def __init__(self, persist_dir: str = "./chroma_db"):
self.persist_dir = Path(persist_dir)
self.persist_dir.mkdir(parents=True, exist_ok=True)
self.index = None
self._initialized = False
if not LLAMAINDEX_AVAILABLE:
return
def _ensure_initialized(self):
"""Lazy initialization to avoid startup errors"""
if self._initialized or not LLAMAINDEX_AVAILABLE:
return
try:
# Configure LlamaIndex settings
Settings.embed_model = OpenAIEmbedding(
api_key=os.getenv("OPENAI_API_KEY"),
model="text-embedding-3-small"
)
# Skip Anthropic LLM initialization to avoid version conflict
# Settings.llm = Anthropic(...)
# Initialize ChromaDB
self.chroma_client = chromadb.PersistentClient(path=str(self.persist_dir))
self.chroma_collection = self.chroma_client.get_or_create_collection("omnimind_knowledge")
# Vector store
self.vector_store = ChromaVectorStore(chroma_collection=self.chroma_collection)
self.storage_context = StorageContext.from_defaults(vector_store=self.vector_store)
# Try to load existing index
try:
self.index = load_index_from_storage(self.storage_context)
print("[OK] Loaded existing knowledge base")
except:
self.index = None
print("[INFO] No existing knowledge base - will create on first document add")
self._initialized = True
except Exception as e:
print(f"[WARNING] LlamaIndex initialization failed: {e}")
self._initialized = False
async def add_documents(self, documents_path: str) -> Dict[str, Any]:
"""Add documents to the knowledge base"""
self._ensure_initialized()
if not LLAMAINDEX_AVAILABLE or not self._initialized:
return {"status": "unavailable", "message": "LlamaIndex not installed"}
reader = SimpleDirectoryReader(documents_path)
documents = reader.load_data()
if self.index is None:
self.index = VectorStoreIndex.from_documents(
documents,
storage_context=self.storage_context
)
else:
for doc in documents:
self.index.insert(doc)
self.index.storage_context.persist()
return {
"status": "success",
"documents_added": len(documents),
"total_documents": len(self.chroma_collection.get()["ids"])
}
async def query(self, question: str, top_k: int = 3) -> Dict[str, Any]:
"""Query the knowledge base"""
self._ensure_initialized()
if not LLAMAINDEX_AVAILABLE or not self._initialized or self.index is None:
return {
"status": "unavailable",
"answer": "Knowledge base not configured",
"sources": []
}
query_engine = self.index.as_query_engine(similarity_top_k=top_k)
response = query_engine.query(question)
return {
"status": "success",
"answer": str(response),
"sources": [
{
"text": node.node.text[:200] + "...",
"score": node.score
}
for node in response.source_nodes
]
}
async def get_context_for_mcp_generation(
self,
task_description: str
) -> Optional[str]:
"""
Get relevant context from knowledge base for MCP generation.
This makes MCPs context-aware - they can use company-specific info.
"""
if not LLAMAINDEX_AVAILABLE or self.index is None:
return None
result = await self.query(
f"Find information relevant to: {task_description}",
top_k=3
)
if result["status"] == "success":
context_parts = [result["answer"]]
context_parts.extend([s["text"] for s in result["sources"]])
return "\n\n".join(context_parts)
return None
# Global knowledge engine
knowledge = KnowledgeEngine()
|