Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Incremental Document Addition for VedaMD Vector Store | |
| ====================================================== | |
| This script allows you to add single documents to an existing vector store | |
| without rebuilding the entire index. | |
| Features: | |
| - Process single PDF file | |
| - Detect duplicates (hash-based) | |
| - Add to existing FAISS index | |
| - Update metadata | |
| - Incremental upload to HF Hub | |
| - No full rebuild required | |
| Usage: | |
| python scripts/add_document.py \\ | |
| --file ./new_guideline.pdf \\ | |
| --citation "SLCOG Hypertension Guidelines 2025" \\ | |
| --vector-store-dir ./data/vector_store \\ | |
| --upload | |
| Author: VedaMD Team | |
| Date: October 22, 2025 | |
| Version: 1.0.0 | |
| """ | |
| import os | |
| import sys | |
| import json | |
| import hashlib | |
| import logging | |
| import argparse | |
| from pathlib import Path | |
| from typing import Dict, Optional, List | |
| from datetime import datetime | |
| import warnings | |
| # Add parent directory to path for imports | |
| sys.path.insert(0, str(Path(__file__).parent)) | |
| # Import from build_vector_store | |
| try: | |
| from build_vector_store import PDFExtractor, MedicalChunker | |
| except ImportError: | |
| # If running standalone, define minimal versions | |
| logger = logging.getLogger(__name__) | |
| logger.error("Cannot import from build_vector_store.py. Make sure it's in the same directory.") | |
| sys.exit(1) | |
| # Embeddings and vector store | |
| try: | |
| from sentence_transformers import SentenceTransformer | |
| import faiss | |
| import numpy as np | |
| HAS_EMBEDDINGS = True | |
| except ImportError: | |
| HAS_EMBEDDINGS = False | |
| raise ImportError("Required packages not installed. Run: pip install sentence-transformers faiss-cpu numpy") | |
| # Hugging Face Hub | |
| try: | |
| from huggingface_hub import HfApi | |
| HAS_HF = True | |
| except ImportError: | |
| HAS_HF = False | |
| warnings.warn("Hugging Face Hub not available. Install with: pip install huggingface-hub") | |
| # Setup logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.StreamHandler(sys.stdout), | |
| logging.FileHandler('add_document.log') | |
| ] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| class DocumentAdder: | |
| """Add documents incrementally to existing vector store""" | |
| def __init__(self, vector_store_dir: str): | |
| self.vector_store_dir = Path(vector_store_dir) | |
| if not self.vector_store_dir.exists(): | |
| raise FileNotFoundError(f"Vector store directory not found: {self.vector_store_dir}") | |
| logger.info(f"📁 Vector store directory: {self.vector_store_dir}") | |
| # Load existing vector store | |
| self.load_vector_store() | |
| def load_vector_store(self): | |
| """Load existing vector store from disk""" | |
| logger.info("📥 Loading existing vector store...") | |
| # Load config | |
| config_path = self.vector_store_dir / "config.json" | |
| if not config_path.exists(): | |
| raise FileNotFoundError(f"Config file not found: {config_path}") | |
| with open(config_path, 'r') as f: | |
| self.config = json.load(f) | |
| logger.info(f"✅ Loaded config: {self.config['embedding_model']}") | |
| # Load FAISS index | |
| index_path = self.vector_store_dir / "faiss_index.bin" | |
| if not index_path.exists(): | |
| raise FileNotFoundError(f"FAISS index not found: {index_path}") | |
| self.index = faiss.read_index(str(index_path)) | |
| logger.info(f"✅ Loaded FAISS index: {self.index.ntotal} vectors") | |
| # Load documents | |
| docs_path = self.vector_store_dir / "documents.json" | |
| if not docs_path.exists(): | |
| raise FileNotFoundError(f"Documents file not found: {docs_path}") | |
| with open(docs_path, 'r', encoding='utf-8') as f: | |
| self.documents = json.load(f) | |
| logger.info(f"✅ Loaded {len(self.documents)} documents") | |
| # Load metadata | |
| metadata_path = self.vector_store_dir / "metadata.json" | |
| if not metadata_path.exists(): | |
| raise FileNotFoundError(f"Metadata file not found: {metadata_path}") | |
| with open(metadata_path, 'r', encoding='utf-8') as f: | |
| self.metadata = json.load(f) | |
| logger.info(f"✅ Loaded {len(self.metadata)} metadata entries") | |
| # Load embedding model | |
| logger.info(f"🤖 Loading embedding model: {self.config['embedding_model']}") | |
| self.embedding_model = SentenceTransformer(self.config['embedding_model']) | |
| self.embedding_dim = self.embedding_model.get_sentence_embedding_dimension() | |
| if self.embedding_dim != self.config['embedding_dim']: | |
| raise ValueError( | |
| f"Embedding dimension mismatch! " | |
| f"Expected {self.config['embedding_dim']}, got {self.embedding_dim}" | |
| ) | |
| logger.info(f"✅ Embedding model loaded (dim={self.embedding_dim})") | |
| # Initialize chunker | |
| self.chunker = MedicalChunker( | |
| chunk_size=self.config.get('chunk_size', 1000), | |
| chunk_overlap=self.config.get('chunk_overlap', 100) | |
| ) | |
| def check_duplicate(self, file_hash: str, filename: str) -> bool: | |
| """Check if document already exists in vector store""" | |
| logger.info(f"🔍 Checking for duplicates...") | |
| for meta in self.metadata: | |
| if meta.get('file_hash') == file_hash: | |
| logger.warning(f"⚠️ Duplicate detected: {meta['source']} (hash: {file_hash[:8]}...)") | |
| return True | |
| # Also check by filename | |
| if meta.get('source') == filename: | |
| logger.warning(f"⚠️ File with same name exists: {filename}") | |
| # Don't return True here - might be updated version | |
| logger.info(f" Continuing anyway (different content)") | |
| logger.info(f"✅ No duplicates found") | |
| return False | |
| def add_document( | |
| self, | |
| pdf_path: str, | |
| citation: Optional[str] = None, | |
| category: Optional[str] = None, | |
| skip_duplicates: bool = True | |
| ) -> int: | |
| """Add a single document to the vector store""" | |
| pdf_path = Path(pdf_path) | |
| if not pdf_path.exists(): | |
| raise FileNotFoundError(f"PDF file not found: {pdf_path}") | |
| logger.info(f"\n{'='*60}") | |
| logger.info(f"📄 Adding document: {pdf_path.name}") | |
| logger.info(f"{'='*60}") | |
| try: | |
| # Extract text | |
| text, extraction_metadata = PDFExtractor.extract_text(str(pdf_path)) | |
| if not text or len(text) < 100: | |
| logger.warning(f"⚠️ Extracted text too short ({len(text)} chars), skipping") | |
| return 0 | |
| # Generate file hash | |
| file_hash = hashlib.md5(text.encode()).hexdigest() | |
| logger.info(f"🔑 File hash: {file_hash[:16]}...") | |
| # Check for duplicates | |
| if skip_duplicates and self.check_duplicate(file_hash, pdf_path.name): | |
| logger.warning(f"⚠️ Skipping duplicate document") | |
| return 0 | |
| # Chunk text | |
| chunks = self.chunker.chunk_text(text, pdf_path.name) | |
| if not chunks: | |
| logger.warning(f"⚠️ No chunks created from {pdf_path.name}") | |
| return 0 | |
| logger.info(f"📝 Created {len(chunks)} chunks") | |
| # Generate embeddings | |
| logger.info(f"🧮 Generating embeddings...") | |
| chunk_texts = [chunk["content"] for chunk in chunks] | |
| chunk_embeddings = self.embedding_model.encode( | |
| chunk_texts, | |
| show_progress_bar=True, | |
| batch_size=32 | |
| ) | |
| # Add to FAISS index | |
| logger.info(f"📊 Adding to FAISS index...") | |
| embeddings_array = np.array(chunk_embeddings).astype('float32') | |
| self.index.add(embeddings_array) | |
| # Add documents and metadata | |
| base_chunk_id = len(self.documents) | |
| for i, (chunk, embedding) in enumerate(zip(chunks, chunk_embeddings)): | |
| self.documents.append(chunk["content"]) | |
| self.metadata.append({ | |
| "source": pdf_path.name, | |
| "section": chunk["section"], | |
| "chunk_id": base_chunk_id + i, | |
| "chunk_size": chunk["size"], | |
| "file_hash": file_hash, | |
| "extraction_method": extraction_metadata["method"], | |
| "total_pages": extraction_metadata["pages"], | |
| "citation": citation or pdf_path.name, | |
| "category": category or "General", | |
| "added_at": datetime.now().isoformat(), | |
| "added_by": "add_document.py" | |
| }) | |
| logger.info(f"✅ Added {len(chunks)} chunks to vector store") | |
| logger.info(f"📊 New total: {self.index.ntotal} vectors") | |
| return len(chunks) | |
| except Exception as e: | |
| logger.error(f"❌ Error adding document: {e}") | |
| raise | |
| def save_vector_store(self): | |
| """Save updated vector store to disk""" | |
| logger.info(f"\n{'='*60}") | |
| logger.info(f"💾 Saving updated vector store...") | |
| logger.info(f"{'='*60}") | |
| # Backup existing files first | |
| backup_dir = self.vector_store_dir / "backups" / datetime.now().strftime("%Y%m%d_%H%M%S") | |
| backup_dir.mkdir(parents=True, exist_ok=True) | |
| for filename in ["faiss_index.bin", "documents.json", "metadata.json"]: | |
| src = self.vector_store_dir / filename | |
| if src.exists(): | |
| dst = backup_dir / filename | |
| import shutil | |
| shutil.copy2(src, dst) | |
| logger.info(f"📦 Backup created: {backup_dir}") | |
| # Save FAISS index | |
| index_path = self.vector_store_dir / "faiss_index.bin" | |
| faiss.write_index(self.index, str(index_path)) | |
| logger.info(f"✅ Saved FAISS index: {index_path}") | |
| # Save documents | |
| docs_path = self.vector_store_dir / "documents.json" | |
| with open(docs_path, 'w', encoding='utf-8') as f: | |
| json.dump(self.documents, f, ensure_ascii=False, indent=2) | |
| logger.info(f"✅ Saved documents: {docs_path}") | |
| # Save metadata | |
| metadata_path = self.vector_store_dir / "metadata.json" | |
| with open(metadata_path, 'w', encoding='utf-8') as f: | |
| json.dump(self.metadata, f, ensure_ascii=False, indent=2) | |
| logger.info(f"✅ Saved metadata: {metadata_path}") | |
| # Update config | |
| self.config["total_documents"] = len(self.documents) | |
| self.config["total_chunks"] = len(self.documents) | |
| self.config["last_updated"] = datetime.now().isoformat() | |
| config_path = self.vector_store_dir / "config.json" | |
| with open(config_path, 'w', encoding='utf-8') as f: | |
| json.dump(self.config, f, indent=2) | |
| logger.info(f"✅ Updated config: {config_path}") | |
| def upload_to_hf(self, repo_id: str, token: Optional[str] = None): | |
| """Upload updated vector store to Hugging Face Hub""" | |
| if not HAS_HF: | |
| logger.warning("⚠️ Hugging Face Hub not available, skipping upload") | |
| return | |
| logger.info(f"\n{'='*60}") | |
| logger.info(f"☁️ Uploading to Hugging Face Hub...") | |
| logger.info(f"📦 Repository: {repo_id}") | |
| logger.info(f"{'='*60}") | |
| try: | |
| api = HfApi(token=token) | |
| # Upload updated files | |
| files_to_upload = [ | |
| "faiss_index.bin", | |
| "documents.json", | |
| "metadata.json", | |
| "config.json" | |
| ] | |
| for filename in files_to_upload: | |
| file_path = self.vector_store_dir / filename | |
| if file_path.exists(): | |
| logger.info(f"📤 Uploading {filename}...") | |
| api.upload_file( | |
| path_or_fileobj=str(file_path), | |
| path_in_repo=filename, | |
| repo_id=repo_id, | |
| repo_type="dataset", | |
| token=token | |
| ) | |
| logger.info(f"✅ Uploaded {filename}") | |
| logger.info(f"🎉 Upload complete! View at: https://huggingface.co/datasets/{repo_id}") | |
| except Exception as e: | |
| logger.error(f"❌ Upload failed: {e}") | |
| raise | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description="Add a document to existing VedaMD Vector Store", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=""" | |
| Examples: | |
| # Add document locally | |
| python scripts/add_document.py \\ | |
| --file ./guidelines/new_protocol.pdf \\ | |
| --citation "SLCOG Hypertension Guidelines 2025" \\ | |
| --vector-store-dir ./data/vector_store | |
| # Add and upload to HF | |
| python scripts/add_document.py \\ | |
| --file ./new_guideline.pdf \\ | |
| --citation "WHO Clinical Guidelines 2025" \\ | |
| --category "Obstetrics" \\ | |
| --vector-store-dir ./data/vector_store \\ | |
| --upload \\ | |
| --repo-id sniro23/VedaMD-Vector-Store | |
| """ | |
| ) | |
| parser.add_argument( | |
| "--file", | |
| type=str, | |
| required=True, | |
| help="PDF file to add" | |
| ) | |
| parser.add_argument( | |
| "--citation", | |
| type=str, | |
| help="Citation for the document" | |
| ) | |
| parser.add_argument( | |
| "--category", | |
| type=str, | |
| help="Category/specialty (e.g., Obstetrics, Cardiology)" | |
| ) | |
| parser.add_argument( | |
| "--vector-store-dir", | |
| type=str, | |
| default="./data/vector_store", | |
| help="Vector store directory" | |
| ) | |
| parser.add_argument( | |
| "--no-duplicate-check", | |
| action="store_true", | |
| help="Skip duplicate detection" | |
| ) | |
| parser.add_argument( | |
| "--upload", | |
| action="store_true", | |
| help="Upload to Hugging Face Hub after adding" | |
| ) | |
| parser.add_argument( | |
| "--repo-id", | |
| type=str, | |
| help="Hugging Face repository ID" | |
| ) | |
| parser.add_argument( | |
| "--hf-token", | |
| type=str, | |
| help="Hugging Face API token" | |
| ) | |
| args = parser.parse_args() | |
| # Get HF token | |
| hf_token = args.hf_token or os.getenv("HF_TOKEN") | |
| # Validate upload arguments | |
| if args.upload and not args.repo_id: | |
| parser.error("--repo-id is required when --upload is specified") | |
| # Add document | |
| start_time = datetime.now() | |
| adder = DocumentAdder(args.vector_store_dir) | |
| chunks_added = adder.add_document( | |
| pdf_path=args.file, | |
| citation=args.citation, | |
| category=args.category, | |
| skip_duplicates=not args.no_duplicate_check | |
| ) | |
| if chunks_added > 0: | |
| # Save updated vector store | |
| adder.save_vector_store() | |
| # Upload if requested | |
| if args.upload and args.repo_id: | |
| adder.upload_to_hf(args.repo_id, hf_token) | |
| # Summary | |
| duration = (datetime.now() - start_time).total_seconds() | |
| logger.info(f"\n{'='*60}") | |
| logger.info(f"✅ DOCUMENT ADDED SUCCESSFULLY!") | |
| logger.info(f"{'='*60}") | |
| logger.info(f"📊 Summary:") | |
| logger.info(f" • Chunks added: {chunks_added}") | |
| logger.info(f" • Total vectors: {adder.index.ntotal}") | |
| logger.info(f" • Time taken: {duration:.2f} seconds") | |
| logger.info(f"{'='*60}\n") | |
| else: | |
| logger.warning(f"\n⚠️ No chunks were added (possibly duplicate or invalid)") | |
| if __name__ == "__main__": | |
| main() | |