ChatbotRAG / event_service.py
minhvtt's picture
Upload 3 files
5f63f29 verified
"""
Event Service - Get event details from MongoDB ID or eventCode
"""
import httpx
from typing import Dict, Any, Optional
class EventService:
"""Service to fetch event details from external API"""
def __init__(self, base_url: str = "https://hoalacrent.io.vn/api/v0"):
self.base_url = base_url
self.client = httpx.AsyncClient(timeout=10.0)
async def get_event_by_code(self, event_code: str) -> Optional[Dict[str, Any]]:
"""
Get event details by eventCode
Args:
event_code: Event code (e.g., "EVENT001")
Returns:
Event data or None if not found
"""
try:
url = f"{self.base_url}/event/get-event-by-event-code"
response = await self.client.get(url, params={"eventCode": event_code})
response.raise_for_status()
data = response.json()
if data.get("success") and data.get("data"):
event = data["data"]
return {
"eventCode": event.get("eventCode"),
"eventName": event.get("eventName"),
"eventAddress": event.get("eventAddress"),
"eventStartTime": event.get("eventStartTime"),
"eventEndTime": event.get("eventEndTime"),
"maxParticipants": event.get("maxParticipants"),
"eventOrganizer": event.get("eventOrganizer"),
"eventDescription": event.get("eventDescription")
}
return None
except Exception as e:
print(f"⚠️ Error fetching event by code: {e}")
return None
async def get_event_by_id(self, event_id: str) -> Optional[Dict[str, Any]]:
"""
Get event details by MongoDB _id
IMPORTANT: Backend needs to implement this endpoint:
GET /api/v0/event/get-event-by-id?id={mongodb_id}
Args:
event_id: MongoDB _id (e.g., "6900ae38eb03f29702c7fd1d")
Returns:
Event data or None if not found
"""
try:
# Option 1: If backend has this endpoint
url = f"{self.base_url}/event/get-event-by-id"
response = await self.client.get(url, params={"id": event_id})
response.raise_for_status()
data = response.json()
if data.get("success") and data.get("data"):
event = data["data"]
return {
"eventCode": event.get("eventCode"),
"eventName": event.get("eventName"),
"eventAddress": event.get("eventAddress"),
"eventStartTime": event.get("eventStartTime"),
"eventEndTime": event.get("eventEndTime"),
"maxParticipants": event.get("maxParticipants"),
"eventOrganizer": event.get("eventOrganizer"),
"eventDescription": event.get("eventDescription")
}
return None
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
print(f"⚠️ Event not found: {event_id}")
else:
print(f"⚠️ Error fetching event by ID: {e}")
return None
except Exception as e:
print(f"⚠️ Error fetching event by ID: {e}")
return None
def format_event_for_display(self, event: Dict[str, Any]) -> str:
"""
Format event data for chat display
Args:
event: Event data dict
Returns:
Formatted string
"""
name = event.get("eventName", "Sự kiện")
# Format date
start = event.get("eventStartTime", "")
date_str = start[:10] if start else "TBA" # Extract date part
# Format location
location = event.get("eventAddress", "")
result = f"**{name}**"
if date_str != "TBA":
result += f" ({date_str})"
if location:
result += f" - {location}"
return result
async def close(self):
"""Close HTTP client"""
await self.client.aclose()