Spaces:
Running
Running
| """ | |
| Agent Chat Streaming Endpoint | |
| SSE-based real-time streaming for Sales & Feedback agents | |
| """ | |
| from typing import AsyncGenerator | |
| from stream_utils import format_sse, EVENT_STATUS, EVENT_TOKEN, EVENT_DONE, EVENT_ERROR, EVENT_METADATA | |
| from datetime import datetime | |
| async def agent_chat_stream( | |
| request, | |
| agent_service, | |
| conversation_service | |
| ) -> AsyncGenerator[str, None]: | |
| """ | |
| Stream agent responses in real-time (SSE format) | |
| Args: | |
| request: ChatRequest with message, session_id, mode, user_id | |
| agent_service: AgentService instance | |
| conversation_service: ConversationService instance | |
| Yields SSE events: | |
| - status: Processing updates | |
| - token: Text chunks | |
| - metadata: Session info | |
| - done: Completion signal | |
| - error: Error messages | |
| """ | |
| try: | |
| # === SESSION MANAGEMENT === | |
| session_id = request.session_id | |
| if not session_id: | |
| session_id = conversation_service.create_session( | |
| metadata={"user_agent": "api", "created_via": "agent_stream"}, | |
| user_id=request.user_id | |
| ) | |
| yield format_sse(EVENT_METADATA, {"session_id": session_id}) | |
| # Get conversation history | |
| history = conversation_service.get_conversation_history(session_id) | |
| # Convert to messages format | |
| messages = [] | |
| for h in history: | |
| messages.append({"role": h["role"], "content": h["content"]}) | |
| # Determine mode | |
| mode = getattr(request, 'mode', 'sales') # Default to sales | |
| user_id = getattr(request, 'user_id', None) | |
| access_token = getattr(request, 'access_token', None) | |
| # Debug logging | |
| print(f"📋 Request Info:") | |
| print(f" - Mode: {mode}") | |
| print(f" - User ID: {user_id}") | |
| print(f" - Access Token: {'✅ Present' if access_token else '❌ Missing'}") | |
| if access_token: | |
| print(f" - Token preview: {access_token[:20]}...") | |
| # === STATUS UPDATE === | |
| if mode == 'feedback': | |
| yield format_sse(EVENT_STATUS, "Đang kiểm tra lịch sử sự kiện của bạn...") | |
| else: | |
| yield format_sse(EVENT_STATUS, "Đang tư vấn...") | |
| # === CALL AGENT === | |
| result = await agent_service.chat( | |
| user_message=request.message, | |
| conversation_history=messages, | |
| mode=mode, | |
| user_id=user_id, | |
| access_token=access_token | |
| ) | |
| agent_response = result["message"] | |
| # === STREAM RESPONSE TOKEN BY TOKEN === | |
| # Simple character-by-character streaming | |
| chunk_size = 5 # Characters per chunk | |
| for i in range(0, len(agent_response), chunk_size): | |
| chunk = agent_response[i:i+chunk_size] | |
| yield format_sse(EVENT_TOKEN, chunk) | |
| # Small delay for smoother streaming | |
| import asyncio | |
| await asyncio.sleep(0.02) | |
| # === SAVE HISTORY === | |
| conversation_service.add_message( | |
| session_id=session_id, | |
| role="user", | |
| content=request.message | |
| ) | |
| conversation_service.add_message( | |
| session_id=session_id, | |
| role="assistant", | |
| content=agent_response | |
| ) | |
| # === DONE === | |
| yield format_sse(EVENT_DONE, { | |
| "session_id": session_id, | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "mode": mode, | |
| "tool_calls": len(result.get("tool_calls", [])) | |
| }) | |
| except Exception as e: | |
| print(f"⚠️ Agent Stream Error: {e}") | |
| yield format_sse(EVENT_ERROR, str(e)) | |