""" Slow Path Handler - Full RAG pipeline for complex queries. """ import os import time import logging import hashlib from typing import Dict, Any, Optional, List, Set import unicodedata import re from concurrent.futures import ThreadPoolExecutor, Future import threading from hue_portal.core.chatbot import get_chatbot, RESPONSE_TEMPLATES from hue_portal.core.models import ( Fine, Procedure, Office, Advisory, LegalSection, LegalDocument, ) from hue_portal.core.search_ml import search_with_ml from hue_portal.core.pure_semantic_search import pure_semantic_search # Lazy import reranker to avoid blocking startup (FlagEmbedding may download model) # from hue_portal.core.reranker import rerank_documents from hue_portal.chatbot.llm_integration import get_llm_generator from hue_portal.chatbot.structured_legal import format_structured_legal_answer from hue_portal.chatbot.context_manager import ConversationContext from hue_portal.chatbot.router import DOCUMENT_CODE_PATTERNS from hue_portal.core.query_rewriter import get_query_rewriter from hue_portal.core.pure_semantic_search import pure_semantic_search, parallel_vector_search logger = logging.getLogger(__name__) class SlowPathHandler: """Handle Slow Path queries with full RAG pipeline.""" def __init__(self): self.chatbot = get_chatbot() self.llm_generator = get_llm_generator() # Thread pool for parallel search (max 2 workers to avoid overwhelming DB) self._executor = ThreadPoolExecutor(max_workers=2, thread_name_prefix="parallel_search") # Cache for prefetched results by session_id (in-memory fallback) self._prefetched_cache: Dict[str, Dict[str, Any]] = {} self._cache_lock = threading.Lock() # Redis cache for prefetch results self.redis_cache = get_redis_cache() # Prefetch cache TTL (30 minutes default) self.prefetch_cache_ttl = int(os.environ.get("CACHE_PREFETCH_TTL", "1800")) def handle( self, query: str, intent: str, session_id: Optional[str] = None, selected_document_code: Optional[str] = None, ) -> Dict[str, Any]: """ Full RAG pipeline: 1. Search (hybrid: BM25 + vector) 2. Retrieve top 20 documents 3. LLM generation with structured output (for legal queries) 4. Guardrails validation 5. Retry up to 3 times if needed Args: query: User query. intent: Detected intent. session_id: Optional session ID for context. selected_document_code: Selected document code from wizard. Returns: Response dict with message, intent, results, etc. """ query = query.strip() selected_document_code_normalized = ( selected_document_code.strip().upper() if selected_document_code else None ) # Handle greetings if intent == "greeting": query_lower = query.lower().strip() query_words = query_lower.split() is_simple_greeting = ( len(query_words) <= 3 and any(greeting in query_lower for greeting in ["xin chào", "chào", "hello", "hi"]) and not any(kw in query_lower for kw in ["phạt", "mức phạt", "vi phạm", "thủ tục", "hồ sơ", "địa chỉ", "công an", "cảnh báo"]) ) if is_simple_greeting: return { "message": RESPONSE_TEMPLATES["greeting"], "intent": "greeting", "results": [], "count": 0, "_source": "slow_path" } # Wizard / option-first cho mọi câu hỏi pháp lý chung: # Nếu: # - intent là search_legal # - chưa có selected_document_code trong session # - trong câu hỏi không ghi rõ mã văn bản # Thì: luôn trả về payload options để người dùng chọn văn bản trước, # chưa generate câu trả lời chi tiết. has_explicit_code = self._has_explicit_document_code_in_query(query) logger.info( "[WIZARD] Checking wizard conditions - intent=%s, selected_code=%s, has_explicit_code=%s, query='%s'", intent, selected_document_code_normalized, has_explicit_code, query[:50], ) if ( intent == "search_legal" and not selected_document_code_normalized and not has_explicit_code ): logger.info("[QUERY_REWRITE] ✅ Wizard conditions met, using Query Rewrite Strategy") # Query Rewrite Strategy: Rewrite query into 3-5 optimized legal queries query_rewriter = get_query_rewriter(self.llm_generator) # Get conversation context for query rewriting context = None if session_id: try: recent_messages = ConversationContext.get_recent_messages(session_id, limit=5) context = [ {"role": msg.role, "content": msg.content} for msg in recent_messages ] except Exception as exc: logger.warning("[QUERY_REWRITE] Failed to load context: %s", exc) # Rewrite query into 3-5 queries rewritten_queries = query_rewriter.rewrite_query( query, context=context, max_queries=5, min_queries=3 ) if not rewritten_queries: # Fallback to original query if rewrite fails rewritten_queries = [query] logger.info( "[QUERY_REWRITE] Rewrote query into %d queries: %s", len(rewritten_queries), rewritten_queries[:3] ) # Parallel vector search with multiple queries try: from hue_portal.core.models import LegalSection # Search all legal sections (no document filter yet) qs = LegalSection.objects.all() text_fields = ["section_title", "section_code", "content"] # Use parallel vector search search_results = parallel_vector_search( rewritten_queries, qs, top_k_per_query=5, final_top_k=7, text_fields=text_fields ) # Extract unique document codes from results doc_codes_seen: Set[str] = set() document_options: List[Dict[str, Any]] = [] for section, score in search_results: doc = getattr(section, "document", None) if not doc: continue doc_code = getattr(doc, "code", "").upper() if not doc_code or doc_code in doc_codes_seen: continue doc_codes_seen.add(doc_code) # Get document metadata doc_title = getattr(doc, "title", "") or doc_code doc_summary = getattr(doc, "summary", "") or "" if not doc_summary: metadata = getattr(doc, "metadata", {}) or {} if isinstance(metadata, dict): doc_summary = metadata.get("summary", "") document_options.append({ "code": doc_code, "title": doc_title, "summary": doc_summary, "score": float(score), "doc_type": getattr(doc, "doc_type", "") or "", }) # Limit to top 5 documents if len(document_options) >= 5: break # If no documents found, use canonical fallback if not document_options: logger.warning("[QUERY_REWRITE] No documents found, using canonical fallback") canonical_candidates = [ { "code": "264-QD-TW", "title": "Quyết định 264-QĐ/TW về kỷ luật đảng viên", "summary": "", "doc_type": "", }, { "code": "QD-69-TW", "title": "Quy định 69-QĐ/TW về kỷ luật tổ chức đảng, đảng viên", "summary": "", "doc_type": "", }, { "code": "TT-02-CAND", "title": "Thông tư 02/2021/TT-BCA về điều lệnh CAND", "summary": "", "doc_type": "", }, ] clarification_payload = self._build_clarification_payload( query, canonical_candidates ) if clarification_payload: clarification_payload.setdefault("intent", intent) clarification_payload.setdefault("_source", "clarification") clarification_payload.setdefault("routing", "clarification") clarification_payload.setdefault("confidence", 0.3) return clarification_payload # Build options from search results options = [ { "code": opt["code"], "title": opt["title"], "reason": opt.get("summary") or f"Độ liên quan: {opt['score']:.2f}", } for opt in document_options ] # Add "Khác" option if not any(opt.get("code") == "__other__" for opt in options): options.append({ "code": "__other__", "title": "Khác", "reason": "Tôi muốn hỏi văn bản hoặc chủ đề pháp luật khác.", }) message = ( "Tôi đã tìm thấy các văn bản pháp luật liên quan đến câu hỏi của bạn.\n\n" "Bạn hãy chọn văn bản muốn tra cứu để tôi trả lời chi tiết hơn:" ) logger.info( "[QUERY_REWRITE] ✅ Found %d documents using Query Rewrite Strategy", len(document_options) ) return { "type": "options", "wizard_stage": "choose_document", "message": message, "options": options, "clarification": { "message": message, "options": options, }, "results": [], "count": 0, "intent": intent, "_source": "query_rewrite", "routing": "query_rewrite", "confidence": 0.95, # High confidence with Query Rewrite Strategy } except Exception as exc: logger.error( "[QUERY_REWRITE] Error in Query Rewrite Strategy: %s, falling back to LLM suggestions", exc, exc_info=True ) # Fallback to original LLM-based clarification canonical_candidates: List[Dict[str, Any]] = [] try: canonical_docs = list( LegalDocument.objects.filter( code__in=["264-QD-TW", "QD-69-TW", "TT-02-CAND"] ) ) for doc in canonical_docs: summary = getattr(doc, "summary", "") or "" metadata = getattr(doc, "metadata", {}) or {} if not summary and isinstance(metadata, dict): summary = metadata.get("summary", "") canonical_candidates.append( { "code": doc.code, "title": getattr(doc, "title", "") or doc.code, "summary": summary, "doc_type": getattr(doc, "doc_type", "") or "", "section_title": "", } ) except Exception as e: logger.warning("[CLARIFICATION] Canonical documents lookup failed: %s", e) if not canonical_candidates: canonical_candidates = [ { "code": "264-QD-TW", "title": "Quyết định 264-QĐ/TW về kỷ luật đảng viên", "summary": "", "doc_type": "", "section_title": "", }, { "code": "QD-69-TW", "title": "Quy định 69-QĐ/TW về kỷ luật tổ chức đảng, đảng viên", "summary": "", "doc_type": "", "section_title": "", }, { "code": "TT-02-CAND", "title": "Thông tư 02/2021/TT-BCA về điều lệnh CAND", "summary": "", "doc_type": "", "section_title": "", }, ] clarification_payload = self._build_clarification_payload( query, canonical_candidates ) if clarification_payload: clarification_payload.setdefault("intent", intent) clarification_payload.setdefault("_source", "clarification_fallback") clarification_payload.setdefault("routing", "clarification") clarification_payload.setdefault("confidence", 0.3) return clarification_payload # Search based on intent - retrieve top-15 for reranking (balance speed and RAM) search_result = self._search_by_intent( intent, query, limit=15, preferred_document_code=selected_document_code_normalized, ) # Balance: 15 for good recall, not too slow # Fast path for high-confidence legal queries (skip for complex queries) fast_path_response = None if intent == "search_legal" and not self._is_complex_query(query): fast_path_response = self._maybe_fast_path_response(search_result["results"], query) if fast_path_response: fast_path_response["intent"] = intent fast_path_response["_source"] = "fast_path" return fast_path_response # Rerank results - DISABLED for speed (can enable via ENABLE_RERANKER env var) # Reranker adds 1-3 seconds delay, skip for faster responses enable_reranker = os.environ.get("ENABLE_RERANKER", "false").lower() == "true" if intent == "search_legal" and enable_reranker: try: # Lazy import to avoid blocking startup (FlagEmbedding may download model) from hue_portal.core.reranker import rerank_documents legal_results = [r for r in search_result["results"] if r.get("type") == "legal"] if len(legal_results) > 0: # Rerank to top-4 (balance speed and context quality) top_k = min(4, len(legal_results)) reranked = rerank_documents(query, legal_results, top_k=top_k) # Update search_result with reranked results (keep non-legal results) non_legal = [r for r in search_result["results"] if r.get("type") != "legal"] search_result["results"] = reranked + non_legal search_result["count"] = len(search_result["results"]) logger.info( "[RERANKER] Reranked %d legal results to top-%d for query: %s", len(legal_results), top_k, query[:50] ) except Exception as e: logger.warning("[RERANKER] Reranking failed: %s, using original results", e) elif intent == "search_legal": # Skip reranking for speed - just use top results by score logger.debug("[RERANKER] Skipped reranking for speed (ENABLE_RERANKER=false)") # BƯỚC 1: Bypass LLM khi có results tốt (tránh context overflow + tăng tốc 30-40%) # Chỉ áp dụng cho legal queries có results với score cao if intent == "search_legal" and search_result["count"] > 0: top_result = search_result["results"][0] top_score = top_result.get("score", 0.0) or 0.0 top_data = top_result.get("data", {}) doc_code = (top_data.get("document_code") or "").upper() content = top_data.get("content", "") or top_data.get("excerpt", "") # Bypass LLM nếu: # 1. Có document code (TT-02-CAND, etc.) và content đủ dài # 2. Score >= 0.4 (giảm threshold để dễ trigger hơn) # 3. Hoặc có keywords quan trọng (%, hạ bậc, thi đua, tỷ lệ) với score >= 0.3 should_bypass = False query_lower = query.lower() has_keywords = any(kw in query_lower for kw in ["%", "phần trăm", "tỷ lệ", "12%", "20%", "10%", "hạ bậc", "thi đua", "xếp loại", "vi phạm", "cán bộ"]) # Điều kiện bypass dễ hơn: có doc_code + content đủ dài + score hợp lý if doc_code and len(content) > 100: if top_score >= 0.4: should_bypass = True elif has_keywords and top_score >= 0.3: should_bypass = True # Hoặc có keywords quan trọng + content đủ dài elif has_keywords and len(content) > 100 and top_score >= 0.3: should_bypass = True if should_bypass: # Template trả thẳng cho query về tỷ lệ vi phạm + hạ bậc thi đua if any(kw in query_lower for kw in ["12%", "tỷ lệ", "phần trăm", "hạ bậc", "thi đua"]): # Query về tỷ lệ vi phạm và hạ bậc thi đua section_code = top_data.get("section_code", "") section_title = top_data.get("section_title", "") doc_title = top_data.get("document_title", "văn bản pháp luật") # Trích xuất đoạn liên quan từ content content_preview = content[:600] + "..." if len(content) > 600 else content answer = ( f"Theo {doc_title} ({doc_code}):\n\n" f"{section_code}: {section_title}\n\n" f"{content_preview}\n\n" f"Nguồn: {section_code}, {doc_title} ({doc_code})" ) else: # Template chung cho legal queries section_code = top_data.get("section_code", "Điều liên quan") section_title = top_data.get("section_title", "") doc_title = top_data.get("document_title", "văn bản pháp luật") content_preview = content[:500] + "..." if len(content) > 500 else content answer = ( f"Kết quả chính xác nhất:\n\n" f"- Văn bản: {doc_title} ({doc_code})\n" f"- Điều khoản: {section_code}" + (f" – {section_title}" if section_title else "") + "\n\n" f"{content_preview}\n\n" f"Nguồn: {section_code}, {doc_title} ({doc_code})" ) logger.info( "[BYPASS_LLM] Using raw template for legal query (score=%.3f, doc=%s, query='%s')", top_score, doc_code, query[:50] ) return { "message": answer, "intent": intent, "confidence": min(0.99, top_score + 0.05), "results": search_result["results"][:3], "count": min(3, search_result["count"]), "_source": "raw_template", "routing": "raw_template" } # Get conversation context if available context = None context_summary = "" if session_id: try: recent_messages = ConversationContext.get_recent_messages(session_id, limit=5) context = [ { "role": msg.role, "content": msg.content, "intent": msg.intent } for msg in recent_messages ] # Tạo context summary để đưa vào prompt nếu có conversation history if len(context) > 1: context_parts = [] for msg in reversed(context[-3:]): # Chỉ lấy 3 message gần nhất if msg["role"] == "user": context_parts.append(f"Người dùng: {msg['content'][:100]}") elif msg["role"] == "bot": context_parts.append(f"Bot: {msg['content'][:100]}") if context_parts: context_summary = "\n\nNgữ cảnh cuộc trò chuyện trước đó:\n" + "\n".join(context_parts) except Exception as exc: logger.warning("[CONTEXT] Failed to load conversation context: %s", exc) # Enhance query with context if available enhanced_query = query if context_summary: enhanced_query = query + context_summary # Generate response message using LLM if available and we have documents message = None if self.llm_generator and search_result["count"] > 0: # For legal queries, use structured output (top-4 for good context and speed) if intent == "search_legal" and search_result["results"]: legal_docs = [r["data"] for r in search_result["results"] if r.get("type") == "legal"][:4] # Top-4 for balance if legal_docs: structured_answer = self.llm_generator.generate_structured_legal_answer( enhanced_query, # Dùng enhanced_query có context legal_docs, prefill_summary=None ) if structured_answer: message = format_structured_legal_answer(structured_answer) # For other intents or if structured failed, use regular LLM generation if not message: documents = [r["data"] for r in search_result["results"][:4]] # Top-4 for balance message = self.llm_generator.generate_answer( enhanced_query, # Dùng enhanced_query có context context=context, documents=documents ) # Fallback to template if LLM not available or failed if not message: if search_result["count"] > 0: # Đặc biệt xử lý legal queries: format tốt hơn thay vì dùng template chung if intent == "search_legal" and search_result["results"]: top_result = search_result["results"][0] top_data = top_result.get("data", {}) doc_code = top_data.get("document_code", "") doc_title = top_data.get("document_title", "văn bản pháp luật") section_code = top_data.get("section_code", "") section_title = top_data.get("section_title", "") content = top_data.get("content", "") or top_data.get("excerpt", "") if content and len(content) > 50: content_preview = content[:400] + "..." if len(content) > 400 else content message = ( f"Tôi tìm thấy {search_result['count']} điều khoản liên quan đến '{query}':\n\n" f"**{section_code}**: {section_title or 'Nội dung liên quan'}\n\n" f"{content_preview}\n\n" f"Nguồn: {doc_title}" + (f" ({doc_code})" if doc_code else "") ) else: template = RESPONSE_TEMPLATES.get(intent, RESPONSE_TEMPLATES["general_query"]) message = template.format( count=search_result["count"], query=query ) else: template = RESPONSE_TEMPLATES.get(intent, RESPONSE_TEMPLATES["general_query"]) message = template.format( count=search_result["count"], query=query ) else: message = RESPONSE_TEMPLATES["no_results"].format(query=query) # Limit results to top 5 for response results = search_result["results"][:5] response = { "message": message, "intent": intent, "confidence": 0.95, # High confidence for Slow Path (thorough search) "results": results, "count": len(results), "_source": "slow_path" } return response def _maybe_request_clarification( self, query: str, search_result: Dict[str, Any], selected_document_code: Optional[str] = None, ) -> Optional[Dict[str, Any]]: """ Quyết định có nên hỏi người dùng chọn văn bản (wizard step: choose_document). Nguyên tắc option-first: - Nếu user CHƯA chọn văn bản trong session - Và trong câu hỏi KHÔNG ghi rõ mã văn bản - Và search có trả về kết quả => Ưu tiên trả về danh sách văn bản để người dùng chọn, thay vì trả lời thẳng. """ if selected_document_code: return None if not search_result or search_result.get("count", 0) == 0: return None # Nếu người dùng đã ghi rõ mã văn bản trong câu hỏi (ví dụ: 264/QĐ-TW) # thì không cần hỏi lại – ưu tiên dùng chính mã đó. if self._has_explicit_document_code_in_query(query): return None # Ưu tiên dùng danh sách văn bản "chuẩn" (canonical) nếu có trong DB. # Tuy nhiên, để đảm bảo wizard luôn hoạt động (option-first), # nếu DB chưa đủ dữ liệu thì vẫn build danh sách tĩnh fallback. fallback_candidates: List[Dict[str, Any]] = [] try: fallback_docs = list( LegalDocument.objects.filter( code__in=["264-QD-TW", "QD-69-TW", "TT-02-CAND"] ) ) for doc in fallback_docs: summary = getattr(doc, "summary", "") or "" metadata = getattr(doc, "metadata", {}) or {} if not summary and isinstance(metadata, dict): summary = metadata.get("summary", "") fallback_candidates.append( { "code": doc.code, "title": getattr(doc, "title", "") or doc.code, "summary": summary, "doc_type": getattr(doc, "doc_type", "") or "", "section_title": "", } ) except Exception as exc: logger.warning( "[CLARIFICATION] Fallback documents lookup failed, using static list: %s", exc, ) # Nếu DB chưa có đủ thông tin, luôn cung cấp danh sách tĩnh tối thiểu, # để wizard option-first vẫn hoạt động. if not fallback_candidates: fallback_candidates = [ { "code": "264-QD-TW", "title": "Quyết định 264-QĐ/TW về kỷ luật đảng viên", "summary": "", "doc_type": "", "section_title": "", }, { "code": "QD-69-TW", "title": "Quy định 69-QĐ/TW về kỷ luật tổ chức đảng, đảng viên", "summary": "", "doc_type": "", "section_title": "", }, { "code": "TT-02-CAND", "title": "Thông tư 02/2021/TT-BCA về điều lệnh CAND", "summary": "", "doc_type": "", "section_title": "", }, ] payload = self._build_clarification_payload(query, fallback_candidates) if payload: logger.info( "[CLARIFICATION] Requesting user choice among canonical documents: %s", [c["code"] for c in fallback_candidates], ) return payload def _has_explicit_document_code_in_query(self, query: str) -> bool: """ Check if the raw query string explicitly contains a known document code pattern (e.g. '264/QĐ-TW', 'QD-69-TW', 'TT-02-CAND'). Khác với _detect_document_code (dò toàn bộ bảng LegalDocument theo token), hàm này chỉ dựa trên các regex cố định để tránh over-detect cho câu hỏi chung chung như 'xử lí kỷ luật đảng viên thế nào'. """ normalized = self._remove_accents(query).upper() if not normalized: return False for pattern in DOCUMENT_CODE_PATTERNS: try: if re.search(pattern, normalized): return True except re.error: # Nếu pattern không hợp lệ thì bỏ qua, không chặn flow continue return False def _collect_document_candidates( self, legal_results: List[Dict[str, Any]], limit: int = 4, ) -> List[Dict[str, Any]]: """Collect unique document candidates from legal results.""" ordered_codes: List[str] = [] seen: set[str] = set() for result in legal_results: data = result.get("data", {}) code = (data.get("document_code") or "").strip() if not code: continue upper = code.upper() if upper in seen: continue ordered_codes.append(code) seen.add(upper) if len(ordered_codes) >= limit: break if len(ordered_codes) < 2: return [] try: documents = { doc.code.upper(): doc for doc in LegalDocument.objects.filter(code__in=ordered_codes) } except Exception as exc: logger.warning("[CLARIFICATION] Unable to load documents for candidates: %s", exc) documents = {} candidates: List[Dict[str, Any]] = [] for code in ordered_codes: upper = code.upper() doc_obj = documents.get(upper) section = next( ( res for res in legal_results if (res.get("data", {}).get("document_code") or "").strip().upper() == upper ), None, ) data = section.get("data", {}) if section else {} summary = "" if doc_obj: summary = doc_obj.summary or "" if not summary and isinstance(doc_obj.metadata, dict): summary = doc_obj.metadata.get("summary", "") if not summary: summary = data.get("excerpt") or data.get("content", "")[:200] candidates.append( { "code": code, "title": data.get("document_title") or (doc_obj.title if doc_obj else code), "summary": summary, "doc_type": doc_obj.doc_type if doc_obj else "", "section_title": data.get("section_title") or "", } ) return candidates def _build_clarification_payload( self, query: str, candidates: List[Dict[str, Any]], ) -> Optional[Dict[str, Any]]: if not candidates: return None default_message = ( "Tôi tìm thấy một số văn bản có thể phù hợp. " "Bạn vui lòng chọn văn bản muốn tra cứu để tôi trả lời chính xác hơn." ) llm_payload = self._call_clarification_llm(query, candidates) message = default_message options: List[Dict[str, Any]] = [] # Ưu tiên dùng gợi ý từ LLM, nhưng phải luôn đảm bảo có options fallback if llm_payload: message = llm_payload.get("message") or default_message raw_options = llm_payload.get("options") if isinstance(raw_options, list): options = [ { "code": (opt.get("code") or candidate.get("code", "")).upper(), "title": opt.get("title") or opt.get("document_title") or candidate.get("title", ""), "reason": opt.get("reason") or opt.get("summary") or candidate.get("summary") or candidate.get("section_title") or "", } for opt, candidate in zip( raw_options, candidates[: len(raw_options)], ) if (opt.get("code") or candidate.get("code")) and (opt.get("title") or opt.get("document_title") or candidate.get("title")) ] # Nếu LLM không trả về options hợp lệ → fallback build từ candidates if not options: options = [ { "code": candidate["code"].upper(), "title": candidate["title"], "reason": candidate.get("summary") or candidate.get("section_title") or "", } for candidate in candidates[:3] ] if not any(opt.get("code") == "__other__" for opt in options): options.append( { "code": "__other__", "title": "Khác", "reason": "Tôi muốn hỏi văn bản hoặc chủ đề khác", } ) return { # Wizard-style payload: ưu tiên dạng options cho UI "type": "options", "wizard_stage": "choose_document", "message": message, "options": options, "clarification": { "message": message, "options": options, }, "results": [], "count": 0, } def _call_clarification_llm( self, query: str, candidates: List[Dict[str, Any]], ) -> Optional[Dict[str, Any]]: if not self.llm_generator: return None try: return self.llm_generator.suggest_clarification_topics( query, candidates, max_options=3, ) except Exception as exc: logger.warning("[CLARIFICATION] LLM suggestion failed: %s", exc) return None def _parallel_search_prepare( self, document_code: str, keywords: List[str], session_id: Optional[str] = None, ) -> None: """ Trigger parallel search in background when user selects a document option. Stores results in cache for Stage 2 (choose topic). Args: document_code: Selected document code keywords: Keywords extracted from query/options session_id: Session ID for caching results """ if not session_id: return def _search_task(): try: logger.info( "[PARALLEL_SEARCH] Starting background search for doc=%s, keywords=%s", document_code, keywords[:5], ) # Check Redis cache first cache_key = f"prefetch:{document_code.upper()}:{hashlib.sha256(' '.join(keywords).encode()).hexdigest()[:16]}" cached_result = None if self.redis_cache and self.redis_cache.is_available(): cached_result = self.redis_cache.get(cache_key) if cached_result: logger.info( "[PARALLEL_SEARCH] ✅ Cache hit for doc=%s", document_code ) # Store in in-memory cache too with self._cache_lock: if session_id not in self._prefetched_cache: self._prefetched_cache[session_id] = {} self._prefetched_cache[session_id]["document_results"] = cached_result return # Search in the selected document query_text = " ".join(keywords) if keywords else "" search_result = self._search_by_intent( intent="search_legal", query=query_text, limit=20, # Get more results for topic options preferred_document_code=document_code.upper(), ) # Prepare cache data cache_data = { "document_code": document_code, "results": search_result.get("results", []), "count": search_result.get("count", 0), "timestamp": time.time(), } # Store in Redis cache if self.redis_cache and self.redis_cache.is_available(): self.redis_cache.set(cache_key, cache_data, ttl_seconds=self.prefetch_cache_ttl) logger.debug( "[PARALLEL_SEARCH] Cached prefetch results (TTL: %ds)", self.prefetch_cache_ttl ) # Store in in-memory cache (fallback) with self._cache_lock: if session_id not in self._prefetched_cache: self._prefetched_cache[session_id] = {} self._prefetched_cache[session_id]["document_results"] = cache_data logger.info( "[PARALLEL_SEARCH] Completed background search for doc=%s, found %d results", document_code, search_result.get("count", 0), ) except Exception as exc: logger.warning("[PARALLEL_SEARCH] Background search failed: %s", exc) # Submit to thread pool self._executor.submit(_search_task) def _parallel_search_topic( self, document_code: str, topic_keywords: List[str], session_id: Optional[str] = None, ) -> None: """ Trigger parallel search when user selects a topic option. Stores results for final answer generation. Args: document_code: Selected document code topic_keywords: Keywords from selected topic session_id: Session ID for caching results """ if not session_id: return def _search_task(): try: logger.info( "[PARALLEL_SEARCH] Starting topic search for doc=%s, keywords=%s", document_code, topic_keywords[:5], ) # Search with topic keywords query_text = " ".join(topic_keywords) if topic_keywords else "" search_result = self._search_by_intent( intent="search_legal", query=query_text, limit=10, preferred_document_code=document_code.upper(), ) # Store in cache with self._cache_lock: if session_id not in self._prefetched_cache: self._prefetched_cache[session_id] = {} self._prefetched_cache[session_id]["topic_results"] = { "document_code": document_code, "keywords": topic_keywords, "results": search_result.get("results", []), "count": search_result.get("count", 0), "timestamp": time.time(), } logger.info( "[PARALLEL_SEARCH] Completed topic search, found %d results", search_result.get("count", 0), ) except Exception as exc: logger.warning("[PARALLEL_SEARCH] Topic search failed: %s", exc) # Submit to thread pool self._executor.submit(_search_task) def _get_prefetched_results( self, session_id: Optional[str], result_type: str = "document_results", ) -> Optional[Dict[str, Any]]: """ Get prefetched search results from cache. Args: session_id: Session ID result_type: "document_results" or "topic_results" Returns: Cached results dict or None """ if not session_id: return None with self._cache_lock: cache_entry = self._prefetched_cache.get(session_id) if not cache_entry: return None results = cache_entry.get(result_type) if not results: return None # Check if results are still fresh (within 5 minutes) timestamp = results.get("timestamp", 0) if time.time() - timestamp > 300: # 5 minutes logger.debug("[PARALLEL_SEARCH] Prefetched results expired for session=%s", session_id) return None return results def _clear_prefetched_cache(self, session_id: Optional[str]) -> None: """Clear prefetched cache for a session.""" if not session_id: return with self._cache_lock: if session_id in self._prefetched_cache: del self._prefetched_cache[session_id] logger.debug("[PARALLEL_SEARCH] Cleared cache for session=%s", session_id) def _search_by_intent( self, intent: str, query: str, limit: int = 5, preferred_document_code: Optional[str] = None, ) -> Dict[str, Any]: """Search based on classified intent. Reduced limit from 20 to 5 for faster inference on free tier.""" # Use original query for better matching keywords = query.strip() extracted = " ".join(self.chatbot.extract_keywords(query)) if extracted and len(extracted) > 2: keywords = f"{keywords} {extracted}" results = [] if intent == "search_fine": qs = Fine.objects.all() text_fields = ["name", "code", "article", "decree", "remedial"] search_results = search_with_ml(qs, keywords, text_fields, top_k=limit, min_score=0.1) results = [{"type": "fine", "data": { "id": f.id, "name": f.name, "code": f.code, "min_fine": float(f.min_fine) if f.min_fine else None, "max_fine": float(f.max_fine) if f.max_fine else None, "article": f.article, "decree": f.decree, }} for f in search_results] elif intent == "search_procedure": qs = Procedure.objects.all() text_fields = ["title", "domain", "conditions", "dossier"] search_results = search_with_ml(qs, keywords, text_fields, top_k=limit, min_score=0.1) results = [{"type": "procedure", "data": { "id": p.id, "title": p.title, "domain": p.domain, "level": p.level, }} for p in search_results] elif intent == "search_office": qs = Office.objects.all() text_fields = ["unit_name", "address", "district", "service_scope"] search_results = search_with_ml(qs, keywords, text_fields, top_k=limit, min_score=0.1) results = [{"type": "office", "data": { "id": o.id, "unit_name": o.unit_name, "address": o.address, "district": o.district, "phone": o.phone, "working_hours": o.working_hours, }} for o in search_results] elif intent == "search_advisory": qs = Advisory.objects.all() text_fields = ["title", "summary"] search_results = search_with_ml(qs, keywords, text_fields, top_k=limit, min_score=0.1) results = [{"type": "advisory", "data": { "id": a.id, "title": a.title, "summary": a.summary, }} for a in search_results] elif intent == "search_legal": qs = LegalSection.objects.all() text_fields = ["section_title", "section_code", "content"] detected_code = self._detect_document_code(query) effective_code = preferred_document_code or detected_code filtered = False if effective_code: filtered_qs = qs.filter(document__code__iexact=effective_code) if filtered_qs.exists(): qs = filtered_qs filtered = True logger.info( "[SEARCH] Prefiltering legal sections for document code %s (query='%s')", effective_code, query, ) else: logger.info( "[SEARCH] Document code %s detected but no sections found locally, falling back to full corpus", effective_code, ) else: logger.debug("[SEARCH] No document code detected for query: %s", query) # Use pure semantic search (100% vector, no BM25) search_results = pure_semantic_search( [keywords], qs, top_k=limit, # limit=15 for reranking, will be reduced to 4 text_fields=text_fields ) results = self._format_legal_results(search_results, detected_code, query=query) logger.info( "[SEARCH] Legal intent processed (query='%s', code=%s, filtered=%s, results=%d)", query, detected_code or "None", filtered, len(results), ) return { "intent": intent, "query": query, "keywords": keywords, "results": results, "count": len(results), "detected_code": detected_code, } def _should_save_to_golden(self, query: str, response: Dict) -> bool: """ Decide if response should be saved to golden dataset. Criteria: - High confidence (>0.95) - Has results - Response is complete and well-formed - Not already in golden dataset """ try: from hue_portal.core.models import GoldenQuery # Check if already exists query_normalized = self._normalize_query(query) if GoldenQuery.objects.filter(query_normalized=query_normalized, is_active=True).exists(): return False # Check criteria has_results = response.get("count", 0) > 0 has_message = bool(response.get("message", "").strip()) confidence = response.get("confidence", 0.0) # Only save if high quality if has_results and has_message and confidence >= 0.95: # Additional check: message should be substantial (not just template) message = response.get("message", "") if len(message) > 50: # Substantial response return True return False except Exception as e: logger.warning(f"Error checking if should save to golden: {e}") return False def _normalize_query(self, query: str) -> str: """Normalize query for matching.""" normalized = query.lower().strip() # Remove accents normalized = unicodedata.normalize("NFD", normalized) normalized = "".join(ch for ch in normalized if unicodedata.category(ch) != "Mn") # Remove extra spaces normalized = re.sub(r'\s+', ' ', normalized).strip() return normalized def _detect_document_code(self, query: str) -> Optional[str]: """Detect known document code mentioned in the query.""" normalized_query = self._remove_accents(query).upper() if not normalized_query: return None try: codes = LegalDocument.objects.values_list("code", flat=True) except Exception as exc: logger.debug("Unable to fetch document codes: %s", exc) return None for code in codes: if not code: continue tokens = self._split_code_tokens(code) if tokens and all(token in normalized_query for token in tokens): logger.info("[SEARCH] Detected document code %s in query", code) return code return None def _split_code_tokens(self, code: str) -> List[str]: """Split a document code into uppercase accentless tokens.""" normalized = self._remove_accents(code).upper() return [tok for tok in re.split(r"[-/\s]+", normalized) if tok] def _remove_accents(self, text: str) -> str: if not text: return "" normalized = unicodedata.normalize("NFD", text) return "".join(ch for ch in normalized if unicodedata.category(ch) != "Mn") def _format_legal_results( self, search_results: List[Any], detected_code: Optional[str], query: Optional[str] = None, ) -> List[Dict[str, Any]]: """Build legal result payload and apply ordering/boosting based on doc code and keywords.""" entries: List[Dict[str, Any]] = [] upper_detected = detected_code.upper() if detected_code else None # Keywords that indicate important legal concepts (boost score if found) important_keywords = [] if query: query_lower = query.lower() # Keywords for percentage/threshold queries if any(kw in query_lower for kw in ["%", "phần trăm", "tỷ lệ", "12%", "20%", "10%"]): important_keywords.extend(["%", "phần trăm", "tỷ lệ", "12", "20", "10"]) # Keywords for ranking/demotion queries if any(kw in query_lower for kw in ["hạ bậc", "thi đua", "xếp loại", "đánh giá"]): important_keywords.extend(["hạ bậc", "thi đua", "xếp loại", "đánh giá"]) for ls in search_results: doc = ls.document doc_code = doc.code if doc else None score = getattr(ls, "_ml_score", getattr(ls, "rank", 0.0)) or 0.0 # Boost score if content contains important keywords content_text = (ls.content or ls.section_title or "").lower() keyword_boost = 0.0 if important_keywords and content_text: for kw in important_keywords: if kw.lower() in content_text: keyword_boost += 0.15 # Boost 0.15 per keyword match logger.debug( "[BOOST] Keyword '%s' found in section %s, boosting score", kw, ls.section_code, ) entries.append( { "type": "legal", "score": float(score) + keyword_boost, "data": { "id": ls.id, "section_code": ls.section_code, "section_title": ls.section_title, "content": ls.content[:500] if ls.content else "", "excerpt": ls.excerpt, "document_code": doc_code, "document_title": doc.title if doc else None, "page_start": ls.page_start, "page_end": ls.page_end, }, } ) if upper_detected: exact_matches = [ r for r in entries if (r["data"].get("document_code") or "").upper() == upper_detected ] if exact_matches: others = [r for r in entries if r not in exact_matches] entries = exact_matches + others else: for entry in entries: doc_code = (entry["data"].get("document_code") or "").upper() if doc_code == upper_detected: entry["score"] = (entry.get("score") or 0.1) * 10 entries.sort(key=lambda r: r.get("score") or 0, reverse=True) else: # Sort by boosted score entries.sort(key=lambda r: r.get("score") or 0, reverse=True) return entries def _is_complex_query(self, query: str) -> bool: """ Detect if query is complex and requires LLM reasoning (not suitable for Fast Path). Complex queries contain keywords like: %, bậc, thi đua, tỷ lệ, liên đới, tăng nặng, giảm nhẹ, đơn vị vi phạm """ if not query: return False query_lower = query.lower() complex_keywords = [ "%", "phần trăm", "bậc", "hạ bậc", "nâng bậc", "thi đua", "xếp loại", "đánh giá", "tỷ lệ", "tỉ lệ", "liên đới", "liên quan", "tăng nặng", "tăng nặng hình phạt", "giảm nhẹ", "giảm nhẹ hình phạt", "đơn vị vi phạm", "đơn vị có", ] for keyword in complex_keywords: if keyword in query_lower: logger.info( "[FAST_PATH] Complex query detected (keyword: '%s'), forcing Slow Path", keyword, ) return True return False def _maybe_fast_path_response( self, results: List[Dict[str, Any]], query: Optional[str] = None ) -> Optional[Dict[str, Any]]: """Return fast-path response if results are confident enough.""" if not results: return None # Double-check: if query is complex, never use Fast Path if query and self._is_complex_query(query): return None top_result = results[0] top_score = top_result.get("score", 0.0) or 0.0 doc_code = (top_result.get("data", {}).get("document_code") or "").upper() if top_score >= 0.88 and doc_code: logger.info( "[FAST_PATH] Top score hit (%.3f) for document %s", top_score, doc_code ) message = self._format_fast_legal_message(top_result) return { "message": message, "results": results[:3], "count": min(3, len(results)), "confidence": min(0.99, top_score + 0.05), } top_three = results[:3] if len(top_three) >= 2: doc_codes = [ (res.get("data", {}).get("document_code") or "").upper() for res in top_three if res.get("data", {}).get("document_code") ] if doc_codes and len(set(doc_codes)) == 1: logger.info( "[FAST_PATH] Top-%d results share same document %s", len(top_three), doc_codes[0], ) message = self._format_fast_legal_message(top_three[0]) return { "message": message, "results": top_three, "count": len(top_three), "confidence": min(0.97, (top_three[0].get("score") or 0.9) + 0.04), } return None def _format_fast_legal_message(self, result: Dict[str, Any]) -> str: """Format a concise legal answer without LLM.""" data = result.get("data", {}) doc_title = data.get("document_title") or "văn bản pháp luật" doc_code = data.get("document_code") or "" section_code = data.get("section_code") or "Điều liên quan" section_title = data.get("section_title") or "" content = (data.get("content") or data.get("excerpt") or "").strip() if len(content) > 400: trimmed = content[:400].rsplit(" ", 1)[0] content = f"{trimmed}..." intro = "Kết quả chính xác nhất:" lines = [intro] if doc_title or doc_code: lines.append(f"- Văn bản: {doc_title or 'văn bản pháp luật'}" + (f" ({doc_code})" if doc_code else "")) section_label = section_code if section_title: section_label = f"{section_code} – {section_title}" lines.append(f"- Điều khoản: {section_label}") lines.append("") lines.append(content) citation_doc = doc_title or doc_code or "nguồn chính thức" lines.append(f"\nNguồn: {section_label}, {citation_doc}.") return "\n".join(lines)