File size: 26,594 Bytes
60344c1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
import os
import time
from typing import List, Dict, Any, Optional, Tuple
import hashlib
from datetime import datetime
import json

# Vector database and embedding imports
from pinecone import Pinecone 
# from sentence_transformers import SentenceTransformer
import numpy as np
import logging
# Local imports
from .chunker import CodeChunk
from config import PINECONE_API_KEY, PINECONE_INDEX_NAME, PINECONE_EMBEDDING_MODEL


logger = logging.getLogger("code_compass")
class PineconeVectorStore:
    """
    Pinecone vector database integration with built-in embedding generation
    """
    
    def __init__(self, 
                 namespace
                #  api_key: Optional[str] = None,
                #  index_name: str = "code-compass",
                #  embedding_model: str = "multilingual-e5-large"
                 ):
        """
        Initialize Pinecone vector store with inference API for embeddings
        
        Args:
            api_key: Pinecone API key (or set PINECONE_API_KEY env var)
            index_name: Name of the Pinecone index
            embedding_model: Pinecone's embedding model to use
        """
        
        # Setup API key
        self.api_key = PINECONE_API_KEY #api_key or os.getenv('PINECONE_API_KEY')
        self.namespace = namespace
        if not self.api_key:
            raise ValueError("Pinecone API key is required. Set PINECONE_API_KEY env var or pass api_key parameter")
        
        self.index_name = PINECONE_INDEX_NAME #index_name
        # self.embedding_model = embedding_model
        
        # Initialize Pinecone client
        self.pc = Pinecone(api_key=self.api_key)
        
        # Initialize index
        self._initialize_index()
        
    def _initialize_index(self):
        """Initialize Pinecone index with inference API"""
        try:
            logger.info("πŸ”„ Initializing Pinecone connection...")
            
            # Check if index exists
            existing_indexes = [index.name for index in self.pc.list_indexes()]
            
            if self.index_name not in existing_indexes:
                logger.info(f"πŸ”„ Creating new Pinecone index: {self.index_name}")
                
                # Create index with inference API enabled
                if not self.pc.has_index(self.index_name):
                    self.pc.create_index_for_model(
                        name=self.index_name,
                        cloud="aws",
                        region="us-east-1",
                        embed={
                            "model": PINECONE_EMBEDDING_MODEL,
                            "field_map":{"text": "chunk_text", "metadata": "metadata", "id": "_id"}
                        }
                    )
                
                # Wait for index to be ready
                logger.info("⏳ Waiting for index to be ready...")
                while not self.pc.describe_index(self.index_name).status['ready']:
                    time.sleep(1)
            
            # Connect to index
            self.index = self.pc.Index(self.index_name)
            logger.info(f"βœ… Connected to Pinecone index: {self.index_name}")
            
            # Get index stats
            stats = self.index.describe_index_stats()
            logger.info(f"πŸ“Š Index stats: {stats.get('total_vector_count', 0)} vectors stored")
            if self.namespace in stats.get('namespaces', {}):
                logger.info(f"Namespace '{self.namespace}' exists. Proceeding with deletion...")
                # 4. Delete all vectors in the namespace
                self.index.delete_namespace(namespace=self.namespace)
                logger.info(f"Successfully deleted all vectors in namespace '{self.namespace}'.")
            else:
                logger.info(f"Namespace '{self.namespace}' does not exist. No action needed.")

        except Exception as e:
            logger.info(f"❌ Error initializing Pinecone: {str(e)}")
            raise
    
    def upsert_chunks(self, chunks: List[CodeChunk], batch_size: int = 96) -> Dict[str, Any]:
        """
        Upsert code chunks to Pinecone using inference API for embeddings
        
        Args:
            chunks: List of code chunks (embeddings will be generated by Pinecone)
            batch_size: Batch size for upsert operations
            
        Returns:
            Dictionary with upsert results
        """
        logger.info(f"πŸ”„ Upserting {len(chunks)} chunks to Pinecone with automatic embedding generation...")
        
        if not chunks:
            return {"status": "error", "message": "No chunks provided"}
        
        # Prepare data for Pinecone inference API
        data_to_upsert = []
        
        for chunk in chunks:
            # Prepare metadata (Pinecone has limitations on metadata size)
            metadata = self._prepare_metadata_for_pinecone(chunk.metadata)
            
            # For Pinecone inference API, we send the text content directly
            data_to_upsert.append({
                "_id": chunk.id,
                "chunk_text": chunk.content,  # Pinecone will generate embeddings from this
                "metadata": metadata
            })
        
        if not data_to_upsert:
            return {"status": "error", "message": "No valid data to upsert"}
        
        # Upsert in batches using Pinecone's inference API
        successful_upserts = 0
        failed_upserts = 0
        
        for i in range(0, len(data_to_upsert), batch_size):
            batch = data_to_upsert[i:i + batch_size]
            
            try:
                logger.info(f"πŸ“Š Upserting batch {i//batch_size + 1}/{(len(data_to_upsert)-1)//batch_size + 1} ({len(batch)} items)")
                
                # Debug: Print first item structure on first batch
                if i == 0 and len(batch) > 0:
                    logger.debug(f"πŸ” Sample item structure:")
                    sample_item = batch[0]
                    logger.debug(f"  ID: {sample_item['_id']}")
                    logger.debug(f"  Text length: {len(sample_item['chunk_text'])}")
                    logger.debug(f"  Metadata keys: {sample_item['metadata']}")
                
                # Use Pinecone's inference API
                upsert_response = self.index.upsert_records(
                    self.namespace,batch
                )
                time.sleep(1)  # Slight delay to ensure consistency
                successful_upserts += len(batch)
                logger.info(f"βœ… Batch {i//batch_size + 1} upserted successfully")
                # if hasattr(upsert_response, 'upserted_count') and upsert_response.upserted_count > 0:
                #     successful_upserts += upsert_response.upserted_count
                # else:
                #     # If no upserted_count, assume success based on batch size
                #     successful_upserts += len(batch)
            except Exception as e:
                logger.info(f"❌ Error upserting batch {i//batch_size + 1}: {str(e)}")
                
                # Try alternative method if dataframe method fails
                try:
                    logger.info("πŸ”„ Trying alternative upsert method...")
                    
                    # Convert to format expected by regular upsert
                    vectors_batch = []
                    for item in batch:
                        vectors_batch.append({
                            "_id": item["_id"],
                            "chunk_text": item["chunk_text"],  # Let Pinecone handle embedding
                            "metadata": item["metadata"]
                        })
                    
                    # Use regular upsert with text (if supported)
                    upsert_response = self.index.upsert_records(self.namespace, vectors_batch)
                    # logger.debug("Upsert response: " + str(upsert_response))
                    # if upsert_response.get('upserted_count', 0) > 0:
                    #     successful_upserts += upsert_response['upserted_count']
                    # else:
                    #     failed_upserts += len(batch)
                    time.sleep(10)
                    successful_upserts += len(vectors_batch)
                    logger.info(f"βœ… Alternative upsert method succeeded for batch {i//batch_size + 1}")
                except Exception as e2:
                    logger.info(f"❌ Alternative upsert method also failed: {str(e2)}")
                    failed_upserts += len(batch)
                    continue
        
        # Final results
        result = {
            "status": "success" if successful_upserts > 0 else "error",
            "successful_upserts": successful_upserts,
            "failed_upserts": failed_upserts,
            "total_chunks": len(chunks),
            "timestamp": datetime.now().isoformat()
        }
        
        logger.info(f"βœ… Upsert complete! {successful_upserts} successful, {failed_upserts} failed")
        return result
    
    def safe_json_store(self, final_metadata):
        try:
            return json.dumps(final_metadata, ensure_ascii=False)
        except (TypeError, ValueError):
            # fallback: force conversion to string and JSON-escape it
            return json.dumps(str(final_metadata), ensure_ascii=False)
        
    def _prepare_metadata_for_pinecone(self, metadata: Dict[str, Any]) -> Dict[str, Any]:
        """
        Prepare metadata for Pinecone storage (handles size and type limitations)
        """
        # Pinecone metadata limitations:
        # - Max 40KB per vector metadata
        # - Only supports string, number, boolean, and list of strings
        # - NO nested objects or complex data types
        
        cleaned_metadata = {}
        
        for key, value in metadata.items():
            if value is None:
                continue
                
            # Convert different types to Pinecone-compatible formats
            if isinstance(value, (str, int, float, bool)):
                # Limit string length to avoid size issues
                if isinstance(value, str) and len(value) > 500:
                    cleaned_metadata[key] = value[:500] + "..."
                else:
                    cleaned_metadata[key] = value
                    
            elif isinstance(value, list):
                # Convert list to list of strings (Pinecone requirement)
                if all(isinstance(item, str) for item in value):
                    # Limit list size and string length
                    limited_list = [str(item)[:100] for item in value[:5]]  # Max 5 items
                    cleaned_metadata[key] = limited_list
                else:
                    # Convert non-string items to strings
                    string_list = [str(item)[:100] for item in value[:5]]
                    cleaned_metadata[key] = string_list
                    
            elif isinstance(value, dict):
                # Pinecone doesn't support nested objects - flatten or convert to string
                # Option 1: Flatten the dict
                for sub_key, sub_value in value.items():
                    flattened_key = f"{key}_{sub_key}"
                    if isinstance(sub_value, (str, int, float, bool)):
                        if isinstance(sub_value, str) and len(sub_value) > 200:
                            cleaned_metadata[flattened_key] = str(sub_value)[:200] + "..."
                        else:
                            cleaned_metadata[flattened_key] = sub_value
                    else:
                        cleaned_metadata[flattened_key] = str(sub_value)[:200]
                        
            else:
                # Convert other types to string
                cleaned_metadata[key] = str(value)[:200]
        
        # Double-check that we don't have any complex types
        final_metadata = {}
        for key, value in cleaned_metadata.items():
            if isinstance(value, (str, int, float, bool)):
                final_metadata[key] = value
            elif isinstance(value, list) and all(isinstance(item, str) for item in value):
                final_metadata[key] = value
            else:
                # Last resort - convert to string
                final_metadata[key] = str(value)[:200]
        
        return self.safe_json_store(final_metadata)#.replace("'", '"')  # Store as JSON string
    
    

    def query_similar_chunks(self, 
                           query_text: str, 
                           top_k: int = 10,
                           filter_dict: Optional[Dict[str, Any]] = None,
                           include_metadata: bool = True) -> List[Dict[str, Any]]:
        """
        Query for similar chunks using Pinecone's inference API
        
        Args:
            query_text: Text to search for (Pinecone will generate embeddings)
            top_k: Number of similar chunks to return
            filter_dict: Optional metadata filters
            include_metadata: Whether to include metadata in results
            
        Returns:
            List of similar chunks with scores
        """
        try:
            logger.info(f"πŸ” Searching for similar chunks to: '{query_text[:50]}...'")

            # Use Pinecone's inference API for query
            search_results = self.index.search(
                namespace=self.namespace,
                query={"inputs": {"text": query_text}, "top_k": top_k},
            )
            
            
            results = []
            if 'result' not in search_results or 'hits' not in search_results['result']:
                logger.info("⚠️  No results found in search response")
                return []
            for match in search_results['result']['hits']:
                result = {
                    'id': match['_id'],
                    'chunk_text': match['fields']['chunk_text'],
                    'score': float(match['_score']),
                    'metadata': match['fields']['metadata'] if include_metadata else None
                }
                results.append(result)
            
            logger.info(f"βœ… Found {len(results)} similar chunks")
            logger.debug(f"Results: {results}")
            return results
            
        except Exception as e:
            logger.info(f"❌ Error querying similar chunks: {str(e)}")
            
            # Fallback to regular query if inference API fails
            try:
                logger.info("πŸ”„ Trying fallback query method...")
                # This would require manual embedding generation as fallback
                # For now, return empty results
                return []
            except Exception as e2:
                logger.info(f"❌ Fallback query also failed: {str(e2)}")
                return []
    
    def query_by_metadata(self, 
                         filter_dict: Dict[str, Any], 
                         top_k: int = 100) -> List[Dict[str, Any]]:
        """
        Query chunks by metadata filters only
        
        Args:
            filter_dict: Metadata filters
            top_k: Maximum number of results
            
        Returns:
            List of matching chunks
        """
        try:
            logger.info(f"πŸ” Querying by metadata: {filter_dict}")
            
            # Use a dummy vector for metadata-only search
            dummy_vector = [0.0] *1024 #* self.dimension
            
            search_results = self.index.search(
                namespace=self.namespace,
                query={"inputs": {"text": filter_dict['repo_name']}, "top_k": top_k},
            )
            
            
            # self.index.query(
            #     vector=dummy_vector,
            #     namespace=self.namespace,
            #     top_k=top_k,
            #     filter=filter_dict,
            #     include_metadata=True
            # )
            
            results = []
            if 'result' not in search_results or 'hits' not in search_results['result']:
                logger.info("⚠️  No results found in search response")
                return []
            for match in search_results['result']['hits']:
                result = {
                    'id': match['_id'],
                    'chunk_text': match['fields']['chunk_text'],
                    'score': float(match['_score']),
                    'metadata': json.loads(match['fields']['metadata']) #if include_metadata else None
                }
                results.append(result)
            
            logger.info(f"βœ… Found {len(results)} chunks matching metadata filters")
            return results
            
        except Exception as e:
            logger.info(f"❌ Error querying by metadata: {str(e)}")
            return []
    
    def get_chunk_by_id(self, chunk_id: str) -> Optional[Dict[str, Any]]:
        """
        Retrieve a specific chunk by its ID
        
        Args:
            chunk_id: Unique chunk identifier
            
        Returns:
            Chunk data or None if not found
        """
        try:
            result = self.index.fetch(ids=[chunk_id])
            
            if chunk_id in result.vectors:
                vector_data = result.vectors[chunk_id]
                return {
                    'id': chunk_id,
                    'values': vector_data.values,
                    'metadata': vector_data.metadata
                }
            else:
                logger.info(f"⚠️  Chunk {chunk_id} not found")
                return None
                
        except Exception as e:
            logger.info(f"❌ Error fetching chunk {chunk_id}: {str(e)}")
            return None
    
    def delete_chunks_by_repo(self, repo_name: str) -> Dict[str, Any]:
        """
        Delete all chunks belonging to a specific repository
        
        Args:
            repo_name: Name of the repository to delete
            
        Returns:
            Deletion results
        """
        try:
            logger.info(f"πŸ—‘οΈ  Deleting all chunks for repository: {repo_name}")
            
            # Query for all chunks from this repo
            chunks_to_delete = self.query_by_metadata(
                filter_dict={"repo_name": repo_name},
                top_k=10000  # High number to get all chunks
            )
            
            if not chunks_to_delete:
                return {"status": "success", "message": "No chunks found for this repository"}
            
            # Extract IDs
            chunk_ids = [chunk['id'] for chunk in chunks_to_delete]
            
            # Delete in batches
            batch_size = 96
            deleted_count = 0
            
            for i in range(0, len(chunk_ids), batch_size):
                batch_ids = chunk_ids[i:i + batch_size]
                
                try:
                    delete_response = self.index.delete(ids=batch_ids)
                    deleted_count += len(batch_ids)
                    logger.info(f"πŸ—‘οΈ  Deleted batch {i//batch_size + 1} ({len(batch_ids)} chunks)")
                    
                except Exception as e:
                    logger.info(f"❌ Error deleting batch: {str(e)}")
            
            result = {
                "status": "success",
                "deleted_count": deleted_count,
                "repo_name": repo_name,
                "timestamp": datetime.now().isoformat()
            }
            
            logger.info(f"βœ… Deleted {deleted_count} chunks for repository {repo_name}")
            return result
            
        except Exception as e:
            logger.info(f"❌ Error deleting chunks for repo {repo_name}: {str(e)}")
            return {"status": "error", "message": str(e)}
    
    def get_index_stats(self) -> Dict[str, Any]:
        """Get statistics about the Pinecone index"""
        try:
            stats = self.index.describe_index_stats()
            return {
                "total_vectors": stats.get('total_vector_count', 0),
                "index_fullness": stats.get('index_fullness', 0),
                "dimension": stats.get('dimension', self.dimension),
                "namespaces": stats.get('namespaces', {}),
                "timestamp": datetime.now().isoformat()
            }
        except Exception as e:
            logger.info(f"❌ Error getting index stats: {str(e)}")
            return {"error": str(e)}
    
    def hybrid_search(self, 
                     query_text: str,
                     chunk_types: Optional[List[str]] = None,
                     repo_names: Optional[List[str]] = None,
                     file_paths: Optional[List[str]] = None,
                     top_k: int = 20) -> List[Dict[str, Any]]:
        """
        Perform hybrid search using Pinecone's inference API with metadata filters
        
        Args:
            query_text: Text query for semantic search
            chunk_types: Filter by chunk types (file, class, function, block)
            repo_names: Filter by repository names
            file_paths: Filter by specific file paths
            top_k: Maximum number of results
            
        Returns:
            List of relevant chunks ranked by similarity and filtered by metadata
        """
        try:
            logger.info(f"πŸ” Performing hybrid search for: '{query_text[:50]}...'")
            
            # Build metadata filter
            filter_conditions = {}
            
            if chunk_types:
                filter_conditions["chunk_type"] = {"$in": chunk_types}
            if repo_names:
                filter_conditions["repo_name"] = {"$in": repo_names}
            if file_paths:
                filter_conditions["file_path"] = {"$in": file_paths}
            
            # Perform semantic search with filters using inference API
            results = self.query_similar_chunks(
                query_text=query_text,
                top_k=top_k,
                filter_dict=filter_conditions if filter_conditions else None,
                include_metadata=True
            )
            
            # Post-process results to add relevance context
            for result  in results:
                result['search_type'] = 'hybrid'
                result['query'] = query_text[:100]
                logger.debug(f"Result metadata: {result.get('metadata', {})}")
                result['metadata'] = json.loads(result.get('metadata', '{}'))
                # Add relevance explanation based on chunk type
                # logger.debug(f"Result metadata: {json.loads(result.get('metadata', {}))}")
                chunk_type =  result["metadata"].get("chunk_type", "unknown")
                if chunk_type == "file":
                    result['relevance_context'] = 'File-level overview'
                elif chunk_type == 'class':
                    result['relevance_context'] = 'Class definition and structure'
                elif chunk_type == 'function':
                    result['relevance_context'] = 'Function implementation'
                elif chunk_type == 'block':
                    result['relevance_context'] = 'Code block logic'
            
            logger.info(f"βœ… Hybrid search completed: {len(results)} relevant chunks found")
            return results
            
        except Exception as e:
            logger.info(f"❌ Error in hybrid search: {str(e)}")
            return []
    
    def get_repository_overview(self, repo_name: str) -> Dict[str, Any]:
        """
        Get comprehensive overview of a repository's structure and content
        
        Args:
            repo_name: Name of the repository
            
        Returns:
            Repository overview with statistics and structure
        """
        try:
            logger.info(f"πŸ“Š Getting overview for repository: {repo_name}")
            
            # Get all chunks for this repository
            all_chunks = self.query_by_metadata(
                filter_dict={"repo_name": repo_name},
                top_k=10000
            )
            
            if not all_chunks:
                return {"error": f"No chunks found for repository {repo_name}"}
            
            # Analyze chunks by type
            chunk_stats = {}
            files = set()
            classes = set()
            functions = set()
            languages = set()
            
            for chunk in all_chunks:
                metadata = chunk.get('metadata', {})
                chunk_type = metadata.get('chunk_type', 'unknown')
                
                chunk_stats[chunk_type] = chunk_stats.get(chunk_type, 0) + 1
                
                if 'file_path' in metadata:
                    files.add(metadata['file_path'])
                if 'language' in metadata:
                    languages.add(metadata['language'])
                if 'class_name' in metadata and metadata['class_name']:
                    classes.add(metadata['class_name'])
                if 'function_name' in metadata and metadata['function_name']:
                    functions.add(metadata['function_name'])
            
            overview = {
                "repo_name": repo_name,
                "total_chunks": len(all_chunks),
                "chunk_distribution": chunk_stats,
                "files_count": len(files),
                "classes_count": len(classes),
                "functions_count": len(functions),
                "languages": list(languages),
                "sample_files": list(files)[:10],  # Show first 10 files
                "sample_classes": list(classes)[:10],  # Show first 10 classes
                "timestamp": datetime.now().isoformat()
            }
            
            logger.info(f"βœ… Repository overview generated for {repo_name}")
            return overview
            
        except Exception as e:
            logger.info(f"❌ Error getting repository overview: {str(e)}")
            return {"error": str(e)}
    
    def cleanup_old_chunks(self, days_old: int = 30) -> Dict[str, Any]:
        """
        Clean up old chunks based on timestamp
        
        Args:
            days_old: Delete chunks older than this many days
            
        Returns:
            Cleanup results
        """
        # This would require storing timestamps in metadata and querying by date
        # Implementation depends on your specific cleanup needs
        logger.info(f"🧹 Cleanup functionality not implemented yet")
        return {"status": "not_implemented"}