import pandas as pd import numpy as np import torch from sentence_transformers import SentenceTransformer import json import gzip import struct import re # 1. Constants MODEL_NAME = 'nomic-ai/nomic-embed-text-v1.5' EMBEDDING_DIM = 256 INPUT_CSV_PATH = 'data/quotes.csv' OUTPUT_BINARY_PATH = 'data/quotes_index.bin' BATCH_SIZE = 64 # Optimized batch size for GPU processing # 2. Load Data def load_quotes(file_path): """Loads quotes from a CSV file using pandas, skipping malformed lines.""" # Define column names explicitly column_names = ['quote', 'author', 'category'] # Read CSV with explicit separator, quote character, and skip bad lines # header=None because we are providing names explicitly df = pd.read_csv( file_path, sep=',', quotechar='"', header=None, names=column_names, on_bad_lines='skip' # Skip lines that pandas cannot parse into 3 columns ) # Filter out rows where the category contains uppercase letters initial_rows = len(df) df = df[df['category'].apply(lambda x: isinstance(x, str) and not any(c.isupper() for c in x))] filtered_rows = len(df) if initial_rows - filtered_rows > 0: print(f"Ignored {initial_rows - filtered_rows} rows due to uppercase letters in category.") # Ensure author is a string for grouping (empty string for missing authors) df['author'] = df['author'].fillna('').astype(str) # Group by quote and author to deduplicate entries. # We intentionally ignore categories to reduce metadata size in the output index. grouped = {} for _, row in df.iterrows(): quote = row['quote'] author = row['author'] # Build a case-insensitive key for deduplication quote_key = quote.lower().strip() if isinstance(quote, str) else '' author_key = author.lower().strip() if isinstance(author, str) else '' key = (quote_key, author_key) if key not in grouped: grouped[key] = { 'quote': quote, 'author': author } # Build records from grouped data; do NOT include categories records = [] for key, data in grouped.items(): orig_author = data['author'] if data['author'] != '' else None records.append({ 'quote': data['quote'], 'author': orig_author }) # Prepend the required prefix for retrieval tasks for r in records: r['quote_for_embedding'] = "search_query: " + r['quote'] return records # 3. Generate Embeddings def generate_embeddings(quotes, model_name, embedding_dim): """Generates and truncates embeddings for a list of quotes.""" model = SentenceTransformer(model_name, trust_remote_code=True) # The model automatically uses the GPU if available embeddings = model.encode( [q['quote_for_embedding'] for q in quotes], convert_to_tensor=True, batch_size=BATCH_SIZE, show_progress_bar=True # Display progress bar for embedding generation ) # Truncate embeddings to the desired dimension truncated_embeddings = embeddings[:, :embedding_dim] return truncated_embeddings.cpu().numpy() # 4. Quantize Embeddings def quantize_embeddings(embeddings): """Quantizes float32 embeddings to int8.""" # Calculate the scale factor abs_max = np.abs(embeddings).max() scale = 127.0 / abs_max if abs_max != 0 else 0 # Quantize and clip quantized_embeddings = np.clip(embeddings * scale, -127, 127).astype(np.int8) return quantized_embeddings, scale def main(): """Main function to run the offline processing pipeline.""" print("Starting offline processing...") # Load quotes quotes = load_quotes(INPUT_CSV_PATH) print(f"Loaded {len(quotes)} quotes.") # Generate embeddings print("Generating embeddings...") float_embeddings = generate_embeddings(quotes, MODEL_NAME, EMBEDDING_DIM) print(f"Generated float embeddings with shape: {float_embeddings.shape}") # Quantize embeddings print("Quantizing embeddings...") quantized_embeddings, scale = quantize_embeddings(float_embeddings) print(f"Quantized embeddings with shape: {quantized_embeddings.shape}") print(f"Quantization scale factor: {scale}") # Prepare metadata without categories to reduce index size metadata = [ {"quote": q["quote"], "author": q["author"]} for q in quotes ] # Replace NaN values with None for JSON compatibility for item in metadata: for key, value in list(item.items()): if isinstance(value, float) and np.isnan(value): item[key] = None # After cleaning metadata, serialize and compress once metadata_json = json.dumps(metadata, separators=(",", ":")) metadata_bytes_uncompressed = metadata_json.encode('utf-8') # Compress metadata with gzip to reduce index size on disk metadata_bytes = gzip.compress(metadata_bytes_uncompressed) # metadata format: 0 = uncompressed JSON (legacy), 1 = gzip-compressed JSON metadata_format = 1 # Pack data into a binary file print("Packaging data into binary file...") with open(OUTPUT_BINARY_PATH, 'wb') as f: # Header f.write(struct.pack('