VedaMD-Backend-v2 / scripts /add_document.py
sniro23's picture
Production ready: Clean codebase + Cerebras + Automated pipeline
b4971bd
#!/usr/bin/env python3
"""
Incremental Document Addition for VedaMD Vector Store
======================================================
This script allows you to add single documents to an existing vector store
without rebuilding the entire index.
Features:
- Process single PDF file
- Detect duplicates (hash-based)
- Add to existing FAISS index
- Update metadata
- Incremental upload to HF Hub
- No full rebuild required
Usage:
python scripts/add_document.py \\
--file ./new_guideline.pdf \\
--citation "SLCOG Hypertension Guidelines 2025" \\
--vector-store-dir ./data/vector_store \\
--upload
Author: VedaMD Team
Date: October 22, 2025
Version: 1.0.0
"""
import os
import sys
import json
import hashlib
import logging
import argparse
from pathlib import Path
from typing import Dict, Optional, List
from datetime import datetime
import warnings
# Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent))
# Import from build_vector_store
try:
from build_vector_store import PDFExtractor, MedicalChunker
except ImportError:
# If running standalone, define minimal versions
logger = logging.getLogger(__name__)
logger.error("Cannot import from build_vector_store.py. Make sure it's in the same directory.")
sys.exit(1)
# Embeddings and vector store
try:
from sentence_transformers import SentenceTransformer
import faiss
import numpy as np
HAS_EMBEDDINGS = True
except ImportError:
HAS_EMBEDDINGS = False
raise ImportError("Required packages not installed. Run: pip install sentence-transformers faiss-cpu numpy")
# Hugging Face Hub
try:
from huggingface_hub import HfApi
HAS_HF = True
except ImportError:
HAS_HF = False
warnings.warn("Hugging Face Hub not available. Install with: pip install huggingface-hub")
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler('add_document.log')
]
)
logger = logging.getLogger(__name__)
class DocumentAdder:
"""Add documents incrementally to existing vector store"""
def __init__(self, vector_store_dir: str):
self.vector_store_dir = Path(vector_store_dir)
if not self.vector_store_dir.exists():
raise FileNotFoundError(f"Vector store directory not found: {self.vector_store_dir}")
logger.info(f"📁 Vector store directory: {self.vector_store_dir}")
# Load existing vector store
self.load_vector_store()
def load_vector_store(self):
"""Load existing vector store from disk"""
logger.info("📥 Loading existing vector store...")
# Load config
config_path = self.vector_store_dir / "config.json"
if not config_path.exists():
raise FileNotFoundError(f"Config file not found: {config_path}")
with open(config_path, 'r') as f:
self.config = json.load(f)
logger.info(f"✅ Loaded config: {self.config['embedding_model']}")
# Load FAISS index
index_path = self.vector_store_dir / "faiss_index.bin"
if not index_path.exists():
raise FileNotFoundError(f"FAISS index not found: {index_path}")
self.index = faiss.read_index(str(index_path))
logger.info(f"✅ Loaded FAISS index: {self.index.ntotal} vectors")
# Load documents
docs_path = self.vector_store_dir / "documents.json"
if not docs_path.exists():
raise FileNotFoundError(f"Documents file not found: {docs_path}")
with open(docs_path, 'r', encoding='utf-8') as f:
self.documents = json.load(f)
logger.info(f"✅ Loaded {len(self.documents)} documents")
# Load metadata
metadata_path = self.vector_store_dir / "metadata.json"
if not metadata_path.exists():
raise FileNotFoundError(f"Metadata file not found: {metadata_path}")
with open(metadata_path, 'r', encoding='utf-8') as f:
self.metadata = json.load(f)
logger.info(f"✅ Loaded {len(self.metadata)} metadata entries")
# Load embedding model
logger.info(f"🤖 Loading embedding model: {self.config['embedding_model']}")
self.embedding_model = SentenceTransformer(self.config['embedding_model'])
self.embedding_dim = self.embedding_model.get_sentence_embedding_dimension()
if self.embedding_dim != self.config['embedding_dim']:
raise ValueError(
f"Embedding dimension mismatch! "
f"Expected {self.config['embedding_dim']}, got {self.embedding_dim}"
)
logger.info(f"✅ Embedding model loaded (dim={self.embedding_dim})")
# Initialize chunker
self.chunker = MedicalChunker(
chunk_size=self.config.get('chunk_size', 1000),
chunk_overlap=self.config.get('chunk_overlap', 100)
)
def check_duplicate(self, file_hash: str, filename: str) -> bool:
"""Check if document already exists in vector store"""
logger.info(f"🔍 Checking for duplicates...")
for meta in self.metadata:
if meta.get('file_hash') == file_hash:
logger.warning(f"⚠️ Duplicate detected: {meta['source']} (hash: {file_hash[:8]}...)")
return True
# Also check by filename
if meta.get('source') == filename:
logger.warning(f"⚠️ File with same name exists: {filename}")
# Don't return True here - might be updated version
logger.info(f" Continuing anyway (different content)")
logger.info(f"✅ No duplicates found")
return False
def add_document(
self,
pdf_path: str,
citation: Optional[str] = None,
category: Optional[str] = None,
skip_duplicates: bool = True
) -> int:
"""Add a single document to the vector store"""
pdf_path = Path(pdf_path)
if not pdf_path.exists():
raise FileNotFoundError(f"PDF file not found: {pdf_path}")
logger.info(f"\n{'='*60}")
logger.info(f"📄 Adding document: {pdf_path.name}")
logger.info(f"{'='*60}")
try:
# Extract text
text, extraction_metadata = PDFExtractor.extract_text(str(pdf_path))
if not text or len(text) < 100:
logger.warning(f"⚠️ Extracted text too short ({len(text)} chars), skipping")
return 0
# Generate file hash
file_hash = hashlib.md5(text.encode()).hexdigest()
logger.info(f"🔑 File hash: {file_hash[:16]}...")
# Check for duplicates
if skip_duplicates and self.check_duplicate(file_hash, pdf_path.name):
logger.warning(f"⚠️ Skipping duplicate document")
return 0
# Chunk text
chunks = self.chunker.chunk_text(text, pdf_path.name)
if not chunks:
logger.warning(f"⚠️ No chunks created from {pdf_path.name}")
return 0
logger.info(f"📝 Created {len(chunks)} chunks")
# Generate embeddings
logger.info(f"🧮 Generating embeddings...")
chunk_texts = [chunk["content"] for chunk in chunks]
chunk_embeddings = self.embedding_model.encode(
chunk_texts,
show_progress_bar=True,
batch_size=32
)
# Add to FAISS index
logger.info(f"📊 Adding to FAISS index...")
embeddings_array = np.array(chunk_embeddings).astype('float32')
self.index.add(embeddings_array)
# Add documents and metadata
base_chunk_id = len(self.documents)
for i, (chunk, embedding) in enumerate(zip(chunks, chunk_embeddings)):
self.documents.append(chunk["content"])
self.metadata.append({
"source": pdf_path.name,
"section": chunk["section"],
"chunk_id": base_chunk_id + i,
"chunk_size": chunk["size"],
"file_hash": file_hash,
"extraction_method": extraction_metadata["method"],
"total_pages": extraction_metadata["pages"],
"citation": citation or pdf_path.name,
"category": category or "General",
"added_at": datetime.now().isoformat(),
"added_by": "add_document.py"
})
logger.info(f"✅ Added {len(chunks)} chunks to vector store")
logger.info(f"📊 New total: {self.index.ntotal} vectors")
return len(chunks)
except Exception as e:
logger.error(f"❌ Error adding document: {e}")
raise
def save_vector_store(self):
"""Save updated vector store to disk"""
logger.info(f"\n{'='*60}")
logger.info(f"💾 Saving updated vector store...")
logger.info(f"{'='*60}")
# Backup existing files first
backup_dir = self.vector_store_dir / "backups" / datetime.now().strftime("%Y%m%d_%H%M%S")
backup_dir.mkdir(parents=True, exist_ok=True)
for filename in ["faiss_index.bin", "documents.json", "metadata.json"]:
src = self.vector_store_dir / filename
if src.exists():
dst = backup_dir / filename
import shutil
shutil.copy2(src, dst)
logger.info(f"📦 Backup created: {backup_dir}")
# Save FAISS index
index_path = self.vector_store_dir / "faiss_index.bin"
faiss.write_index(self.index, str(index_path))
logger.info(f"✅ Saved FAISS index: {index_path}")
# Save documents
docs_path = self.vector_store_dir / "documents.json"
with open(docs_path, 'w', encoding='utf-8') as f:
json.dump(self.documents, f, ensure_ascii=False, indent=2)
logger.info(f"✅ Saved documents: {docs_path}")
# Save metadata
metadata_path = self.vector_store_dir / "metadata.json"
with open(metadata_path, 'w', encoding='utf-8') as f:
json.dump(self.metadata, f, ensure_ascii=False, indent=2)
logger.info(f"✅ Saved metadata: {metadata_path}")
# Update config
self.config["total_documents"] = len(self.documents)
self.config["total_chunks"] = len(self.documents)
self.config["last_updated"] = datetime.now().isoformat()
config_path = self.vector_store_dir / "config.json"
with open(config_path, 'w', encoding='utf-8') as f:
json.dump(self.config, f, indent=2)
logger.info(f"✅ Updated config: {config_path}")
def upload_to_hf(self, repo_id: str, token: Optional[str] = None):
"""Upload updated vector store to Hugging Face Hub"""
if not HAS_HF:
logger.warning("⚠️ Hugging Face Hub not available, skipping upload")
return
logger.info(f"\n{'='*60}")
logger.info(f"☁️ Uploading to Hugging Face Hub...")
logger.info(f"📦 Repository: {repo_id}")
logger.info(f"{'='*60}")
try:
api = HfApi(token=token)
# Upload updated files
files_to_upload = [
"faiss_index.bin",
"documents.json",
"metadata.json",
"config.json"
]
for filename in files_to_upload:
file_path = self.vector_store_dir / filename
if file_path.exists():
logger.info(f"📤 Uploading {filename}...")
api.upload_file(
path_or_fileobj=str(file_path),
path_in_repo=filename,
repo_id=repo_id,
repo_type="dataset",
token=token
)
logger.info(f"✅ Uploaded {filename}")
logger.info(f"🎉 Upload complete! View at: https://huggingface.co/datasets/{repo_id}")
except Exception as e:
logger.error(f"❌ Upload failed: {e}")
raise
def main():
parser = argparse.ArgumentParser(
description="Add a document to existing VedaMD Vector Store",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Add document locally
python scripts/add_document.py \\
--file ./guidelines/new_protocol.pdf \\
--citation "SLCOG Hypertension Guidelines 2025" \\
--vector-store-dir ./data/vector_store
# Add and upload to HF
python scripts/add_document.py \\
--file ./new_guideline.pdf \\
--citation "WHO Clinical Guidelines 2025" \\
--category "Obstetrics" \\
--vector-store-dir ./data/vector_store \\
--upload \\
--repo-id sniro23/VedaMD-Vector-Store
"""
)
parser.add_argument(
"--file",
type=str,
required=True,
help="PDF file to add"
)
parser.add_argument(
"--citation",
type=str,
help="Citation for the document"
)
parser.add_argument(
"--category",
type=str,
help="Category/specialty (e.g., Obstetrics, Cardiology)"
)
parser.add_argument(
"--vector-store-dir",
type=str,
default="./data/vector_store",
help="Vector store directory"
)
parser.add_argument(
"--no-duplicate-check",
action="store_true",
help="Skip duplicate detection"
)
parser.add_argument(
"--upload",
action="store_true",
help="Upload to Hugging Face Hub after adding"
)
parser.add_argument(
"--repo-id",
type=str,
help="Hugging Face repository ID"
)
parser.add_argument(
"--hf-token",
type=str,
help="Hugging Face API token"
)
args = parser.parse_args()
# Get HF token
hf_token = args.hf_token or os.getenv("HF_TOKEN")
# Validate upload arguments
if args.upload and not args.repo_id:
parser.error("--repo-id is required when --upload is specified")
# Add document
start_time = datetime.now()
adder = DocumentAdder(args.vector_store_dir)
chunks_added = adder.add_document(
pdf_path=args.file,
citation=args.citation,
category=args.category,
skip_duplicates=not args.no_duplicate_check
)
if chunks_added > 0:
# Save updated vector store
adder.save_vector_store()
# Upload if requested
if args.upload and args.repo_id:
adder.upload_to_hf(args.repo_id, hf_token)
# Summary
duration = (datetime.now() - start_time).total_seconds()
logger.info(f"\n{'='*60}")
logger.info(f"✅ DOCUMENT ADDED SUCCESSFULLY!")
logger.info(f"{'='*60}")
logger.info(f"📊 Summary:")
logger.info(f" • Chunks added: {chunks_added}")
logger.info(f" • Total vectors: {adder.index.ntotal}")
logger.info(f" • Time taken: {duration:.2f} seconds")
logger.info(f"{'='*60}\n")
else:
logger.warning(f"\n⚠️ No chunks were added (possibly duplicate or invalid)")
if __name__ == "__main__":
main()