File size: 8,640 Bytes
7c5229e a388535 7c5229e a388535 7c5229e a388535 7c5229e a388535 7c5229e a388535 363d93e 7c5229e c2074e1 7c5229e c2074e1 7c5229e a388535 7c5229e a388535 7c5229e a388535 c2074e1 a388535 7c5229e a388535 7c5229e a388535 7c5229e a388535 7c5229e a388535 7c5229e a388535 7c5229e a388535 7c5229e a388535 7c5229e a388535 7c5229e a388535 7c5229e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 |
import httpx
import json
import time
import uuid
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field
from typing import List, Optional
# --- API Configuration ---
# It's recommended to use environment variables for sensitive data in production.
TYPEGPT_API_URL = "https://api.typegpt.net/v1/chat/completions"
TYPEGPT_API_KEY = "sk-oPdaZC7n1JlDq0sJ5NSSyHe7sYaeAXeEuj0wX4Lk8hlOGPF8" # Replace with your actual key
SEARCH_API_URL = "https://superapis-bing.hf.space/search"
# --- System Prompt ---
# This prompt guides the AI to behave like a factual research assistant.
SYSTEM_PROMPT = """
You are an expert AI research assistant. Your primary goal is to provide accurate, comprehensive, and helpful answers based ONLY on the provided search results.
Instructions:
1. Carefully analyze the user's query and the provided search results.
2. Synthesize an answer directly from the information found in the search results.
3. For every statement or piece of information you provide, you MUST cite the corresponding search result number in the format `[<number>]`.
4. If multiple sources support a statement, you can cite them like `[1, 2]`.
5. If the search results do not contain enough information to answer the query, you must explicitly state that you could not find the information in the provided context.
6. Do not use any prior knowledge or information outside of the provided search results.
7. Structure your response in a clear and easy-to-read format. Start with a direct answer, followed by a more detailed explanation.
"""
# --- Pydantic Models ---
# For incoming requests
class ChatMessage(BaseModel):
role: str
content: str
class ChatCompletionRequest(BaseModel):
messages: List[ChatMessage] = Field(..., example=[{"role": "user", "content": "What are the benefits of learning Python?"}])
model: str = "perplexity-like" # Model name can be customized
stream: bool = Field(default=False, description="Enable streaming response")
# For outgoing streaming responses (OpenAI compatible)
class ChatDelta(BaseModel):
content: Optional[str] = None
role: Optional[str] = None
class ChatCompletionStreamChoice(BaseModel):
delta: ChatDelta
index: int = 0
finish_reason: Optional[str] = None
class ChatCompletionStreamResponse(BaseModel):
id: str = Field(default_factory=lambda: f"chatcmpl-{uuid.uuid4().hex}")
object: str = "chat.completion.chunk"
created: int = Field(default_factory=lambda: int(time.time()))
model: str = "perplexity-like"
choices: List[ChatCompletionStreamChoice]
# --- FastAPI App Initialization ---
app = FastAPI(
title="Perplexity-like API",
description="An API that uses web search to answer questions with citations, supporting streaming.",
version="2.0.0"
)
# --- Streaming Logic ---
async def stream_llm_response(payload: dict):
"""
An async generator that streams the response from the language model.
"""
start_time = time.time()
try:
async with httpx.AsyncClient(timeout=60.0) as client:
headers = {
"Authorization": f"Bearer {TYPEGPT_API_KEY}",
"Content-Type": "application/json"
}
async with client.stream("POST", TYPEGPT_API_URL, headers=headers, json=payload) as response:
# Check for errors from the upstream API
if response.status_code != 200:
error_content = await response.aread()
raise HTTPException(
status_code=response.status_code,
detail=f"Error from language model API: {error_content.decode()}"
)
# Process the stream line by line
async for line in response.aiter_lines():
if line.startswith("data: "):
data_str = line.removeprefix("data: ")
if data_str.strip() == "[DONE]":
break
try:
chunk = json.loads(data_str)
delta_content = chunk["choices"][0]["delta"].get("content")
if delta_content:
# Create a streaming-compliant response chunk
stream_choice = ChatCompletionStreamChoice(delta=ChatDelta(content=delta_content))
stream_response = ChatCompletionStreamResponse(choices=[stream_choice])
yield f"data: {stream_response.model_dump_json()}\n\n"
except (json.JSONDecodeError, KeyError, IndexError):
# Skip malformed lines
continue
except httpx.RequestError as e:
# Handle network-related errors during the streaming request
error_message = f"HTTP Request Error during streaming: {e}"
stream_choice = ChatCompletionStreamChoice(delta=ChatDelta(content=f"\n\nERROR: {error_message}"))
stream_response = ChatCompletionStreamResponse(choices=[stream_choice])
yield f"data: {stream_response.model_dump_json()}\n\n"
except Exception as e:
# Handle other unexpected errors
error_message = f"An unexpected error occurred during streaming: {e}"
stream_choice = ChatCompletionStreamChoice(delta=ChatDelta(content=f"\n\nERROR: {error_message}"))
stream_response = ChatCompletionStreamResponse(choices=[stream_choice])
yield f"data: {stream_response.model_dump_json()}\n\n"
# Send the final chunk with finish_reason
finally:
finish_time = time.time()
print(f"Stream finished in {finish_time - start_time:.2f} seconds.")
final_choice = ChatCompletionStreamChoice(delta=ChatDelta(), finish_reason="stop")
final_response = ChatCompletionStreamResponse(choices=[final_choice])
yield f"data: {final_response.model_dump_json()}\n\n"
yield "data: [DONE]\n\n"
# --- API Endpoint ---
@app.post("/v1/chat/completions")
async def chat_completions(request: ChatCompletionRequest):
"""
Takes a user's query, performs a web search, and streams a factual,
cited response from a language model.
"""
if not request.messages or request.messages[-1].role != "user":
raise HTTPException(status_code=400, detail="Invalid request. The last message must be from the 'user'.")
user_query = request.messages[-1].content
# 1. Perform a web search
try:
async with httpx.AsyncClient(timeout=30.0) as client:
search_params = {"keywords": user_query, "max_results": 7}
search_response = await client.get(SEARCH_API_URL, params=search_params)
search_response.raise_for_status()
search_results = search_response.json()
except httpx.RequestError as e:
raise HTTPException(status_code=502, detail=f"Error calling the search API: {e}")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to process search results: {e}")
# 2. Format search results into a context for the language model
# Using the 'description' field as per the new OpenAPI spec
context = ""
for i, result in enumerate(search_results):
context += f"Source [{i+1}]:\nTitle: {result.get('title', 'N/A')}\nDescription: {result.get('description', '')}\nURL: {result.get('url', 'N/A')}\n\n"
# 3. Construct the prompt for the language model
final_prompt = f"""
**Search Results:**
{context}
**User Query:** "{user_query}"
Please provide a comprehensive answer based on the search results above, following all instructions.
"""
# 4. Prepare the payload for the TypeGPT language model
llm_payload = {
"model": "gpt-4.1-mini",
"messages": [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": final_prompt}
],
"stream": True # Enable streaming from the backing LLM
}
# 5. Return the streaming response
return StreamingResponse(stream_llm_response(llm_payload), media_type="text/event-stream")
# --- Main execution ---
if __name__ == "__main__":
import uvicorn
# To run this app:
# 1. Save the code as main.py
# 2. Install necessary packages: pip install fastapi "uvicorn[standard]" httpx
# 3. Run in your terminal: uvicorn main:app --reload
# 4. Access the interactive docs at http://127.0.0.1:8000/docs
uvicorn.run(app, host="0.0.0.0", port=8000) |