""" Conversation Service for Multi-turn Chat Server-side session management """ from typing import List, Dict, Optional from datetime import datetime from pymongo.collection import Collection import uuid class ConversationService: """ Manages multi-turn conversation history với server-side session """ def __init__(self, mongo_collection: Collection, max_history: int = 10): """ Args: mongo_collection: MongoDB collection for storing conversations max_history: Maximum số messages giữ lại (sliding window) """ self.collection = mongo_collection self.max_history = max_history # Create indexes self._ensure_indexes() def _ensure_indexes(self): """Create necessary indexes""" try: self.collection.create_index("session_id", unique=True) self.collection.create_index("user_id") # NEW: Index for user filtering # Auto-delete sessions sau 7 ngày không dùng self.collection.create_index( "updated_at", expireAfterSeconds=604800 # 7 days ) print("✓ Conversation indexes created") except Exception as e: print(f"Conversation indexes already exist or error: {e}") def create_session(self, metadata: Optional[Dict] = None, user_id: Optional[str] = None) -> str: """ Create new conversation session Args: metadata: Additional metadata user_id: User identifier (optional) Returns: session_id (UUID string) """ session_id = str(uuid.uuid4()) self.collection.insert_one({ "session_id": session_id, "user_id": user_id, # NEW: Store user_id "messages": [], "scenario_state": None, # NEW: Scenario state "metadata": metadata or {}, "created_at": datetime.utcnow(), "updated_at": datetime.utcnow() }) return session_id def add_message( self, session_id: str, role: str, content: str, metadata: Optional[Dict] = None ): """ Add message to conversation history Args: session_id: Session identifier role: "user" or "assistant" content: Message text metadata: Additional info (rag_stats, tool_calls, etc.) """ message = { "role": role, "content": content, "timestamp": datetime.utcnow().isoformat(), "metadata": metadata or {} } # Upsert: tạo session nếu chưa tồn tại self.collection.update_one( {"session_id": session_id}, { "$push": { "messages": { "$each": [message], "$slice": -self.max_history # Keep only last N messages } }, "$set": {"updated_at": datetime.utcnow()} }, upsert=True ) def get_conversation_history( self, session_id: str, limit: Optional[int] = None, include_metadata: bool = False ) -> List[Dict]: """ Get conversation messages for LLM context Args: session_id: Session identifier limit: Override max_history với số lượng tùy chỉnh include_metadata: Include metadata trong response Returns: List of messages in format: [{"role": "user", "content": "..."}, ...] """ session = self.collection.find_one({"session_id": session_id}) if not session: return [] messages = session.get("messages", []) # Limit to recent messages if limit: messages = messages[-limit:] else: messages = messages[-self.max_history:] # Format for LLM if include_metadata: return messages else: return [ { "role": msg["role"], "content": msg["content"] } for msg in messages ] def get_session_info(self, session_id: str) -> Optional[Dict]: """ Get session metadata Returns: Session info hoặc None nếu không tồn tại """ session = self.collection.find_one( {"session_id": session_id}, {"_id": 0, "session_id": 1, "user_id": 1, "created_at": 1, "updated_at": 1, "metadata": 1} ) return session def clear_session(self, session_id: str) -> bool: """ Clear conversation history for session Returns: True nếu xóa thành công, False nếu session không tồn tại """ result = self.collection.delete_one({"session_id": session_id}) return result.deleted_count > 0 def session_exists(self, session_id: str) -> bool: """ Check if session exists """ return self.collection.count_documents({"session_id": session_id}) > 0 def get_last_user_message(self, session_id: str) -> Optional[str]: """ Get the last user message in conversation Useful for context extraction """ session = self.collection.find_one({"session_id": session_id}) if not session: return None messages = session.get("messages", []) # Tìm message cuối cùng từ user for msg in reversed(messages): if msg["role"] == "user": return msg["content"] return None def list_sessions( self, limit: int = 50, skip: int = 0, sort_by: str = "updated_at", descending: bool = True, user_id: Optional[str] = None # NEW: Filter by user ) -> List[Dict]: """ List all conversation sessions Args: limit: Maximum number of sessions to return skip: Number of sessions to skip (for pagination) sort_by: Field to sort by (created_at, updated_at) descending: Sort in descending order user_id: Filter sessions by user_id (optional) Returns: List of session summaries """ sort_order = -1 if descending else 1 # Build query filter query = {} if user_id: query["user_id"] = user_id sessions = self.collection.find( query, # Use query filter {"_id": 0, "session_id": 1, "user_id": 1, "created_at": 1, "updated_at": 1, "metadata": 1} ).sort(sort_by, sort_order).skip(skip).limit(limit) result = [] for session in sessions: # Count messages message_count = len( self.collection.find_one({"session_id": session["session_id"]}, {"messages": 1}) .get("messages", []) ) result.append({ "session_id": session["session_id"], "user_id": session.get("user_id"), # NEW: Include user_id "created_at": session["created_at"], "updated_at": session["updated_at"], "message_count": message_count, "metadata": session.get("metadata", {}) }) return result def count_sessions(self, user_id: Optional[str] = None) -> int: """ Get total number of sessions Args: user_id: Filter count by user_id (optional) """ query = {} if user_id: query["user_id"] = user_id return self.collection.count_documents(query) # ===== Scenario State Management ===== def get_scenario_state(self, session_id: str) -> Optional[Dict]: """ Get current scenario state for session Returns: { "active_scenario": "price_inquiry", "scenario_step": 3, "scenario_data": {...}, "last_activity": "..." } or None if no active scenario """ session = self.collection.find_one({"session_id": session_id}) if not session: return None return session.get("scenario_state") def set_scenario_state(self, session_id: str, state: Dict): """ Set scenario state for session Args: session_id: Session ID state: Scenario state dict """ self.collection.update_one( {"session_id": session_id}, { "$set": { "scenario_state": state, "updated_at": datetime.utcnow() } }, upsert=True ) def clear_scenario(self, session_id: str): """ Clear scenario state (end scenario) """ self.collection.update_one( {"session_id": session_id}, { "$set": { "scenario_state": None, "updated_at": datetime.utcnow() } } )