davidtran999's picture
Push full code from hue-portal-backend folder
519b145
"""
Slow Path Handler - Full RAG pipeline for complex queries.
"""
import os
import time
import logging
import hashlib
from typing import Dict, Any, Optional, List, Set
import unicodedata
import re
from concurrent.futures import ThreadPoolExecutor, Future
import threading
from hue_portal.core.chatbot import get_chatbot, RESPONSE_TEMPLATES
from hue_portal.core.models import (
Fine,
Procedure,
Office,
Advisory,
LegalSection,
LegalDocument,
)
from hue_portal.core.search_ml import search_with_ml
from hue_portal.core.pure_semantic_search import pure_semantic_search
# Lazy import reranker to avoid blocking startup (FlagEmbedding may download model)
# from hue_portal.core.reranker import rerank_documents
from hue_portal.chatbot.llm_integration import get_llm_generator
from hue_portal.chatbot.structured_legal import format_structured_legal_answer
from hue_portal.chatbot.context_manager import ConversationContext
from hue_portal.chatbot.router import DOCUMENT_CODE_PATTERNS
from hue_portal.core.query_rewriter import get_query_rewriter
from hue_portal.core.pure_semantic_search import pure_semantic_search, parallel_vector_search
logger = logging.getLogger(__name__)
class SlowPathHandler:
"""Handle Slow Path queries with full RAG pipeline."""
def __init__(self):
self.chatbot = get_chatbot()
self.llm_generator = get_llm_generator()
# Thread pool for parallel search (max 2 workers to avoid overwhelming DB)
self._executor = ThreadPoolExecutor(max_workers=2, thread_name_prefix="parallel_search")
# Cache for prefetched results by session_id (in-memory fallback)
self._prefetched_cache: Dict[str, Dict[str, Any]] = {}
self._cache_lock = threading.Lock()
# Redis cache for prefetch results
self.redis_cache = get_redis_cache()
# Prefetch cache TTL (30 minutes default)
self.prefetch_cache_ttl = int(os.environ.get("CACHE_PREFETCH_TTL", "1800"))
def handle(
self,
query: str,
intent: str,
session_id: Optional[str] = None,
selected_document_code: Optional[str] = None,
) -> Dict[str, Any]:
"""
Full RAG pipeline:
1. Search (hybrid: BM25 + vector)
2. Retrieve top 20 documents
3. LLM generation with structured output (for legal queries)
4. Guardrails validation
5. Retry up to 3 times if needed
Args:
query: User query.
intent: Detected intent.
session_id: Optional session ID for context.
selected_document_code: Selected document code from wizard.
Returns:
Response dict with message, intent, results, etc.
"""
query = query.strip()
selected_document_code_normalized = (
selected_document_code.strip().upper() if selected_document_code else None
)
# Handle greetings
if intent == "greeting":
query_lower = query.lower().strip()
query_words = query_lower.split()
is_simple_greeting = (
len(query_words) <= 3 and
any(greeting in query_lower for greeting in ["xin chào", "chào", "hello", "hi"]) and
not any(kw in query_lower for kw in ["phạt", "mức phạt", "vi phạm", "thủ tục", "hồ sơ", "địa chỉ", "công an", "cảnh báo"])
)
if is_simple_greeting:
return {
"message": RESPONSE_TEMPLATES["greeting"],
"intent": "greeting",
"results": [],
"count": 0,
"_source": "slow_path"
}
# Wizard / option-first cho mọi câu hỏi pháp lý chung:
# Nếu:
# - intent là search_legal
# - chưa có selected_document_code trong session
# - trong câu hỏi không ghi rõ mã văn bản
# Thì: luôn trả về payload options để người dùng chọn văn bản trước,
# chưa generate câu trả lời chi tiết.
has_explicit_code = self._has_explicit_document_code_in_query(query)
logger.info(
"[WIZARD] Checking wizard conditions - intent=%s, selected_code=%s, has_explicit_code=%s, query='%s'",
intent,
selected_document_code_normalized,
has_explicit_code,
query[:50],
)
if (
intent == "search_legal"
and not selected_document_code_normalized
and not has_explicit_code
):
logger.info("[QUERY_REWRITE] ✅ Wizard conditions met, using Query Rewrite Strategy")
# Query Rewrite Strategy: Rewrite query into 3-5 optimized legal queries
query_rewriter = get_query_rewriter(self.llm_generator)
# Get conversation context for query rewriting
context = None
if session_id:
try:
recent_messages = ConversationContext.get_recent_messages(session_id, limit=5)
context = [
{"role": msg.role, "content": msg.content}
for msg in recent_messages
]
except Exception as exc:
logger.warning("[QUERY_REWRITE] Failed to load context: %s", exc)
# Rewrite query into 3-5 queries
rewritten_queries = query_rewriter.rewrite_query(
query,
context=context,
max_queries=5,
min_queries=3
)
if not rewritten_queries:
# Fallback to original query if rewrite fails
rewritten_queries = [query]
logger.info(
"[QUERY_REWRITE] Rewrote query into %d queries: %s",
len(rewritten_queries),
rewritten_queries[:3]
)
# Parallel vector search with multiple queries
try:
from hue_portal.core.models import LegalSection
# Search all legal sections (no document filter yet)
qs = LegalSection.objects.all()
text_fields = ["section_title", "section_code", "content"]
# Use parallel vector search
search_results = parallel_vector_search(
rewritten_queries,
qs,
top_k_per_query=5,
final_top_k=7,
text_fields=text_fields
)
# Extract unique document codes from results
doc_codes_seen: Set[str] = set()
document_options: List[Dict[str, Any]] = []
for section, score in search_results:
doc = getattr(section, "document", None)
if not doc:
continue
doc_code = getattr(doc, "code", "").upper()
if not doc_code or doc_code in doc_codes_seen:
continue
doc_codes_seen.add(doc_code)
# Get document metadata
doc_title = getattr(doc, "title", "") or doc_code
doc_summary = getattr(doc, "summary", "") or ""
if not doc_summary:
metadata = getattr(doc, "metadata", {}) or {}
if isinstance(metadata, dict):
doc_summary = metadata.get("summary", "")
document_options.append({
"code": doc_code,
"title": doc_title,
"summary": doc_summary,
"score": float(score),
"doc_type": getattr(doc, "doc_type", "") or "",
})
# Limit to top 5 documents
if len(document_options) >= 5:
break
# If no documents found, use canonical fallback
if not document_options:
logger.warning("[QUERY_REWRITE] No documents found, using canonical fallback")
canonical_candidates = [
{
"code": "264-QD-TW",
"title": "Quyết định 264-QĐ/TW về kỷ luật đảng viên",
"summary": "",
"doc_type": "",
},
{
"code": "QD-69-TW",
"title": "Quy định 69-QĐ/TW về kỷ luật tổ chức đảng, đảng viên",
"summary": "",
"doc_type": "",
},
{
"code": "TT-02-CAND",
"title": "Thông tư 02/2021/TT-BCA về điều lệnh CAND",
"summary": "",
"doc_type": "",
},
]
clarification_payload = self._build_clarification_payload(
query, canonical_candidates
)
if clarification_payload:
clarification_payload.setdefault("intent", intent)
clarification_payload.setdefault("_source", "clarification")
clarification_payload.setdefault("routing", "clarification")
clarification_payload.setdefault("confidence", 0.3)
return clarification_payload
# Build options from search results
options = [
{
"code": opt["code"],
"title": opt["title"],
"reason": opt.get("summary") or f"Độ liên quan: {opt['score']:.2f}",
}
for opt in document_options
]
# Add "Khác" option
if not any(opt.get("code") == "__other__" for opt in options):
options.append({
"code": "__other__",
"title": "Khác",
"reason": "Tôi muốn hỏi văn bản hoặc chủ đề pháp luật khác.",
})
message = (
"Tôi đã tìm thấy các văn bản pháp luật liên quan đến câu hỏi của bạn.\n\n"
"Bạn hãy chọn văn bản muốn tra cứu để tôi trả lời chi tiết hơn:"
)
logger.info(
"[QUERY_REWRITE] ✅ Found %d documents using Query Rewrite Strategy",
len(document_options)
)
return {
"type": "options",
"wizard_stage": "choose_document",
"message": message,
"options": options,
"clarification": {
"message": message,
"options": options,
},
"results": [],
"count": 0,
"intent": intent,
"_source": "query_rewrite",
"routing": "query_rewrite",
"confidence": 0.95, # High confidence with Query Rewrite Strategy
}
except Exception as exc:
logger.error(
"[QUERY_REWRITE] Error in Query Rewrite Strategy: %s, falling back to LLM suggestions",
exc,
exc_info=True
)
# Fallback to original LLM-based clarification
canonical_candidates: List[Dict[str, Any]] = []
try:
canonical_docs = list(
LegalDocument.objects.filter(
code__in=["264-QD-TW", "QD-69-TW", "TT-02-CAND"]
)
)
for doc in canonical_docs:
summary = getattr(doc, "summary", "") or ""
metadata = getattr(doc, "metadata", {}) or {}
if not summary and isinstance(metadata, dict):
summary = metadata.get("summary", "")
canonical_candidates.append(
{
"code": doc.code,
"title": getattr(doc, "title", "") or doc.code,
"summary": summary,
"doc_type": getattr(doc, "doc_type", "") or "",
"section_title": "",
}
)
except Exception as e:
logger.warning("[CLARIFICATION] Canonical documents lookup failed: %s", e)
if not canonical_candidates:
canonical_candidates = [
{
"code": "264-QD-TW",
"title": "Quyết định 264-QĐ/TW về kỷ luật đảng viên",
"summary": "",
"doc_type": "",
"section_title": "",
},
{
"code": "QD-69-TW",
"title": "Quy định 69-QĐ/TW về kỷ luật tổ chức đảng, đảng viên",
"summary": "",
"doc_type": "",
"section_title": "",
},
{
"code": "TT-02-CAND",
"title": "Thông tư 02/2021/TT-BCA về điều lệnh CAND",
"summary": "",
"doc_type": "",
"section_title": "",
},
]
clarification_payload = self._build_clarification_payload(
query, canonical_candidates
)
if clarification_payload:
clarification_payload.setdefault("intent", intent)
clarification_payload.setdefault("_source", "clarification_fallback")
clarification_payload.setdefault("routing", "clarification")
clarification_payload.setdefault("confidence", 0.3)
return clarification_payload
# Search based on intent - retrieve top-15 for reranking (balance speed and RAM)
search_result = self._search_by_intent(
intent,
query,
limit=15,
preferred_document_code=selected_document_code_normalized,
) # Balance: 15 for good recall, not too slow
# Fast path for high-confidence legal queries (skip for complex queries)
fast_path_response = None
if intent == "search_legal" and not self._is_complex_query(query):
fast_path_response = self._maybe_fast_path_response(search_result["results"], query)
if fast_path_response:
fast_path_response["intent"] = intent
fast_path_response["_source"] = "fast_path"
return fast_path_response
# Rerank results - DISABLED for speed (can enable via ENABLE_RERANKER env var)
# Reranker adds 1-3 seconds delay, skip for faster responses
enable_reranker = os.environ.get("ENABLE_RERANKER", "false").lower() == "true"
if intent == "search_legal" and enable_reranker:
try:
# Lazy import to avoid blocking startup (FlagEmbedding may download model)
from hue_portal.core.reranker import rerank_documents
legal_results = [r for r in search_result["results"] if r.get("type") == "legal"]
if len(legal_results) > 0:
# Rerank to top-4 (balance speed and context quality)
top_k = min(4, len(legal_results))
reranked = rerank_documents(query, legal_results, top_k=top_k)
# Update search_result with reranked results (keep non-legal results)
non_legal = [r for r in search_result["results"] if r.get("type") != "legal"]
search_result["results"] = reranked + non_legal
search_result["count"] = len(search_result["results"])
logger.info(
"[RERANKER] Reranked %d legal results to top-%d for query: %s",
len(legal_results),
top_k,
query[:50]
)
except Exception as e:
logger.warning("[RERANKER] Reranking failed: %s, using original results", e)
elif intent == "search_legal":
# Skip reranking for speed - just use top results by score
logger.debug("[RERANKER] Skipped reranking for speed (ENABLE_RERANKER=false)")
# BƯỚC 1: Bypass LLM khi có results tốt (tránh context overflow + tăng tốc 30-40%)
# Chỉ áp dụng cho legal queries có results với score cao
if intent == "search_legal" and search_result["count"] > 0:
top_result = search_result["results"][0]
top_score = top_result.get("score", 0.0) or 0.0
top_data = top_result.get("data", {})
doc_code = (top_data.get("document_code") or "").upper()
content = top_data.get("content", "") or top_data.get("excerpt", "")
# Bypass LLM nếu:
# 1. Có document code (TT-02-CAND, etc.) và content đủ dài
# 2. Score >= 0.4 (giảm threshold để dễ trigger hơn)
# 3. Hoặc có keywords quan trọng (%, hạ bậc, thi đua, tỷ lệ) với score >= 0.3
should_bypass = False
query_lower = query.lower()
has_keywords = any(kw in query_lower for kw in ["%", "phần trăm", "tỷ lệ", "12%", "20%", "10%", "hạ bậc", "thi đua", "xếp loại", "vi phạm", "cán bộ"])
# Điều kiện bypass dễ hơn: có doc_code + content đủ dài + score hợp lý
if doc_code and len(content) > 100:
if top_score >= 0.4:
should_bypass = True
elif has_keywords and top_score >= 0.3:
should_bypass = True
# Hoặc có keywords quan trọng + content đủ dài
elif has_keywords and len(content) > 100 and top_score >= 0.3:
should_bypass = True
if should_bypass:
# Template trả thẳng cho query về tỷ lệ vi phạm + hạ bậc thi đua
if any(kw in query_lower for kw in ["12%", "tỷ lệ", "phần trăm", "hạ bậc", "thi đua"]):
# Query về tỷ lệ vi phạm và hạ bậc thi đua
section_code = top_data.get("section_code", "")
section_title = top_data.get("section_title", "")
doc_title = top_data.get("document_title", "văn bản pháp luật")
# Trích xuất đoạn liên quan từ content
content_preview = content[:600] + "..." if len(content) > 600 else content
answer = (
f"Theo {doc_title} ({doc_code}):\n\n"
f"{section_code}: {section_title}\n\n"
f"{content_preview}\n\n"
f"Nguồn: {section_code}, {doc_title} ({doc_code})"
)
else:
# Template chung cho legal queries
section_code = top_data.get("section_code", "Điều liên quan")
section_title = top_data.get("section_title", "")
doc_title = top_data.get("document_title", "văn bản pháp luật")
content_preview = content[:500] + "..." if len(content) > 500 else content
answer = (
f"Kết quả chính xác nhất:\n\n"
f"- Văn bản: {doc_title} ({doc_code})\n"
f"- Điều khoản: {section_code}" + (f" – {section_title}" if section_title else "") + "\n\n"
f"{content_preview}\n\n"
f"Nguồn: {section_code}, {doc_title} ({doc_code})"
)
logger.info(
"[BYPASS_LLM] Using raw template for legal query (score=%.3f, doc=%s, query='%s')",
top_score,
doc_code,
query[:50]
)
return {
"message": answer,
"intent": intent,
"confidence": min(0.99, top_score + 0.05),
"results": search_result["results"][:3],
"count": min(3, search_result["count"]),
"_source": "raw_template",
"routing": "raw_template"
}
# Get conversation context if available
context = None
context_summary = ""
if session_id:
try:
recent_messages = ConversationContext.get_recent_messages(session_id, limit=5)
context = [
{
"role": msg.role,
"content": msg.content,
"intent": msg.intent
}
for msg in recent_messages
]
# Tạo context summary để đưa vào prompt nếu có conversation history
if len(context) > 1:
context_parts = []
for msg in reversed(context[-3:]): # Chỉ lấy 3 message gần nhất
if msg["role"] == "user":
context_parts.append(f"Người dùng: {msg['content'][:100]}")
elif msg["role"] == "bot":
context_parts.append(f"Bot: {msg['content'][:100]}")
if context_parts:
context_summary = "\n\nNgữ cảnh cuộc trò chuyện trước đó:\n" + "\n".join(context_parts)
except Exception as exc:
logger.warning("[CONTEXT] Failed to load conversation context: %s", exc)
# Enhance query with context if available
enhanced_query = query
if context_summary:
enhanced_query = query + context_summary
# Generate response message using LLM if available and we have documents
message = None
if self.llm_generator and search_result["count"] > 0:
# For legal queries, use structured output (top-4 for good context and speed)
if intent == "search_legal" and search_result["results"]:
legal_docs = [r["data"] for r in search_result["results"] if r.get("type") == "legal"][:4] # Top-4 for balance
if legal_docs:
structured_answer = self.llm_generator.generate_structured_legal_answer(
enhanced_query, # Dùng enhanced_query có context
legal_docs,
prefill_summary=None
)
if structured_answer:
message = format_structured_legal_answer(structured_answer)
# For other intents or if structured failed, use regular LLM generation
if not message:
documents = [r["data"] for r in search_result["results"][:4]] # Top-4 for balance
message = self.llm_generator.generate_answer(
enhanced_query, # Dùng enhanced_query có context
context=context,
documents=documents
)
# Fallback to template if LLM not available or failed
if not message:
if search_result["count"] > 0:
# Đặc biệt xử lý legal queries: format tốt hơn thay vì dùng template chung
if intent == "search_legal" and search_result["results"]:
top_result = search_result["results"][0]
top_data = top_result.get("data", {})
doc_code = top_data.get("document_code", "")
doc_title = top_data.get("document_title", "văn bản pháp luật")
section_code = top_data.get("section_code", "")
section_title = top_data.get("section_title", "")
content = top_data.get("content", "") or top_data.get("excerpt", "")
if content and len(content) > 50:
content_preview = content[:400] + "..." if len(content) > 400 else content
message = (
f"Tôi tìm thấy {search_result['count']} điều khoản liên quan đến '{query}':\n\n"
f"**{section_code}**: {section_title or 'Nội dung liên quan'}\n\n"
f"{content_preview}\n\n"
f"Nguồn: {doc_title}" + (f" ({doc_code})" if doc_code else "")
)
else:
template = RESPONSE_TEMPLATES.get(intent, RESPONSE_TEMPLATES["general_query"])
message = template.format(
count=search_result["count"],
query=query
)
else:
template = RESPONSE_TEMPLATES.get(intent, RESPONSE_TEMPLATES["general_query"])
message = template.format(
count=search_result["count"],
query=query
)
else:
message = RESPONSE_TEMPLATES["no_results"].format(query=query)
# Limit results to top 5 for response
results = search_result["results"][:5]
response = {
"message": message,
"intent": intent,
"confidence": 0.95, # High confidence for Slow Path (thorough search)
"results": results,
"count": len(results),
"_source": "slow_path"
}
return response
def _maybe_request_clarification(
self,
query: str,
search_result: Dict[str, Any],
selected_document_code: Optional[str] = None,
) -> Optional[Dict[str, Any]]:
"""
Quyết định có nên hỏi người dùng chọn văn bản (wizard step: choose_document).
Nguyên tắc option-first:
- Nếu user CHƯA chọn văn bản trong session
- Và trong câu hỏi KHÔNG ghi rõ mã văn bản
- Và search có trả về kết quả
=> Ưu tiên trả về danh sách văn bản để người dùng chọn, thay vì trả lời thẳng.
"""
if selected_document_code:
return None
if not search_result or search_result.get("count", 0) == 0:
return None
# Nếu người dùng đã ghi rõ mã văn bản trong câu hỏi (ví dụ: 264/QĐ-TW)
# thì không cần hỏi lại – ưu tiên dùng chính mã đó.
if self._has_explicit_document_code_in_query(query):
return None
# Ưu tiên dùng danh sách văn bản "chuẩn" (canonical) nếu có trong DB.
# Tuy nhiên, để đảm bảo wizard luôn hoạt động (option-first),
# nếu DB chưa đủ dữ liệu thì vẫn build danh sách tĩnh fallback.
fallback_candidates: List[Dict[str, Any]] = []
try:
fallback_docs = list(
LegalDocument.objects.filter(
code__in=["264-QD-TW", "QD-69-TW", "TT-02-CAND"]
)
)
for doc in fallback_docs:
summary = getattr(doc, "summary", "") or ""
metadata = getattr(doc, "metadata", {}) or {}
if not summary and isinstance(metadata, dict):
summary = metadata.get("summary", "")
fallback_candidates.append(
{
"code": doc.code,
"title": getattr(doc, "title", "") or doc.code,
"summary": summary,
"doc_type": getattr(doc, "doc_type", "") or "",
"section_title": "",
}
)
except Exception as exc:
logger.warning(
"[CLARIFICATION] Fallback documents lookup failed, using static list: %s",
exc,
)
# Nếu DB chưa có đủ thông tin, luôn cung cấp danh sách tĩnh tối thiểu,
# để wizard option-first vẫn hoạt động.
if not fallback_candidates:
fallback_candidates = [
{
"code": "264-QD-TW",
"title": "Quyết định 264-QĐ/TW về kỷ luật đảng viên",
"summary": "",
"doc_type": "",
"section_title": "",
},
{
"code": "QD-69-TW",
"title": "Quy định 69-QĐ/TW về kỷ luật tổ chức đảng, đảng viên",
"summary": "",
"doc_type": "",
"section_title": "",
},
{
"code": "TT-02-CAND",
"title": "Thông tư 02/2021/TT-BCA về điều lệnh CAND",
"summary": "",
"doc_type": "",
"section_title": "",
},
]
payload = self._build_clarification_payload(query, fallback_candidates)
if payload:
logger.info(
"[CLARIFICATION] Requesting user choice among canonical documents: %s",
[c["code"] for c in fallback_candidates],
)
return payload
def _has_explicit_document_code_in_query(self, query: str) -> bool:
"""
Check if the raw query string explicitly contains a known document code
pattern (e.g. '264/QĐ-TW', 'QD-69-TW', 'TT-02-CAND').
Khác với _detect_document_code (dò toàn bộ bảng LegalDocument theo token),
hàm này chỉ dựa trên các regex cố định để tránh over-detect cho câu hỏi
chung chung như 'xử lí kỷ luật đảng viên thế nào'.
"""
normalized = self._remove_accents(query).upper()
if not normalized:
return False
for pattern in DOCUMENT_CODE_PATTERNS:
try:
if re.search(pattern, normalized):
return True
except re.error:
# Nếu pattern không hợp lệ thì bỏ qua, không chặn flow
continue
return False
def _collect_document_candidates(
self,
legal_results: List[Dict[str, Any]],
limit: int = 4,
) -> List[Dict[str, Any]]:
"""Collect unique document candidates from legal results."""
ordered_codes: List[str] = []
seen: set[str] = set()
for result in legal_results:
data = result.get("data", {})
code = (data.get("document_code") or "").strip()
if not code:
continue
upper = code.upper()
if upper in seen:
continue
ordered_codes.append(code)
seen.add(upper)
if len(ordered_codes) >= limit:
break
if len(ordered_codes) < 2:
return []
try:
documents = {
doc.code.upper(): doc
for doc in LegalDocument.objects.filter(code__in=ordered_codes)
}
except Exception as exc:
logger.warning("[CLARIFICATION] Unable to load documents for candidates: %s", exc)
documents = {}
candidates: List[Dict[str, Any]] = []
for code in ordered_codes:
upper = code.upper()
doc_obj = documents.get(upper)
section = next(
(
res
for res in legal_results
if (res.get("data", {}).get("document_code") or "").strip().upper() == upper
),
None,
)
data = section.get("data", {}) if section else {}
summary = ""
if doc_obj:
summary = doc_obj.summary or ""
if not summary and isinstance(doc_obj.metadata, dict):
summary = doc_obj.metadata.get("summary", "")
if not summary:
summary = data.get("excerpt") or data.get("content", "")[:200]
candidates.append(
{
"code": code,
"title": data.get("document_title") or (doc_obj.title if doc_obj else code),
"summary": summary,
"doc_type": doc_obj.doc_type if doc_obj else "",
"section_title": data.get("section_title") or "",
}
)
return candidates
def _build_clarification_payload(
self,
query: str,
candidates: List[Dict[str, Any]],
) -> Optional[Dict[str, Any]]:
if not candidates:
return None
default_message = (
"Tôi tìm thấy một số văn bản có thể phù hợp. "
"Bạn vui lòng chọn văn bản muốn tra cứu để tôi trả lời chính xác hơn."
)
llm_payload = self._call_clarification_llm(query, candidates)
message = default_message
options: List[Dict[str, Any]] = []
# Ưu tiên dùng gợi ý từ LLM, nhưng phải luôn đảm bảo có options fallback
if llm_payload:
message = llm_payload.get("message") or default_message
raw_options = llm_payload.get("options")
if isinstance(raw_options, list):
options = [
{
"code": (opt.get("code") or candidate.get("code", "")).upper(),
"title": opt.get("title") or opt.get("document_title") or candidate.get("title", ""),
"reason": opt.get("reason")
or opt.get("summary")
or candidate.get("summary")
or candidate.get("section_title")
or "",
}
for opt, candidate in zip(
raw_options,
candidates[: len(raw_options)],
)
if (opt.get("code") or candidate.get("code"))
and (opt.get("title") or opt.get("document_title") or candidate.get("title"))
]
# Nếu LLM không trả về options hợp lệ → fallback build từ candidates
if not options:
options = [
{
"code": candidate["code"].upper(),
"title": candidate["title"],
"reason": candidate.get("summary") or candidate.get("section_title") or "",
}
for candidate in candidates[:3]
]
if not any(opt.get("code") == "__other__" for opt in options):
options.append(
{
"code": "__other__",
"title": "Khác",
"reason": "Tôi muốn hỏi văn bản hoặc chủ đề khác",
}
)
return {
# Wizard-style payload: ưu tiên dạng options cho UI
"type": "options",
"wizard_stage": "choose_document",
"message": message,
"options": options,
"clarification": {
"message": message,
"options": options,
},
"results": [],
"count": 0,
}
def _call_clarification_llm(
self,
query: str,
candidates: List[Dict[str, Any]],
) -> Optional[Dict[str, Any]]:
if not self.llm_generator:
return None
try:
return self.llm_generator.suggest_clarification_topics(
query,
candidates,
max_options=3,
)
except Exception as exc:
logger.warning("[CLARIFICATION] LLM suggestion failed: %s", exc)
return None
def _parallel_search_prepare(
self,
document_code: str,
keywords: List[str],
session_id: Optional[str] = None,
) -> None:
"""
Trigger parallel search in background when user selects a document option.
Stores results in cache for Stage 2 (choose topic).
Args:
document_code: Selected document code
keywords: Keywords extracted from query/options
session_id: Session ID for caching results
"""
if not session_id:
return
def _search_task():
try:
logger.info(
"[PARALLEL_SEARCH] Starting background search for doc=%s, keywords=%s",
document_code,
keywords[:5],
)
# Check Redis cache first
cache_key = f"prefetch:{document_code.upper()}:{hashlib.sha256(' '.join(keywords).encode()).hexdigest()[:16]}"
cached_result = None
if self.redis_cache and self.redis_cache.is_available():
cached_result = self.redis_cache.get(cache_key)
if cached_result:
logger.info(
"[PARALLEL_SEARCH] ✅ Cache hit for doc=%s",
document_code
)
# Store in in-memory cache too
with self._cache_lock:
if session_id not in self._prefetched_cache:
self._prefetched_cache[session_id] = {}
self._prefetched_cache[session_id]["document_results"] = cached_result
return
# Search in the selected document
query_text = " ".join(keywords) if keywords else ""
search_result = self._search_by_intent(
intent="search_legal",
query=query_text,
limit=20, # Get more results for topic options
preferred_document_code=document_code.upper(),
)
# Prepare cache data
cache_data = {
"document_code": document_code,
"results": search_result.get("results", []),
"count": search_result.get("count", 0),
"timestamp": time.time(),
}
# Store in Redis cache
if self.redis_cache and self.redis_cache.is_available():
self.redis_cache.set(cache_key, cache_data, ttl_seconds=self.prefetch_cache_ttl)
logger.debug(
"[PARALLEL_SEARCH] Cached prefetch results (TTL: %ds)",
self.prefetch_cache_ttl
)
# Store in in-memory cache (fallback)
with self._cache_lock:
if session_id not in self._prefetched_cache:
self._prefetched_cache[session_id] = {}
self._prefetched_cache[session_id]["document_results"] = cache_data
logger.info(
"[PARALLEL_SEARCH] Completed background search for doc=%s, found %d results",
document_code,
search_result.get("count", 0),
)
except Exception as exc:
logger.warning("[PARALLEL_SEARCH] Background search failed: %s", exc)
# Submit to thread pool
self._executor.submit(_search_task)
def _parallel_search_topic(
self,
document_code: str,
topic_keywords: List[str],
session_id: Optional[str] = None,
) -> None:
"""
Trigger parallel search when user selects a topic option.
Stores results for final answer generation.
Args:
document_code: Selected document code
topic_keywords: Keywords from selected topic
session_id: Session ID for caching results
"""
if not session_id:
return
def _search_task():
try:
logger.info(
"[PARALLEL_SEARCH] Starting topic search for doc=%s, keywords=%s",
document_code,
topic_keywords[:5],
)
# Search with topic keywords
query_text = " ".join(topic_keywords) if topic_keywords else ""
search_result = self._search_by_intent(
intent="search_legal",
query=query_text,
limit=10,
preferred_document_code=document_code.upper(),
)
# Store in cache
with self._cache_lock:
if session_id not in self._prefetched_cache:
self._prefetched_cache[session_id] = {}
self._prefetched_cache[session_id]["topic_results"] = {
"document_code": document_code,
"keywords": topic_keywords,
"results": search_result.get("results", []),
"count": search_result.get("count", 0),
"timestamp": time.time(),
}
logger.info(
"[PARALLEL_SEARCH] Completed topic search, found %d results",
search_result.get("count", 0),
)
except Exception as exc:
logger.warning("[PARALLEL_SEARCH] Topic search failed: %s", exc)
# Submit to thread pool
self._executor.submit(_search_task)
def _get_prefetched_results(
self,
session_id: Optional[str],
result_type: str = "document_results",
) -> Optional[Dict[str, Any]]:
"""
Get prefetched search results from cache.
Args:
session_id: Session ID
result_type: "document_results" or "topic_results"
Returns:
Cached results dict or None
"""
if not session_id:
return None
with self._cache_lock:
cache_entry = self._prefetched_cache.get(session_id)
if not cache_entry:
return None
results = cache_entry.get(result_type)
if not results:
return None
# Check if results are still fresh (within 5 minutes)
timestamp = results.get("timestamp", 0)
if time.time() - timestamp > 300: # 5 minutes
logger.debug("[PARALLEL_SEARCH] Prefetched results expired for session=%s", session_id)
return None
return results
def _clear_prefetched_cache(self, session_id: Optional[str]) -> None:
"""Clear prefetched cache for a session."""
if not session_id:
return
with self._cache_lock:
if session_id in self._prefetched_cache:
del self._prefetched_cache[session_id]
logger.debug("[PARALLEL_SEARCH] Cleared cache for session=%s", session_id)
def _search_by_intent(
self,
intent: str,
query: str,
limit: int = 5,
preferred_document_code: Optional[str] = None,
) -> Dict[str, Any]:
"""Search based on classified intent. Reduced limit from 20 to 5 for faster inference on free tier."""
# Use original query for better matching
keywords = query.strip()
extracted = " ".join(self.chatbot.extract_keywords(query))
if extracted and len(extracted) > 2:
keywords = f"{keywords} {extracted}"
results = []
if intent == "search_fine":
qs = Fine.objects.all()
text_fields = ["name", "code", "article", "decree", "remedial"]
search_results = search_with_ml(qs, keywords, text_fields, top_k=limit, min_score=0.1)
results = [{"type": "fine", "data": {
"id": f.id,
"name": f.name,
"code": f.code,
"min_fine": float(f.min_fine) if f.min_fine else None,
"max_fine": float(f.max_fine) if f.max_fine else None,
"article": f.article,
"decree": f.decree,
}} for f in search_results]
elif intent == "search_procedure":
qs = Procedure.objects.all()
text_fields = ["title", "domain", "conditions", "dossier"]
search_results = search_with_ml(qs, keywords, text_fields, top_k=limit, min_score=0.1)
results = [{"type": "procedure", "data": {
"id": p.id,
"title": p.title,
"domain": p.domain,
"level": p.level,
}} for p in search_results]
elif intent == "search_office":
qs = Office.objects.all()
text_fields = ["unit_name", "address", "district", "service_scope"]
search_results = search_with_ml(qs, keywords, text_fields, top_k=limit, min_score=0.1)
results = [{"type": "office", "data": {
"id": o.id,
"unit_name": o.unit_name,
"address": o.address,
"district": o.district,
"phone": o.phone,
"working_hours": o.working_hours,
}} for o in search_results]
elif intent == "search_advisory":
qs = Advisory.objects.all()
text_fields = ["title", "summary"]
search_results = search_with_ml(qs, keywords, text_fields, top_k=limit, min_score=0.1)
results = [{"type": "advisory", "data": {
"id": a.id,
"title": a.title,
"summary": a.summary,
}} for a in search_results]
elif intent == "search_legal":
qs = LegalSection.objects.all()
text_fields = ["section_title", "section_code", "content"]
detected_code = self._detect_document_code(query)
effective_code = preferred_document_code or detected_code
filtered = False
if effective_code:
filtered_qs = qs.filter(document__code__iexact=effective_code)
if filtered_qs.exists():
qs = filtered_qs
filtered = True
logger.info(
"[SEARCH] Prefiltering legal sections for document code %s (query='%s')",
effective_code,
query,
)
else:
logger.info(
"[SEARCH] Document code %s detected but no sections found locally, falling back to full corpus",
effective_code,
)
else:
logger.debug("[SEARCH] No document code detected for query: %s", query)
# Use pure semantic search (100% vector, no BM25)
search_results = pure_semantic_search(
[keywords],
qs,
top_k=limit, # limit=15 for reranking, will be reduced to 4
text_fields=text_fields
)
results = self._format_legal_results(search_results, detected_code, query=query)
logger.info(
"[SEARCH] Legal intent processed (query='%s', code=%s, filtered=%s, results=%d)",
query,
detected_code or "None",
filtered,
len(results),
)
return {
"intent": intent,
"query": query,
"keywords": keywords,
"results": results,
"count": len(results),
"detected_code": detected_code,
}
def _should_save_to_golden(self, query: str, response: Dict) -> bool:
"""
Decide if response should be saved to golden dataset.
Criteria:
- High confidence (>0.95)
- Has results
- Response is complete and well-formed
- Not already in golden dataset
"""
try:
from hue_portal.core.models import GoldenQuery
# Check if already exists
query_normalized = self._normalize_query(query)
if GoldenQuery.objects.filter(query_normalized=query_normalized, is_active=True).exists():
return False
# Check criteria
has_results = response.get("count", 0) > 0
has_message = bool(response.get("message", "").strip())
confidence = response.get("confidence", 0.0)
# Only save if high quality
if has_results and has_message and confidence >= 0.95:
# Additional check: message should be substantial (not just template)
message = response.get("message", "")
if len(message) > 50: # Substantial response
return True
return False
except Exception as e:
logger.warning(f"Error checking if should save to golden: {e}")
return False
def _normalize_query(self, query: str) -> str:
"""Normalize query for matching."""
normalized = query.lower().strip()
# Remove accents
normalized = unicodedata.normalize("NFD", normalized)
normalized = "".join(ch for ch in normalized if unicodedata.category(ch) != "Mn")
# Remove extra spaces
normalized = re.sub(r'\s+', ' ', normalized).strip()
return normalized
def _detect_document_code(self, query: str) -> Optional[str]:
"""Detect known document code mentioned in the query."""
normalized_query = self._remove_accents(query).upper()
if not normalized_query:
return None
try:
codes = LegalDocument.objects.values_list("code", flat=True)
except Exception as exc:
logger.debug("Unable to fetch document codes: %s", exc)
return None
for code in codes:
if not code:
continue
tokens = self._split_code_tokens(code)
if tokens and all(token in normalized_query for token in tokens):
logger.info("[SEARCH] Detected document code %s in query", code)
return code
return None
def _split_code_tokens(self, code: str) -> List[str]:
"""Split a document code into uppercase accentless tokens."""
normalized = self._remove_accents(code).upper()
return [tok for tok in re.split(r"[-/\s]+", normalized) if tok]
def _remove_accents(self, text: str) -> str:
if not text:
return ""
normalized = unicodedata.normalize("NFD", text)
return "".join(ch for ch in normalized if unicodedata.category(ch) != "Mn")
def _format_legal_results(
self,
search_results: List[Any],
detected_code: Optional[str],
query: Optional[str] = None,
) -> List[Dict[str, Any]]:
"""Build legal result payload and apply ordering/boosting based on doc code and keywords."""
entries: List[Dict[str, Any]] = []
upper_detected = detected_code.upper() if detected_code else None
# Keywords that indicate important legal concepts (boost score if found)
important_keywords = []
if query:
query_lower = query.lower()
# Keywords for percentage/threshold queries
if any(kw in query_lower for kw in ["%", "phần trăm", "tỷ lệ", "12%", "20%", "10%"]):
important_keywords.extend(["%", "phần trăm", "tỷ lệ", "12", "20", "10"])
# Keywords for ranking/demotion queries
if any(kw in query_lower for kw in ["hạ bậc", "thi đua", "xếp loại", "đánh giá"]):
important_keywords.extend(["hạ bậc", "thi đua", "xếp loại", "đánh giá"])
for ls in search_results:
doc = ls.document
doc_code = doc.code if doc else None
score = getattr(ls, "_ml_score", getattr(ls, "rank", 0.0)) or 0.0
# Boost score if content contains important keywords
content_text = (ls.content or ls.section_title or "").lower()
keyword_boost = 0.0
if important_keywords and content_text:
for kw in important_keywords:
if kw.lower() in content_text:
keyword_boost += 0.15 # Boost 0.15 per keyword match
logger.debug(
"[BOOST] Keyword '%s' found in section %s, boosting score",
kw,
ls.section_code,
)
entries.append(
{
"type": "legal",
"score": float(score) + keyword_boost,
"data": {
"id": ls.id,
"section_code": ls.section_code,
"section_title": ls.section_title,
"content": ls.content[:500] if ls.content else "",
"excerpt": ls.excerpt,
"document_code": doc_code,
"document_title": doc.title if doc else None,
"page_start": ls.page_start,
"page_end": ls.page_end,
},
}
)
if upper_detected:
exact_matches = [
r for r in entries if (r["data"].get("document_code") or "").upper() == upper_detected
]
if exact_matches:
others = [r for r in entries if r not in exact_matches]
entries = exact_matches + others
else:
for entry in entries:
doc_code = (entry["data"].get("document_code") or "").upper()
if doc_code == upper_detected:
entry["score"] = (entry.get("score") or 0.1) * 10
entries.sort(key=lambda r: r.get("score") or 0, reverse=True)
else:
# Sort by boosted score
entries.sort(key=lambda r: r.get("score") or 0, reverse=True)
return entries
def _is_complex_query(self, query: str) -> bool:
"""
Detect if query is complex and requires LLM reasoning (not suitable for Fast Path).
Complex queries contain keywords like: %, bậc, thi đua, tỷ lệ, liên đới, tăng nặng, giảm nhẹ, đơn vị vi phạm
"""
if not query:
return False
query_lower = query.lower()
complex_keywords = [
"%", "phần trăm",
"bậc", "hạ bậc", "nâng bậc",
"thi đua", "xếp loại", "đánh giá",
"tỷ lệ", "tỉ lệ",
"liên đới", "liên quan",
"tăng nặng", "tăng nặng hình phạt",
"giảm nhẹ", "giảm nhẹ hình phạt",
"đơn vị vi phạm", "đơn vị có",
]
for keyword in complex_keywords:
if keyword in query_lower:
logger.info(
"[FAST_PATH] Complex query detected (keyword: '%s'), forcing Slow Path",
keyword,
)
return True
return False
def _maybe_fast_path_response(
self, results: List[Dict[str, Any]], query: Optional[str] = None
) -> Optional[Dict[str, Any]]:
"""Return fast-path response if results are confident enough."""
if not results:
return None
# Double-check: if query is complex, never use Fast Path
if query and self._is_complex_query(query):
return None
top_result = results[0]
top_score = top_result.get("score", 0.0) or 0.0
doc_code = (top_result.get("data", {}).get("document_code") or "").upper()
if top_score >= 0.88 and doc_code:
logger.info(
"[FAST_PATH] Top score hit (%.3f) for document %s", top_score, doc_code
)
message = self._format_fast_legal_message(top_result)
return {
"message": message,
"results": results[:3],
"count": min(3, len(results)),
"confidence": min(0.99, top_score + 0.05),
}
top_three = results[:3]
if len(top_three) >= 2:
doc_codes = [
(res.get("data", {}).get("document_code") or "").upper()
for res in top_three
if res.get("data", {}).get("document_code")
]
if doc_codes and len(set(doc_codes)) == 1:
logger.info(
"[FAST_PATH] Top-%d results share same document %s",
len(top_three),
doc_codes[0],
)
message = self._format_fast_legal_message(top_three[0])
return {
"message": message,
"results": top_three,
"count": len(top_three),
"confidence": min(0.97, (top_three[0].get("score") or 0.9) + 0.04),
}
return None
def _format_fast_legal_message(self, result: Dict[str, Any]) -> str:
"""Format a concise legal answer without LLM."""
data = result.get("data", {})
doc_title = data.get("document_title") or "văn bản pháp luật"
doc_code = data.get("document_code") or ""
section_code = data.get("section_code") or "Điều liên quan"
section_title = data.get("section_title") or ""
content = (data.get("content") or data.get("excerpt") or "").strip()
if len(content) > 400:
trimmed = content[:400].rsplit(" ", 1)[0]
content = f"{trimmed}..."
intro = "Kết quả chính xác nhất:"
lines = [intro]
if doc_title or doc_code:
lines.append(f"- Văn bản: {doc_title or 'văn bản pháp luật'}" + (f" ({doc_code})" if doc_code else ""))
section_label = section_code
if section_title:
section_label = f"{section_code}{section_title}"
lines.append(f"- Điều khoản: {section_label}")
lines.append("")
lines.append(content)
citation_doc = doc_title or doc_code or "nguồn chính thức"
lines.append(f"\nNguồn: {section_label}, {citation_doc}.")
return "\n".join(lines)