ChatbotRAG / scenario_handlers /base_handler.py
minhvtt's picture
Upload 36 files
ffb5f88 verified
"""
Base Scenario Handler - Abstract class for all scenario handlers
Provides common functionality: RAG search, formatting, unexpected input handling
"""
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional
class BaseScenarioHandler(ABC):
"""
Abstract base class for scenario handlers
Each scenario (price_inquiry, event_recommendation, etc.)
should inherit from this and implement start() and next_step()
"""
def __init__(self, embedding_service, qdrant_service, lead_storage):
"""
Initialize handler with required services
Args:
embedding_service: JinaClipEmbeddingService for text encoding
qdrant_service: QdrantVectorService for vector search
lead_storage: LeadStorageService for saving customer data
"""
self.embedding_service = embedding_service
self.qdrant_service = qdrant_service
self.lead_storage = lead_storage
@abstractmethod
def start(self, initial_data: Dict = None) -> Dict[str, Any]:
"""
Start the scenario - return first message
Args:
initial_data: Optional initial context (e.g., event_name, mood)
Returns:
{
"message": "First bot message",
"new_state": {
"active_scenario": "scenario_id",
"scenario_step": 1,
"scenario_data": {...}
}
}
"""
pass
@abstractmethod
def next_step(self, current_step: int, user_input: str, scenario_data: Dict) -> Dict[str, Any]:
"""
Process user input and advance to next step
Args:
current_step: Current step number (1-indexed)
user_input: User's message
scenario_data: Accumulated scenario data
Returns:
{
"message": "Bot response",
"new_state": {...} or None if don't advance,
"loading_message": "Optional loading text",
"end_scenario": True/False,
"action": "Optional action to execute"
}
"""
pass
def _search_rag(self, query: str, limit: int = 3) -> list:
"""
Execute RAG search using Qdrant
Args:
query: Search query text
limit: Max number of results
Returns:
List of search results with metadata
"""
try:
embedding = self.embedding_service.encode_text(query)
results = self.qdrant_service.search(
query_embedding=embedding,
limit=limit,
score_threshold=0.5,
ef=256
)
return results
except Exception as e:
print(f"⚠️ RAG search error: {e}")
return []
def _format_rag_results(self, results: list, max_length: int = 200) -> str:
"""
Format RAG search results as readable text
Args:
results: Search results from _search_rag()
max_length: Max chars per result
Returns:
Formatted string with numbered results
"""
if not results or len(results) == 0:
return "Không tìm thấy kết quả."
formatted = []
for i, r in enumerate(results[:3], 1):
text = r['metadata'].get('text', '')
if text:
snippet = text[:max_length].strip()
if len(text) > max_length:
snippet += "..."
formatted.append(f"{i}. {snippet}")
return "\n".join(formatted) if formatted else "Không tìm thấy kết quả."
def handle_unexpected_input(
self,
user_input: str,
expected_type: str,
current_step: int
) -> Optional[Dict[str, Any]]:
"""
Handle when user gives unexpected input (e.g., asks question instead of answering)
Args:
user_input: User's message
expected_type: What we expected (email, choice, event_name, etc.)
current_step: Current step number
Returns:
None - Continue with normal flow
Dict - Return this response (RAG answer + retry prompt)
"""
# Detect if user is asking a question instead of answering
question_indicators = [
"?", "đâu", "gì", "sao", "where", "what", "how",
"khi nào", "mấy giờ", "thế nào", "bao nhiêu"
]
message_lower = user_input.lower()
is_question = any(q in message_lower for q in question_indicators)
if is_question:
# User asking off-topic question → Answer with RAG, then retry
print(f"🔀 Unexpected input detected: '{user_input}' (expected: {expected_type})")
results = self._search_rag(user_input)
rag_answer = self._format_rag_results(results)
# Build retry prompt based on expected_type
retry_prompts = {
'interest_tag': "Vậy nha! Quay lại câu hỏi: Bạn thích vibe nào? (Chill / Sôi động / Hài / Workshop)",
'event_name': "OK! Vậy bạn muốn xem event nào trong danh sách trên?",
'email': "Được rồi! Cho mình xin email nhé?",
'phone': "Okie! Vậy cho mình số điện thoại để liên hệ nhé?",
'choice': "Hiểu rồi! Vậy bạn chọn gì?",
'rating': "Vậy nha! Bạn đánh giá mấy sao? (1-5)"
}
retry_msg = retry_prompts.get(expected_type, "Vậy nha! Quay lại câu hỏi trước nhé ^^")
return {
"message": f"{rag_answer}\n\n---\n💬 {retry_msg}",
"new_state": None, # Don't advance step
"scenario_active": True,
"loading_message": "⏳ Bạn đợi tôi tìm 1 chút nhé..."
}
return None # Continue normal flow
def _validate_email(self, email: str) -> bool:
"""Simple email validation"""
import re
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return re.match(pattern, email) is not None
def _validate_phone(self, phone: str) -> bool:
"""Simple phone validation (Vietnam format)"""
import re
# Accept formats: 0123456789, +84123456789, 84123456789
pattern = r'^(\+?84|0)[0-9]{9,10}$'
return re.match(pattern, phone.replace(' ', '')) is not None