code-compass / scripts /vectorstore.py
shahzeb171's picture
Git to HF
60344c1
import os
import time
from typing import List, Dict, Any, Optional, Tuple
import hashlib
from datetime import datetime
import json
# Vector database and embedding imports
from pinecone import Pinecone
# from sentence_transformers import SentenceTransformer
import numpy as np
import logging
# Local imports
from .chunker import CodeChunk
from config import PINECONE_API_KEY, PINECONE_INDEX_NAME, PINECONE_EMBEDDING_MODEL
logger = logging.getLogger("code_compass")
class PineconeVectorStore:
"""
Pinecone vector database integration with built-in embedding generation
"""
def __init__(self,
namespace
# api_key: Optional[str] = None,
# index_name: str = "code-compass",
# embedding_model: str = "multilingual-e5-large"
):
"""
Initialize Pinecone vector store with inference API for embeddings
Args:
api_key: Pinecone API key (or set PINECONE_API_KEY env var)
index_name: Name of the Pinecone index
embedding_model: Pinecone's embedding model to use
"""
# Setup API key
self.api_key = PINECONE_API_KEY #api_key or os.getenv('PINECONE_API_KEY')
self.namespace = namespace
if not self.api_key:
raise ValueError("Pinecone API key is required. Set PINECONE_API_KEY env var or pass api_key parameter")
self.index_name = PINECONE_INDEX_NAME #index_name
# self.embedding_model = embedding_model
# Initialize Pinecone client
self.pc = Pinecone(api_key=self.api_key)
# Initialize index
self._initialize_index()
def _initialize_index(self):
"""Initialize Pinecone index with inference API"""
try:
logger.info("πŸ”„ Initializing Pinecone connection...")
# Check if index exists
existing_indexes = [index.name for index in self.pc.list_indexes()]
if self.index_name not in existing_indexes:
logger.info(f"πŸ”„ Creating new Pinecone index: {self.index_name}")
# Create index with inference API enabled
if not self.pc.has_index(self.index_name):
self.pc.create_index_for_model(
name=self.index_name,
cloud="aws",
region="us-east-1",
embed={
"model": PINECONE_EMBEDDING_MODEL,
"field_map":{"text": "chunk_text", "metadata": "metadata", "id": "_id"}
}
)
# Wait for index to be ready
logger.info("⏳ Waiting for index to be ready...")
while not self.pc.describe_index(self.index_name).status['ready']:
time.sleep(1)
# Connect to index
self.index = self.pc.Index(self.index_name)
logger.info(f"βœ… Connected to Pinecone index: {self.index_name}")
# Get index stats
stats = self.index.describe_index_stats()
logger.info(f"πŸ“Š Index stats: {stats.get('total_vector_count', 0)} vectors stored")
if self.namespace in stats.get('namespaces', {}):
logger.info(f"Namespace '{self.namespace}' exists. Proceeding with deletion...")
# 4. Delete all vectors in the namespace
self.index.delete_namespace(namespace=self.namespace)
logger.info(f"Successfully deleted all vectors in namespace '{self.namespace}'.")
else:
logger.info(f"Namespace '{self.namespace}' does not exist. No action needed.")
except Exception as e:
logger.info(f"❌ Error initializing Pinecone: {str(e)}")
raise
def upsert_chunks(self, chunks: List[CodeChunk], batch_size: int = 96) -> Dict[str, Any]:
"""
Upsert code chunks to Pinecone using inference API for embeddings
Args:
chunks: List of code chunks (embeddings will be generated by Pinecone)
batch_size: Batch size for upsert operations
Returns:
Dictionary with upsert results
"""
logger.info(f"πŸ”„ Upserting {len(chunks)} chunks to Pinecone with automatic embedding generation...")
if not chunks:
return {"status": "error", "message": "No chunks provided"}
# Prepare data for Pinecone inference API
data_to_upsert = []
for chunk in chunks:
# Prepare metadata (Pinecone has limitations on metadata size)
metadata = self._prepare_metadata_for_pinecone(chunk.metadata)
# For Pinecone inference API, we send the text content directly
data_to_upsert.append({
"_id": chunk.id,
"chunk_text": chunk.content, # Pinecone will generate embeddings from this
"metadata": metadata
})
if not data_to_upsert:
return {"status": "error", "message": "No valid data to upsert"}
# Upsert in batches using Pinecone's inference API
successful_upserts = 0
failed_upserts = 0
for i in range(0, len(data_to_upsert), batch_size):
batch = data_to_upsert[i:i + batch_size]
try:
logger.info(f"πŸ“Š Upserting batch {i//batch_size + 1}/{(len(data_to_upsert)-1)//batch_size + 1} ({len(batch)} items)")
# Debug: Print first item structure on first batch
if i == 0 and len(batch) > 0:
logger.debug(f"πŸ” Sample item structure:")
sample_item = batch[0]
logger.debug(f" ID: {sample_item['_id']}")
logger.debug(f" Text length: {len(sample_item['chunk_text'])}")
logger.debug(f" Metadata keys: {sample_item['metadata']}")
# Use Pinecone's inference API
upsert_response = self.index.upsert_records(
self.namespace,batch
)
time.sleep(1) # Slight delay to ensure consistency
successful_upserts += len(batch)
logger.info(f"βœ… Batch {i//batch_size + 1} upserted successfully")
# if hasattr(upsert_response, 'upserted_count') and upsert_response.upserted_count > 0:
# successful_upserts += upsert_response.upserted_count
# else:
# # If no upserted_count, assume success based on batch size
# successful_upserts += len(batch)
except Exception as e:
logger.info(f"❌ Error upserting batch {i//batch_size + 1}: {str(e)}")
# Try alternative method if dataframe method fails
try:
logger.info("πŸ”„ Trying alternative upsert method...")
# Convert to format expected by regular upsert
vectors_batch = []
for item in batch:
vectors_batch.append({
"_id": item["_id"],
"chunk_text": item["chunk_text"], # Let Pinecone handle embedding
"metadata": item["metadata"]
})
# Use regular upsert with text (if supported)
upsert_response = self.index.upsert_records(self.namespace, vectors_batch)
# logger.debug("Upsert response: " + str(upsert_response))
# if upsert_response.get('upserted_count', 0) > 0:
# successful_upserts += upsert_response['upserted_count']
# else:
# failed_upserts += len(batch)
time.sleep(10)
successful_upserts += len(vectors_batch)
logger.info(f"βœ… Alternative upsert method succeeded for batch {i//batch_size + 1}")
except Exception as e2:
logger.info(f"❌ Alternative upsert method also failed: {str(e2)}")
failed_upserts += len(batch)
continue
# Final results
result = {
"status": "success" if successful_upserts > 0 else "error",
"successful_upserts": successful_upserts,
"failed_upserts": failed_upserts,
"total_chunks": len(chunks),
"timestamp": datetime.now().isoformat()
}
logger.info(f"βœ… Upsert complete! {successful_upserts} successful, {failed_upserts} failed")
return result
def safe_json_store(self, final_metadata):
try:
return json.dumps(final_metadata, ensure_ascii=False)
except (TypeError, ValueError):
# fallback: force conversion to string and JSON-escape it
return json.dumps(str(final_metadata), ensure_ascii=False)
def _prepare_metadata_for_pinecone(self, metadata: Dict[str, Any]) -> Dict[str, Any]:
"""
Prepare metadata for Pinecone storage (handles size and type limitations)
"""
# Pinecone metadata limitations:
# - Max 40KB per vector metadata
# - Only supports string, number, boolean, and list of strings
# - NO nested objects or complex data types
cleaned_metadata = {}
for key, value in metadata.items():
if value is None:
continue
# Convert different types to Pinecone-compatible formats
if isinstance(value, (str, int, float, bool)):
# Limit string length to avoid size issues
if isinstance(value, str) and len(value) > 500:
cleaned_metadata[key] = value[:500] + "..."
else:
cleaned_metadata[key] = value
elif isinstance(value, list):
# Convert list to list of strings (Pinecone requirement)
if all(isinstance(item, str) for item in value):
# Limit list size and string length
limited_list = [str(item)[:100] for item in value[:5]] # Max 5 items
cleaned_metadata[key] = limited_list
else:
# Convert non-string items to strings
string_list = [str(item)[:100] for item in value[:5]]
cleaned_metadata[key] = string_list
elif isinstance(value, dict):
# Pinecone doesn't support nested objects - flatten or convert to string
# Option 1: Flatten the dict
for sub_key, sub_value in value.items():
flattened_key = f"{key}_{sub_key}"
if isinstance(sub_value, (str, int, float, bool)):
if isinstance(sub_value, str) and len(sub_value) > 200:
cleaned_metadata[flattened_key] = str(sub_value)[:200] + "..."
else:
cleaned_metadata[flattened_key] = sub_value
else:
cleaned_metadata[flattened_key] = str(sub_value)[:200]
else:
# Convert other types to string
cleaned_metadata[key] = str(value)[:200]
# Double-check that we don't have any complex types
final_metadata = {}
for key, value in cleaned_metadata.items():
if isinstance(value, (str, int, float, bool)):
final_metadata[key] = value
elif isinstance(value, list) and all(isinstance(item, str) for item in value):
final_metadata[key] = value
else:
# Last resort - convert to string
final_metadata[key] = str(value)[:200]
return self.safe_json_store(final_metadata)#.replace("'", '"') # Store as JSON string
def query_similar_chunks(self,
query_text: str,
top_k: int = 10,
filter_dict: Optional[Dict[str, Any]] = None,
include_metadata: bool = True) -> List[Dict[str, Any]]:
"""
Query for similar chunks using Pinecone's inference API
Args:
query_text: Text to search for (Pinecone will generate embeddings)
top_k: Number of similar chunks to return
filter_dict: Optional metadata filters
include_metadata: Whether to include metadata in results
Returns:
List of similar chunks with scores
"""
try:
logger.info(f"πŸ” Searching for similar chunks to: '{query_text[:50]}...'")
# Use Pinecone's inference API for query
search_results = self.index.search(
namespace=self.namespace,
query={"inputs": {"text": query_text}, "top_k": top_k},
)
results = []
if 'result' not in search_results or 'hits' not in search_results['result']:
logger.info("⚠️ No results found in search response")
return []
for match in search_results['result']['hits']:
result = {
'id': match['_id'],
'chunk_text': match['fields']['chunk_text'],
'score': float(match['_score']),
'metadata': match['fields']['metadata'] if include_metadata else None
}
results.append(result)
logger.info(f"βœ… Found {len(results)} similar chunks")
logger.debug(f"Results: {results}")
return results
except Exception as e:
logger.info(f"❌ Error querying similar chunks: {str(e)}")
# Fallback to regular query if inference API fails
try:
logger.info("πŸ”„ Trying fallback query method...")
# This would require manual embedding generation as fallback
# For now, return empty results
return []
except Exception as e2:
logger.info(f"❌ Fallback query also failed: {str(e2)}")
return []
def query_by_metadata(self,
filter_dict: Dict[str, Any],
top_k: int = 100) -> List[Dict[str, Any]]:
"""
Query chunks by metadata filters only
Args:
filter_dict: Metadata filters
top_k: Maximum number of results
Returns:
List of matching chunks
"""
try:
logger.info(f"πŸ” Querying by metadata: {filter_dict}")
# Use a dummy vector for metadata-only search
dummy_vector = [0.0] *1024 #* self.dimension
search_results = self.index.search(
namespace=self.namespace,
query={"inputs": {"text": filter_dict['repo_name']}, "top_k": top_k},
)
# self.index.query(
# vector=dummy_vector,
# namespace=self.namespace,
# top_k=top_k,
# filter=filter_dict,
# include_metadata=True
# )
results = []
if 'result' not in search_results or 'hits' not in search_results['result']:
logger.info("⚠️ No results found in search response")
return []
for match in search_results['result']['hits']:
result = {
'id': match['_id'],
'chunk_text': match['fields']['chunk_text'],
'score': float(match['_score']),
'metadata': json.loads(match['fields']['metadata']) #if include_metadata else None
}
results.append(result)
logger.info(f"βœ… Found {len(results)} chunks matching metadata filters")
return results
except Exception as e:
logger.info(f"❌ Error querying by metadata: {str(e)}")
return []
def get_chunk_by_id(self, chunk_id: str) -> Optional[Dict[str, Any]]:
"""
Retrieve a specific chunk by its ID
Args:
chunk_id: Unique chunk identifier
Returns:
Chunk data or None if not found
"""
try:
result = self.index.fetch(ids=[chunk_id])
if chunk_id in result.vectors:
vector_data = result.vectors[chunk_id]
return {
'id': chunk_id,
'values': vector_data.values,
'metadata': vector_data.metadata
}
else:
logger.info(f"⚠️ Chunk {chunk_id} not found")
return None
except Exception as e:
logger.info(f"❌ Error fetching chunk {chunk_id}: {str(e)}")
return None
def delete_chunks_by_repo(self, repo_name: str) -> Dict[str, Any]:
"""
Delete all chunks belonging to a specific repository
Args:
repo_name: Name of the repository to delete
Returns:
Deletion results
"""
try:
logger.info(f"πŸ—‘οΈ Deleting all chunks for repository: {repo_name}")
# Query for all chunks from this repo
chunks_to_delete = self.query_by_metadata(
filter_dict={"repo_name": repo_name},
top_k=10000 # High number to get all chunks
)
if not chunks_to_delete:
return {"status": "success", "message": "No chunks found for this repository"}
# Extract IDs
chunk_ids = [chunk['id'] for chunk in chunks_to_delete]
# Delete in batches
batch_size = 96
deleted_count = 0
for i in range(0, len(chunk_ids), batch_size):
batch_ids = chunk_ids[i:i + batch_size]
try:
delete_response = self.index.delete(ids=batch_ids)
deleted_count += len(batch_ids)
logger.info(f"πŸ—‘οΈ Deleted batch {i//batch_size + 1} ({len(batch_ids)} chunks)")
except Exception as e:
logger.info(f"❌ Error deleting batch: {str(e)}")
result = {
"status": "success",
"deleted_count": deleted_count,
"repo_name": repo_name,
"timestamp": datetime.now().isoformat()
}
logger.info(f"βœ… Deleted {deleted_count} chunks for repository {repo_name}")
return result
except Exception as e:
logger.info(f"❌ Error deleting chunks for repo {repo_name}: {str(e)}")
return {"status": "error", "message": str(e)}
def get_index_stats(self) -> Dict[str, Any]:
"""Get statistics about the Pinecone index"""
try:
stats = self.index.describe_index_stats()
return {
"total_vectors": stats.get('total_vector_count', 0),
"index_fullness": stats.get('index_fullness', 0),
"dimension": stats.get('dimension', self.dimension),
"namespaces": stats.get('namespaces', {}),
"timestamp": datetime.now().isoformat()
}
except Exception as e:
logger.info(f"❌ Error getting index stats: {str(e)}")
return {"error": str(e)}
def hybrid_search(self,
query_text: str,
chunk_types: Optional[List[str]] = None,
repo_names: Optional[List[str]] = None,
file_paths: Optional[List[str]] = None,
top_k: int = 20) -> List[Dict[str, Any]]:
"""
Perform hybrid search using Pinecone's inference API with metadata filters
Args:
query_text: Text query for semantic search
chunk_types: Filter by chunk types (file, class, function, block)
repo_names: Filter by repository names
file_paths: Filter by specific file paths
top_k: Maximum number of results
Returns:
List of relevant chunks ranked by similarity and filtered by metadata
"""
try:
logger.info(f"πŸ” Performing hybrid search for: '{query_text[:50]}...'")
# Build metadata filter
filter_conditions = {}
if chunk_types:
filter_conditions["chunk_type"] = {"$in": chunk_types}
if repo_names:
filter_conditions["repo_name"] = {"$in": repo_names}
if file_paths:
filter_conditions["file_path"] = {"$in": file_paths}
# Perform semantic search with filters using inference API
results = self.query_similar_chunks(
query_text=query_text,
top_k=top_k,
filter_dict=filter_conditions if filter_conditions else None,
include_metadata=True
)
# Post-process results to add relevance context
for result in results:
result['search_type'] = 'hybrid'
result['query'] = query_text[:100]
logger.debug(f"Result metadata: {result.get('metadata', {})}")
result['metadata'] = json.loads(result.get('metadata', '{}'))
# Add relevance explanation based on chunk type
# logger.debug(f"Result metadata: {json.loads(result.get('metadata', {}))}")
chunk_type = result["metadata"].get("chunk_type", "unknown")
if chunk_type == "file":
result['relevance_context'] = 'File-level overview'
elif chunk_type == 'class':
result['relevance_context'] = 'Class definition and structure'
elif chunk_type == 'function':
result['relevance_context'] = 'Function implementation'
elif chunk_type == 'block':
result['relevance_context'] = 'Code block logic'
logger.info(f"βœ… Hybrid search completed: {len(results)} relevant chunks found")
return results
except Exception as e:
logger.info(f"❌ Error in hybrid search: {str(e)}")
return []
def get_repository_overview(self, repo_name: str) -> Dict[str, Any]:
"""
Get comprehensive overview of a repository's structure and content
Args:
repo_name: Name of the repository
Returns:
Repository overview with statistics and structure
"""
try:
logger.info(f"πŸ“Š Getting overview for repository: {repo_name}")
# Get all chunks for this repository
all_chunks = self.query_by_metadata(
filter_dict={"repo_name": repo_name},
top_k=10000
)
if not all_chunks:
return {"error": f"No chunks found for repository {repo_name}"}
# Analyze chunks by type
chunk_stats = {}
files = set()
classes = set()
functions = set()
languages = set()
for chunk in all_chunks:
metadata = chunk.get('metadata', {})
chunk_type = metadata.get('chunk_type', 'unknown')
chunk_stats[chunk_type] = chunk_stats.get(chunk_type, 0) + 1
if 'file_path' in metadata:
files.add(metadata['file_path'])
if 'language' in metadata:
languages.add(metadata['language'])
if 'class_name' in metadata and metadata['class_name']:
classes.add(metadata['class_name'])
if 'function_name' in metadata and metadata['function_name']:
functions.add(metadata['function_name'])
overview = {
"repo_name": repo_name,
"total_chunks": len(all_chunks),
"chunk_distribution": chunk_stats,
"files_count": len(files),
"classes_count": len(classes),
"functions_count": len(functions),
"languages": list(languages),
"sample_files": list(files)[:10], # Show first 10 files
"sample_classes": list(classes)[:10], # Show first 10 classes
"timestamp": datetime.now().isoformat()
}
logger.info(f"βœ… Repository overview generated for {repo_name}")
return overview
except Exception as e:
logger.info(f"❌ Error getting repository overview: {str(e)}")
return {"error": str(e)}
def cleanup_old_chunks(self, days_old: int = 30) -> Dict[str, Any]:
"""
Clean up old chunks based on timestamp
Args:
days_old: Delete chunks older than this many days
Returns:
Cleanup results
"""
# This would require storing timestamps in metadata and querying by date
# Implementation depends on your specific cleanup needs
logger.info(f"🧹 Cleanup functionality not implemented yet")
return {"status": "not_implemented"}