ChatbotRAG / agent_chat_stream.py
minhvtt's picture
Upload 2 files
afb1117 verified
"""
Agent Chat Streaming Endpoint
SSE-based real-time streaming for Sales & Feedback agents
"""
from typing import AsyncGenerator
from stream_utils import format_sse, EVENT_STATUS, EVENT_TOKEN, EVENT_DONE, EVENT_ERROR, EVENT_METADATA
from datetime import datetime
async def agent_chat_stream(
request,
agent_service,
conversation_service
) -> AsyncGenerator[str, None]:
"""
Stream agent responses in real-time (SSE format)
Args:
request: ChatRequest with message, session_id, mode, user_id
agent_service: AgentService instance
conversation_service: ConversationService instance
Yields SSE events:
- status: Processing updates
- token: Text chunks
- metadata: Session info
- done: Completion signal
- error: Error messages
"""
try:
# === SESSION MANAGEMENT ===
session_id = request.session_id
if not session_id:
session_id = conversation_service.create_session(
metadata={"user_agent": "api", "created_via": "agent_stream"},
user_id=request.user_id
)
yield format_sse(EVENT_METADATA, {"session_id": session_id})
# Get conversation history
history = conversation_service.get_conversation_history(session_id)
# Convert to messages format
messages = []
for h in history:
messages.append({"role": h["role"], "content": h["content"]})
# Determine mode
mode = getattr(request, 'mode', 'sales') # Default to sales
user_id = getattr(request, 'user_id', None)
access_token = getattr(request, 'access_token', None)
# Debug logging
print(f"📋 Request Info:")
print(f" - Mode: {mode}")
print(f" - User ID: {user_id}")
print(f" - Access Token: {'✅ Present' if access_token else '❌ Missing'}")
if access_token:
print(f" - Token preview: {access_token[:20]}...")
# === STATUS UPDATE ===
if mode == 'feedback':
yield format_sse(EVENT_STATUS, "Đang kiểm tra lịch sử sự kiện của bạn...")
else:
yield format_sse(EVENT_STATUS, "Đang tư vấn...")
# === CALL AGENT ===
result = await agent_service.chat(
user_message=request.message,
conversation_history=messages,
mode=mode,
user_id=user_id,
access_token=access_token
)
agent_response = result["message"]
# === STREAM RESPONSE TOKEN BY TOKEN ===
# Simple character-by-character streaming
chunk_size = 5 # Characters per chunk
for i in range(0, len(agent_response), chunk_size):
chunk = agent_response[i:i+chunk_size]
yield format_sse(EVENT_TOKEN, chunk)
# Small delay for smoother streaming
import asyncio
await asyncio.sleep(0.02)
# === SAVE HISTORY ===
conversation_service.add_message(
session_id=session_id,
role="user",
content=request.message
)
conversation_service.add_message(
session_id=session_id,
role="assistant",
content=agent_response
)
# === DONE ===
yield format_sse(EVENT_DONE, {
"session_id": session_id,
"timestamp": datetime.utcnow().isoformat(),
"mode": mode,
"tool_calls": len(result.get("tool_calls", []))
})
except Exception as e:
print(f"⚠️ Agent Stream Error: {e}")
yield format_sse(EVENT_ERROR, str(e))