#!/usr/bin/env python3 """ Automated Vector Store Builder for VedaMD ========================================== This script automates the complete vector store creation process: 1. Scans directory for PDF documents 2. Extracts text using best available method (PyMuPDF → PDFPlumber → OCR) 3. Smart chunking with medical section awareness 4. Batch embedding generation 5. FAISS index creation 6. Metadata generation (citations, sources, quality scores) 7. Automatic Hugging Face Hub upload 8. Configuration file generation Usage: python scripts/build_vector_store.py \\ --input-dir ./Obs \\ --output-dir ./data/vector_store \\ --repo-id sniro23/VedaMD-Vector-Store \\ --upload Author: VedaMD Team Date: October 22, 2025 Version: 1.0.0 """ import os import sys import json import hashlib import logging import argparse from pathlib import Path from typing import List, Dict, Tuple, Optional from datetime import datetime import warnings # PDF processing try: import fitz # PyMuPDF HAS_PYMUPDF = True except ImportError: HAS_PYMUPDF = False warnings.warn("PyMuPDF not available. Install with: pip install PyMuPDF") try: import pdfplumber HAS_PDFPLUMBER = True except ImportError: HAS_PDFPLUMBER = False warnings.warn("pdfplumber not available. Install with: pip install pdfplumber") # Embeddings and vector store try: from sentence_transformers import SentenceTransformer import faiss import numpy as np HAS_EMBEDDINGS = True except ImportError: HAS_EMBEDDINGS = False raise ImportError("Required packages not installed. Run: pip install sentence-transformers faiss-cpu numpy") # Hugging Face Hub try: from huggingface_hub import HfApi, create_repo HAS_HF = True except ImportError: HAS_HF = False warnings.warn("Hugging Face Hub not available. Install with: pip install huggingface-hub") # Setup logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler('vector_store_build.log') ] ) logger = logging.getLogger(__name__) class PDFExtractor: """Handles PDF text extraction with multiple fallback methods""" @staticmethod def extract_with_pymupdf(pdf_path: str) -> Tuple[str, Dict]: """Extract text using PyMuPDF (fastest, most reliable)""" if not HAS_PYMUPDF: raise ImportError("PyMuPDF not available") logger.info(f"📄 Extracting with PyMuPDF: {pdf_path}") text = "" metadata = {"method": "pymupdf", "pages": 0} try: doc = fitz.open(pdf_path) metadata["pages"] = len(doc) metadata["title"] = doc.metadata.get("title", "") metadata["author"] = doc.metadata.get("author", "") for page_num, page in enumerate(doc, 1): page_text = page.get_text() text += f"\n--- Page {page_num} ---\n{page_text}" doc.close() logger.info(f"✅ Extracted {len(text)} characters from {metadata['pages']} pages") return text, metadata except Exception as e: logger.error(f"❌ PyMuPDF extraction failed: {e}") raise @staticmethod def extract_with_pdfplumber(pdf_path: str) -> Tuple[str, Dict]: """Extract text using pdfplumber (better table handling)""" if not HAS_PDFPLUMBER: raise ImportError("pdfplumber not available") logger.info(f"📄 Extracting with pdfplumber: {pdf_path}") text = "" metadata = {"method": "pdfplumber", "pages": 0} try: with pdfplumber.open(pdf_path) as pdf: metadata["pages"] = len(pdf.pages) for page_num, page in enumerate(pdf.pages, 1): page_text = page.extract_text() or "" text += f"\n--- Page {page_num} ---\n{page_text}" logger.info(f"✅ Extracted {len(text)} characters from {metadata['pages']} pages") return text, metadata except Exception as e: logger.error(f"❌ pdfplumber extraction failed: {e}") raise @staticmethod def extract_text(pdf_path: str) -> Tuple[str, Dict]: """Extract text using best available method with fallbacks""" errors = [] # Try PyMuPDF first (fastest) if HAS_PYMUPDF: try: return PDFExtractor.extract_with_pymupdf(pdf_path) except Exception as e: errors.append(f"PyMuPDF: {e}") logger.warning(f"⚠️ PyMuPDF failed, trying pdfplumber...") # Fallback to pdfplumber if HAS_PDFPLUMBER: try: return PDFExtractor.extract_with_pdfplumber(pdf_path) except Exception as e: errors.append(f"pdfplumber: {e}") logger.warning(f"⚠️ pdfplumber failed") # If all methods fail raise Exception(f"All extraction methods failed: {'; '.join(errors)}") class MedicalChunker: """Smart chunking with medical section awareness""" def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 100): self.chunk_size = chunk_size self.chunk_overlap = chunk_overlap # Medical section headers to preserve self.section_markers = [ "INTRODUCTION", "BACKGROUND", "DEFINITION", "EPIDEMIOLOGY", "PATHOPHYSIOLOGY", "CLINICAL FEATURES", "DIAGNOSIS", "MANAGEMENT", "TREATMENT", "PREVENTION", "COMPLICATIONS", "PROGNOSIS", "REFERENCES", "GUIDELINES", "PROTOCOL", "RECOMMENDATIONS" ] def chunk_text(self, text: str, source: str) -> List[Dict]: """Split text into chunks while preserving medical sections""" logger.info(f"📝 Chunking text from {source}") # Clean text text = text.strip() if not text: logger.warning(f"⚠️ Empty text from {source}") return [] chunks = [] current_chunk = "" current_section = "General" # Split by paragraphs paragraphs = text.split('\n\n') for para in paragraphs: para = para.strip() if not para: continue # Check if paragraph is a section header para_upper = para.upper() for marker in self.section_markers: if marker in para_upper and len(para) < 100: current_section = para break # Add paragraph to current chunk if len(current_chunk) + len(para) + 2 <= self.chunk_size: current_chunk += f"\n\n{para}" else: # Save current chunk if current_chunk.strip(): chunks.append({ "content": current_chunk.strip(), "source": source, "section": current_section, "size": len(current_chunk) }) # Start new chunk with overlap if self.chunk_overlap > 0: # Keep last few sentences for context sentences = current_chunk.split('. ') overlap_text = '. '.join(sentences[-2:]) if len(sentences) > 1 else "" current_chunk = f"{overlap_text}\n\n{para}" else: current_chunk = para # Add final chunk if current_chunk.strip(): chunks.append({ "content": current_chunk.strip(), "source": source, "section": current_section, "size": len(current_chunk) }) logger.info(f"✅ Created {len(chunks)} chunks from {source}") return chunks class VectorStoreBuilder: """Main vector store builder class""" def __init__( self, input_dir: str, output_dir: str, embedding_model: str = "sentence-transformers/all-MiniLM-L6-v2", chunk_size: int = 1000, chunk_overlap: int = 100 ): self.input_dir = Path(input_dir) self.output_dir = Path(output_dir) self.embedding_model_name = embedding_model self.chunk_size = chunk_size self.chunk_overlap = chunk_overlap # Create output directory self.output_dir.mkdir(parents=True, exist_ok=True) # Initialize components logger.info(f"🔧 Initializing vector store builder...") logger.info(f"📁 Input directory: {self.input_dir}") logger.info(f"📁 Output directory: {self.output_dir}") # Load embedding model logger.info(f"🤖 Loading embedding model: {self.embedding_model_name}") self.embedding_model = SentenceTransformer(self.embedding_model_name) self.embedding_dim = self.embedding_model.get_sentence_embedding_dimension() logger.info(f"✅ Embedding dimension: {self.embedding_dim}") # Initialize chunker self.chunker = MedicalChunker(chunk_size, chunk_overlap) # Storage self.documents = [] self.embeddings = [] self.metadata = [] def scan_pdfs(self) -> List[Path]: """Scan input directory for PDF files""" logger.info(f"🔍 Scanning for PDFs in {self.input_dir}") if not self.input_dir.exists(): raise FileNotFoundError(f"Input directory not found: {self.input_dir}") pdf_files = list(self.input_dir.glob("**/*.pdf")) logger.info(f"✅ Found {len(pdf_files)} PDF files") for pdf in pdf_files: logger.info(f" 📄 {pdf.name}") return pdf_files def process_pdf(self, pdf_path: Path) -> int: """Process a single PDF file""" logger.info(f"\n{'='*60}") logger.info(f"📄 Processing: {pdf_path.name}") logger.info(f"{'='*60}") try: # Extract text text, extraction_metadata = PDFExtractor.extract_text(str(pdf_path)) if not text or len(text) < 100: logger.warning(f"⚠️ Extracted text too short ({len(text)} chars), skipping") return 0 # Generate file hash for duplicate detection file_hash = hashlib.md5(text.encode()).hexdigest() # Chunk text chunks = self.chunker.chunk_text(text, pdf_path.name) if not chunks: logger.warning(f"⚠️ No chunks created from {pdf_path.name}") return 0 # Generate embeddings logger.info(f"🧮 Generating embeddings for {len(chunks)} chunks...") chunk_texts = [chunk["content"] for chunk in chunks] chunk_embeddings = self.embedding_model.encode( chunk_texts, show_progress_bar=True, batch_size=32 ) # Store documents and embeddings for i, (chunk, embedding) in enumerate(zip(chunks, chunk_embeddings)): self.documents.append(chunk["content"]) self.embeddings.append(embedding) self.metadata.append({ "source": pdf_path.name, "section": chunk["section"], "chunk_id": i, "chunk_size": chunk["size"], "file_hash": file_hash, "extraction_method": extraction_metadata["method"], "total_pages": extraction_metadata["pages"], "processed_at": datetime.now().isoformat() }) logger.info(f"✅ Processed {pdf_path.name}: {len(chunks)} chunks added") return len(chunks) except Exception as e: logger.error(f"❌ Error processing {pdf_path.name}: {e}") return 0 def build_faiss_index(self): """Build FAISS index from embeddings""" logger.info(f"\n{'='*60}") logger.info(f"🏗️ Building FAISS index...") logger.info(f"{'='*60}") if not self.embeddings: raise ValueError("No embeddings to index") # Convert to numpy array embeddings_array = np.array(self.embeddings).astype('float32') logger.info(f"📊 Embeddings shape: {embeddings_array.shape}") # Create FAISS index (L2 distance) index = faiss.IndexFlatL2(self.embedding_dim) # Add embeddings index.add(embeddings_array) logger.info(f"✅ FAISS index created with {index.ntotal} vectors") return index def save_vector_store(self, index): """Save vector store to disk""" logger.info(f"\n{'='*60}") logger.info(f"💾 Saving vector store...") logger.info(f"{'='*60}") # Save FAISS index index_path = self.output_dir / "faiss_index.bin" faiss.write_index(index, str(index_path)) logger.info(f"✅ Saved FAISS index: {index_path}") # Save documents docs_path = self.output_dir / "documents.json" with open(docs_path, 'w', encoding='utf-8') as f: json.dump(self.documents, f, ensure_ascii=False, indent=2) logger.info(f"✅ Saved documents: {docs_path}") # Save metadata metadata_path = self.output_dir / "metadata.json" with open(metadata_path, 'w', encoding='utf-8') as f: json.dump(self.metadata, f, ensure_ascii=False, indent=2) logger.info(f"✅ Saved metadata: {metadata_path}") # Save configuration config = { "embedding_model": self.embedding_model_name, "embedding_dim": self.embedding_dim, "chunk_size": self.chunk_size, "chunk_overlap": self.chunk_overlap, "total_documents": len(self.documents), "total_chunks": len(self.documents), "build_date": datetime.now().isoformat(), "version": "1.0.0" } config_path = self.output_dir / "config.json" with open(config_path, 'w', encoding='utf-8') as f: json.dump(config, f, indent=2) logger.info(f"✅ Saved config: {config_path}") # Save build log log_data = { "build_date": datetime.now().isoformat(), "input_dir": str(self.input_dir), "output_dir": str(self.output_dir), "total_pdfs": len(set(m["source"] for m in self.metadata)), "total_chunks": len(self.documents), "sources": list(set(m["source"] for m in self.metadata)), "config": config } log_path = self.output_dir / "build_log.json" with open(log_path, 'w', encoding='utf-8') as f: json.dump(log_data, f, indent=2) logger.info(f"✅ Saved build log: {log_path}") def upload_to_hf(self, repo_id: str, token: Optional[str] = None): """Upload vector store to Hugging Face Hub""" if not HAS_HF: logger.warning("⚠️ Hugging Face Hub not available, skipping upload") return logger.info(f"\n{'='*60}") logger.info(f"☁️ Uploading to Hugging Face Hub...") logger.info(f"📦 Repository: {repo_id}") logger.info(f"{'='*60}") try: api = HfApi(token=token) # Create repo if it doesn't exist try: create_repo(repo_id, repo_type="dataset", exist_ok=True, token=token) logger.info(f"✅ Repository ready: {repo_id}") except Exception as e: logger.warning(f"⚠️ Repo creation: {e}") # Upload all files files_to_upload = [ "faiss_index.bin", "documents.json", "metadata.json", "config.json", "build_log.json" ] for filename in files_to_upload: file_path = self.output_dir / filename if file_path.exists(): logger.info(f"📤 Uploading {filename}...") api.upload_file( path_or_fileobj=str(file_path), path_in_repo=filename, repo_id=repo_id, repo_type="dataset", token=token ) logger.info(f"✅ Uploaded {filename}") logger.info(f"🎉 Upload complete! View at: https://huggingface.co/datasets/{repo_id}") except Exception as e: logger.error(f"❌ Upload failed: {e}") raise def build(self, upload: bool = False, repo_id: Optional[str] = None, hf_token: Optional[str] = None): """Main build process""" start_time = datetime.now() logger.info(f"\n{'='*60}") logger.info(f"🚀 STARTING VECTOR STORE BUILD") logger.info(f"{'='*60}\n") try: # Scan for PDFs pdf_files = self.scan_pdfs() if not pdf_files: raise ValueError("No PDF files found in input directory") # Process each PDF total_chunks = 0 for pdf_path in pdf_files: chunks_added = self.process_pdf(pdf_path) total_chunks += chunks_added if total_chunks == 0: raise ValueError("No chunks created from any PDF") # Build FAISS index index = self.build_faiss_index() # Save to disk self.save_vector_store(index) # Upload to HF if requested if upload and repo_id: self.upload_to_hf(repo_id, hf_token) # Summary duration = (datetime.now() - start_time).total_seconds() logger.info(f"\n{'='*60}") logger.info(f"✅ BUILD COMPLETE!") logger.info(f"{'='*60}") logger.info(f"📊 Summary:") logger.info(f" • PDFs processed: {len(pdf_files)}") logger.info(f" • Total chunks: {total_chunks}") logger.info(f" • Embedding dimension: {self.embedding_dim}") logger.info(f" • Output directory: {self.output_dir}") logger.info(f" • Build time: {duration:.2f} seconds") logger.info(f"{'='*60}\n") return True except Exception as e: logger.error(f"\n{'='*60}") logger.error(f"❌ BUILD FAILED: {e}") logger.error(f"{'='*60}\n") raise def main(): parser = argparse.ArgumentParser( description="Build VedaMD Vector Store from PDF documents", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Build locally python scripts/build_vector_store.py --input-dir ./Obs --output-dir ./data/vector_store # Build and upload to HF python scripts/build_vector_store.py \\ --input-dir ./Obs \\ --output-dir ./data/vector_store \\ --repo-id sniro23/VedaMD-Vector-Store \\ --upload """ ) parser.add_argument( "--input-dir", type=str, required=True, help="Directory containing PDF files" ) parser.add_argument( "--output-dir", type=str, default="./data/vector_store", help="Output directory for vector store files" ) parser.add_argument( "--embedding-model", type=str, default="sentence-transformers/all-MiniLM-L6-v2", help="Sentence transformer model for embeddings" ) parser.add_argument( "--chunk-size", type=int, default=1000, help="Maximum chunk size in characters" ) parser.add_argument( "--chunk-overlap", type=int, default=100, help="Overlap between chunks in characters" ) parser.add_argument( "--upload", action="store_true", help="Upload to Hugging Face Hub after building" ) parser.add_argument( "--repo-id", type=str, help="Hugging Face repository ID (e.g., username/repo-name)" ) parser.add_argument( "--hf-token", type=str, help="Hugging Face API token (or set HF_TOKEN env var)" ) args = parser.parse_args() # Get HF token from env if not provided hf_token = args.hf_token or os.getenv("HF_TOKEN") # Validate upload arguments if args.upload and not args.repo_id: parser.error("--repo-id is required when --upload is specified") # Build vector store builder = VectorStoreBuilder( input_dir=args.input_dir, output_dir=args.output_dir, embedding_model=args.embedding_model, chunk_size=args.chunk_size, chunk_overlap=args.chunk_overlap ) builder.build( upload=args.upload, repo_id=args.repo_id, hf_token=hf_token ) if __name__ == "__main__": main()