Spaces:
Sleeping
Sleeping
| """ | |
| Hybrid Retrieval System | |
| Combines semantic search (HNSW) with keyword search (BM25) for optimal retrieval | |
| """ | |
| import json | |
| import numpy as np | |
| import hnswlib | |
| from typing import List, Dict, Tuple | |
| from sentence_transformers import SentenceTransformer | |
| from rank_bm25 import BM25Okapi | |
| import pickle | |
| from dataclasses import dataclass | |
| class RetrievalResult: | |
| """Represents a retrieval result with metadata""" | |
| chunk_id: str | |
| text: str | |
| source_title: str | |
| source_url: str | |
| semantic_score: float | |
| keyword_score: float | |
| combined_score: float | |
| community_id: int | |
| rank: int | |
| class HybridRetriever: | |
| """Hybrid retrieval combining semantic and keyword search""" | |
| def __init__( | |
| self, | |
| chunks_file: str, | |
| graphrag_index_file: str, | |
| embedding_model: str = "sentence-transformers/all-MiniLM-L6-v2", | |
| embedding_dim: int = 384 | |
| ): | |
| self.chunks_file = chunks_file | |
| self.graphrag_index_file = graphrag_index_file | |
| self.embedding_dim = embedding_dim | |
| # Load components | |
| print("[INFO] Loading hybrid retriever components...") | |
| self.embedding_model = SentenceTransformer(embedding_model) | |
| self.chunks = self._load_chunks() | |
| self.graphrag_index = self._load_graphrag_index() | |
| # Build indexes | |
| self.hnsw_index = None | |
| self.bm25 = None | |
| self.chunk_embeddings = None | |
| print("[SUCCESS] Hybrid retriever initialized") | |
| def _load_chunks(self) -> List[Dict]: | |
| """Load chunks from file""" | |
| with open(self.chunks_file, 'r', encoding='utf-8') as f: | |
| chunks = json.load(f) | |
| print(f"[INFO] Loaded {len(chunks)} chunks") | |
| return chunks | |
| def _load_graphrag_index(self) -> Dict: | |
| """Load GraphRAG index""" | |
| with open(self.graphrag_index_file, 'r', encoding='utf-8') as f: | |
| index = json.load(f) | |
| print(f"[INFO] Loaded GraphRAG index with {index['metadata']['total_communities']} communities") | |
| return index | |
| def build_semantic_index(self): | |
| """Build HNSW semantic search index""" | |
| print("[INFO] Building semantic index with HNSW...") | |
| # Generate embeddings for all chunks | |
| chunk_texts = [chunk['text'] for chunk in self.chunks] | |
| print(f"[INFO] Generating embeddings for {len(chunk_texts)} chunks...") | |
| self.chunk_embeddings = self.embedding_model.encode( | |
| chunk_texts, | |
| show_progress_bar=True, | |
| convert_to_numpy=True, | |
| normalize_embeddings=True # L2 normalization for cosine similarity | |
| ) | |
| # Build HNSW index with optimized parameters | |
| import time | |
| n_chunks = len(self.chunks) | |
| print(f"[INFO] Building HNSW index for {n_chunks} chunks...") | |
| start_build = time.time() | |
| # Initialize HNSW index | |
| # ef_construction: controls index build time/accuracy tradeoff (higher = more accurate but slower) | |
| # M: number of bi-directional links per element (higher = better recall but more memory) | |
| self.hnsw_index = hnswlib.Index(space='cosine', dim=self.embedding_dim) | |
| # For 86K vectors, optimal parameters for speed + accuracy: | |
| # M=64 gives excellent recall with reasonable memory | |
| # ef_construction=200 balances build time and quality | |
| self.hnsw_index.init_index( | |
| max_elements=n_chunks, | |
| ef_construction=200, # Higher = better quality, slower build | |
| M=64, # Higher = better recall, more memory | |
| random_seed=42 | |
| ) | |
| # Set number of threads for parallel insertion | |
| self.hnsw_index.set_num_threads(8) | |
| # Add all vectors to index | |
| print(f"[INFO] Adding {n_chunks} vectors to index (using 8 threads)...") | |
| self.hnsw_index.add_items(self.chunk_embeddings, np.arange(n_chunks)) | |
| build_time = time.time() - start_build | |
| print(f"[SUCCESS] HNSW index built in {build_time:.1f} seconds ({build_time/60:.2f} minutes)") | |
| print(f"[SUCCESS] Index contains {self.hnsw_index.get_current_count()} vectors") | |
| def build_keyword_index(self): | |
| """Build BM25 keyword search index""" | |
| print("[INFO] Building BM25 keyword index...") | |
| # Tokenize chunks for BM25 | |
| tokenized_chunks = [chunk['text'].lower().split() for chunk in self.chunks] | |
| # Build BM25 index | |
| self.bm25 = BM25Okapi(tokenized_chunks) | |
| print(f"[SUCCESS] BM25 index built for {len(tokenized_chunks)} chunks") | |
| def semantic_search(self, query: str, top_k: int = 10) -> List[Tuple[int, float]]: | |
| """Semantic search using HNSW""" | |
| # Encode query | |
| query_embedding = self.embedding_model.encode( | |
| [query], | |
| convert_to_numpy=True, | |
| normalize_embeddings=True | |
| ) | |
| # Set ef (exploration factor) for search - higher = more accurate but slower | |
| # For maximum accuracy, set ef = top_k * 2 | |
| self.hnsw_index.set_ef(max(top_k * 2, 100)) | |
| # Search in HNSW index | |
| indices, distances = self.hnsw_index.knn_query(query_embedding, k=top_k) | |
| # Convert cosine distances to similarity scores (1 - distance) | |
| # HNSW returns distances, we want similarities | |
| scores = 1 - distances[0] | |
| # Return (index, score) tuples | |
| results = [(int(idx), float(score)) for idx, score in zip(indices[0], scores)] | |
| return results | |
| def keyword_search(self, query: str, top_k: int = 10) -> List[Tuple[int, float]]: | |
| """Keyword search using BM25""" | |
| # Tokenize query | |
| query_tokens = query.lower().split() | |
| # Get BM25 scores | |
| scores = self.bm25.get_scores(query_tokens) | |
| # Get top-k indices | |
| top_indices = np.argsort(scores)[::-1][:top_k] | |
| # Return (index, score) tuples | |
| results = [(int(idx), float(scores[idx])) for idx in top_indices] | |
| return results | |
| def hybrid_search( | |
| self, | |
| query: str, | |
| top_k: int = 10, | |
| semantic_weight: float = 0.7, | |
| keyword_weight: float = 0.3, | |
| rerank: bool = True | |
| ) -> List[RetrievalResult]: | |
| """ | |
| Hybrid search combining semantic and keyword search | |
| Args: | |
| query: Search query | |
| top_k: Number of results to return | |
| semantic_weight: Weight for semantic scores (0-1) | |
| keyword_weight: Weight for keyword scores (0-1) | |
| rerank: Whether to rerank by community relevance | |
| """ | |
| # Get results from both search methods | |
| semantic_results = self.semantic_search(query, top_k * 2) # Get more for fusion | |
| keyword_results = self.keyword_search(query, top_k * 2) | |
| # Normalize scores to [0, 1] range | |
| def normalize_scores(results): | |
| if not results: | |
| return [] | |
| scores = [score for _, score in results] | |
| min_score, max_score = min(scores), max(scores) | |
| if max_score == min_score: | |
| return [(idx, 1.0) for idx, _ in results] | |
| return [(idx, (score - min_score) / (max_score - min_score)) | |
| for idx, score in results] | |
| semantic_results = normalize_scores(semantic_results) | |
| keyword_results = normalize_scores(keyword_results) | |
| # Combine scores using reciprocal rank fusion | |
| combined_scores = {} | |
| for idx, score in semantic_results: | |
| combined_scores[idx] = { | |
| 'semantic': score * semantic_weight, | |
| 'keyword': 0.0, | |
| 'combined': score * semantic_weight | |
| } | |
| for idx, score in keyword_results: | |
| if idx in combined_scores: | |
| combined_scores[idx]['keyword'] = score * keyword_weight | |
| combined_scores[idx]['combined'] += score * keyword_weight | |
| else: | |
| combined_scores[idx] = { | |
| 'semantic': 0.0, | |
| 'keyword': score * keyword_weight, | |
| 'combined': score * keyword_weight | |
| } | |
| # Sort by combined score | |
| sorted_indices = sorted( | |
| combined_scores.items(), | |
| key=lambda x: x[1]['combined'], | |
| reverse=True | |
| )[:top_k] | |
| # Build retrieval results | |
| results = [] | |
| for rank, (idx, scores) in enumerate(sorted_indices): | |
| chunk = self.chunks[idx] | |
| community_id = self.graphrag_index['node_to_community'].get(chunk['chunk_id'], -1) | |
| result = RetrievalResult( | |
| chunk_id=chunk['chunk_id'], | |
| text=chunk['text'], | |
| source_title=chunk['source_title'], | |
| source_url=chunk['source_url'], | |
| semantic_score=scores['semantic'], | |
| keyword_score=scores['keyword'], | |
| combined_score=scores['combined'], | |
| community_id=community_id, | |
| rank=rank + 1 | |
| ) | |
| results.append(result) | |
| return results | |
| def get_community_context(self, community_id: int) -> Dict: | |
| """Get context from a community""" | |
| if str(community_id) in self.graphrag_index['communities']: | |
| return self.graphrag_index['communities'][str(community_id)] | |
| return {} | |
| def save_indexes(self, output_dir: str = "dataset/wikipedia_ireland"): | |
| """Save indexes for fast loading""" | |
| print("[INFO] Saving indexes...") | |
| # Save HNSW index | |
| self.hnsw_index.save_index(f"{output_dir}/hybrid_hnsw_index.bin") | |
| # Save BM25 and embeddings | |
| with open(f"{output_dir}/hybrid_indexes.pkl", 'wb') as f: | |
| pickle.dump({ | |
| 'bm25': self.bm25, | |
| 'embeddings': self.chunk_embeddings | |
| }, f) | |
| print(f"[SUCCESS] Indexes saved to {output_dir}") | |
| def load_indexes(self, output_dir: str = "dataset/wikipedia_ireland"): | |
| """Load pre-built indexes""" | |
| print("[INFO] Loading pre-built indexes...") | |
| # Load HNSW index | |
| self.hnsw_index = hnswlib.Index(space='cosine', dim=self.embedding_dim) | |
| self.hnsw_index.load_index(f"{output_dir}/hybrid_hnsw_index.bin") | |
| self.hnsw_index.set_num_threads(8) # Enable multi-threading for search | |
| # Load BM25 and embeddings | |
| with open(f"{output_dir}/hybrid_indexes.pkl", 'rb') as f: | |
| data = pickle.load(f) | |
| self.bm25 = data['bm25'] | |
| self.chunk_embeddings = data['embeddings'] | |
| print("[SUCCESS] Indexes loaded successfully") | |
| if __name__ == "__main__": | |
| # Build and save indexes | |
| retriever = HybridRetriever( | |
| chunks_file="dataset/wikipedia_ireland/chunks.json", | |
| graphrag_index_file="dataset/wikipedia_ireland/graphrag_index.json" | |
| ) | |
| retriever.build_semantic_index() | |
| retriever.build_keyword_index() | |
| retriever.save_indexes() | |
| # Test hybrid search | |
| query = "What is the capital of Ireland?" | |
| results = retriever.hybrid_search(query, top_k=5) | |
| print("\nHybrid Search Results:") | |
| for result in results: | |
| print(f"\nRank {result.rank}: {result.source_title}") | |
| print(f"Score: {result.combined_score:.3f} (semantic: {result.semantic_score:.3f}, keyword: {result.keyword_score:.3f})") | |
| print(f"Text: {result.text[:200]}...") | |