""" Agent Chat Streaming Endpoint SSE-based real-time streaming for Sales & Feedback agents """ from typing import AsyncGenerator from stream_utils import format_sse, EVENT_STATUS, EVENT_TOKEN, EVENT_DONE, EVENT_ERROR, EVENT_METADATA from datetime import datetime async def agent_chat_stream( request, agent_service, conversation_service ) -> AsyncGenerator[str, None]: """ Stream agent responses in real-time (SSE format) Args: request: ChatRequest with message, session_id, mode, user_id agent_service: AgentService instance conversation_service: ConversationService instance Yields SSE events: - status: Processing updates - token: Text chunks - metadata: Session info - done: Completion signal - error: Error messages """ try: # === SESSION MANAGEMENT === session_id = request.session_id if not session_id: session_id = conversation_service.create_session( metadata={"user_agent": "api", "created_via": "agent_stream"}, user_id=request.user_id ) yield format_sse(EVENT_METADATA, {"session_id": session_id}) # Get conversation history history = conversation_service.get_conversation_history(session_id) # Convert to messages format messages = [] for h in history: messages.append({"role": h["role"], "content": h["content"]}) # Determine mode mode = getattr(request, 'mode', 'sales') # Default to sales user_id = getattr(request, 'user_id', None) access_token = getattr(request, 'access_token', None) # Debug logging print(f"📋 Request Info:") print(f" - Mode: {mode}") print(f" - User ID: {user_id}") print(f" - Access Token: {'✅ Present' if access_token else '❌ Missing'}") if access_token: print(f" - Token preview: {access_token[:20]}...") # === STATUS UPDATE === if mode == 'feedback': yield format_sse(EVENT_STATUS, "Đang kiểm tra lịch sử sự kiện của bạn...") else: yield format_sse(EVENT_STATUS, "Đang tư vấn...") # === CALL AGENT === result = await agent_service.chat( user_message=request.message, conversation_history=messages, mode=mode, user_id=user_id, access_token=access_token ) agent_response = result["message"] # === STREAM RESPONSE TOKEN BY TOKEN === # Simple character-by-character streaming chunk_size = 5 # Characters per chunk for i in range(0, len(agent_response), chunk_size): chunk = agent_response[i:i+chunk_size] yield format_sse(EVENT_TOKEN, chunk) # Small delay for smoother streaming import asyncio await asyncio.sleep(0.02) # === SAVE HISTORY === conversation_service.add_message( session_id=session_id, role="user", content=request.message ) conversation_service.add_message( session_id=session_id, role="assistant", content=agent_response ) # === DONE === yield format_sse(EVENT_DONE, { "session_id": session_id, "timestamp": datetime.utcnow().isoformat(), "mode": mode, "tool_calls": len(result.get("tool_calls", [])) }) except Exception as e: print(f"⚠️ Agent Stream Error: {e}") yield format_sse(EVENT_ERROR, str(e))