Spaces:
Sleeping
Sleeping
| import os | |
| import time | |
| from typing import List, Dict, Any, Optional, Tuple | |
| import hashlib | |
| from datetime import datetime | |
| import json | |
| # Vector database and embedding imports | |
| from pinecone import Pinecone | |
| # from sentence_transformers import SentenceTransformer | |
| import numpy as np | |
| import logging | |
| # Local imports | |
| from .chunker import CodeChunk | |
| from config import PINECONE_API_KEY, PINECONE_INDEX_NAME, PINECONE_EMBEDDING_MODEL | |
| logger = logging.getLogger("code_compass") | |
| class PineconeVectorStore: | |
| """ | |
| Pinecone vector database integration with built-in embedding generation | |
| """ | |
| def __init__(self, | |
| namespace | |
| # api_key: Optional[str] = None, | |
| # index_name: str = "code-compass", | |
| # embedding_model: str = "multilingual-e5-large" | |
| ): | |
| """ | |
| Initialize Pinecone vector store with inference API for embeddings | |
| Args: | |
| api_key: Pinecone API key (or set PINECONE_API_KEY env var) | |
| index_name: Name of the Pinecone index | |
| embedding_model: Pinecone's embedding model to use | |
| """ | |
| # Setup API key | |
| self.api_key = PINECONE_API_KEY #api_key or os.getenv('PINECONE_API_KEY') | |
| self.namespace = namespace | |
| if not self.api_key: | |
| raise ValueError("Pinecone API key is required. Set PINECONE_API_KEY env var or pass api_key parameter") | |
| self.index_name = PINECONE_INDEX_NAME #index_name | |
| # self.embedding_model = embedding_model | |
| # Initialize Pinecone client | |
| self.pc = Pinecone(api_key=self.api_key) | |
| # Initialize index | |
| self._initialize_index() | |
| def _initialize_index(self): | |
| """Initialize Pinecone index with inference API""" | |
| try: | |
| logger.info("π Initializing Pinecone connection...") | |
| # Check if index exists | |
| existing_indexes = [index.name for index in self.pc.list_indexes()] | |
| if self.index_name not in existing_indexes: | |
| logger.info(f"π Creating new Pinecone index: {self.index_name}") | |
| # Create index with inference API enabled | |
| if not self.pc.has_index(self.index_name): | |
| self.pc.create_index_for_model( | |
| name=self.index_name, | |
| cloud="aws", | |
| region="us-east-1", | |
| embed={ | |
| "model": PINECONE_EMBEDDING_MODEL, | |
| "field_map":{"text": "chunk_text", "metadata": "metadata", "id": "_id"} | |
| } | |
| ) | |
| # Wait for index to be ready | |
| logger.info("β³ Waiting for index to be ready...") | |
| while not self.pc.describe_index(self.index_name).status['ready']: | |
| time.sleep(1) | |
| # Connect to index | |
| self.index = self.pc.Index(self.index_name) | |
| logger.info(f"β Connected to Pinecone index: {self.index_name}") | |
| # Get index stats | |
| stats = self.index.describe_index_stats() | |
| logger.info(f"π Index stats: {stats.get('total_vector_count', 0)} vectors stored") | |
| if self.namespace in stats.get('namespaces', {}): | |
| logger.info(f"Namespace '{self.namespace}' exists. Proceeding with deletion...") | |
| # 4. Delete all vectors in the namespace | |
| self.index.delete_namespace(namespace=self.namespace) | |
| logger.info(f"Successfully deleted all vectors in namespace '{self.namespace}'.") | |
| else: | |
| logger.info(f"Namespace '{self.namespace}' does not exist. No action needed.") | |
| except Exception as e: | |
| logger.info(f"β Error initializing Pinecone: {str(e)}") | |
| raise | |
| def upsert_chunks(self, chunks: List[CodeChunk], batch_size: int = 96) -> Dict[str, Any]: | |
| """ | |
| Upsert code chunks to Pinecone using inference API for embeddings | |
| Args: | |
| chunks: List of code chunks (embeddings will be generated by Pinecone) | |
| batch_size: Batch size for upsert operations | |
| Returns: | |
| Dictionary with upsert results | |
| """ | |
| logger.info(f"π Upserting {len(chunks)} chunks to Pinecone with automatic embedding generation...") | |
| if not chunks: | |
| return {"status": "error", "message": "No chunks provided"} | |
| # Prepare data for Pinecone inference API | |
| data_to_upsert = [] | |
| for chunk in chunks: | |
| # Prepare metadata (Pinecone has limitations on metadata size) | |
| metadata = self._prepare_metadata_for_pinecone(chunk.metadata) | |
| # For Pinecone inference API, we send the text content directly | |
| data_to_upsert.append({ | |
| "_id": chunk.id, | |
| "chunk_text": chunk.content, # Pinecone will generate embeddings from this | |
| "metadata": metadata | |
| }) | |
| if not data_to_upsert: | |
| return {"status": "error", "message": "No valid data to upsert"} | |
| # Upsert in batches using Pinecone's inference API | |
| successful_upserts = 0 | |
| failed_upserts = 0 | |
| for i in range(0, len(data_to_upsert), batch_size): | |
| batch = data_to_upsert[i:i + batch_size] | |
| try: | |
| logger.info(f"π Upserting batch {i//batch_size + 1}/{(len(data_to_upsert)-1)//batch_size + 1} ({len(batch)} items)") | |
| # Debug: Print first item structure on first batch | |
| if i == 0 and len(batch) > 0: | |
| logger.debug(f"π Sample item structure:") | |
| sample_item = batch[0] | |
| logger.debug(f" ID: {sample_item['_id']}") | |
| logger.debug(f" Text length: {len(sample_item['chunk_text'])}") | |
| logger.debug(f" Metadata keys: {sample_item['metadata']}") | |
| # Use Pinecone's inference API | |
| upsert_response = self.index.upsert_records( | |
| self.namespace,batch | |
| ) | |
| time.sleep(1) # Slight delay to ensure consistency | |
| successful_upserts += len(batch) | |
| logger.info(f"β Batch {i//batch_size + 1} upserted successfully") | |
| # if hasattr(upsert_response, 'upserted_count') and upsert_response.upserted_count > 0: | |
| # successful_upserts += upsert_response.upserted_count | |
| # else: | |
| # # If no upserted_count, assume success based on batch size | |
| # successful_upserts += len(batch) | |
| except Exception as e: | |
| logger.info(f"β Error upserting batch {i//batch_size + 1}: {str(e)}") | |
| # Try alternative method if dataframe method fails | |
| try: | |
| logger.info("π Trying alternative upsert method...") | |
| # Convert to format expected by regular upsert | |
| vectors_batch = [] | |
| for item in batch: | |
| vectors_batch.append({ | |
| "_id": item["_id"], | |
| "chunk_text": item["chunk_text"], # Let Pinecone handle embedding | |
| "metadata": item["metadata"] | |
| }) | |
| # Use regular upsert with text (if supported) | |
| upsert_response = self.index.upsert_records(self.namespace, vectors_batch) | |
| # logger.debug("Upsert response: " + str(upsert_response)) | |
| # if upsert_response.get('upserted_count', 0) > 0: | |
| # successful_upserts += upsert_response['upserted_count'] | |
| # else: | |
| # failed_upserts += len(batch) | |
| time.sleep(10) | |
| successful_upserts += len(vectors_batch) | |
| logger.info(f"β Alternative upsert method succeeded for batch {i//batch_size + 1}") | |
| except Exception as e2: | |
| logger.info(f"β Alternative upsert method also failed: {str(e2)}") | |
| failed_upserts += len(batch) | |
| continue | |
| # Final results | |
| result = { | |
| "status": "success" if successful_upserts > 0 else "error", | |
| "successful_upserts": successful_upserts, | |
| "failed_upserts": failed_upserts, | |
| "total_chunks": len(chunks), | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| logger.info(f"β Upsert complete! {successful_upserts} successful, {failed_upserts} failed") | |
| return result | |
| def safe_json_store(self, final_metadata): | |
| try: | |
| return json.dumps(final_metadata, ensure_ascii=False) | |
| except (TypeError, ValueError): | |
| # fallback: force conversion to string and JSON-escape it | |
| return json.dumps(str(final_metadata), ensure_ascii=False) | |
| def _prepare_metadata_for_pinecone(self, metadata: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Prepare metadata for Pinecone storage (handles size and type limitations) | |
| """ | |
| # Pinecone metadata limitations: | |
| # - Max 40KB per vector metadata | |
| # - Only supports string, number, boolean, and list of strings | |
| # - NO nested objects or complex data types | |
| cleaned_metadata = {} | |
| for key, value in metadata.items(): | |
| if value is None: | |
| continue | |
| # Convert different types to Pinecone-compatible formats | |
| if isinstance(value, (str, int, float, bool)): | |
| # Limit string length to avoid size issues | |
| if isinstance(value, str) and len(value) > 500: | |
| cleaned_metadata[key] = value[:500] + "..." | |
| else: | |
| cleaned_metadata[key] = value | |
| elif isinstance(value, list): | |
| # Convert list to list of strings (Pinecone requirement) | |
| if all(isinstance(item, str) for item in value): | |
| # Limit list size and string length | |
| limited_list = [str(item)[:100] for item in value[:5]] # Max 5 items | |
| cleaned_metadata[key] = limited_list | |
| else: | |
| # Convert non-string items to strings | |
| string_list = [str(item)[:100] for item in value[:5]] | |
| cleaned_metadata[key] = string_list | |
| elif isinstance(value, dict): | |
| # Pinecone doesn't support nested objects - flatten or convert to string | |
| # Option 1: Flatten the dict | |
| for sub_key, sub_value in value.items(): | |
| flattened_key = f"{key}_{sub_key}" | |
| if isinstance(sub_value, (str, int, float, bool)): | |
| if isinstance(sub_value, str) and len(sub_value) > 200: | |
| cleaned_metadata[flattened_key] = str(sub_value)[:200] + "..." | |
| else: | |
| cleaned_metadata[flattened_key] = sub_value | |
| else: | |
| cleaned_metadata[flattened_key] = str(sub_value)[:200] | |
| else: | |
| # Convert other types to string | |
| cleaned_metadata[key] = str(value)[:200] | |
| # Double-check that we don't have any complex types | |
| final_metadata = {} | |
| for key, value in cleaned_metadata.items(): | |
| if isinstance(value, (str, int, float, bool)): | |
| final_metadata[key] = value | |
| elif isinstance(value, list) and all(isinstance(item, str) for item in value): | |
| final_metadata[key] = value | |
| else: | |
| # Last resort - convert to string | |
| final_metadata[key] = str(value)[:200] | |
| return self.safe_json_store(final_metadata)#.replace("'", '"') # Store as JSON string | |
| def query_similar_chunks(self, | |
| query_text: str, | |
| top_k: int = 10, | |
| filter_dict: Optional[Dict[str, Any]] = None, | |
| include_metadata: bool = True) -> List[Dict[str, Any]]: | |
| """ | |
| Query for similar chunks using Pinecone's inference API | |
| Args: | |
| query_text: Text to search for (Pinecone will generate embeddings) | |
| top_k: Number of similar chunks to return | |
| filter_dict: Optional metadata filters | |
| include_metadata: Whether to include metadata in results | |
| Returns: | |
| List of similar chunks with scores | |
| """ | |
| try: | |
| logger.info(f"π Searching for similar chunks to: '{query_text[:50]}...'") | |
| # Use Pinecone's inference API for query | |
| search_results = self.index.search( | |
| namespace=self.namespace, | |
| query={"inputs": {"text": query_text}, "top_k": top_k}, | |
| ) | |
| results = [] | |
| if 'result' not in search_results or 'hits' not in search_results['result']: | |
| logger.info("β οΈ No results found in search response") | |
| return [] | |
| for match in search_results['result']['hits']: | |
| result = { | |
| 'id': match['_id'], | |
| 'chunk_text': match['fields']['chunk_text'], | |
| 'score': float(match['_score']), | |
| 'metadata': match['fields']['metadata'] if include_metadata else None | |
| } | |
| results.append(result) | |
| logger.info(f"β Found {len(results)} similar chunks") | |
| logger.debug(f"Results: {results}") | |
| return results | |
| except Exception as e: | |
| logger.info(f"β Error querying similar chunks: {str(e)}") | |
| # Fallback to regular query if inference API fails | |
| try: | |
| logger.info("π Trying fallback query method...") | |
| # This would require manual embedding generation as fallback | |
| # For now, return empty results | |
| return [] | |
| except Exception as e2: | |
| logger.info(f"β Fallback query also failed: {str(e2)}") | |
| return [] | |
| def query_by_metadata(self, | |
| filter_dict: Dict[str, Any], | |
| top_k: int = 100) -> List[Dict[str, Any]]: | |
| """ | |
| Query chunks by metadata filters only | |
| Args: | |
| filter_dict: Metadata filters | |
| top_k: Maximum number of results | |
| Returns: | |
| List of matching chunks | |
| """ | |
| try: | |
| logger.info(f"π Querying by metadata: {filter_dict}") | |
| # Use a dummy vector for metadata-only search | |
| dummy_vector = [0.0] *1024 #* self.dimension | |
| search_results = self.index.search( | |
| namespace=self.namespace, | |
| query={"inputs": {"text": filter_dict['repo_name']}, "top_k": top_k}, | |
| ) | |
| # self.index.query( | |
| # vector=dummy_vector, | |
| # namespace=self.namespace, | |
| # top_k=top_k, | |
| # filter=filter_dict, | |
| # include_metadata=True | |
| # ) | |
| results = [] | |
| if 'result' not in search_results or 'hits' not in search_results['result']: | |
| logger.info("β οΈ No results found in search response") | |
| return [] | |
| for match in search_results['result']['hits']: | |
| result = { | |
| 'id': match['_id'], | |
| 'chunk_text': match['fields']['chunk_text'], | |
| 'score': float(match['_score']), | |
| 'metadata': json.loads(match['fields']['metadata']) #if include_metadata else None | |
| } | |
| results.append(result) | |
| logger.info(f"β Found {len(results)} chunks matching metadata filters") | |
| return results | |
| except Exception as e: | |
| logger.info(f"β Error querying by metadata: {str(e)}") | |
| return [] | |
| def get_chunk_by_id(self, chunk_id: str) -> Optional[Dict[str, Any]]: | |
| """ | |
| Retrieve a specific chunk by its ID | |
| Args: | |
| chunk_id: Unique chunk identifier | |
| Returns: | |
| Chunk data or None if not found | |
| """ | |
| try: | |
| result = self.index.fetch(ids=[chunk_id]) | |
| if chunk_id in result.vectors: | |
| vector_data = result.vectors[chunk_id] | |
| return { | |
| 'id': chunk_id, | |
| 'values': vector_data.values, | |
| 'metadata': vector_data.metadata | |
| } | |
| else: | |
| logger.info(f"β οΈ Chunk {chunk_id} not found") | |
| return None | |
| except Exception as e: | |
| logger.info(f"β Error fetching chunk {chunk_id}: {str(e)}") | |
| return None | |
| def delete_chunks_by_repo(self, repo_name: str) -> Dict[str, Any]: | |
| """ | |
| Delete all chunks belonging to a specific repository | |
| Args: | |
| repo_name: Name of the repository to delete | |
| Returns: | |
| Deletion results | |
| """ | |
| try: | |
| logger.info(f"ποΈ Deleting all chunks for repository: {repo_name}") | |
| # Query for all chunks from this repo | |
| chunks_to_delete = self.query_by_metadata( | |
| filter_dict={"repo_name": repo_name}, | |
| top_k=10000 # High number to get all chunks | |
| ) | |
| if not chunks_to_delete: | |
| return {"status": "success", "message": "No chunks found for this repository"} | |
| # Extract IDs | |
| chunk_ids = [chunk['id'] for chunk in chunks_to_delete] | |
| # Delete in batches | |
| batch_size = 96 | |
| deleted_count = 0 | |
| for i in range(0, len(chunk_ids), batch_size): | |
| batch_ids = chunk_ids[i:i + batch_size] | |
| try: | |
| delete_response = self.index.delete(ids=batch_ids) | |
| deleted_count += len(batch_ids) | |
| logger.info(f"ποΈ Deleted batch {i//batch_size + 1} ({len(batch_ids)} chunks)") | |
| except Exception as e: | |
| logger.info(f"β Error deleting batch: {str(e)}") | |
| result = { | |
| "status": "success", | |
| "deleted_count": deleted_count, | |
| "repo_name": repo_name, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| logger.info(f"β Deleted {deleted_count} chunks for repository {repo_name}") | |
| return result | |
| except Exception as e: | |
| logger.info(f"β Error deleting chunks for repo {repo_name}: {str(e)}") | |
| return {"status": "error", "message": str(e)} | |
| def get_index_stats(self) -> Dict[str, Any]: | |
| """Get statistics about the Pinecone index""" | |
| try: | |
| stats = self.index.describe_index_stats() | |
| return { | |
| "total_vectors": stats.get('total_vector_count', 0), | |
| "index_fullness": stats.get('index_fullness', 0), | |
| "dimension": stats.get('dimension', self.dimension), | |
| "namespaces": stats.get('namespaces', {}), | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| except Exception as e: | |
| logger.info(f"β Error getting index stats: {str(e)}") | |
| return {"error": str(e)} | |
| def hybrid_search(self, | |
| query_text: str, | |
| chunk_types: Optional[List[str]] = None, | |
| repo_names: Optional[List[str]] = None, | |
| file_paths: Optional[List[str]] = None, | |
| top_k: int = 20) -> List[Dict[str, Any]]: | |
| """ | |
| Perform hybrid search using Pinecone's inference API with metadata filters | |
| Args: | |
| query_text: Text query for semantic search | |
| chunk_types: Filter by chunk types (file, class, function, block) | |
| repo_names: Filter by repository names | |
| file_paths: Filter by specific file paths | |
| top_k: Maximum number of results | |
| Returns: | |
| List of relevant chunks ranked by similarity and filtered by metadata | |
| """ | |
| try: | |
| logger.info(f"π Performing hybrid search for: '{query_text[:50]}...'") | |
| # Build metadata filter | |
| filter_conditions = {} | |
| if chunk_types: | |
| filter_conditions["chunk_type"] = {"$in": chunk_types} | |
| if repo_names: | |
| filter_conditions["repo_name"] = {"$in": repo_names} | |
| if file_paths: | |
| filter_conditions["file_path"] = {"$in": file_paths} | |
| # Perform semantic search with filters using inference API | |
| results = self.query_similar_chunks( | |
| query_text=query_text, | |
| top_k=top_k, | |
| filter_dict=filter_conditions if filter_conditions else None, | |
| include_metadata=True | |
| ) | |
| # Post-process results to add relevance context | |
| for result in results: | |
| result['search_type'] = 'hybrid' | |
| result['query'] = query_text[:100] | |
| logger.debug(f"Result metadata: {result.get('metadata', {})}") | |
| result['metadata'] = json.loads(result.get('metadata', '{}')) | |
| # Add relevance explanation based on chunk type | |
| # logger.debug(f"Result metadata: {json.loads(result.get('metadata', {}))}") | |
| chunk_type = result["metadata"].get("chunk_type", "unknown") | |
| if chunk_type == "file": | |
| result['relevance_context'] = 'File-level overview' | |
| elif chunk_type == 'class': | |
| result['relevance_context'] = 'Class definition and structure' | |
| elif chunk_type == 'function': | |
| result['relevance_context'] = 'Function implementation' | |
| elif chunk_type == 'block': | |
| result['relevance_context'] = 'Code block logic' | |
| logger.info(f"β Hybrid search completed: {len(results)} relevant chunks found") | |
| return results | |
| except Exception as e: | |
| logger.info(f"β Error in hybrid search: {str(e)}") | |
| return [] | |
| def get_repository_overview(self, repo_name: str) -> Dict[str, Any]: | |
| """ | |
| Get comprehensive overview of a repository's structure and content | |
| Args: | |
| repo_name: Name of the repository | |
| Returns: | |
| Repository overview with statistics and structure | |
| """ | |
| try: | |
| logger.info(f"π Getting overview for repository: {repo_name}") | |
| # Get all chunks for this repository | |
| all_chunks = self.query_by_metadata( | |
| filter_dict={"repo_name": repo_name}, | |
| top_k=10000 | |
| ) | |
| if not all_chunks: | |
| return {"error": f"No chunks found for repository {repo_name}"} | |
| # Analyze chunks by type | |
| chunk_stats = {} | |
| files = set() | |
| classes = set() | |
| functions = set() | |
| languages = set() | |
| for chunk in all_chunks: | |
| metadata = chunk.get('metadata', {}) | |
| chunk_type = metadata.get('chunk_type', 'unknown') | |
| chunk_stats[chunk_type] = chunk_stats.get(chunk_type, 0) + 1 | |
| if 'file_path' in metadata: | |
| files.add(metadata['file_path']) | |
| if 'language' in metadata: | |
| languages.add(metadata['language']) | |
| if 'class_name' in metadata and metadata['class_name']: | |
| classes.add(metadata['class_name']) | |
| if 'function_name' in metadata and metadata['function_name']: | |
| functions.add(metadata['function_name']) | |
| overview = { | |
| "repo_name": repo_name, | |
| "total_chunks": len(all_chunks), | |
| "chunk_distribution": chunk_stats, | |
| "files_count": len(files), | |
| "classes_count": len(classes), | |
| "functions_count": len(functions), | |
| "languages": list(languages), | |
| "sample_files": list(files)[:10], # Show first 10 files | |
| "sample_classes": list(classes)[:10], # Show first 10 classes | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| logger.info(f"β Repository overview generated for {repo_name}") | |
| return overview | |
| except Exception as e: | |
| logger.info(f"β Error getting repository overview: {str(e)}") | |
| return {"error": str(e)} | |
| def cleanup_old_chunks(self, days_old: int = 30) -> Dict[str, Any]: | |
| """ | |
| Clean up old chunks based on timestamp | |
| Args: | |
| days_old: Delete chunks older than this many days | |
| Returns: | |
| Cleanup results | |
| """ | |
| # This would require storing timestamps in metadata and querying by date | |
| # Implementation depends on your specific cleanup needs | |
| logger.info(f"π§Ή Cleanup functionality not implemented yet") | |
| return {"status": "not_implemented"} |