Spaces:
Running
Running
| """ | |
| Scenario Engine for FSM-based Conversations | |
| Executes multi-turn scripted conversations from JSON definitions | |
| """ | |
| import json | |
| import os | |
| import re | |
| from typing import Dict, Optional, List, Any | |
| from datetime import datetime | |
| class ScenarioEngine: | |
| """ | |
| Execute scenario-based conversations | |
| Load scenarios from JSON and manage step-by-step flow | |
| """ | |
| def __init__(self, scenarios_dir: str = "scenarios"): | |
| self.scenarios_dir = scenarios_dir | |
| self.scenarios = self._load_scenarios() | |
| def _load_scenarios(self) -> Dict[str, Dict]: | |
| """Load all scenario JSON files""" | |
| scenarios = {} | |
| if not os.path.exists(self.scenarios_dir): | |
| print(f"⚠ Scenarios directory not found: {self.scenarios_dir}") | |
| return scenarios | |
| for filename in os.listdir(self.scenarios_dir): | |
| if filename.endswith('.json'): | |
| filepath = os.path.join(self.scenarios_dir, filename) | |
| with open(filepath, 'r', encoding='utf-8') as f: | |
| scenario = json.load(f) | |
| scenario_id = scenario.get('scenario_id') | |
| if scenario_id: | |
| scenarios[scenario_id] = scenario | |
| print(f"✓ Loaded scenario: {scenario_id}") | |
| return scenarios | |
| def start_scenario(self, scenario_id: str, initial_data: Dict = None) -> Dict[str, Any]: | |
| """ | |
| Start a new scenario with optional initial data | |
| Args: | |
| scenario_id: Scenario to start | |
| initial_data: External data to inject (event_name, mood, etc.) | |
| Returns: | |
| { | |
| "message": str, | |
| "new_state": {...}, | |
| "end_scenario": bool | |
| } | |
| """ | |
| if scenario_id not in self.scenarios: | |
| return { | |
| "message": "Xin lỗi, tính năng này đang được cập nhật.", | |
| "new_state": {}, | |
| "end_scenario": True | |
| } | |
| scenario = self.scenarios[scenario_id] | |
| first_step = scenario['steps'][0] | |
| # Initialize with external data | |
| scenario_data = initial_data.copy() if initial_data else {} | |
| # Build first message with initial data | |
| message = self._build_message(first_step, scenario_data, None) | |
| return { | |
| "message": message, | |
| "new_state": { | |
| "active_scenario": scenario_id, | |
| "scenario_step": 1, | |
| "scenario_data": scenario_data, | |
| "last_activity": datetime.utcnow().isoformat() | |
| }, | |
| "end_scenario": False | |
| } | |
| def next_step( | |
| self, | |
| scenario_id: str, | |
| current_step: int, | |
| user_input: str, | |
| scenario_data: Dict, | |
| rag_service: Optional[Any] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Process user input and move to next step | |
| Args: | |
| scenario_id: Active scenario ID | |
| current_step: Current step number | |
| user_input: User's message | |
| scenario_data: Data collected so far | |
| rag_service: Optional RAG service for hybrid queries | |
| Returns: | |
| { | |
| "message": str, | |
| "new_state": {...} | None, | |
| "end_scenario": bool, | |
| "action": str | None | |
| } | |
| """ | |
| if scenario_id not in self.scenarios: | |
| return {"message": "Error: Scenario not found", "end_scenario": True} | |
| scenario = self.scenarios[scenario_id] | |
| current_step_config = self._get_step(scenario, current_step) | |
| if not current_step_config: | |
| return {"message": "Error: Step not found", "end_scenario": True} | |
| # Validate input if needed | |
| expected_type = current_step_config.get('expected_input_type') | |
| if expected_type: | |
| validation_error = self._validate_input(user_input, expected_type) | |
| if validation_error: | |
| return { | |
| "message": validation_error, | |
| "new_state": None, # Don't change state | |
| "end_scenario": False | |
| } | |
| # Handle branching | |
| if 'branches' in current_step_config: | |
| branch_result = self._handle_branches( | |
| current_step_config['branches'], | |
| user_input, | |
| scenario_data | |
| ) | |
| next_step_id = branch_result['next_step'] | |
| scenario_data.update(branch_result.get('save_data', {})) | |
| else: | |
| next_step_id = current_step_config.get('next_step') | |
| # Save user input | |
| input_field = current_step_config.get('save_as', f'step_{current_step}_input') | |
| scenario_data[input_field] = user_input | |
| # Get next step config | |
| next_step_config = self._get_step(scenario, next_step_id) | |
| if not next_step_config: | |
| return {"message": "Cảm ơn bạn!", "end_scenario": True} | |
| # Check if scenario ends | |
| if next_step_config.get('end_scenario'): | |
| return { | |
| "message": next_step_config['bot_message'], | |
| "new_state": None, | |
| "end_scenario": True, | |
| "action": next_step_config.get('action') | |
| } | |
| # Build next message | |
| message = self._build_message( | |
| next_step_config, | |
| scenario_data, | |
| rag_service | |
| ) | |
| return { | |
| "message": message, | |
| "new_state": { | |
| "active_scenario": scenario_id, | |
| "scenario_step": next_step_id, | |
| "scenario_data": scenario_data, | |
| "last_activity": datetime.utcnow().isoformat() | |
| }, | |
| "end_scenario": False, | |
| "action": next_step_config.get('action') | |
| } | |
| def _get_step(self, scenario: Dict, step_id: int) -> Optional[Dict]: | |
| """Get step config by ID""" | |
| for step in scenario['steps']: | |
| if step['id'] == step_id: | |
| return step | |
| return None | |
| def _validate_input(self, user_input: str, expected_type: str) -> Optional[str]: | |
| """ | |
| Validate user input | |
| Returns error message or None if valid | |
| """ | |
| if expected_type == 'email': | |
| if not re.match(r'^[\w\.-]+@[\w\.-]+\.\w+$', user_input): | |
| return "Email không hợp lệ. Vui lòng nhập lại (vd: ten@email.com)" | |
| elif expected_type == 'phone': | |
| # Simple Vietnamese phone validation | |
| clean = re.sub(r'[^\d]', '', user_input) | |
| if len(clean) < 9 or len(clean) > 11: | |
| return "Số điện thoại không hợp lệ. Vui lòng nhập lại (10-11 số)" | |
| return None | |
| def _handle_branches( | |
| self, | |
| branches: Dict, | |
| user_input: str, | |
| scenario_data: Dict | |
| ) -> Dict: | |
| """ | |
| Handle branch logic | |
| Returns: | |
| {"next_step": int, "save_data": {...}} | |
| """ | |
| user_input_lower = user_input.lower().strip() | |
| for branch_name, branch_config in branches.items(): | |
| if branch_name == 'default': | |
| continue | |
| patterns = branch_config.get('patterns', []) | |
| for pattern in patterns: | |
| if pattern.lower() in user_input_lower: | |
| return { | |
| "next_step": branch_config['next_step'], | |
| "save_data": branch_config.get('save_data', {}) | |
| } | |
| # Default branch | |
| default_name = branches.get('default_branch', list(branches.keys())[0]) | |
| default_branch = branches.get(default_name, list(branches.values())[0]) | |
| return { | |
| "next_step": default_branch['next_step'], | |
| "save_data": default_branch.get('save_data', {}) | |
| } | |
| def _build_message( | |
| self, | |
| step_config: Dict, | |
| scenario_data: Dict, | |
| rag_service: Optional[Any] | |
| ) -> str: | |
| """ | |
| Build bot message with 3-layer data resolution: | |
| 1. scenario_data (initial + user inputs) | |
| 2. RAG results (if rag_query_template exists) | |
| 3. Merged template vars | |
| """ | |
| # Layer 1: Base data (initial + user inputs) | |
| # Map common template vars from scenario_data | |
| template_data = { | |
| 'event_name': scenario_data.get('event_name', scenario_data.get('step_1_input', 'sự kiện này')), | |
| 'mood': scenario_data.get('mood', scenario_data.get('step_1_input', '')), | |
| 'interest': scenario_data.get('interest', scenario_data.get('step_1_input', '')), | |
| 'interest_tag': scenario_data.get('interest_tag', scenario_data.get('step_1_input', '')), | |
| **scenario_data # Include all scenario data | |
| } | |
| # Layer 2: RAG query (if specified) | |
| if 'rag_query_template' in step_config: | |
| try: | |
| # Build query from template | |
| query = step_config['rag_query_template'].format(**template_data) | |
| if rag_service: | |
| # Execute RAG search | |
| results = self._execute_rag_query(query, rag_service) | |
| template_data['rag_results'] = results | |
| else: | |
| # Fallback if no RAG service | |
| template_data['rag_results'] = "(Đang tải thông tin...)" | |
| except Exception as e: | |
| print(f"⚠ RAG query error: {e}") | |
| template_data['rag_results'] = "" | |
| # Layer 3: Build final message | |
| if 'bot_message_template' in step_config: | |
| try: | |
| return step_config['bot_message_template'].format(**template_data) | |
| except KeyError as e: | |
| print(f"⚠ Template var missing: {e}") | |
| print(f"📋 Available vars: {list(template_data.keys())}") | |
| # Fallback: replace missing vars with placeholder | |
| import re | |
| message = step_config['bot_message_template'] | |
| # Find all {var} patterns | |
| missing_vars = re.findall(r'\{(\w+)\}', message) | |
| for var in missing_vars: | |
| if var not in template_data: | |
| template_data[var] = f"[{var}]" | |
| print(f"⚠ Adding placeholder for: {var}") | |
| return message.format(**template_data) | |
| return step_config.get('bot_message', '') | |
| def _execute_rag_query(self, query: str, rag_service: Any) -> str: | |
| """ | |
| Execute RAG query and format results | |
| Returns formatted string of top results | |
| """ | |
| try: | |
| # Simple search (we'll integrate with actual RAG later) | |
| # For now, return placeholder | |
| return f"[Kết quả tìm kiếm cho: {query}]\n1. Sự kiện A\n2. Sự kiện B" | |
| except Exception as e: | |
| print(f"⚠ RAG execution error: {e}") | |
| return "" | |
| # Test | |
| if __name__ == "__main__": | |
| engine = ScenarioEngine() | |
| print("\nTest: Start price_inquiry scenario") | |
| result = engine.start_scenario("price_inquiry") | |
| print(f"Bot: {result['message']}") | |
| print(f"State: {result['new_state']}") | |
| print("\nTest: User answers 'Show A'") | |
| state = result['new_state'] | |
| result = engine.next_step( | |
| scenario_id=state['active_scenario'], | |
| current_step=state['scenario_step'], | |
| user_input="Show A", | |
| scenario_data=state['scenario_data'] | |
| ) | |
| print(f"Bot: {result['message']}") | |
| print("\nTest: User answers 'nhóm'") | |
| state = result['new_state'] | |
| result = engine.next_step( | |
| scenario_id=state['active_scenario'], | |
| current_step=state['scenario_step'], | |
| user_input="nhóm 5 người", | |
| scenario_data=state['scenario_data'] | |
| ) | |
| print(f"Bot: {result['message']}") | |
| print(f"Data collected: {result['new_state']['scenario_data']}") | |