""" Agent Service - Central Brain for Sales & Feedback Agents Manages LLM conversation loop with tool calling """ from typing import Dict, Any, List, Optional import os from tools_service import ToolsService class AgentService: """ Manages the conversation loop between User -> LLM -> Tools -> Response """ def __init__( self, tools_service: ToolsService, embedding_service, qdrant_service, advanced_rag, hf_token: str, feedback_tracking=None # NEW: Optional feedback tracking ): self.tools_service = tools_service self.embedding_service = embedding_service self.qdrant_service = qdrant_service self.advanced_rag = advanced_rag self.hf_token = hf_token self.feedback_tracking = feedback_tracking # Load system prompts self.prompts = self._load_prompts() def _load_prompts(self) -> Dict[str, str]: """Load system prompts from files""" prompts = {} prompts_dir = "prompts" for mode in ["sales_agent", "feedback_agent"]: filepath = os.path.join(prompts_dir, f"{mode}.txt") try: with open(filepath, 'r', encoding='utf-8') as f: prompts[mode] = f.read() print(f"✓ Loaded prompt: {mode}") except Exception as e: print(f"⚠️ Error loading {mode} prompt: {e}") prompts[mode] = "" return prompts async def chat( self, user_message: str, conversation_history: List[Dict], mode: str = "sales", # "sales" or "feedback" user_id: Optional[str] = None, access_token: Optional[str] = None, # NEW: For authenticated API calls max_iterations: int = 3 ) -> Dict[str, Any]: """ Main conversation loop Args: user_message: User's input conversation_history: Previous messages [{"role": "user", "content": ...}, ...] mode: "sales" or "feedback" user_id: User ID (for feedback mode to check purchase history) access_token: JWT token for authenticated API calls max_iterations: Maximum tool call iterations to prevent infinite loops Returns: { "message": "Bot response", "tool_calls": [...], # List of tools called (for debugging) "mode": mode } """ print(f"\n🤖 Agent Mode: {mode}") print(f"👤 User Message: {user_message}") print(f"🔑 Auth Info:") print(f" - User ID: {user_id}") print(f" - Access Token: {'✅ Received' if access_token else '❌ None'}") # Store user_id and access_token for tool calls self.current_user_id = user_id self.current_access_token = access_token if access_token: print(f" - Stored access_token for tools: {access_token[:20]}...") if user_id: print(f" - Stored user_id for tools: {user_id}") # Select system prompt system_prompt = self._get_system_prompt(mode) # Build conversation context messages = self._build_messages(system_prompt, conversation_history, user_message) # Agentic loop: LLM may call tools multiple times tool_calls_made = [] current_response = None for iteration in range(max_iterations): print(f"\n🔄 Iteration {iteration + 1}") # Call LLM llm_response = await self._call_llm(messages) print(f"🧠 LLM Response: {llm_response[:200]}...") # Check if LLM wants to call a tool tool_call = self._parse_tool_call(llm_response) if not tool_call: # No tool call -> This is the final response current_response = llm_response break # Execute tool print(f"🔧 Tool Called: {tool_call['tool_name']}") # Auto-inject real user_id for get_purchased_events if tool_call['tool_name'] == 'get_purchased_events' and self.current_user_id: print(f"🔄 Auto-injecting real user_id: {self.current_user_id}") tool_call['arguments']['user_id'] = self.current_user_id tool_result = await self.tools_service.execute_tool( tool_call['tool_name'], tool_call['arguments'], access_token=self.current_access_token # Pass access_token ) # Record tool call tool_calls_made.append({ "function": tool_call['tool_name'], "arguments": tool_call['arguments'], "result": tool_result }) # Add tool result to conversation messages.append({ "role": "assistant", "content": llm_response }) messages.append({ "role": "system", "content": f"Tool Result:\n{self._format_tool_result({'result': tool_result})}" }) # If tool returns "run_rag_search", handle it specially if isinstance(tool_result, dict) and tool_result.get("action") == "run_rag_search": rag_results = await self._execute_rag_search(tool_result["query"]) messages[-1]["content"] = f"RAG Search Results:\n{rag_results}" # Clean up response final_response = current_response or llm_response final_response = self._clean_response(final_response) return { "message": final_response, "tool_calls": tool_calls_made, "mode": mode } def _get_system_prompt(self, mode: str) -> str: """Get system prompt for selected mode with tools definition""" prompt_key = f"{mode}_agent" if mode in ["sales", "feedback"] else "sales_agent" base_prompt = self.prompts.get(prompt_key, "") # Add tools definition tools_definition = self._get_tools_definition() return f"{base_prompt}\n\n{tools_definition}" def _get_tools_definition(self) -> str: """Get tools definition in text format for prompt""" return """ # AVAILABLE TOOLS You can call the following tools when needed. To call a tool, output a JSON block like this: ```json { "tool_call": "tool_name", "arguments": { "arg1": "value1", "arg2": "value2" } } ``` ## Tools List: ### 1. search_events Search for events matching user criteria. Arguments: - query (string): Search keywords - vibe (string, optional): Mood/vibe (e.g., "chill", "sôi động") - time (string, optional): Time period (e.g., "cuối tuần này") Example: ```json {"tool_call": "search_events", "arguments": {"query": "nhạc rock", "vibe": "sôi động"}} ``` ### 2. get_event_details Get detailed information about a specific event. Arguments: - event_id (string): Event ID from search results Example: ```json {"tool_call": "get_event_details", "arguments": {"event_id": "6900ae38eb03f29702c7fd1d"}} ``` ### 3. get_purchased_events (Feedback mode only) Check which events the user has attended. Arguments: - user_id (string): User ID Example: ```json {"tool_call": "get_purchased_events", "arguments": {"user_id": "user_123"}} ``` ### 4. save_feedback Save user's feedback/review for an event. Arguments: - event_id (string): Event ID - rating (integer): 1-5 stars - comment (string, optional): User's comment Example: ```json {"tool_call": "save_feedback", "arguments": {"event_id": "abc123", "rating": 5, "comment": "Tuyệt vời!"}} ``` ### 5. save_lead Save customer contact information. Arguments: - email (string, optional): Email address - phone (string, optional): Phone number - interest (string, optional): What they're interested in Example: ```json {"tool_call": "save_lead", "arguments": {"email": "user@example.com", "interest": "Rock show"}} ``` **IMPORTANT:** - Call tools ONLY when you need real-time data - After receiving tool results, respond naturally to the user - Don't expose raw JSON to users - always format nicely """ def _build_messages( self, system_prompt: str, history: List[Dict], user_message: str ) -> List[Dict]: """Build messages array for LLM""" messages = [{"role": "system", "content": system_prompt}] # Add conversation history messages.extend(history) # Add current user message messages.append({"role": "user", "content": user_message}) return messages async def _call_llm(self, messages: List[Dict]) -> str: """ Call HuggingFace LLM directly using chat_completion (conversational) """ try: from huggingface_hub import AsyncInferenceClient # Create async client client = AsyncInferenceClient(token=self.hf_token) # Call HF API with chat completion (conversational) response_text = "" async for message in await client.chat_completion( messages=messages, # Use messages directly model="openai/gpt-oss-20b", max_tokens=512, temperature=0.7, stream=True ): if message.choices and message.choices[0].delta.content: response_text += message.choices[0].delta.content return response_text except Exception as e: print(f"⚠️ LLM Call Error: {e}") return "Xin lỗi, tôi đang gặp chút vấn đề kỹ thuật. Bạn thử lại sau nhé!" def _messages_to_prompt(self, messages: List[Dict]) -> str: """Convert messages array to single prompt string""" prompt_parts = [] for msg in messages: role = msg["role"] content = msg["content"] if role == "system": prompt_parts.append(f"[SYSTEM]\n{content}\n") elif role == "user": prompt_parts.append(f"[USER]\n{content}\n") elif role == "assistant": prompt_parts.append(f"[ASSISTANT]\n{content}\n") return "\n".join(prompt_parts) def _format_tool_result(self, tool_result: Dict) -> str: """Format tool result for feeding back to LLM""" result = tool_result.get("result", {}) # Special handling for purchased events list if isinstance(result, list): print(f"\n🔍 Formatting {len(result)} purchased events for LLM") if not result: return "User has not purchased any events yet." # Format each event clearly formatted_events = [] for i, event in enumerate(result, 1): event_info = [] event_info.append(f"Event {i}:") # Extract key fields if 'eventName' in event: event_info.append(f" Name: {event['eventName']}") if 'eventCode' in event: event_info.append(f" Code: {event['eventCode']}") if '_id' in event: event_info.append(f" ID: {event['_id']}") if 'startTimeEventTime' in event: event_info.append(f" Date: {event['startTimeEventTime']}") formatted_events.append("\n".join(event_info)) formatted = "User's Purchased Events:\n\n" + "\n\n".join(formatted_events) print(f"📤 Sending to LLM:\n{formatted}") return formatted # Default formatting for other results if isinstance(result, dict): # Pretty print key info formatted = [] for key, value in result.items(): if key not in ["success", "error"]: formatted.append(f"{key}: {value}") return "\n".join(formatted) return str(result) async def _execute_rag_search(self, query_params: Dict) -> str: """ Execute RAG search for event discovery Called when LLM wants to search_events """ query = query_params.get("query", "") vibe = query_params.get("vibe", "") # Build search query search_text = f"{query} {vibe}".strip() print(f"🔍 RAG Search: {search_text}") # Use embedding + qdrant embedding = self.embedding_service.encode_text(search_text) results = self.qdrant_service.search( query_embedding=embedding, limit=5 ) # Format results formatted = [] for i, result in enumerate(results, 1): # Result is a dict with keys: id, score, payload payload = result.get("payload", {}) texts = payload.get("texts", []) text = texts[0] if texts else "" event_id = payload.get("id_use", "") formatted.append(f"{i}. {text[:100]}... (ID: {event_id})") return "\n".join(formatted) if formatted else "Không tìm thấy sự kiện phù hợp." def _parse_tool_call(self, llm_response: str) -> Optional[Dict]: """ Parse LLM response to detect tool calls using structured JSON Returns: {"tool_name": "...", "arguments": {...}} or None """ import json import re # Method 1: Look for JSON code block json_match = re.search(r'```json\s*(\{.*?\})\s*```', llm_response, re.DOTALL) if json_match: try: data = json.loads(json_match.group(1)) return self._extract_tool_from_json(data) except json.JSONDecodeError: pass # Method 2: Look for inline JSON object # Find all potential JSON objects json_objects = re.findall(r'\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}', llm_response) for json_str in json_objects: try: data = json.loads(json_str) tool_call = self._extract_tool_from_json(data) if tool_call: return tool_call except json.JSONDecodeError: continue # Method 3: Nested JSON (for complex structures) try: # Find outermost curly braces if '{' in llm_response and '}' in llm_response: start = llm_response.find('{') # Find matching closing brace count = 0 for i, char in enumerate(llm_response[start:], start): if char == '{': count += 1 elif char == '}': count -= 1 if count == 0: json_str = llm_response[start:i+1] data = json.loads(json_str) return self._extract_tool_from_json(data) except (json.JSONDecodeError, ValueError): pass return None def _extract_tool_from_json(self, data: dict) -> Optional[Dict]: """ Extract tool call information from parsed JSON Supports multiple formats: - {"tool_call": "search_events", "arguments": {...}} - {"function": "search_events", "parameters": {...}} - {"name": "search_events", "args": {...}} """ # Format 1: tool_call + arguments if "tool_call" in data and isinstance(data["tool_call"], str): return { "tool_name": data["tool_call"], "arguments": data.get("arguments", {}) } # Format 2: function + parameters if "function" in data: return { "tool_name": data["function"], "arguments": data.get("parameters", data.get("arguments", {})) } # Format 3: name + args if "name" in data: return { "tool_name": data["name"], "arguments": data.get("args", data.get("arguments", {})) } # Format 4: Direct tool name as key valid_tools = ["search_events", "get_event_details", "get_purchased_events", "save_feedback", "save_lead"] for tool in valid_tools: if tool in data: return { "tool_name": tool, "arguments": data[tool] if isinstance(data[tool], dict) else {} } return None def _clean_response(self, response: str) -> str: """Remove JSON artifacts from final response""" # Remove JSON blocks if "```json" in response: response = response.split("```json")[0] if "```" in response: response = response.split("```")[0] # Remove tool call markers if "{" in response and "tool_call" in response: # Find the last natural sentence before JSON lines = response.split("\n") cleaned = [] for line in lines: if "{" in line and "tool_call" in line: break cleaned.append(line) response = "\n".join(cleaned) return response.strip()