import os import time from typing import List, Dict, Any, Optional, Tuple import hashlib from datetime import datetime import json # Vector database and embedding imports from pinecone import Pinecone # from sentence_transformers import SentenceTransformer import numpy as np import logging # Local imports from .chunker import CodeChunk from config import PINECONE_API_KEY, PINECONE_INDEX_NAME, PINECONE_EMBEDDING_MODEL logger = logging.getLogger("code_compass") class PineconeVectorStore: """ Pinecone vector database integration with built-in embedding generation """ def __init__(self, namespace # api_key: Optional[str] = None, # index_name: str = "code-compass", # embedding_model: str = "multilingual-e5-large" ): """ Initialize Pinecone vector store with inference API for embeddings Args: api_key: Pinecone API key (or set PINECONE_API_KEY env var) index_name: Name of the Pinecone index embedding_model: Pinecone's embedding model to use """ # Setup API key self.api_key = PINECONE_API_KEY #api_key or os.getenv('PINECONE_API_KEY') self.namespace = namespace if not self.api_key: raise ValueError("Pinecone API key is required. Set PINECONE_API_KEY env var or pass api_key parameter") self.index_name = PINECONE_INDEX_NAME #index_name # self.embedding_model = embedding_model # Initialize Pinecone client self.pc = Pinecone(api_key=self.api_key) # Initialize index self._initialize_index() def _initialize_index(self): """Initialize Pinecone index with inference API""" try: logger.info("๐Ÿ”„ Initializing Pinecone connection...") # Check if index exists existing_indexes = [index.name for index in self.pc.list_indexes()] if self.index_name not in existing_indexes: logger.info(f"๐Ÿ”„ Creating new Pinecone index: {self.index_name}") # Create index with inference API enabled if not self.pc.has_index(self.index_name): self.pc.create_index_for_model( name=self.index_name, cloud="aws", region="us-east-1", embed={ "model": PINECONE_EMBEDDING_MODEL, "field_map":{"text": "chunk_text", "metadata": "metadata", "id": "_id"} } ) # Wait for index to be ready logger.info("โณ Waiting for index to be ready...") while not self.pc.describe_index(self.index_name).status['ready']: time.sleep(1) # Connect to index self.index = self.pc.Index(self.index_name) logger.info(f"โœ… Connected to Pinecone index: {self.index_name}") # Get index stats stats = self.index.describe_index_stats() logger.info(f"๐Ÿ“Š Index stats: {stats.get('total_vector_count', 0)} vectors stored") if self.namespace in stats.get('namespaces', {}): logger.info(f"Namespace '{self.namespace}' exists. Proceeding with deletion...") # 4. Delete all vectors in the namespace self.index.delete_namespace(namespace=self.namespace) logger.info(f"Successfully deleted all vectors in namespace '{self.namespace}'.") else: logger.info(f"Namespace '{self.namespace}' does not exist. No action needed.") except Exception as e: logger.info(f"โŒ Error initializing Pinecone: {str(e)}") raise def upsert_chunks(self, chunks: List[CodeChunk], batch_size: int = 96) -> Dict[str, Any]: """ Upsert code chunks to Pinecone using inference API for embeddings Args: chunks: List of code chunks (embeddings will be generated by Pinecone) batch_size: Batch size for upsert operations Returns: Dictionary with upsert results """ logger.info(f"๐Ÿ”„ Upserting {len(chunks)} chunks to Pinecone with automatic embedding generation...") if not chunks: return {"status": "error", "message": "No chunks provided"} # Prepare data for Pinecone inference API data_to_upsert = [] for chunk in chunks: # Prepare metadata (Pinecone has limitations on metadata size) metadata = self._prepare_metadata_for_pinecone(chunk.metadata) # For Pinecone inference API, we send the text content directly data_to_upsert.append({ "_id": chunk.id, "chunk_text": chunk.content, # Pinecone will generate embeddings from this "metadata": metadata }) if not data_to_upsert: return {"status": "error", "message": "No valid data to upsert"} # Upsert in batches using Pinecone's inference API successful_upserts = 0 failed_upserts = 0 for i in range(0, len(data_to_upsert), batch_size): batch = data_to_upsert[i:i + batch_size] try: logger.info(f"๐Ÿ“Š Upserting batch {i//batch_size + 1}/{(len(data_to_upsert)-1)//batch_size + 1} ({len(batch)} items)") # Debug: Print first item structure on first batch if i == 0 and len(batch) > 0: logger.debug(f"๐Ÿ” Sample item structure:") sample_item = batch[0] logger.debug(f" ID: {sample_item['_id']}") logger.debug(f" Text length: {len(sample_item['chunk_text'])}") logger.debug(f" Metadata keys: {sample_item['metadata']}") # Use Pinecone's inference API upsert_response = self.index.upsert_records( self.namespace,batch ) time.sleep(1) # Slight delay to ensure consistency successful_upserts += len(batch) logger.info(f"โœ… Batch {i//batch_size + 1} upserted successfully") # if hasattr(upsert_response, 'upserted_count') and upsert_response.upserted_count > 0: # successful_upserts += upsert_response.upserted_count # else: # # If no upserted_count, assume success based on batch size # successful_upserts += len(batch) except Exception as e: logger.info(f"โŒ Error upserting batch {i//batch_size + 1}: {str(e)}") # Try alternative method if dataframe method fails try: logger.info("๐Ÿ”„ Trying alternative upsert method...") # Convert to format expected by regular upsert vectors_batch = [] for item in batch: vectors_batch.append({ "_id": item["_id"], "chunk_text": item["chunk_text"], # Let Pinecone handle embedding "metadata": item["metadata"] }) # Use regular upsert with text (if supported) upsert_response = self.index.upsert_records(self.namespace, vectors_batch) # logger.debug("Upsert response: " + str(upsert_response)) # if upsert_response.get('upserted_count', 0) > 0: # successful_upserts += upsert_response['upserted_count'] # else: # failed_upserts += len(batch) time.sleep(10) successful_upserts += len(vectors_batch) logger.info(f"โœ… Alternative upsert method succeeded for batch {i//batch_size + 1}") except Exception as e2: logger.info(f"โŒ Alternative upsert method also failed: {str(e2)}") failed_upserts += len(batch) continue # Final results result = { "status": "success" if successful_upserts > 0 else "error", "successful_upserts": successful_upserts, "failed_upserts": failed_upserts, "total_chunks": len(chunks), "timestamp": datetime.now().isoformat() } logger.info(f"โœ… Upsert complete! {successful_upserts} successful, {failed_upserts} failed") return result def safe_json_store(self, final_metadata): try: return json.dumps(final_metadata, ensure_ascii=False) except (TypeError, ValueError): # fallback: force conversion to string and JSON-escape it return json.dumps(str(final_metadata), ensure_ascii=False) def _prepare_metadata_for_pinecone(self, metadata: Dict[str, Any]) -> Dict[str, Any]: """ Prepare metadata for Pinecone storage (handles size and type limitations) """ # Pinecone metadata limitations: # - Max 40KB per vector metadata # - Only supports string, number, boolean, and list of strings # - NO nested objects or complex data types cleaned_metadata = {} for key, value in metadata.items(): if value is None: continue # Convert different types to Pinecone-compatible formats if isinstance(value, (str, int, float, bool)): # Limit string length to avoid size issues if isinstance(value, str) and len(value) > 500: cleaned_metadata[key] = value[:500] + "..." else: cleaned_metadata[key] = value elif isinstance(value, list): # Convert list to list of strings (Pinecone requirement) if all(isinstance(item, str) for item in value): # Limit list size and string length limited_list = [str(item)[:100] for item in value[:5]] # Max 5 items cleaned_metadata[key] = limited_list else: # Convert non-string items to strings string_list = [str(item)[:100] for item in value[:5]] cleaned_metadata[key] = string_list elif isinstance(value, dict): # Pinecone doesn't support nested objects - flatten or convert to string # Option 1: Flatten the dict for sub_key, sub_value in value.items(): flattened_key = f"{key}_{sub_key}" if isinstance(sub_value, (str, int, float, bool)): if isinstance(sub_value, str) and len(sub_value) > 200: cleaned_metadata[flattened_key] = str(sub_value)[:200] + "..." else: cleaned_metadata[flattened_key] = sub_value else: cleaned_metadata[flattened_key] = str(sub_value)[:200] else: # Convert other types to string cleaned_metadata[key] = str(value)[:200] # Double-check that we don't have any complex types final_metadata = {} for key, value in cleaned_metadata.items(): if isinstance(value, (str, int, float, bool)): final_metadata[key] = value elif isinstance(value, list) and all(isinstance(item, str) for item in value): final_metadata[key] = value else: # Last resort - convert to string final_metadata[key] = str(value)[:200] return self.safe_json_store(final_metadata)#.replace("'", '"') # Store as JSON string def query_similar_chunks(self, query_text: str, top_k: int = 10, filter_dict: Optional[Dict[str, Any]] = None, include_metadata: bool = True) -> List[Dict[str, Any]]: """ Query for similar chunks using Pinecone's inference API Args: query_text: Text to search for (Pinecone will generate embeddings) top_k: Number of similar chunks to return filter_dict: Optional metadata filters include_metadata: Whether to include metadata in results Returns: List of similar chunks with scores """ try: logger.info(f"๐Ÿ” Searching for similar chunks to: '{query_text[:50]}...'") # Use Pinecone's inference API for query search_results = self.index.search( namespace=self.namespace, query={"inputs": {"text": query_text}, "top_k": top_k}, ) results = [] if 'result' not in search_results or 'hits' not in search_results['result']: logger.info("โš ๏ธ No results found in search response") return [] for match in search_results['result']['hits']: result = { 'id': match['_id'], 'chunk_text': match['fields']['chunk_text'], 'score': float(match['_score']), 'metadata': match['fields']['metadata'] if include_metadata else None } results.append(result) logger.info(f"โœ… Found {len(results)} similar chunks") logger.debug(f"Results: {results}") return results except Exception as e: logger.info(f"โŒ Error querying similar chunks: {str(e)}") # Fallback to regular query if inference API fails try: logger.info("๐Ÿ”„ Trying fallback query method...") # This would require manual embedding generation as fallback # For now, return empty results return [] except Exception as e2: logger.info(f"โŒ Fallback query also failed: {str(e2)}") return [] def query_by_metadata(self, filter_dict: Dict[str, Any], top_k: int = 100) -> List[Dict[str, Any]]: """ Query chunks by metadata filters only Args: filter_dict: Metadata filters top_k: Maximum number of results Returns: List of matching chunks """ try: logger.info(f"๐Ÿ” Querying by metadata: {filter_dict}") # Use a dummy vector for metadata-only search dummy_vector = [0.0] *1024 #* self.dimension search_results = self.index.search( namespace=self.namespace, query={"inputs": {"text": filter_dict['repo_name']}, "top_k": top_k}, ) # self.index.query( # vector=dummy_vector, # namespace=self.namespace, # top_k=top_k, # filter=filter_dict, # include_metadata=True # ) results = [] if 'result' not in search_results or 'hits' not in search_results['result']: logger.info("โš ๏ธ No results found in search response") return [] for match in search_results['result']['hits']: result = { 'id': match['_id'], 'chunk_text': match['fields']['chunk_text'], 'score': float(match['_score']), 'metadata': json.loads(match['fields']['metadata']) #if include_metadata else None } results.append(result) logger.info(f"โœ… Found {len(results)} chunks matching metadata filters") return results except Exception as e: logger.info(f"โŒ Error querying by metadata: {str(e)}") return [] def get_chunk_by_id(self, chunk_id: str) -> Optional[Dict[str, Any]]: """ Retrieve a specific chunk by its ID Args: chunk_id: Unique chunk identifier Returns: Chunk data or None if not found """ try: result = self.index.fetch(ids=[chunk_id]) if chunk_id in result.vectors: vector_data = result.vectors[chunk_id] return { 'id': chunk_id, 'values': vector_data.values, 'metadata': vector_data.metadata } else: logger.info(f"โš ๏ธ Chunk {chunk_id} not found") return None except Exception as e: logger.info(f"โŒ Error fetching chunk {chunk_id}: {str(e)}") return None def delete_chunks_by_repo(self, repo_name: str) -> Dict[str, Any]: """ Delete all chunks belonging to a specific repository Args: repo_name: Name of the repository to delete Returns: Deletion results """ try: logger.info(f"๐Ÿ—‘๏ธ Deleting all chunks for repository: {repo_name}") # Query for all chunks from this repo chunks_to_delete = self.query_by_metadata( filter_dict={"repo_name": repo_name}, top_k=10000 # High number to get all chunks ) if not chunks_to_delete: return {"status": "success", "message": "No chunks found for this repository"} # Extract IDs chunk_ids = [chunk['id'] for chunk in chunks_to_delete] # Delete in batches batch_size = 96 deleted_count = 0 for i in range(0, len(chunk_ids), batch_size): batch_ids = chunk_ids[i:i + batch_size] try: delete_response = self.index.delete(ids=batch_ids) deleted_count += len(batch_ids) logger.info(f"๐Ÿ—‘๏ธ Deleted batch {i//batch_size + 1} ({len(batch_ids)} chunks)") except Exception as e: logger.info(f"โŒ Error deleting batch: {str(e)}") result = { "status": "success", "deleted_count": deleted_count, "repo_name": repo_name, "timestamp": datetime.now().isoformat() } logger.info(f"โœ… Deleted {deleted_count} chunks for repository {repo_name}") return result except Exception as e: logger.info(f"โŒ Error deleting chunks for repo {repo_name}: {str(e)}") return {"status": "error", "message": str(e)} def get_index_stats(self) -> Dict[str, Any]: """Get statistics about the Pinecone index""" try: stats = self.index.describe_index_stats() return { "total_vectors": stats.get('total_vector_count', 0), "index_fullness": stats.get('index_fullness', 0), "dimension": stats.get('dimension', self.dimension), "namespaces": stats.get('namespaces', {}), "timestamp": datetime.now().isoformat() } except Exception as e: logger.info(f"โŒ Error getting index stats: {str(e)}") return {"error": str(e)} def hybrid_search(self, query_text: str, chunk_types: Optional[List[str]] = None, repo_names: Optional[List[str]] = None, file_paths: Optional[List[str]] = None, top_k: int = 20) -> List[Dict[str, Any]]: """ Perform hybrid search using Pinecone's inference API with metadata filters Args: query_text: Text query for semantic search chunk_types: Filter by chunk types (file, class, function, block) repo_names: Filter by repository names file_paths: Filter by specific file paths top_k: Maximum number of results Returns: List of relevant chunks ranked by similarity and filtered by metadata """ try: logger.info(f"๐Ÿ” Performing hybrid search for: '{query_text[:50]}...'") # Build metadata filter filter_conditions = {} if chunk_types: filter_conditions["chunk_type"] = {"$in": chunk_types} if repo_names: filter_conditions["repo_name"] = {"$in": repo_names} if file_paths: filter_conditions["file_path"] = {"$in": file_paths} # Perform semantic search with filters using inference API results = self.query_similar_chunks( query_text=query_text, top_k=top_k, filter_dict=filter_conditions if filter_conditions else None, include_metadata=True ) # Post-process results to add relevance context for result in results: result['search_type'] = 'hybrid' result['query'] = query_text[:100] logger.debug(f"Result metadata: {result.get('metadata', {})}") result['metadata'] = json.loads(result.get('metadata', '{}')) # Add relevance explanation based on chunk type # logger.debug(f"Result metadata: {json.loads(result.get('metadata', {}))}") chunk_type = result["metadata"].get("chunk_type", "unknown") if chunk_type == "file": result['relevance_context'] = 'File-level overview' elif chunk_type == 'class': result['relevance_context'] = 'Class definition and structure' elif chunk_type == 'function': result['relevance_context'] = 'Function implementation' elif chunk_type == 'block': result['relevance_context'] = 'Code block logic' logger.info(f"โœ… Hybrid search completed: {len(results)} relevant chunks found") return results except Exception as e: logger.info(f"โŒ Error in hybrid search: {str(e)}") return [] def get_repository_overview(self, repo_name: str) -> Dict[str, Any]: """ Get comprehensive overview of a repository's structure and content Args: repo_name: Name of the repository Returns: Repository overview with statistics and structure """ try: logger.info(f"๐Ÿ“Š Getting overview for repository: {repo_name}") # Get all chunks for this repository all_chunks = self.query_by_metadata( filter_dict={"repo_name": repo_name}, top_k=10000 ) if not all_chunks: return {"error": f"No chunks found for repository {repo_name}"} # Analyze chunks by type chunk_stats = {} files = set() classes = set() functions = set() languages = set() for chunk in all_chunks: metadata = chunk.get('metadata', {}) chunk_type = metadata.get('chunk_type', 'unknown') chunk_stats[chunk_type] = chunk_stats.get(chunk_type, 0) + 1 if 'file_path' in metadata: files.add(metadata['file_path']) if 'language' in metadata: languages.add(metadata['language']) if 'class_name' in metadata and metadata['class_name']: classes.add(metadata['class_name']) if 'function_name' in metadata and metadata['function_name']: functions.add(metadata['function_name']) overview = { "repo_name": repo_name, "total_chunks": len(all_chunks), "chunk_distribution": chunk_stats, "files_count": len(files), "classes_count": len(classes), "functions_count": len(functions), "languages": list(languages), "sample_files": list(files)[:10], # Show first 10 files "sample_classes": list(classes)[:10], # Show first 10 classes "timestamp": datetime.now().isoformat() } logger.info(f"โœ… Repository overview generated for {repo_name}") return overview except Exception as e: logger.info(f"โŒ Error getting repository overview: {str(e)}") return {"error": str(e)} def cleanup_old_chunks(self, days_old: int = 30) -> Dict[str, Any]: """ Clean up old chunks based on timestamp Args: days_old: Delete chunks older than this many days Returns: Cleanup results """ # This would require storing timestamps in metadata and querying by date # Implementation depends on your specific cleanup needs logger.info(f"๐Ÿงน Cleanup functionality not implemented yet") return {"status": "not_implemented"}