Spaces:
Running
Running
| """ | |
| Base Scenario Handler - Abstract class for all scenario handlers | |
| Provides common functionality: RAG search, formatting, unexpected input handling | |
| """ | |
| from abc import ABC, abstractmethod | |
| from typing import Dict, Any, Optional | |
| class BaseScenarioHandler(ABC): | |
| """ | |
| Abstract base class for scenario handlers | |
| Each scenario (price_inquiry, event_recommendation, etc.) | |
| should inherit from this and implement start() and next_step() | |
| """ | |
| def __init__(self, embedding_service, qdrant_service, lead_storage): | |
| """ | |
| Initialize handler with required services | |
| Args: | |
| embedding_service: JinaClipEmbeddingService for text encoding | |
| qdrant_service: QdrantVectorService for vector search | |
| lead_storage: LeadStorageService for saving customer data | |
| """ | |
| self.embedding_service = embedding_service | |
| self.qdrant_service = qdrant_service | |
| self.lead_storage = lead_storage | |
| def start(self, initial_data: Dict = None) -> Dict[str, Any]: | |
| """ | |
| Start the scenario - return first message | |
| Args: | |
| initial_data: Optional initial context (e.g., event_name, mood) | |
| Returns: | |
| { | |
| "message": "First bot message", | |
| "new_state": { | |
| "active_scenario": "scenario_id", | |
| "scenario_step": 1, | |
| "scenario_data": {...} | |
| } | |
| } | |
| """ | |
| pass | |
| def next_step(self, current_step: int, user_input: str, scenario_data: Dict) -> Dict[str, Any]: | |
| """ | |
| Process user input and advance to next step | |
| Args: | |
| current_step: Current step number (1-indexed) | |
| user_input: User's message | |
| scenario_data: Accumulated scenario data | |
| Returns: | |
| { | |
| "message": "Bot response", | |
| "new_state": {...} or None if don't advance, | |
| "loading_message": "Optional loading text", | |
| "end_scenario": True/False, | |
| "action": "Optional action to execute" | |
| } | |
| """ | |
| pass | |
| def _search_rag(self, query: str, limit: int = 3) -> list: | |
| """ | |
| Execute RAG search using Qdrant | |
| Args: | |
| query: Search query text | |
| limit: Max number of results | |
| Returns: | |
| List of search results with metadata | |
| """ | |
| try: | |
| embedding = self.embedding_service.encode_text(query) | |
| results = self.qdrant_service.search( | |
| query_embedding=embedding, | |
| limit=limit, | |
| score_threshold=0.5, | |
| ef=256 | |
| ) | |
| return results | |
| except Exception as e: | |
| print(f"⚠️ RAG search error: {e}") | |
| return [] | |
| def _format_rag_results(self, results: list, max_length: int = 200) -> str: | |
| """ | |
| Format RAG search results as readable text | |
| Args: | |
| results: Search results from _search_rag() | |
| max_length: Max chars per result | |
| Returns: | |
| Formatted string with numbered results | |
| """ | |
| if not results or len(results) == 0: | |
| return "Không tìm thấy kết quả." | |
| formatted = [] | |
| for i, r in enumerate(results[:3], 1): | |
| text = r['metadata'].get('text', '') | |
| if text: | |
| snippet = text[:max_length].strip() | |
| if len(text) > max_length: | |
| snippet += "..." | |
| formatted.append(f"{i}. {snippet}") | |
| return "\n".join(formatted) if formatted else "Không tìm thấy kết quả." | |
| def handle_unexpected_input( | |
| self, | |
| user_input: str, | |
| expected_type: str, | |
| current_step: int | |
| ) -> Optional[Dict[str, Any]]: | |
| """ | |
| Handle when user gives unexpected input (e.g., asks question instead of answering) | |
| Args: | |
| user_input: User's message | |
| expected_type: What we expected (email, choice, event_name, etc.) | |
| current_step: Current step number | |
| Returns: | |
| None - Continue with normal flow | |
| Dict - Return this response (RAG answer + retry prompt) | |
| """ | |
| # Detect if user is asking a question instead of answering | |
| question_indicators = [ | |
| "?", "đâu", "gì", "sao", "where", "what", "how", | |
| "khi nào", "mấy giờ", "thế nào", "bao nhiêu" | |
| ] | |
| message_lower = user_input.lower() | |
| is_question = any(q in message_lower for q in question_indicators) | |
| if is_question: | |
| # User asking off-topic question → Answer with RAG, then retry | |
| print(f"🔀 Unexpected input detected: '{user_input}' (expected: {expected_type})") | |
| results = self._search_rag(user_input) | |
| rag_answer = self._format_rag_results(results) | |
| # Build retry prompt based on expected_type | |
| retry_prompts = { | |
| 'interest_tag': "Vậy nha! Quay lại câu hỏi: Bạn thích vibe nào? (Chill / Sôi động / Hài / Workshop)", | |
| 'event_name': "OK! Vậy bạn muốn xem event nào trong danh sách trên?", | |
| 'email': "Được rồi! Cho mình xin email nhé?", | |
| 'phone': "Okie! Vậy cho mình số điện thoại để liên hệ nhé?", | |
| 'choice': "Hiểu rồi! Vậy bạn chọn gì?", | |
| 'rating': "Vậy nha! Bạn đánh giá mấy sao? (1-5)" | |
| } | |
| retry_msg = retry_prompts.get(expected_type, "Vậy nha! Quay lại câu hỏi trước nhé ^^") | |
| return { | |
| "message": f"{rag_answer}\n\n---\n💬 {retry_msg}", | |
| "new_state": None, # Don't advance step | |
| "scenario_active": True, | |
| "loading_message": "⏳ Bạn đợi tôi tìm 1 chút nhé..." | |
| } | |
| return None # Continue normal flow | |
| def _validate_email(self, email: str) -> bool: | |
| """Simple email validation""" | |
| import re | |
| pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' | |
| return re.match(pattern, email) is not None | |
| def _validate_phone(self, phone: str) -> bool: | |
| """Simple phone validation (Vietnam format)""" | |
| import re | |
| # Accept formats: 0123456789, +84123456789, 84123456789 | |
| pattern = r'^(\+?84|0)[0-9]{9,10}$' | |
| return re.match(pattern, phone.replace(' ', '')) is not None | |