ChatbotRAG / tools_service.py
minhvtt's picture
Upload 4 files
4fafc3c verified
raw
history blame
9.91 kB
"""
Tools Service for LLM Function Calling
HuggingFace-compatible với prompt engineering
"""
import httpx
from typing import List, Dict, Any, Optional
import json
import asyncio
class ToolsService:
"""
Manages external API tools that LLM can call via prompt engineering
"""
def __init__(self, base_url: str = "https://hoalacrent.io.vn/api/v0", feedback_tracking=None):
self.base_url = base_url
self.client = httpx.AsyncClient(timeout=10.0)
self.feedback_tracking = feedback_tracking # NEW: Feedback tracking
def get_tools_definition(self) -> List[Dict]:
"""
Return list of tool definitions (OpenAI format style)
Used for constructing system prompt
"""
return [
{
"name": "search_events",
"description": "Tìm kiếm sự kiện phù hợp theo từ khóa, vibe, hoặc thời gian.",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Từ khóa tìm kiếm (VD: 'nhạc rock', 'hài kịch')"},
"vibe": {"type": "string", "description": "Vibe/Mood (VD: 'chill', 'sôi động', 'hẹn hò')"},
"time": {"type": "string", "description": "Thời gian (VD: 'cuối tuần này', 'tối nay')"}
}
}
},
{
"name": "get_event_details",
"description": "Lấy thông tin chi tiết (giá, địa điểm, thời gian) của sự kiện.",
"parameters": {
"type": "object",
"properties": {
"event_id": {"type": "string", "description": "ID của sự kiện (MongoDB ID)"}
},
"required": ["event_id"]
}
},
{
"name": "get_purchased_events",
"description": "Kiểm tra lịch sử các sự kiện user đã mua vé hoặc tham gia.",
"parameters": {
"type": "object",
"properties": {
"user_id": {"type": "string", "description": "ID của user"}
},
"required": ["user_id"]
}
},
{
"name": "save_feedback",
"description": "Lưu đánh giá/feedback của user về sự kiện.",
"parameters": {
"type": "object",
"properties": {
"event_id": {"type": "string", "description": "ID sự kiện"},
"rating": {"type": "integer", "description": "Số sao đánh giá (1-5)"},
"comment": {"type": "string", "description": "Nội dung nhận xét"}
},
"required": ["event_id", "rating"]
}
},
{
"name": "save_lead",
"description": "Lưu thông tin khách hàng quan tâm (Lead).",
"parameters": {
"type": "object",
"properties": {
"email": {"type": "string"},
"phone": {"type": "string"},
"interest": {"type": "string"}
}
}
}
]
async def execute_tool(self, tool_name: str, arguments: Dict, access_token: Optional[str] = None) -> Any:
"""
Execute a tool by name with arguments
Args:
tool_name: Name of the tool
arguments: Tool arguments
access_token: JWT token for authenticated API calls
"""
print(f"\n🔧 ===== TOOL EXECUTION =====")
print(f"Tool: {tool_name}")
print(f"Arguments: {arguments}")
print(f"Access Token: {'✅ Present' if access_token else '❌ Missing'}")
if access_token:
print(f"Token preview: {access_token[:30]}...")
try:
if tool_name == "get_event_details":
return await self._get_event_details(arguments.get("event_id") or arguments.get("event_code"))
elif tool_name == "get_purchased_events":
print(f"→ Calling _get_purchased_events with:")
print(f" user_id: {arguments.get('user_id')}")
print(f" access_token: {'✅' if access_token else '❌'}")
return await self._get_purchased_events(
arguments.get("user_id"),
access_token=access_token # Pass access_token
)
elif tool_name == "save_feedback":
return await self._save_feedback(
arguments.get("event_id"),
arguments.get("rating"),
arguments.get("comment")
)
elif tool_name == "search_events":
# Note: This usually requires RAG service, so we return a special signal
# The Agent Service will handle RAG search
return {"action": "run_rag_search", "query": arguments}
elif tool_name == "save_lead":
# Placeholder for lead saving
return {"success": True, "message": "Lead saved successfully"}
else:
return {"error": f"Unknown tool: {tool_name}"}
except Exception as e:
print(f"⚠️ Tool Execution Error: {e}")
return {"error": str(e)}
async def _get_event_details(self, event_id: str) -> Dict:
"""Call API to get event details"""
if not event_id:
return {"error": "Missing event_id"}
try:
url = f"{self.base_url}/event/get-event-by-id"
response = await self.client.get(url, params={"id": event_id})
if response.status_code == 200:
data = response.json()
if data.get("success"):
return data.get("data")
return {"error": "Event not found", "details": response.text}
except Exception as e:
return {"error": str(e)}
async def _get_purchased_events(self, user_id: str, access_token: Optional[str] = None) -> List[Dict]:
"""Call API to get purchased events for user (requires auth)"""
print(f"\n🎫 ===== GET PURCHASED EVENTS =====")
print(f"User ID: {user_id}")
print(f"Access Token: {'✅ Present' if access_token else '❌ Missing'}")
if not user_id:
print("⚠️ No user_id provided, returning empty list")
return []
try:
url = f"{self.base_url}/event/get-purchase-event-by-user-id/{user_id}"
print(f"🔍 API URL: {url}")
# Add Authorization header if access_token provided
headers = {}
if access_token:
headers["Authorization"] = f"Bearer {access_token}"
print(f"🔐 Authorization Header Added:")
print(f" Bearer {access_token[:30]}...")
else:
print(f"⚠️ No access_token - calling API without auth")
print(f"📡 Headers: {headers}")
print(f"🚀 Calling API...")
response = await self.client.get(url, headers=headers)
print(f"📥 Response Status: {response.status_code}")
print(f"📦 Response Headers: {dict(response.headers)}")
if response.status_code == 200:
data = response.json()
print(f"✅ Success! Data keys: {list(data.keys())}")
events = data.get("data", [])
print(f"📊 Found {len(events)} purchased events")
return events
else:
print(f"❌ API Error: {response.status_code}")
print(f"Response body: {response.text[:500]}")
return []
except Exception as e:
print(f"⚠️ Exception in _get_purchased_events: {type(e).__name__}: {e}")
import traceback
traceback.print_exc()
return []
async def _save_feedback(self, event_id: str, rating: int, comment: str, user_id: str = None, event_code: str = None) -> Dict:
"""Save feedback and mark as completed in tracking system"""
print(f"\n📝 ===== SAVE FEEDBACK =====")
print(f"Event ID: {event_id}")
print(f"Event Code: {event_code}")
print(f"User ID: {user_id}")
print(f"Rating: {rating}")
print(f"Comment: {comment}")
# TODO: Implement real API call to save feedback
# For now, just mark in tracking system
if self.feedback_tracking and user_id and event_code:
success = self.feedback_tracking.mark_feedback_given(
user_id=user_id,
event_code=event_code,
rating=rating,
comment=comment
)
if success:
print(f"✅ Feedback tracked in database")
else:
print(f"⚠️ Failed to track feedback")
return {"success": True, "message": "Feedback recorded"}
async def close(self):
"""Close HTTP client"""
await self.client.aclose()