|
|
""" |
|
|
Slow Path Handler - Full RAG pipeline for complex queries. |
|
|
""" |
|
|
import os |
|
|
import time |
|
|
import logging |
|
|
import hashlib |
|
|
from typing import Dict, Any, Optional, List, Set |
|
|
import unicodedata |
|
|
import re |
|
|
from concurrent.futures import ThreadPoolExecutor, Future |
|
|
import threading |
|
|
|
|
|
from hue_portal.core.chatbot import get_chatbot, RESPONSE_TEMPLATES |
|
|
from hue_portal.core.models import ( |
|
|
Fine, |
|
|
Procedure, |
|
|
Office, |
|
|
Advisory, |
|
|
LegalSection, |
|
|
LegalDocument, |
|
|
) |
|
|
from hue_portal.core.search_ml import search_with_ml |
|
|
from hue_portal.core.pure_semantic_search import pure_semantic_search |
|
|
|
|
|
|
|
|
from hue_portal.chatbot.llm_integration import get_llm_generator |
|
|
from hue_portal.chatbot.structured_legal import format_structured_legal_answer |
|
|
from hue_portal.chatbot.context_manager import ConversationContext |
|
|
from hue_portal.chatbot.router import DOCUMENT_CODE_PATTERNS |
|
|
from hue_portal.core.query_rewriter import get_query_rewriter |
|
|
from hue_portal.core.pure_semantic_search import pure_semantic_search, parallel_vector_search |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class SlowPathHandler: |
|
|
"""Handle Slow Path queries with full RAG pipeline.""" |
|
|
|
|
|
def __init__(self): |
|
|
self.chatbot = get_chatbot() |
|
|
self.llm_generator = get_llm_generator() |
|
|
|
|
|
self._executor = ThreadPoolExecutor(max_workers=2, thread_name_prefix="parallel_search") |
|
|
|
|
|
self._prefetched_cache: Dict[str, Dict[str, Any]] = {} |
|
|
self._cache_lock = threading.Lock() |
|
|
|
|
|
self.redis_cache = get_redis_cache() |
|
|
|
|
|
self.prefetch_cache_ttl = int(os.environ.get("CACHE_PREFETCH_TTL", "1800")) |
|
|
|
|
|
def handle( |
|
|
self, |
|
|
query: str, |
|
|
intent: str, |
|
|
session_id: Optional[str] = None, |
|
|
selected_document_code: Optional[str] = None, |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Full RAG pipeline: |
|
|
1. Search (hybrid: BM25 + vector) |
|
|
2. Retrieve top 20 documents |
|
|
3. LLM generation with structured output (for legal queries) |
|
|
4. Guardrails validation |
|
|
5. Retry up to 3 times if needed |
|
|
|
|
|
Args: |
|
|
query: User query. |
|
|
intent: Detected intent. |
|
|
session_id: Optional session ID for context. |
|
|
selected_document_code: Selected document code from wizard. |
|
|
|
|
|
Returns: |
|
|
Response dict with message, intent, results, etc. |
|
|
""" |
|
|
query = query.strip() |
|
|
selected_document_code_normalized = ( |
|
|
selected_document_code.strip().upper() if selected_document_code else None |
|
|
) |
|
|
|
|
|
|
|
|
if intent == "greeting": |
|
|
query_lower = query.lower().strip() |
|
|
query_words = query_lower.split() |
|
|
is_simple_greeting = ( |
|
|
len(query_words) <= 3 and |
|
|
any(greeting in query_lower for greeting in ["xin chào", "chào", "hello", "hi"]) and |
|
|
not any(kw in query_lower for kw in ["phạt", "mức phạt", "vi phạm", "thủ tục", "hồ sơ", "địa chỉ", "công an", "cảnh báo"]) |
|
|
) |
|
|
if is_simple_greeting: |
|
|
return { |
|
|
"message": RESPONSE_TEMPLATES["greeting"], |
|
|
"intent": "greeting", |
|
|
"results": [], |
|
|
"count": 0, |
|
|
"_source": "slow_path" |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
has_explicit_code = self._has_explicit_document_code_in_query(query) |
|
|
logger.info( |
|
|
"[WIZARD] Checking wizard conditions - intent=%s, selected_code=%s, has_explicit_code=%s, query='%s'", |
|
|
intent, |
|
|
selected_document_code_normalized, |
|
|
has_explicit_code, |
|
|
query[:50], |
|
|
) |
|
|
if ( |
|
|
intent == "search_legal" |
|
|
and not selected_document_code_normalized |
|
|
and not has_explicit_code |
|
|
): |
|
|
logger.info("[QUERY_REWRITE] ✅ Wizard conditions met, using Query Rewrite Strategy") |
|
|
|
|
|
|
|
|
query_rewriter = get_query_rewriter(self.llm_generator) |
|
|
|
|
|
|
|
|
context = None |
|
|
if session_id: |
|
|
try: |
|
|
recent_messages = ConversationContext.get_recent_messages(session_id, limit=5) |
|
|
context = [ |
|
|
{"role": msg.role, "content": msg.content} |
|
|
for msg in recent_messages |
|
|
] |
|
|
except Exception as exc: |
|
|
logger.warning("[QUERY_REWRITE] Failed to load context: %s", exc) |
|
|
|
|
|
|
|
|
rewritten_queries = query_rewriter.rewrite_query( |
|
|
query, |
|
|
context=context, |
|
|
max_queries=5, |
|
|
min_queries=3 |
|
|
) |
|
|
|
|
|
if not rewritten_queries: |
|
|
|
|
|
rewritten_queries = [query] |
|
|
|
|
|
logger.info( |
|
|
"[QUERY_REWRITE] Rewrote query into %d queries: %s", |
|
|
len(rewritten_queries), |
|
|
rewritten_queries[:3] |
|
|
) |
|
|
|
|
|
|
|
|
try: |
|
|
from hue_portal.core.models import LegalSection |
|
|
|
|
|
|
|
|
qs = LegalSection.objects.all() |
|
|
text_fields = ["section_title", "section_code", "content"] |
|
|
|
|
|
|
|
|
search_results = parallel_vector_search( |
|
|
rewritten_queries, |
|
|
qs, |
|
|
top_k_per_query=5, |
|
|
final_top_k=7, |
|
|
text_fields=text_fields |
|
|
) |
|
|
|
|
|
|
|
|
doc_codes_seen: Set[str] = set() |
|
|
document_options: List[Dict[str, Any]] = [] |
|
|
|
|
|
for section, score in search_results: |
|
|
doc = getattr(section, "document", None) |
|
|
if not doc: |
|
|
continue |
|
|
|
|
|
doc_code = getattr(doc, "code", "").upper() |
|
|
if not doc_code or doc_code in doc_codes_seen: |
|
|
continue |
|
|
|
|
|
doc_codes_seen.add(doc_code) |
|
|
|
|
|
|
|
|
doc_title = getattr(doc, "title", "") or doc_code |
|
|
doc_summary = getattr(doc, "summary", "") or "" |
|
|
if not doc_summary: |
|
|
metadata = getattr(doc, "metadata", {}) or {} |
|
|
if isinstance(metadata, dict): |
|
|
doc_summary = metadata.get("summary", "") |
|
|
|
|
|
document_options.append({ |
|
|
"code": doc_code, |
|
|
"title": doc_title, |
|
|
"summary": doc_summary, |
|
|
"score": float(score), |
|
|
"doc_type": getattr(doc, "doc_type", "") or "", |
|
|
}) |
|
|
|
|
|
|
|
|
if len(document_options) >= 5: |
|
|
break |
|
|
|
|
|
|
|
|
if not document_options: |
|
|
logger.warning("[QUERY_REWRITE] No documents found, using canonical fallback") |
|
|
canonical_candidates = [ |
|
|
{ |
|
|
"code": "264-QD-TW", |
|
|
"title": "Quyết định 264-QĐ/TW về kỷ luật đảng viên", |
|
|
"summary": "", |
|
|
"doc_type": "", |
|
|
}, |
|
|
{ |
|
|
"code": "QD-69-TW", |
|
|
"title": "Quy định 69-QĐ/TW về kỷ luật tổ chức đảng, đảng viên", |
|
|
"summary": "", |
|
|
"doc_type": "", |
|
|
}, |
|
|
{ |
|
|
"code": "TT-02-CAND", |
|
|
"title": "Thông tư 02/2021/TT-BCA về điều lệnh CAND", |
|
|
"summary": "", |
|
|
"doc_type": "", |
|
|
}, |
|
|
] |
|
|
clarification_payload = self._build_clarification_payload( |
|
|
query, canonical_candidates |
|
|
) |
|
|
if clarification_payload: |
|
|
clarification_payload.setdefault("intent", intent) |
|
|
clarification_payload.setdefault("_source", "clarification") |
|
|
clarification_payload.setdefault("routing", "clarification") |
|
|
clarification_payload.setdefault("confidence", 0.3) |
|
|
return clarification_payload |
|
|
|
|
|
|
|
|
options = [ |
|
|
{ |
|
|
"code": opt["code"], |
|
|
"title": opt["title"], |
|
|
"reason": opt.get("summary") or f"Độ liên quan: {opt['score']:.2f}", |
|
|
} |
|
|
for opt in document_options |
|
|
] |
|
|
|
|
|
|
|
|
if not any(opt.get("code") == "__other__" for opt in options): |
|
|
options.append({ |
|
|
"code": "__other__", |
|
|
"title": "Khác", |
|
|
"reason": "Tôi muốn hỏi văn bản hoặc chủ đề pháp luật khác.", |
|
|
}) |
|
|
|
|
|
message = ( |
|
|
"Tôi đã tìm thấy các văn bản pháp luật liên quan đến câu hỏi của bạn.\n\n" |
|
|
"Bạn hãy chọn văn bản muốn tra cứu để tôi trả lời chi tiết hơn:" |
|
|
) |
|
|
|
|
|
logger.info( |
|
|
"[QUERY_REWRITE] ✅ Found %d documents using Query Rewrite Strategy", |
|
|
len(document_options) |
|
|
) |
|
|
|
|
|
return { |
|
|
"type": "options", |
|
|
"wizard_stage": "choose_document", |
|
|
"message": message, |
|
|
"options": options, |
|
|
"clarification": { |
|
|
"message": message, |
|
|
"options": options, |
|
|
}, |
|
|
"results": [], |
|
|
"count": 0, |
|
|
"intent": intent, |
|
|
"_source": "query_rewrite", |
|
|
"routing": "query_rewrite", |
|
|
"confidence": 0.95, |
|
|
} |
|
|
|
|
|
except Exception as exc: |
|
|
logger.error( |
|
|
"[QUERY_REWRITE] Error in Query Rewrite Strategy: %s, falling back to LLM suggestions", |
|
|
exc, |
|
|
exc_info=True |
|
|
) |
|
|
|
|
|
canonical_candidates: List[Dict[str, Any]] = [] |
|
|
try: |
|
|
canonical_docs = list( |
|
|
LegalDocument.objects.filter( |
|
|
code__in=["264-QD-TW", "QD-69-TW", "TT-02-CAND"] |
|
|
) |
|
|
) |
|
|
for doc in canonical_docs: |
|
|
summary = getattr(doc, "summary", "") or "" |
|
|
metadata = getattr(doc, "metadata", {}) or {} |
|
|
if not summary and isinstance(metadata, dict): |
|
|
summary = metadata.get("summary", "") |
|
|
canonical_candidates.append( |
|
|
{ |
|
|
"code": doc.code, |
|
|
"title": getattr(doc, "title", "") or doc.code, |
|
|
"summary": summary, |
|
|
"doc_type": getattr(doc, "doc_type", "") or "", |
|
|
"section_title": "", |
|
|
} |
|
|
) |
|
|
except Exception as e: |
|
|
logger.warning("[CLARIFICATION] Canonical documents lookup failed: %s", e) |
|
|
|
|
|
if not canonical_candidates: |
|
|
canonical_candidates = [ |
|
|
{ |
|
|
"code": "264-QD-TW", |
|
|
"title": "Quyết định 264-QĐ/TW về kỷ luật đảng viên", |
|
|
"summary": "", |
|
|
"doc_type": "", |
|
|
"section_title": "", |
|
|
}, |
|
|
{ |
|
|
"code": "QD-69-TW", |
|
|
"title": "Quy định 69-QĐ/TW về kỷ luật tổ chức đảng, đảng viên", |
|
|
"summary": "", |
|
|
"doc_type": "", |
|
|
"section_title": "", |
|
|
}, |
|
|
{ |
|
|
"code": "TT-02-CAND", |
|
|
"title": "Thông tư 02/2021/TT-BCA về điều lệnh CAND", |
|
|
"summary": "", |
|
|
"doc_type": "", |
|
|
"section_title": "", |
|
|
}, |
|
|
] |
|
|
|
|
|
clarification_payload = self._build_clarification_payload( |
|
|
query, canonical_candidates |
|
|
) |
|
|
if clarification_payload: |
|
|
clarification_payload.setdefault("intent", intent) |
|
|
clarification_payload.setdefault("_source", "clarification_fallback") |
|
|
clarification_payload.setdefault("routing", "clarification") |
|
|
clarification_payload.setdefault("confidence", 0.3) |
|
|
return clarification_payload |
|
|
|
|
|
|
|
|
search_result = self._search_by_intent( |
|
|
intent, |
|
|
query, |
|
|
limit=15, |
|
|
preferred_document_code=selected_document_code_normalized, |
|
|
) |
|
|
|
|
|
|
|
|
fast_path_response = None |
|
|
if intent == "search_legal" and not self._is_complex_query(query): |
|
|
fast_path_response = self._maybe_fast_path_response(search_result["results"], query) |
|
|
if fast_path_response: |
|
|
fast_path_response["intent"] = intent |
|
|
fast_path_response["_source"] = "fast_path" |
|
|
return fast_path_response |
|
|
|
|
|
|
|
|
|
|
|
enable_reranker = os.environ.get("ENABLE_RERANKER", "false").lower() == "true" |
|
|
if intent == "search_legal" and enable_reranker: |
|
|
try: |
|
|
|
|
|
from hue_portal.core.reranker import rerank_documents |
|
|
|
|
|
legal_results = [r for r in search_result["results"] if r.get("type") == "legal"] |
|
|
if len(legal_results) > 0: |
|
|
|
|
|
top_k = min(4, len(legal_results)) |
|
|
reranked = rerank_documents(query, legal_results, top_k=top_k) |
|
|
|
|
|
non_legal = [r for r in search_result["results"] if r.get("type") != "legal"] |
|
|
search_result["results"] = reranked + non_legal |
|
|
search_result["count"] = len(search_result["results"]) |
|
|
logger.info( |
|
|
"[RERANKER] Reranked %d legal results to top-%d for query: %s", |
|
|
len(legal_results), |
|
|
top_k, |
|
|
query[:50] |
|
|
) |
|
|
except Exception as e: |
|
|
logger.warning("[RERANKER] Reranking failed: %s, using original results", e) |
|
|
elif intent == "search_legal": |
|
|
|
|
|
logger.debug("[RERANKER] Skipped reranking for speed (ENABLE_RERANKER=false)") |
|
|
|
|
|
|
|
|
|
|
|
if intent == "search_legal" and search_result["count"] > 0: |
|
|
top_result = search_result["results"][0] |
|
|
top_score = top_result.get("score", 0.0) or 0.0 |
|
|
top_data = top_result.get("data", {}) |
|
|
doc_code = (top_data.get("document_code") or "").upper() |
|
|
content = top_data.get("content", "") or top_data.get("excerpt", "") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
should_bypass = False |
|
|
query_lower = query.lower() |
|
|
has_keywords = any(kw in query_lower for kw in ["%", "phần trăm", "tỷ lệ", "12%", "20%", "10%", "hạ bậc", "thi đua", "xếp loại", "vi phạm", "cán bộ"]) |
|
|
|
|
|
|
|
|
if doc_code and len(content) > 100: |
|
|
if top_score >= 0.4: |
|
|
should_bypass = True |
|
|
elif has_keywords and top_score >= 0.3: |
|
|
should_bypass = True |
|
|
|
|
|
elif has_keywords and len(content) > 100 and top_score >= 0.3: |
|
|
should_bypass = True |
|
|
|
|
|
if should_bypass: |
|
|
|
|
|
if any(kw in query_lower for kw in ["12%", "tỷ lệ", "phần trăm", "hạ bậc", "thi đua"]): |
|
|
|
|
|
section_code = top_data.get("section_code", "") |
|
|
section_title = top_data.get("section_title", "") |
|
|
doc_title = top_data.get("document_title", "văn bản pháp luật") |
|
|
|
|
|
|
|
|
content_preview = content[:600] + "..." if len(content) > 600 else content |
|
|
|
|
|
answer = ( |
|
|
f"Theo {doc_title} ({doc_code}):\n\n" |
|
|
f"{section_code}: {section_title}\n\n" |
|
|
f"{content_preview}\n\n" |
|
|
f"Nguồn: {section_code}, {doc_title} ({doc_code})" |
|
|
) |
|
|
else: |
|
|
|
|
|
section_code = top_data.get("section_code", "Điều liên quan") |
|
|
section_title = top_data.get("section_title", "") |
|
|
doc_title = top_data.get("document_title", "văn bản pháp luật") |
|
|
content_preview = content[:500] + "..." if len(content) > 500 else content |
|
|
|
|
|
answer = ( |
|
|
f"Kết quả chính xác nhất:\n\n" |
|
|
f"- Văn bản: {doc_title} ({doc_code})\n" |
|
|
f"- Điều khoản: {section_code}" + (f" – {section_title}" if section_title else "") + "\n\n" |
|
|
f"{content_preview}\n\n" |
|
|
f"Nguồn: {section_code}, {doc_title} ({doc_code})" |
|
|
) |
|
|
|
|
|
logger.info( |
|
|
"[BYPASS_LLM] Using raw template for legal query (score=%.3f, doc=%s, query='%s')", |
|
|
top_score, |
|
|
doc_code, |
|
|
query[:50] |
|
|
) |
|
|
|
|
|
return { |
|
|
"message": answer, |
|
|
"intent": intent, |
|
|
"confidence": min(0.99, top_score + 0.05), |
|
|
"results": search_result["results"][:3], |
|
|
"count": min(3, search_result["count"]), |
|
|
"_source": "raw_template", |
|
|
"routing": "raw_template" |
|
|
} |
|
|
|
|
|
|
|
|
context = None |
|
|
context_summary = "" |
|
|
if session_id: |
|
|
try: |
|
|
recent_messages = ConversationContext.get_recent_messages(session_id, limit=5) |
|
|
context = [ |
|
|
{ |
|
|
"role": msg.role, |
|
|
"content": msg.content, |
|
|
"intent": msg.intent |
|
|
} |
|
|
for msg in recent_messages |
|
|
] |
|
|
|
|
|
if len(context) > 1: |
|
|
context_parts = [] |
|
|
for msg in reversed(context[-3:]): |
|
|
if msg["role"] == "user": |
|
|
context_parts.append(f"Người dùng: {msg['content'][:100]}") |
|
|
elif msg["role"] == "bot": |
|
|
context_parts.append(f"Bot: {msg['content'][:100]}") |
|
|
if context_parts: |
|
|
context_summary = "\n\nNgữ cảnh cuộc trò chuyện trước đó:\n" + "\n".join(context_parts) |
|
|
except Exception as exc: |
|
|
logger.warning("[CONTEXT] Failed to load conversation context: %s", exc) |
|
|
|
|
|
|
|
|
enhanced_query = query |
|
|
if context_summary: |
|
|
enhanced_query = query + context_summary |
|
|
|
|
|
|
|
|
message = None |
|
|
if self.llm_generator and search_result["count"] > 0: |
|
|
|
|
|
if intent == "search_legal" and search_result["results"]: |
|
|
legal_docs = [r["data"] for r in search_result["results"] if r.get("type") == "legal"][:4] |
|
|
if legal_docs: |
|
|
structured_answer = self.llm_generator.generate_structured_legal_answer( |
|
|
enhanced_query, |
|
|
legal_docs, |
|
|
prefill_summary=None |
|
|
) |
|
|
if structured_answer: |
|
|
message = format_structured_legal_answer(structured_answer) |
|
|
|
|
|
|
|
|
if not message: |
|
|
documents = [r["data"] for r in search_result["results"][:4]] |
|
|
message = self.llm_generator.generate_answer( |
|
|
enhanced_query, |
|
|
context=context, |
|
|
documents=documents |
|
|
) |
|
|
|
|
|
|
|
|
if not message: |
|
|
if search_result["count"] > 0: |
|
|
|
|
|
if intent == "search_legal" and search_result["results"]: |
|
|
top_result = search_result["results"][0] |
|
|
top_data = top_result.get("data", {}) |
|
|
doc_code = top_data.get("document_code", "") |
|
|
doc_title = top_data.get("document_title", "văn bản pháp luật") |
|
|
section_code = top_data.get("section_code", "") |
|
|
section_title = top_data.get("section_title", "") |
|
|
content = top_data.get("content", "") or top_data.get("excerpt", "") |
|
|
|
|
|
if content and len(content) > 50: |
|
|
content_preview = content[:400] + "..." if len(content) > 400 else content |
|
|
message = ( |
|
|
f"Tôi tìm thấy {search_result['count']} điều khoản liên quan đến '{query}':\n\n" |
|
|
f"**{section_code}**: {section_title or 'Nội dung liên quan'}\n\n" |
|
|
f"{content_preview}\n\n" |
|
|
f"Nguồn: {doc_title}" + (f" ({doc_code})" if doc_code else "") |
|
|
) |
|
|
else: |
|
|
template = RESPONSE_TEMPLATES.get(intent, RESPONSE_TEMPLATES["general_query"]) |
|
|
message = template.format( |
|
|
count=search_result["count"], |
|
|
query=query |
|
|
) |
|
|
else: |
|
|
template = RESPONSE_TEMPLATES.get(intent, RESPONSE_TEMPLATES["general_query"]) |
|
|
message = template.format( |
|
|
count=search_result["count"], |
|
|
query=query |
|
|
) |
|
|
else: |
|
|
message = RESPONSE_TEMPLATES["no_results"].format(query=query) |
|
|
|
|
|
|
|
|
results = search_result["results"][:5] |
|
|
|
|
|
response = { |
|
|
"message": message, |
|
|
"intent": intent, |
|
|
"confidence": 0.95, |
|
|
"results": results, |
|
|
"count": len(results), |
|
|
"_source": "slow_path" |
|
|
} |
|
|
|
|
|
return response |
|
|
|
|
|
def _maybe_request_clarification( |
|
|
self, |
|
|
query: str, |
|
|
search_result: Dict[str, Any], |
|
|
selected_document_code: Optional[str] = None, |
|
|
) -> Optional[Dict[str, Any]]: |
|
|
""" |
|
|
Quyết định có nên hỏi người dùng chọn văn bản (wizard step: choose_document). |
|
|
|
|
|
Nguyên tắc option-first: |
|
|
- Nếu user CHƯA chọn văn bản trong session |
|
|
- Và trong câu hỏi KHÔNG ghi rõ mã văn bản |
|
|
- Và search có trả về kết quả |
|
|
=> Ưu tiên trả về danh sách văn bản để người dùng chọn, thay vì trả lời thẳng. |
|
|
""" |
|
|
if selected_document_code: |
|
|
return None |
|
|
if not search_result or search_result.get("count", 0) == 0: |
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
if self._has_explicit_document_code_in_query(query): |
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
fallback_candidates: List[Dict[str, Any]] = [] |
|
|
try: |
|
|
fallback_docs = list( |
|
|
LegalDocument.objects.filter( |
|
|
code__in=["264-QD-TW", "QD-69-TW", "TT-02-CAND"] |
|
|
) |
|
|
) |
|
|
for doc in fallback_docs: |
|
|
summary = getattr(doc, "summary", "") or "" |
|
|
metadata = getattr(doc, "metadata", {}) or {} |
|
|
if not summary and isinstance(metadata, dict): |
|
|
summary = metadata.get("summary", "") |
|
|
fallback_candidates.append( |
|
|
{ |
|
|
"code": doc.code, |
|
|
"title": getattr(doc, "title", "") or doc.code, |
|
|
"summary": summary, |
|
|
"doc_type": getattr(doc, "doc_type", "") or "", |
|
|
"section_title": "", |
|
|
} |
|
|
) |
|
|
except Exception as exc: |
|
|
logger.warning( |
|
|
"[CLARIFICATION] Fallback documents lookup failed, using static list: %s", |
|
|
exc, |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
if not fallback_candidates: |
|
|
fallback_candidates = [ |
|
|
{ |
|
|
"code": "264-QD-TW", |
|
|
"title": "Quyết định 264-QĐ/TW về kỷ luật đảng viên", |
|
|
"summary": "", |
|
|
"doc_type": "", |
|
|
"section_title": "", |
|
|
}, |
|
|
{ |
|
|
"code": "QD-69-TW", |
|
|
"title": "Quy định 69-QĐ/TW về kỷ luật tổ chức đảng, đảng viên", |
|
|
"summary": "", |
|
|
"doc_type": "", |
|
|
"section_title": "", |
|
|
}, |
|
|
{ |
|
|
"code": "TT-02-CAND", |
|
|
"title": "Thông tư 02/2021/TT-BCA về điều lệnh CAND", |
|
|
"summary": "", |
|
|
"doc_type": "", |
|
|
"section_title": "", |
|
|
}, |
|
|
] |
|
|
|
|
|
payload = self._build_clarification_payload(query, fallback_candidates) |
|
|
if payload: |
|
|
logger.info( |
|
|
"[CLARIFICATION] Requesting user choice among canonical documents: %s", |
|
|
[c["code"] for c in fallback_candidates], |
|
|
) |
|
|
return payload |
|
|
|
|
|
def _has_explicit_document_code_in_query(self, query: str) -> bool: |
|
|
""" |
|
|
Check if the raw query string explicitly contains a known document code |
|
|
pattern (e.g. '264/QĐ-TW', 'QD-69-TW', 'TT-02-CAND'). |
|
|
|
|
|
Khác với _detect_document_code (dò toàn bộ bảng LegalDocument theo token), |
|
|
hàm này chỉ dựa trên các regex cố định để tránh over-detect cho câu hỏi |
|
|
chung chung như 'xử lí kỷ luật đảng viên thế nào'. |
|
|
""" |
|
|
normalized = self._remove_accents(query).upper() |
|
|
if not normalized: |
|
|
return False |
|
|
for pattern in DOCUMENT_CODE_PATTERNS: |
|
|
try: |
|
|
if re.search(pattern, normalized): |
|
|
return True |
|
|
except re.error: |
|
|
|
|
|
continue |
|
|
return False |
|
|
|
|
|
def _collect_document_candidates( |
|
|
self, |
|
|
legal_results: List[Dict[str, Any]], |
|
|
limit: int = 4, |
|
|
) -> List[Dict[str, Any]]: |
|
|
"""Collect unique document candidates from legal results.""" |
|
|
ordered_codes: List[str] = [] |
|
|
seen: set[str] = set() |
|
|
for result in legal_results: |
|
|
data = result.get("data", {}) |
|
|
code = (data.get("document_code") or "").strip() |
|
|
if not code: |
|
|
continue |
|
|
upper = code.upper() |
|
|
if upper in seen: |
|
|
continue |
|
|
ordered_codes.append(code) |
|
|
seen.add(upper) |
|
|
if len(ordered_codes) >= limit: |
|
|
break |
|
|
if len(ordered_codes) < 2: |
|
|
return [] |
|
|
try: |
|
|
documents = { |
|
|
doc.code.upper(): doc |
|
|
for doc in LegalDocument.objects.filter(code__in=ordered_codes) |
|
|
} |
|
|
except Exception as exc: |
|
|
logger.warning("[CLARIFICATION] Unable to load documents for candidates: %s", exc) |
|
|
documents = {} |
|
|
candidates: List[Dict[str, Any]] = [] |
|
|
for code in ordered_codes: |
|
|
upper = code.upper() |
|
|
doc_obj = documents.get(upper) |
|
|
section = next( |
|
|
( |
|
|
res |
|
|
for res in legal_results |
|
|
if (res.get("data", {}).get("document_code") or "").strip().upper() == upper |
|
|
), |
|
|
None, |
|
|
) |
|
|
data = section.get("data", {}) if section else {} |
|
|
summary = "" |
|
|
if doc_obj: |
|
|
summary = doc_obj.summary or "" |
|
|
if not summary and isinstance(doc_obj.metadata, dict): |
|
|
summary = doc_obj.metadata.get("summary", "") |
|
|
if not summary: |
|
|
summary = data.get("excerpt") or data.get("content", "")[:200] |
|
|
candidates.append( |
|
|
{ |
|
|
"code": code, |
|
|
"title": data.get("document_title") or (doc_obj.title if doc_obj else code), |
|
|
"summary": summary, |
|
|
"doc_type": doc_obj.doc_type if doc_obj else "", |
|
|
"section_title": data.get("section_title") or "", |
|
|
} |
|
|
) |
|
|
return candidates |
|
|
|
|
|
def _build_clarification_payload( |
|
|
self, |
|
|
query: str, |
|
|
candidates: List[Dict[str, Any]], |
|
|
) -> Optional[Dict[str, Any]]: |
|
|
if not candidates: |
|
|
return None |
|
|
default_message = ( |
|
|
"Tôi tìm thấy một số văn bản có thể phù hợp. " |
|
|
"Bạn vui lòng chọn văn bản muốn tra cứu để tôi trả lời chính xác hơn." |
|
|
) |
|
|
llm_payload = self._call_clarification_llm(query, candidates) |
|
|
message = default_message |
|
|
options: List[Dict[str, Any]] = [] |
|
|
|
|
|
|
|
|
if llm_payload: |
|
|
message = llm_payload.get("message") or default_message |
|
|
raw_options = llm_payload.get("options") |
|
|
if isinstance(raw_options, list): |
|
|
options = [ |
|
|
{ |
|
|
"code": (opt.get("code") or candidate.get("code", "")).upper(), |
|
|
"title": opt.get("title") or opt.get("document_title") or candidate.get("title", ""), |
|
|
"reason": opt.get("reason") |
|
|
or opt.get("summary") |
|
|
or candidate.get("summary") |
|
|
or candidate.get("section_title") |
|
|
or "", |
|
|
} |
|
|
for opt, candidate in zip( |
|
|
raw_options, |
|
|
candidates[: len(raw_options)], |
|
|
) |
|
|
if (opt.get("code") or candidate.get("code")) |
|
|
and (opt.get("title") or opt.get("document_title") or candidate.get("title")) |
|
|
] |
|
|
|
|
|
|
|
|
if not options: |
|
|
options = [ |
|
|
{ |
|
|
"code": candidate["code"].upper(), |
|
|
"title": candidate["title"], |
|
|
"reason": candidate.get("summary") or candidate.get("section_title") or "", |
|
|
} |
|
|
for candidate in candidates[:3] |
|
|
] |
|
|
if not any(opt.get("code") == "__other__" for opt in options): |
|
|
options.append( |
|
|
{ |
|
|
"code": "__other__", |
|
|
"title": "Khác", |
|
|
"reason": "Tôi muốn hỏi văn bản hoặc chủ đề khác", |
|
|
} |
|
|
) |
|
|
return { |
|
|
|
|
|
"type": "options", |
|
|
"wizard_stage": "choose_document", |
|
|
"message": message, |
|
|
"options": options, |
|
|
"clarification": { |
|
|
"message": message, |
|
|
"options": options, |
|
|
}, |
|
|
"results": [], |
|
|
"count": 0, |
|
|
} |
|
|
|
|
|
def _call_clarification_llm( |
|
|
self, |
|
|
query: str, |
|
|
candidates: List[Dict[str, Any]], |
|
|
) -> Optional[Dict[str, Any]]: |
|
|
if not self.llm_generator: |
|
|
return None |
|
|
try: |
|
|
return self.llm_generator.suggest_clarification_topics( |
|
|
query, |
|
|
candidates, |
|
|
max_options=3, |
|
|
) |
|
|
except Exception as exc: |
|
|
logger.warning("[CLARIFICATION] LLM suggestion failed: %s", exc) |
|
|
return None |
|
|
|
|
|
def _parallel_search_prepare( |
|
|
self, |
|
|
document_code: str, |
|
|
keywords: List[str], |
|
|
session_id: Optional[str] = None, |
|
|
) -> None: |
|
|
""" |
|
|
Trigger parallel search in background when user selects a document option. |
|
|
Stores results in cache for Stage 2 (choose topic). |
|
|
|
|
|
Args: |
|
|
document_code: Selected document code |
|
|
keywords: Keywords extracted from query/options |
|
|
session_id: Session ID for caching results |
|
|
""" |
|
|
if not session_id: |
|
|
return |
|
|
|
|
|
def _search_task(): |
|
|
try: |
|
|
logger.info( |
|
|
"[PARALLEL_SEARCH] Starting background search for doc=%s, keywords=%s", |
|
|
document_code, |
|
|
keywords[:5], |
|
|
) |
|
|
|
|
|
|
|
|
cache_key = f"prefetch:{document_code.upper()}:{hashlib.sha256(' '.join(keywords).encode()).hexdigest()[:16]}" |
|
|
cached_result = None |
|
|
if self.redis_cache and self.redis_cache.is_available(): |
|
|
cached_result = self.redis_cache.get(cache_key) |
|
|
if cached_result: |
|
|
logger.info( |
|
|
"[PARALLEL_SEARCH] ✅ Cache hit for doc=%s", |
|
|
document_code |
|
|
) |
|
|
|
|
|
with self._cache_lock: |
|
|
if session_id not in self._prefetched_cache: |
|
|
self._prefetched_cache[session_id] = {} |
|
|
self._prefetched_cache[session_id]["document_results"] = cached_result |
|
|
return |
|
|
|
|
|
|
|
|
query_text = " ".join(keywords) if keywords else "" |
|
|
search_result = self._search_by_intent( |
|
|
intent="search_legal", |
|
|
query=query_text, |
|
|
limit=20, |
|
|
preferred_document_code=document_code.upper(), |
|
|
) |
|
|
|
|
|
|
|
|
cache_data = { |
|
|
"document_code": document_code, |
|
|
"results": search_result.get("results", []), |
|
|
"count": search_result.get("count", 0), |
|
|
"timestamp": time.time(), |
|
|
} |
|
|
|
|
|
|
|
|
if self.redis_cache and self.redis_cache.is_available(): |
|
|
self.redis_cache.set(cache_key, cache_data, ttl_seconds=self.prefetch_cache_ttl) |
|
|
logger.debug( |
|
|
"[PARALLEL_SEARCH] Cached prefetch results (TTL: %ds)", |
|
|
self.prefetch_cache_ttl |
|
|
) |
|
|
|
|
|
|
|
|
with self._cache_lock: |
|
|
if session_id not in self._prefetched_cache: |
|
|
self._prefetched_cache[session_id] = {} |
|
|
self._prefetched_cache[session_id]["document_results"] = cache_data |
|
|
|
|
|
logger.info( |
|
|
"[PARALLEL_SEARCH] Completed background search for doc=%s, found %d results", |
|
|
document_code, |
|
|
search_result.get("count", 0), |
|
|
) |
|
|
except Exception as exc: |
|
|
logger.warning("[PARALLEL_SEARCH] Background search failed: %s", exc) |
|
|
|
|
|
|
|
|
self._executor.submit(_search_task) |
|
|
|
|
|
def _parallel_search_topic( |
|
|
self, |
|
|
document_code: str, |
|
|
topic_keywords: List[str], |
|
|
session_id: Optional[str] = None, |
|
|
) -> None: |
|
|
""" |
|
|
Trigger parallel search when user selects a topic option. |
|
|
Stores results for final answer generation. |
|
|
|
|
|
Args: |
|
|
document_code: Selected document code |
|
|
topic_keywords: Keywords from selected topic |
|
|
session_id: Session ID for caching results |
|
|
""" |
|
|
if not session_id: |
|
|
return |
|
|
|
|
|
def _search_task(): |
|
|
try: |
|
|
logger.info( |
|
|
"[PARALLEL_SEARCH] Starting topic search for doc=%s, keywords=%s", |
|
|
document_code, |
|
|
topic_keywords[:5], |
|
|
) |
|
|
|
|
|
|
|
|
query_text = " ".join(topic_keywords) if topic_keywords else "" |
|
|
search_result = self._search_by_intent( |
|
|
intent="search_legal", |
|
|
query=query_text, |
|
|
limit=10, |
|
|
preferred_document_code=document_code.upper(), |
|
|
) |
|
|
|
|
|
|
|
|
with self._cache_lock: |
|
|
if session_id not in self._prefetched_cache: |
|
|
self._prefetched_cache[session_id] = {} |
|
|
self._prefetched_cache[session_id]["topic_results"] = { |
|
|
"document_code": document_code, |
|
|
"keywords": topic_keywords, |
|
|
"results": search_result.get("results", []), |
|
|
"count": search_result.get("count", 0), |
|
|
"timestamp": time.time(), |
|
|
} |
|
|
|
|
|
logger.info( |
|
|
"[PARALLEL_SEARCH] Completed topic search, found %d results", |
|
|
search_result.get("count", 0), |
|
|
) |
|
|
except Exception as exc: |
|
|
logger.warning("[PARALLEL_SEARCH] Topic search failed: %s", exc) |
|
|
|
|
|
|
|
|
self._executor.submit(_search_task) |
|
|
|
|
|
def _get_prefetched_results( |
|
|
self, |
|
|
session_id: Optional[str], |
|
|
result_type: str = "document_results", |
|
|
) -> Optional[Dict[str, Any]]: |
|
|
""" |
|
|
Get prefetched search results from cache. |
|
|
|
|
|
Args: |
|
|
session_id: Session ID |
|
|
result_type: "document_results" or "topic_results" |
|
|
|
|
|
Returns: |
|
|
Cached results dict or None |
|
|
""" |
|
|
if not session_id: |
|
|
return None |
|
|
|
|
|
with self._cache_lock: |
|
|
cache_entry = self._prefetched_cache.get(session_id) |
|
|
if not cache_entry: |
|
|
return None |
|
|
|
|
|
results = cache_entry.get(result_type) |
|
|
if not results: |
|
|
return None |
|
|
|
|
|
|
|
|
timestamp = results.get("timestamp", 0) |
|
|
if time.time() - timestamp > 300: |
|
|
logger.debug("[PARALLEL_SEARCH] Prefetched results expired for session=%s", session_id) |
|
|
return None |
|
|
|
|
|
return results |
|
|
|
|
|
def _clear_prefetched_cache(self, session_id: Optional[str]) -> None: |
|
|
"""Clear prefetched cache for a session.""" |
|
|
if not session_id: |
|
|
return |
|
|
|
|
|
with self._cache_lock: |
|
|
if session_id in self._prefetched_cache: |
|
|
del self._prefetched_cache[session_id] |
|
|
logger.debug("[PARALLEL_SEARCH] Cleared cache for session=%s", session_id) |
|
|
|
|
|
def _search_by_intent( |
|
|
self, |
|
|
intent: str, |
|
|
query: str, |
|
|
limit: int = 5, |
|
|
preferred_document_code: Optional[str] = None, |
|
|
) -> Dict[str, Any]: |
|
|
"""Search based on classified intent. Reduced limit from 20 to 5 for faster inference on free tier.""" |
|
|
|
|
|
keywords = query.strip() |
|
|
extracted = " ".join(self.chatbot.extract_keywords(query)) |
|
|
if extracted and len(extracted) > 2: |
|
|
keywords = f"{keywords} {extracted}" |
|
|
|
|
|
results = [] |
|
|
|
|
|
if intent == "search_fine": |
|
|
qs = Fine.objects.all() |
|
|
text_fields = ["name", "code", "article", "decree", "remedial"] |
|
|
search_results = search_with_ml(qs, keywords, text_fields, top_k=limit, min_score=0.1) |
|
|
results = [{"type": "fine", "data": { |
|
|
"id": f.id, |
|
|
"name": f.name, |
|
|
"code": f.code, |
|
|
"min_fine": float(f.min_fine) if f.min_fine else None, |
|
|
"max_fine": float(f.max_fine) if f.max_fine else None, |
|
|
"article": f.article, |
|
|
"decree": f.decree, |
|
|
}} for f in search_results] |
|
|
|
|
|
elif intent == "search_procedure": |
|
|
qs = Procedure.objects.all() |
|
|
text_fields = ["title", "domain", "conditions", "dossier"] |
|
|
search_results = search_with_ml(qs, keywords, text_fields, top_k=limit, min_score=0.1) |
|
|
results = [{"type": "procedure", "data": { |
|
|
"id": p.id, |
|
|
"title": p.title, |
|
|
"domain": p.domain, |
|
|
"level": p.level, |
|
|
}} for p in search_results] |
|
|
|
|
|
elif intent == "search_office": |
|
|
qs = Office.objects.all() |
|
|
text_fields = ["unit_name", "address", "district", "service_scope"] |
|
|
search_results = search_with_ml(qs, keywords, text_fields, top_k=limit, min_score=0.1) |
|
|
results = [{"type": "office", "data": { |
|
|
"id": o.id, |
|
|
"unit_name": o.unit_name, |
|
|
"address": o.address, |
|
|
"district": o.district, |
|
|
"phone": o.phone, |
|
|
"working_hours": o.working_hours, |
|
|
}} for o in search_results] |
|
|
|
|
|
elif intent == "search_advisory": |
|
|
qs = Advisory.objects.all() |
|
|
text_fields = ["title", "summary"] |
|
|
search_results = search_with_ml(qs, keywords, text_fields, top_k=limit, min_score=0.1) |
|
|
results = [{"type": "advisory", "data": { |
|
|
"id": a.id, |
|
|
"title": a.title, |
|
|
"summary": a.summary, |
|
|
}} for a in search_results] |
|
|
|
|
|
elif intent == "search_legal": |
|
|
qs = LegalSection.objects.all() |
|
|
text_fields = ["section_title", "section_code", "content"] |
|
|
detected_code = self._detect_document_code(query) |
|
|
effective_code = preferred_document_code or detected_code |
|
|
filtered = False |
|
|
if effective_code: |
|
|
filtered_qs = qs.filter(document__code__iexact=effective_code) |
|
|
if filtered_qs.exists(): |
|
|
qs = filtered_qs |
|
|
filtered = True |
|
|
logger.info( |
|
|
"[SEARCH] Prefiltering legal sections for document code %s (query='%s')", |
|
|
effective_code, |
|
|
query, |
|
|
) |
|
|
else: |
|
|
logger.info( |
|
|
"[SEARCH] Document code %s detected but no sections found locally, falling back to full corpus", |
|
|
effective_code, |
|
|
) |
|
|
else: |
|
|
logger.debug("[SEARCH] No document code detected for query: %s", query) |
|
|
|
|
|
search_results = pure_semantic_search( |
|
|
[keywords], |
|
|
qs, |
|
|
top_k=limit, |
|
|
text_fields=text_fields |
|
|
) |
|
|
results = self._format_legal_results(search_results, detected_code, query=query) |
|
|
logger.info( |
|
|
"[SEARCH] Legal intent processed (query='%s', code=%s, filtered=%s, results=%d)", |
|
|
query, |
|
|
detected_code or "None", |
|
|
filtered, |
|
|
len(results), |
|
|
) |
|
|
|
|
|
return { |
|
|
"intent": intent, |
|
|
"query": query, |
|
|
"keywords": keywords, |
|
|
"results": results, |
|
|
"count": len(results), |
|
|
"detected_code": detected_code, |
|
|
} |
|
|
|
|
|
def _should_save_to_golden(self, query: str, response: Dict) -> bool: |
|
|
""" |
|
|
Decide if response should be saved to golden dataset. |
|
|
|
|
|
Criteria: |
|
|
- High confidence (>0.95) |
|
|
- Has results |
|
|
- Response is complete and well-formed |
|
|
- Not already in golden dataset |
|
|
""" |
|
|
try: |
|
|
from hue_portal.core.models import GoldenQuery |
|
|
|
|
|
|
|
|
query_normalized = self._normalize_query(query) |
|
|
if GoldenQuery.objects.filter(query_normalized=query_normalized, is_active=True).exists(): |
|
|
return False |
|
|
|
|
|
|
|
|
has_results = response.get("count", 0) > 0 |
|
|
has_message = bool(response.get("message", "").strip()) |
|
|
confidence = response.get("confidence", 0.0) |
|
|
|
|
|
|
|
|
if has_results and has_message and confidence >= 0.95: |
|
|
|
|
|
message = response.get("message", "") |
|
|
if len(message) > 50: |
|
|
return True |
|
|
|
|
|
return False |
|
|
except Exception as e: |
|
|
logger.warning(f"Error checking if should save to golden: {e}") |
|
|
return False |
|
|
|
|
|
def _normalize_query(self, query: str) -> str: |
|
|
"""Normalize query for matching.""" |
|
|
normalized = query.lower().strip() |
|
|
|
|
|
normalized = unicodedata.normalize("NFD", normalized) |
|
|
normalized = "".join(ch for ch in normalized if unicodedata.category(ch) != "Mn") |
|
|
|
|
|
normalized = re.sub(r'\s+', ' ', normalized).strip() |
|
|
return normalized |
|
|
|
|
|
def _detect_document_code(self, query: str) -> Optional[str]: |
|
|
"""Detect known document code mentioned in the query.""" |
|
|
normalized_query = self._remove_accents(query).upper() |
|
|
if not normalized_query: |
|
|
return None |
|
|
try: |
|
|
codes = LegalDocument.objects.values_list("code", flat=True) |
|
|
except Exception as exc: |
|
|
logger.debug("Unable to fetch document codes: %s", exc) |
|
|
return None |
|
|
|
|
|
for code in codes: |
|
|
if not code: |
|
|
continue |
|
|
tokens = self._split_code_tokens(code) |
|
|
if tokens and all(token in normalized_query for token in tokens): |
|
|
logger.info("[SEARCH] Detected document code %s in query", code) |
|
|
return code |
|
|
return None |
|
|
|
|
|
def _split_code_tokens(self, code: str) -> List[str]: |
|
|
"""Split a document code into uppercase accentless tokens.""" |
|
|
normalized = self._remove_accents(code).upper() |
|
|
return [tok for tok in re.split(r"[-/\s]+", normalized) if tok] |
|
|
|
|
|
def _remove_accents(self, text: str) -> str: |
|
|
if not text: |
|
|
return "" |
|
|
normalized = unicodedata.normalize("NFD", text) |
|
|
return "".join(ch for ch in normalized if unicodedata.category(ch) != "Mn") |
|
|
|
|
|
def _format_legal_results( |
|
|
self, |
|
|
search_results: List[Any], |
|
|
detected_code: Optional[str], |
|
|
query: Optional[str] = None, |
|
|
) -> List[Dict[str, Any]]: |
|
|
"""Build legal result payload and apply ordering/boosting based on doc code and keywords.""" |
|
|
entries: List[Dict[str, Any]] = [] |
|
|
upper_detected = detected_code.upper() if detected_code else None |
|
|
|
|
|
|
|
|
important_keywords = [] |
|
|
if query: |
|
|
query_lower = query.lower() |
|
|
|
|
|
if any(kw in query_lower for kw in ["%", "phần trăm", "tỷ lệ", "12%", "20%", "10%"]): |
|
|
important_keywords.extend(["%", "phần trăm", "tỷ lệ", "12", "20", "10"]) |
|
|
|
|
|
if any(kw in query_lower for kw in ["hạ bậc", "thi đua", "xếp loại", "đánh giá"]): |
|
|
important_keywords.extend(["hạ bậc", "thi đua", "xếp loại", "đánh giá"]) |
|
|
|
|
|
for ls in search_results: |
|
|
doc = ls.document |
|
|
doc_code = doc.code if doc else None |
|
|
score = getattr(ls, "_ml_score", getattr(ls, "rank", 0.0)) or 0.0 |
|
|
|
|
|
|
|
|
content_text = (ls.content or ls.section_title or "").lower() |
|
|
keyword_boost = 0.0 |
|
|
if important_keywords and content_text: |
|
|
for kw in important_keywords: |
|
|
if kw.lower() in content_text: |
|
|
keyword_boost += 0.15 |
|
|
logger.debug( |
|
|
"[BOOST] Keyword '%s' found in section %s, boosting score", |
|
|
kw, |
|
|
ls.section_code, |
|
|
) |
|
|
|
|
|
entries.append( |
|
|
{ |
|
|
"type": "legal", |
|
|
"score": float(score) + keyword_boost, |
|
|
"data": { |
|
|
"id": ls.id, |
|
|
"section_code": ls.section_code, |
|
|
"section_title": ls.section_title, |
|
|
"content": ls.content[:500] if ls.content else "", |
|
|
"excerpt": ls.excerpt, |
|
|
"document_code": doc_code, |
|
|
"document_title": doc.title if doc else None, |
|
|
"page_start": ls.page_start, |
|
|
"page_end": ls.page_end, |
|
|
}, |
|
|
} |
|
|
) |
|
|
|
|
|
if upper_detected: |
|
|
exact_matches = [ |
|
|
r for r in entries if (r["data"].get("document_code") or "").upper() == upper_detected |
|
|
] |
|
|
if exact_matches: |
|
|
others = [r for r in entries if r not in exact_matches] |
|
|
entries = exact_matches + others |
|
|
else: |
|
|
for entry in entries: |
|
|
doc_code = (entry["data"].get("document_code") or "").upper() |
|
|
if doc_code == upper_detected: |
|
|
entry["score"] = (entry.get("score") or 0.1) * 10 |
|
|
entries.sort(key=lambda r: r.get("score") or 0, reverse=True) |
|
|
else: |
|
|
|
|
|
entries.sort(key=lambda r: r.get("score") or 0, reverse=True) |
|
|
return entries |
|
|
|
|
|
def _is_complex_query(self, query: str) -> bool: |
|
|
""" |
|
|
Detect if query is complex and requires LLM reasoning (not suitable for Fast Path). |
|
|
|
|
|
Complex queries contain keywords like: %, bậc, thi đua, tỷ lệ, liên đới, tăng nặng, giảm nhẹ, đơn vị vi phạm |
|
|
""" |
|
|
if not query: |
|
|
return False |
|
|
query_lower = query.lower() |
|
|
complex_keywords = [ |
|
|
"%", "phần trăm", |
|
|
"bậc", "hạ bậc", "nâng bậc", |
|
|
"thi đua", "xếp loại", "đánh giá", |
|
|
"tỷ lệ", "tỉ lệ", |
|
|
"liên đới", "liên quan", |
|
|
"tăng nặng", "tăng nặng hình phạt", |
|
|
"giảm nhẹ", "giảm nhẹ hình phạt", |
|
|
"đơn vị vi phạm", "đơn vị có", |
|
|
] |
|
|
for keyword in complex_keywords: |
|
|
if keyword in query_lower: |
|
|
logger.info( |
|
|
"[FAST_PATH] Complex query detected (keyword: '%s'), forcing Slow Path", |
|
|
keyword, |
|
|
) |
|
|
return True |
|
|
return False |
|
|
|
|
|
def _maybe_fast_path_response( |
|
|
self, results: List[Dict[str, Any]], query: Optional[str] = None |
|
|
) -> Optional[Dict[str, Any]]: |
|
|
"""Return fast-path response if results are confident enough.""" |
|
|
if not results: |
|
|
return None |
|
|
|
|
|
|
|
|
if query and self._is_complex_query(query): |
|
|
return None |
|
|
top_result = results[0] |
|
|
top_score = top_result.get("score", 0.0) or 0.0 |
|
|
doc_code = (top_result.get("data", {}).get("document_code") or "").upper() |
|
|
|
|
|
if top_score >= 0.88 and doc_code: |
|
|
logger.info( |
|
|
"[FAST_PATH] Top score hit (%.3f) for document %s", top_score, doc_code |
|
|
) |
|
|
message = self._format_fast_legal_message(top_result) |
|
|
return { |
|
|
"message": message, |
|
|
"results": results[:3], |
|
|
"count": min(3, len(results)), |
|
|
"confidence": min(0.99, top_score + 0.05), |
|
|
} |
|
|
|
|
|
top_three = results[:3] |
|
|
if len(top_three) >= 2: |
|
|
doc_codes = [ |
|
|
(res.get("data", {}).get("document_code") or "").upper() |
|
|
for res in top_three |
|
|
if res.get("data", {}).get("document_code") |
|
|
] |
|
|
if doc_codes and len(set(doc_codes)) == 1: |
|
|
logger.info( |
|
|
"[FAST_PATH] Top-%d results share same document %s", |
|
|
len(top_three), |
|
|
doc_codes[0], |
|
|
) |
|
|
message = self._format_fast_legal_message(top_three[0]) |
|
|
return { |
|
|
"message": message, |
|
|
"results": top_three, |
|
|
"count": len(top_three), |
|
|
"confidence": min(0.97, (top_three[0].get("score") or 0.9) + 0.04), |
|
|
} |
|
|
return None |
|
|
|
|
|
def _format_fast_legal_message(self, result: Dict[str, Any]) -> str: |
|
|
"""Format a concise legal answer without LLM.""" |
|
|
data = result.get("data", {}) |
|
|
doc_title = data.get("document_title") or "văn bản pháp luật" |
|
|
doc_code = data.get("document_code") or "" |
|
|
section_code = data.get("section_code") or "Điều liên quan" |
|
|
section_title = data.get("section_title") or "" |
|
|
content = (data.get("content") or data.get("excerpt") or "").strip() |
|
|
if len(content) > 400: |
|
|
trimmed = content[:400].rsplit(" ", 1)[0] |
|
|
content = f"{trimmed}..." |
|
|
intro = "Kết quả chính xác nhất:" |
|
|
lines = [intro] |
|
|
if doc_title or doc_code: |
|
|
lines.append(f"- Văn bản: {doc_title or 'văn bản pháp luật'}" + (f" ({doc_code})" if doc_code else "")) |
|
|
section_label = section_code |
|
|
if section_title: |
|
|
section_label = f"{section_code} – {section_title}" |
|
|
lines.append(f"- Điều khoản: {section_label}") |
|
|
lines.append("") |
|
|
lines.append(content) |
|
|
citation_doc = doc_title or doc_code or "nguồn chính thức" |
|
|
lines.append(f"\nNguồn: {section_label}, {citation_doc}.") |
|
|
return "\n".join(lines) |
|
|
|
|
|
|