|
|
import httpx |
|
|
import json |
|
|
import time |
|
|
import uuid |
|
|
from fastapi import FastAPI, HTTPException |
|
|
from fastapi.responses import StreamingResponse |
|
|
from pydantic import BaseModel, Field |
|
|
from typing import List, Optional |
|
|
|
|
|
|
|
|
|
|
|
TYPEGPT_API_URL = "https://api.typegpt.net/v1/chat/completions" |
|
|
TYPEGPT_API_KEY = "sk-oPdaZC7n1JlDq0sJ5NSSyHe7sYaeAXeEuj0wX4Lk8hlOGPF8" |
|
|
SEARCH_API_URL = "https://superapis-bing.hf.space/search" |
|
|
|
|
|
|
|
|
|
|
|
SYSTEM_PROMPT = """ |
|
|
You are an expert AI research assistant. Your primary goal is to provide accurate, comprehensive, and helpful answers based ONLY on the provided search results. |
|
|
Instructions: |
|
|
1. Carefully analyze the user's query and the provided search results. |
|
|
2. Synthesize an answer directly from the information found in the search results. |
|
|
3. For every statement or piece of information you provide, you MUST cite the corresponding search result number in the format `[<number>]`. |
|
|
4. If multiple sources support a statement, you can cite them like `[1, 2]`. |
|
|
5. If the search results do not contain enough information to answer the query, you must explicitly state that you could not find the information in the provided context. |
|
|
6. Do not use any prior knowledge or information outside of the provided search results. |
|
|
7. Structure your response in a clear and easy-to-read format. Start with a direct answer, followed by a more detailed explanation. |
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ChatMessage(BaseModel): |
|
|
role: str |
|
|
content: str |
|
|
|
|
|
class ChatCompletionRequest(BaseModel): |
|
|
messages: List[ChatMessage] = Field(..., example=[{"role": "user", "content": "What are the benefits of learning Python?"}]) |
|
|
model: str = "perplexity-like" |
|
|
stream: bool = Field(default=False, description="Enable streaming response") |
|
|
|
|
|
|
|
|
class ChatDelta(BaseModel): |
|
|
content: Optional[str] = None |
|
|
role: Optional[str] = None |
|
|
|
|
|
class ChatCompletionStreamChoice(BaseModel): |
|
|
delta: ChatDelta |
|
|
index: int = 0 |
|
|
finish_reason: Optional[str] = None |
|
|
|
|
|
class ChatCompletionStreamResponse(BaseModel): |
|
|
id: str = Field(default_factory=lambda: f"chatcmpl-{uuid.uuid4().hex}") |
|
|
object: str = "chat.completion.chunk" |
|
|
created: int = Field(default_factory=lambda: int(time.time())) |
|
|
model: str = "perplexity-like" |
|
|
choices: List[ChatCompletionStreamChoice] |
|
|
|
|
|
|
|
|
app = FastAPI( |
|
|
title="Perplexity-like API", |
|
|
description="An API that uses web search to answer questions with citations, supporting streaming.", |
|
|
version="2.0.0" |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
async def stream_llm_response(payload: dict): |
|
|
""" |
|
|
An async generator that streams the response from the language model. |
|
|
""" |
|
|
start_time = time.time() |
|
|
try: |
|
|
async with httpx.AsyncClient(timeout=60.0) as client: |
|
|
headers = { |
|
|
"Authorization": f"Bearer {TYPEGPT_API_KEY}", |
|
|
"Content-Type": "application/json" |
|
|
} |
|
|
async with client.stream("POST", TYPEGPT_API_URL, headers=headers, json=payload) as response: |
|
|
|
|
|
if response.status_code != 200: |
|
|
error_content = await response.aread() |
|
|
raise HTTPException( |
|
|
status_code=response.status_code, |
|
|
detail=f"Error from language model API: {error_content.decode()}" |
|
|
) |
|
|
|
|
|
|
|
|
async for line in response.aiter_lines(): |
|
|
if line.startswith("data: "): |
|
|
data_str = line.removeprefix("data: ") |
|
|
if data_str.strip() == "[DONE]": |
|
|
break |
|
|
try: |
|
|
chunk = json.loads(data_str) |
|
|
delta_content = chunk["choices"][0]["delta"].get("content") |
|
|
if delta_content: |
|
|
|
|
|
stream_choice = ChatCompletionStreamChoice(delta=ChatDelta(content=delta_content)) |
|
|
stream_response = ChatCompletionStreamResponse(choices=[stream_choice]) |
|
|
yield f"data: {stream_response.model_dump_json()}\n\n" |
|
|
except (json.JSONDecodeError, KeyError, IndexError): |
|
|
|
|
|
continue |
|
|
|
|
|
except httpx.RequestError as e: |
|
|
|
|
|
error_message = f"HTTP Request Error during streaming: {e}" |
|
|
stream_choice = ChatCompletionStreamChoice(delta=ChatDelta(content=f"\n\nERROR: {error_message}")) |
|
|
stream_response = ChatCompletionStreamResponse(choices=[stream_choice]) |
|
|
yield f"data: {stream_response.model_dump_json()}\n\n" |
|
|
|
|
|
except Exception as e: |
|
|
|
|
|
error_message = f"An unexpected error occurred during streaming: {e}" |
|
|
stream_choice = ChatCompletionStreamChoice(delta=ChatDelta(content=f"\n\nERROR: {error_message}")) |
|
|
stream_response = ChatCompletionStreamResponse(choices=[stream_choice]) |
|
|
yield f"data: {stream_response.model_dump_json()}\n\n" |
|
|
|
|
|
|
|
|
finally: |
|
|
finish_time = time.time() |
|
|
print(f"Stream finished in {finish_time - start_time:.2f} seconds.") |
|
|
final_choice = ChatCompletionStreamChoice(delta=ChatDelta(), finish_reason="stop") |
|
|
final_response = ChatCompletionStreamResponse(choices=[final_choice]) |
|
|
yield f"data: {final_response.model_dump_json()}\n\n" |
|
|
yield "data: [DONE]\n\n" |
|
|
|
|
|
|
|
|
|
|
|
@app.post("/v1/chat/completions") |
|
|
async def chat_completions(request: ChatCompletionRequest): |
|
|
""" |
|
|
Takes a user's query, performs a web search, and streams a factual, |
|
|
cited response from a language model. |
|
|
""" |
|
|
if not request.messages or request.messages[-1].role != "user": |
|
|
raise HTTPException(status_code=400, detail="Invalid request. The last message must be from the 'user'.") |
|
|
|
|
|
user_query = request.messages[-1].content |
|
|
|
|
|
|
|
|
try: |
|
|
async with httpx.AsyncClient(timeout=30.0) as client: |
|
|
search_params = {"keywords": user_query, "max_results": 7} |
|
|
search_response = await client.get(SEARCH_API_URL, params=search_params) |
|
|
search_response.raise_for_status() |
|
|
search_results = search_response.json() |
|
|
except httpx.RequestError as e: |
|
|
raise HTTPException(status_code=502, detail=f"Error calling the search API: {e}") |
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=500, detail=f"Failed to process search results: {e}") |
|
|
|
|
|
|
|
|
|
|
|
context = "" |
|
|
for i, result in enumerate(search_results): |
|
|
context += f"Source [{i+1}]:\nTitle: {result.get('title', 'N/A')}\nDescription: {result.get('description', '')}\nURL: {result.get('url', 'N/A')}\n\n" |
|
|
|
|
|
|
|
|
final_prompt = f""" |
|
|
**Search Results:** |
|
|
{context} |
|
|
**User Query:** "{user_query}" |
|
|
Please provide a comprehensive answer based on the search results above, following all instructions. |
|
|
""" |
|
|
|
|
|
|
|
|
llm_payload = { |
|
|
"model": "gpt-4.1-mini", |
|
|
"messages": [ |
|
|
{"role": "system", "content": SYSTEM_PROMPT}, |
|
|
{"role": "user", "content": final_prompt} |
|
|
], |
|
|
"stream": True |
|
|
} |
|
|
|
|
|
|
|
|
return StreamingResponse(stream_llm_response(llm_payload), media_type="text/event-stream") |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
import uvicorn |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
uvicorn.run(app, host="0.0.0.0", port=8000) |