davidtran999's picture
Upload backend/core/search_ml.py with huggingface_hub
c696533 verified
"""
Machine Learning-based search utilities using TF-IDF and text similarity.
"""
import re
from typing import List, Tuple, Dict, Any
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
from django.db import connection
from django.db.models import Q, QuerySet, F
from django.contrib.postgres.search import SearchQuery, SearchRank
from .models import Synonym
def normalize_text(text: str) -> str:
"""Normalize Vietnamese text for search."""
if not text:
return ""
# Lowercase and remove extra spaces
text = text.lower().strip()
text = re.sub(r'\s+', ' ', text)
return text
def expand_query_with_synonyms(query: str) -> List[str]:
"""Expand query using synonyms from database."""
query_normalized = normalize_text(query)
expanded = [query_normalized]
try:
# Get all synonyms
synonyms = Synonym.objects.all()
for synonym in synonyms:
keyword = normalize_text(synonym.keyword)
alias = normalize_text(synonym.alias)
# If query contains keyword, add alias
if keyword in query_normalized:
expanded.append(query_normalized.replace(keyword, alias))
# If query contains alias, add keyword
if alias in query_normalized:
expanded.append(query_normalized.replace(alias, keyword))
except Exception:
pass # If Synonym table doesn't exist yet
return list(set(expanded)) # Remove duplicates
def create_search_vector(text_fields: List[str]) -> str:
"""Create a searchable text vector from multiple fields."""
return " ".join(str(field) for field in text_fields if field)
def calculate_similarity_scores(
query: str,
documents: List[str],
top_k: int = 20
) -> List[Tuple[int, float]]:
"""
Calculate cosine similarity scores between query and documents.
Returns list of (index, score) tuples sorted by score descending.
"""
if not query or not documents:
return []
# Expand query with synonyms
expanded_queries = expand_query_with_synonyms(query)
# Combine all query variations
all_texts = expanded_queries + documents
try:
# Create TF-IDF vectorizer
vectorizer = TfidfVectorizer(
analyzer='word',
ngram_range=(1, 2), # Unigrams and bigrams
min_df=1,
max_df=0.95,
lowercase=True,
token_pattern=r'\b\w+\b'
)
# Fit and transform
tfidf_matrix = vectorizer.fit_transform(all_texts)
# Get query vector (average of expanded queries)
query_vectors = tfidf_matrix[:len(expanded_queries)]
query_vector = np.mean(query_vectors.toarray(), axis=0).reshape(1, -1)
# Get document vectors
doc_vectors = tfidf_matrix[len(expanded_queries):]
# Calculate similarities
similarities = cosine_similarity(query_vector, doc_vectors)[0]
# Get top k results with scores
top_indices = np.argsort(similarities)[::-1][:top_k]
results = [(int(idx), float(similarities[idx])) for idx in top_indices if similarities[idx] > 0.0]
return results
except Exception as e:
# Fallback to simple text matching if ML fails
query_lower = normalize_text(query)
results = []
for idx, doc in enumerate(documents):
doc_lower = normalize_text(doc)
if query_lower in doc_lower:
# Simple score based on position and length
score = 1.0 - (doc_lower.find(query_lower) / max(len(doc_lower), 1))
results.append((idx, score))
return sorted(results, key=lambda x: x[1], reverse=True)[:top_k]
def search_with_ml(
queryset: QuerySet,
query: str,
text_fields: List[str],
top_k: int = 20,
min_score: float = 0.1,
use_hybrid: bool = True
) -> QuerySet:
"""
Search queryset using ML-based similarity scoring.
Args:
queryset: Django QuerySet to search
query: Search query string
text_fields: List of field names to search in
top_k: Maximum number of results
min_score: Minimum similarity score threshold
Returns:
Filtered and ranked QuerySet
"""
if not query:
return queryset[:top_k]
# Try hybrid search if enabled
if use_hybrid:
try:
from .hybrid_search import search_with_hybrid
from .config.hybrid_search_config import get_config
# Determine content type from model
model_name = queryset.model.__name__.lower()
content_type = None
if 'procedure' in model_name:
content_type = 'procedure'
elif 'fine' in model_name:
content_type = 'fine'
elif 'office' in model_name:
content_type = 'office'
elif 'advisory' in model_name:
content_type = 'advisory'
elif 'legalsection' in model_name:
content_type = 'legal'
config = get_config(content_type)
return search_with_hybrid(
queryset,
query,
text_fields,
top_k=top_k,
min_score=min_score,
use_hybrid=True,
bm25_weight=config.bm25_weight,
vector_weight=config.vector_weight
)
except Exception as e:
print(f"Hybrid search not available, using BM25/TF-IDF: {e}")
# Attempt PostgreSQL BM25 ranking first when available
if connection.vendor == "postgresql" and hasattr(queryset.model, "tsv_body"):
try:
expanded_queries = expand_query_with_synonyms(query)
combined_query = None
for q_variant in expanded_queries:
variant_query = SearchQuery(q_variant, config="simple")
combined_query = variant_query if combined_query is None else combined_query | variant_query
if combined_query is not None:
ranked_qs = (
queryset
.annotate(rank=SearchRank(F("tsv_body"), combined_query))
.filter(rank__gt=0)
.order_by("-rank")
)
results = list(ranked_qs[:top_k])
if results:
for obj in results:
obj._ml_score = getattr(obj, "rank", 0.0)
return results
except Exception:
# Fall through to ML-based search if any error occurs (e.g. missing extensions)
pass
# Get all objects and create search vectors
all_objects = list(queryset)
if not all_objects:
return queryset.none()
# Create search vectors for each object
documents = []
for obj in all_objects:
field_values = [getattr(obj, field, "") for field in text_fields]
search_vector = create_search_vector(field_values)
documents.append(search_vector)
# Calculate similarity scores
try:
scored_indices = calculate_similarity_scores(query, documents, top_k=top_k)
# Filter by minimum score and get object IDs
valid_indices = [idx for idx, score in scored_indices if score >= min_score]
# If ML search found results, use them
if valid_indices:
result_objects = [all_objects[idx] for idx in valid_indices]
result_ids = [obj.id for obj in result_objects]
if result_ids:
# Create a mapping of ID to order for sorting
id_to_order = {obj_id: idx for idx, obj_id in enumerate(result_ids)}
# Filter by IDs and sort by the order
filtered = queryset.filter(id__in=result_ids)
# Convert to list, sort by order, then convert back to queryset
result_list = list(filtered)
result_list.sort(key=lambda x: id_to_order.get(x.id, 999))
# Return limited results - create a new queryset from IDs in order
ordered_ids = [obj.id for obj in result_list[:top_k]]
if ordered_ids:
# Use Case/When for ordering in PostgreSQL
from django.db.models import Case, When, IntegerField
preserved = Case(*[When(pk=pk, then=pos) for pos, pk in enumerate(ordered_ids)], output_field=IntegerField())
return queryset.filter(id__in=ordered_ids).order_by(preserved)
except Exception as e:
# If ML search fails, fall back to simple search
pass
# Fallback to simple icontains search with exact match prioritization
query_lower = normalize_text(query)
query_words = query_lower.split()
# Extract key phrases (2-3 words) for better matching
key_phrases = []
for i in range(len(query_words) - 1):
phrase = " ".join(query_words[i:i+2])
if len(phrase) > 3:
key_phrases.append(phrase)
for i in range(len(query_words) - 2):
phrase = " ".join(query_words[i:i+3])
if len(phrase) > 5:
key_phrases.append(phrase)
# Try to find exact phrase matches first
exact_matches = []
primary_field = text_fields[0] if text_fields else None
if primary_field:
for phrase in key_phrases:
filter_kwargs = {f"{primary_field}__icontains": phrase}
matches = list(queryset.filter(**filter_kwargs)[:top_k])
exact_matches.extend(matches)
# If we found exact matches, prioritize them
if exact_matches:
# Remove duplicates while preserving order
seen = set()
unique_matches = []
for obj in exact_matches:
if obj.id not in seen:
seen.add(obj.id)
unique_matches.append(obj)
return unique_matches[:top_k]
# Fallback to simple icontains search
q_objects = Q()
for field in text_fields:
q_objects |= Q(**{f"{field}__icontains": query})
return queryset.filter(q_objects)[:top_k]