File size: 2,625 Bytes
4851501 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
from fastapi import APIRouter
from pydantic import BaseModel
from typing import Optional
from backend.services.executor import QueryExecutor
router = APIRouter()
class MessageHistory(BaseModel):
role: str # "user" or "assistant"
content: str
class ChatRequest(BaseModel):
message: str
history: list[MessageHistory] = []
class ChartData(BaseModel):
type: str # 'bar', 'line', 'pie', 'donut'
title: Optional[str] = None
data: list[dict] = []
xKey: Optional[str] = None
yKey: Optional[str] = None
lines: Optional[list[dict]] = None
class ChatResponse(BaseModel):
response: str
sql_query: Optional[str] = None
geojson: Optional[dict] = None
data_citations: list[str] = []
intent: Optional[str] = None
chart_data: Optional[ChartData] = None # NEW: For STAT_QUERY responses
@router.post("/", response_model=ChatResponse)
async def chat(request: ChatRequest):
"""
Main chat endpoint that handles conversation with context.
Routes to appropriate handler based on detected intent.
"""
executor = QueryExecutor()
# Convert history to dict format for the executor
history = [{"role": h.role, "content": h.content} for h in request.history]
# Process the query with full context
result = await executor.process_query_with_context(
query=request.message,
history=history
)
return ChatResponse(
response=result.get("response", "I processed your request."),
sql_query=result.get("sql_query"),
geojson=result.get("geojson"),
data_citations=result.get("data_citations", []),
intent=result.get("intent"),
chart_data=result.get("chart_data"),
raw_data=result.get("raw_data")
)
from sse_starlette.sse import EventSourceResponse
import json
import asyncio
@router.post("/stream")
async def chat_stream(request: ChatRequest):
"""
Streaming chat endpoint that returns Server-Sent Events (SSE).
"""
executor = QueryExecutor()
history = [{"role": h.role, "content": h.content} for h in request.history]
async def event_generator():
try:
# Delegate entirely to the executor's streaming process
async for event in executor.process_query_stream(request.message, history):
yield event
except Exception as e:
print(f"Stream error: {e}")
yield {
"event": "chunk",
"data": json.dumps({"type": "text", "content": f"\n\nError: {str(e)}"})
}
return EventSourceResponse(event_generator())
|