NAIX / main.py
rkihacker's picture
Update main.py
c2074e1 verified
import httpx
import json
import time
import uuid
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field
from typing import List, Optional
# --- API Configuration ---
# It's recommended to use environment variables for sensitive data in production.
TYPEGPT_API_URL = "https://api.typegpt.net/v1/chat/completions"
TYPEGPT_API_KEY = "sk-oPdaZC7n1JlDq0sJ5NSSyHe7sYaeAXeEuj0wX4Lk8hlOGPF8" # Replace with your actual key
SEARCH_API_URL = "https://superapis-bing.hf.space/search"
# --- System Prompt ---
# This prompt guides the AI to behave like a factual research assistant.
SYSTEM_PROMPT = """
You are an expert AI research assistant. Your primary goal is to provide accurate, comprehensive, and helpful answers based ONLY on the provided search results.
Instructions:
1. Carefully analyze the user's query and the provided search results.
2. Synthesize an answer directly from the information found in the search results.
3. For every statement or piece of information you provide, you MUST cite the corresponding search result number in the format `[<number>]`.
4. If multiple sources support a statement, you can cite them like `[1, 2]`.
5. If the search results do not contain enough information to answer the query, you must explicitly state that you could not find the information in the provided context.
6. Do not use any prior knowledge or information outside of the provided search results.
7. Structure your response in a clear and easy-to-read format. Start with a direct answer, followed by a more detailed explanation.
"""
# --- Pydantic Models ---
# For incoming requests
class ChatMessage(BaseModel):
role: str
content: str
class ChatCompletionRequest(BaseModel):
messages: List[ChatMessage] = Field(..., example=[{"role": "user", "content": "What are the benefits of learning Python?"}])
model: str = "perplexity-like" # Model name can be customized
stream: bool = Field(default=False, description="Enable streaming response")
# For outgoing streaming responses (OpenAI compatible)
class ChatDelta(BaseModel):
content: Optional[str] = None
role: Optional[str] = None
class ChatCompletionStreamChoice(BaseModel):
delta: ChatDelta
index: int = 0
finish_reason: Optional[str] = None
class ChatCompletionStreamResponse(BaseModel):
id: str = Field(default_factory=lambda: f"chatcmpl-{uuid.uuid4().hex}")
object: str = "chat.completion.chunk"
created: int = Field(default_factory=lambda: int(time.time()))
model: str = "perplexity-like"
choices: List[ChatCompletionStreamChoice]
# --- FastAPI App Initialization ---
app = FastAPI(
title="Perplexity-like API",
description="An API that uses web search to answer questions with citations, supporting streaming.",
version="2.0.0"
)
# --- Streaming Logic ---
async def stream_llm_response(payload: dict):
"""
An async generator that streams the response from the language model.
"""
start_time = time.time()
try:
async with httpx.AsyncClient(timeout=60.0) as client:
headers = {
"Authorization": f"Bearer {TYPEGPT_API_KEY}",
"Content-Type": "application/json"
}
async with client.stream("POST", TYPEGPT_API_URL, headers=headers, json=payload) as response:
# Check for errors from the upstream API
if response.status_code != 200:
error_content = await response.aread()
raise HTTPException(
status_code=response.status_code,
detail=f"Error from language model API: {error_content.decode()}"
)
# Process the stream line by line
async for line in response.aiter_lines():
if line.startswith("data: "):
data_str = line.removeprefix("data: ")
if data_str.strip() == "[DONE]":
break
try:
chunk = json.loads(data_str)
delta_content = chunk["choices"][0]["delta"].get("content")
if delta_content:
# Create a streaming-compliant response chunk
stream_choice = ChatCompletionStreamChoice(delta=ChatDelta(content=delta_content))
stream_response = ChatCompletionStreamResponse(choices=[stream_choice])
yield f"data: {stream_response.model_dump_json()}\n\n"
except (json.JSONDecodeError, KeyError, IndexError):
# Skip malformed lines
continue
except httpx.RequestError as e:
# Handle network-related errors during the streaming request
error_message = f"HTTP Request Error during streaming: {e}"
stream_choice = ChatCompletionStreamChoice(delta=ChatDelta(content=f"\n\nERROR: {error_message}"))
stream_response = ChatCompletionStreamResponse(choices=[stream_choice])
yield f"data: {stream_response.model_dump_json()}\n\n"
except Exception as e:
# Handle other unexpected errors
error_message = f"An unexpected error occurred during streaming: {e}"
stream_choice = ChatCompletionStreamChoice(delta=ChatDelta(content=f"\n\nERROR: {error_message}"))
stream_response = ChatCompletionStreamResponse(choices=[stream_choice])
yield f"data: {stream_response.model_dump_json()}\n\n"
# Send the final chunk with finish_reason
finally:
finish_time = time.time()
print(f"Stream finished in {finish_time - start_time:.2f} seconds.")
final_choice = ChatCompletionStreamChoice(delta=ChatDelta(), finish_reason="stop")
final_response = ChatCompletionStreamResponse(choices=[final_choice])
yield f"data: {final_response.model_dump_json()}\n\n"
yield "data: [DONE]\n\n"
# --- API Endpoint ---
@app.post("/v1/chat/completions")
async def chat_completions(request: ChatCompletionRequest):
"""
Takes a user's query, performs a web search, and streams a factual,
cited response from a language model.
"""
if not request.messages or request.messages[-1].role != "user":
raise HTTPException(status_code=400, detail="Invalid request. The last message must be from the 'user'.")
user_query = request.messages[-1].content
# 1. Perform a web search
try:
async with httpx.AsyncClient(timeout=30.0) as client:
search_params = {"keywords": user_query, "max_results": 7}
search_response = await client.get(SEARCH_API_URL, params=search_params)
search_response.raise_for_status()
search_results = search_response.json()
except httpx.RequestError as e:
raise HTTPException(status_code=502, detail=f"Error calling the search API: {e}")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to process search results: {e}")
# 2. Format search results into a context for the language model
# Using the 'description' field as per the new OpenAPI spec
context = ""
for i, result in enumerate(search_results):
context += f"Source [{i+1}]:\nTitle: {result.get('title', 'N/A')}\nDescription: {result.get('description', '')}\nURL: {result.get('url', 'N/A')}\n\n"
# 3. Construct the prompt for the language model
final_prompt = f"""
**Search Results:**
{context}
**User Query:** "{user_query}"
Please provide a comprehensive answer based on the search results above, following all instructions.
"""
# 4. Prepare the payload for the TypeGPT language model
llm_payload = {
"model": "gpt-4.1-mini",
"messages": [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": final_prompt}
],
"stream": True # Enable streaming from the backing LLM
}
# 5. Return the streaming response
return StreamingResponse(stream_llm_response(llm_payload), media_type="text/event-stream")
# --- Main execution ---
if __name__ == "__main__":
import uvicorn
# To run this app:
# 1. Save the code as main.py
# 2. Install necessary packages: pip install fastapi "uvicorn[standard]" httpx
# 3. Run in your terminal: uvicorn main:app --reload
# 4. Access the interactive docs at http://127.0.0.1:8000/docs
uvicorn.run(app, host="0.0.0.0", port=8000)