davidtran999's picture
Upload backend/core/embeddings.py with huggingface_hub
57b3892 verified
"""
Vector embeddings utilities for semantic search.
"""
import os
from typing import List, Optional, Union, Dict
import numpy as np
from pathlib import Path
try:
from sentence_transformers import SentenceTransformer
SENTENCE_TRANSFORMERS_AVAILABLE = True
except ImportError:
SENTENCE_TRANSFORMERS_AVAILABLE = False
SentenceTransformer = None
# Available embedding models (ordered by preference for Vietnamese)
# Models are ordered from fastest to best quality
AVAILABLE_MODELS = {
# Fast models (384 dim) - Good for production
"paraphrase-multilingual": "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2", # Fast, 384 dim
# High quality models (768 dim) - Better accuracy
"multilingual-mpnet": "sentence-transformers/paraphrase-multilingual-mpnet-base-v2", # High quality, 768 dim, recommended
"vietnamese-sbert": "keepitreal/vietnamese-sbert-v2", # Vietnamese-specific (may require auth)
# Very high quality models (1024+ dim) - Best accuracy but slower
"multilingual-e5-large": "intfloat/multilingual-e5-large", # Very high quality, 1024 dim, large model
"multilingual-e5-base": "intfloat/multilingual-e5-base", # High quality, 768 dim, balanced
# Vietnamese-specific models (if available)
"vietnamese-embedding": "dangvantuan/vietnamese-embedding", # Vietnamese-specific (if available)
"vietnamese-bi-encoder": "bkai-foundation-models/vietnamese-bi-encoder", # Vietnamese bi-encoder (if available)
}
# Default embedding model for Vietnamese (can be overridden via env var)
# Use multilingual-mpnet as default - better quality than MiniLM, still reasonable size
# Can be set via EMBEDDING_MODEL env var (supports both short names and full model paths)
# Examples:
# - EMBEDDING_MODEL=multilingual-mpnet (uses short name)
# - EMBEDDING_MODEL=sentence-transformers/paraphrase-multilingual-mpnet-base-v2 (full path)
# - EMBEDDING_MODEL=/path/to/local/model (local model path)
# - EMBEDDING_MODEL=username/private-model (private HF model, requires HF_TOKEN)
DEFAULT_MODEL_NAME = os.environ.get(
"EMBEDDING_MODEL",
AVAILABLE_MODELS.get("multilingual-mpnet", "sentence-transformers/paraphrase-multilingual-mpnet-base-v2")
)
FALLBACK_MODEL_NAME = AVAILABLE_MODELS.get("paraphrase-multilingual", "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2")
# Cache for model instance
_model_cache: Optional[SentenceTransformer] = None
_cached_model_name: Optional[str] = None
def get_embedding_model(model_name: Optional[str] = None, force_reload: bool = False) -> Optional[SentenceTransformer]:
"""
Get or load embedding model instance.
Args:
model_name: Name of the model to load. Can be:
- Full model name (e.g., "keepitreal/vietnamese-sbert-v2")
- Short name (e.g., "vietnamese-sbert")
- None (uses DEFAULT_MODEL_NAME from env or default)
force_reload: Force reload model even if cached.
Returns:
SentenceTransformer instance or None if not available.
"""
global _model_cache, _cached_model_name
if not SENTENCE_TRANSFORMERS_AVAILABLE:
print("Warning: sentence-transformers not installed. Install with: pip install sentence-transformers")
return None
# Resolve model name (check if it's a short name)
resolved_model_name = model_name or DEFAULT_MODEL_NAME
if resolved_model_name in AVAILABLE_MODELS:
resolved_model_name = AVAILABLE_MODELS[resolved_model_name]
# Return cached model if same model and not forcing reload
if _model_cache is not None and _cached_model_name == resolved_model_name and not force_reload:
return _model_cache
# Load new model
try:
print(f"Loading embedding model: {resolved_model_name}")
# Check if it's a local path
model_path = Path(resolved_model_name)
if model_path.exists() and model_path.is_dir():
# Local model path
print(f"Loading local model from: {resolved_model_name}")
_model_cache = SentenceTransformer(str(model_path))
else:
# Hugging Face model (public or private)
hf_token = os.environ.get("HF_TOKEN") or os.environ.get("HUGGINGFACE_TOKEN")
model_kwargs = {}
if hf_token:
print(f"Using Hugging Face token for model: {resolved_model_name}")
model_kwargs["token"] = hf_token
# Public model (or token provided)
_model_cache = SentenceTransformer(resolved_model_name, **model_kwargs)
_cached_model_name = resolved_model_name
# Get model dimension for info
try:
test_embedding = _model_cache.encode("test", show_progress_bar=False)
dim = len(test_embedding)
print(f"✅ Successfully loaded model: {resolved_model_name} (dimension: {dim})")
except Exception:
print(f"✅ Successfully loaded model: {resolved_model_name}")
return _model_cache
except Exception as e:
print(f"❌ Error loading model {resolved_model_name}: {e}")
if resolved_model_name != FALLBACK_MODEL_NAME:
print(f"Trying fallback model: {FALLBACK_MODEL_NAME}")
try:
_model_cache = SentenceTransformer(FALLBACK_MODEL_NAME)
_cached_model_name = FALLBACK_MODEL_NAME
test_embedding = _model_cache.encode("test", show_progress_bar=False)
dim = len(test_embedding)
print(f"✅ Successfully loaded fallback model: {FALLBACK_MODEL_NAME} (dimension: {dim})")
return _model_cache
except Exception as e2:
print(f"❌ Error loading fallback model: {e2}")
return None
def list_available_models() -> Dict[str, str]:
"""
List all available embedding models.
Returns:
Dictionary mapping short names to full model names.
"""
return AVAILABLE_MODELS.copy()
def compare_models(texts: List[str], model_names: Optional[List[str]] = None) -> Dict[str, Dict[str, float]]:
"""
Compare different embedding models on sample texts.
Args:
texts: List of sample texts to test.
model_names: List of model names to compare. If None, compares all available models.
Returns:
Dictionary with comparison results including:
- dimension: Embedding dimension
- encoding_time: Time to encode texts (seconds)
- avg_similarity: Average similarity between texts
"""
import time
if model_names is None:
model_names = list(AVAILABLE_MODELS.keys())
results = {}
for model_key in model_names:
if model_key not in AVAILABLE_MODELS:
continue
model_name = AVAILABLE_MODELS[model_key]
try:
model = get_embedding_model(model_name, force_reload=True)
if model is None:
continue
# Get dimension
dim = get_embedding_dimension(model_name)
# Measure encoding time
start_time = time.time()
embeddings = generate_embeddings_batch(texts, model=model)
encoding_time = time.time() - start_time
# Calculate average similarity
similarities = []
for i in range(len(embeddings)):
for j in range(i + 1, len(embeddings)):
if embeddings[i] is not None and embeddings[j] is not None:
sim = cosine_similarity(embeddings[i], embeddings[j])
similarities.append(sim)
avg_similarity = sum(similarities) / len(similarities) if similarities else 0.0
results[model_key] = {
"model_name": model_name,
"dimension": dim,
"encoding_time": encoding_time,
"avg_similarity": avg_similarity
}
except Exception as e:
print(f"Error comparing model {model_key}: {e}")
results[model_key] = {"error": str(e)}
return results
def generate_embedding(text: str, model: Optional[SentenceTransformer] = None) -> Optional[np.ndarray]:
"""
Generate embedding vector for a single text.
Args:
text: Input text to embed.
model: SentenceTransformer instance. If None, uses default model.
Returns:
Numpy array of embedding vector or None if error.
"""
if not text or not text.strip():
return None
if model is None:
model = get_embedding_model()
if model is None:
return None
try:
embedding = model.encode(text, normalize_embeddings=True, show_progress_bar=False)
return embedding
except Exception as e:
print(f"Error generating embedding: {e}")
return None
def generate_embeddings_batch(texts: List[str], model: Optional[SentenceTransformer] = None, batch_size: int = 32) -> List[Optional[np.ndarray]]:
"""
Generate embeddings for a batch of texts.
Args:
texts: List of input texts.
model: SentenceTransformer instance. If None, uses default model.
batch_size: Batch size for processing.
Returns:
List of numpy arrays (embeddings) or None for failed texts.
"""
if not texts:
return []
if model is None:
model = get_embedding_model()
if model is None:
return [None] * len(texts)
try:
embeddings = model.encode(
texts,
batch_size=batch_size,
normalize_embeddings=True,
show_progress_bar=True,
convert_to_numpy=True
)
return [emb for emb in embeddings]
except Exception as e:
print(f"Error generating batch embeddings: {e}")
return [None] * len(texts)
def cosine_similarity(vec1: np.ndarray, vec2: np.ndarray) -> float:
"""
Calculate cosine similarity between two vectors.
Args:
vec1: First vector.
vec2: Second vector.
Returns:
Cosine similarity score (0-1).
"""
if vec1 is None or vec2 is None:
return 0.0
dot_product = np.dot(vec1, vec2)
norm1 = np.linalg.norm(vec1)
norm2 = np.linalg.norm(vec2)
if norm1 == 0 or norm2 == 0:
return 0.0
return float(dot_product / (norm1 * norm2))
def get_embedding_dimension(model_name: Optional[str] = None) -> int:
"""
Get embedding dimension for a model.
Args:
model_name: Model name. If None, uses default.
Returns:
Embedding dimension or 0 if unknown.
"""
model = get_embedding_model(model_name)
if model is None:
return 0
# Get dimension by encoding a dummy text
try:
dummy_embedding = model.encode("test", show_progress_bar=False)
return len(dummy_embedding)
except Exception:
return 0