import httpx import json import time import uuid from fastapi import FastAPI, HTTPException from fastapi.responses import StreamingResponse from pydantic import BaseModel, Field from typing import List, Optional # --- API Configuration --- # It's recommended to use environment variables for sensitive data in production. TYPEGPT_API_URL = "https://api.typegpt.net/v1/chat/completions" TYPEGPT_API_KEY = "sk-oPdaZC7n1JlDq0sJ5NSSyHe7sYaeAXeEuj0wX4Lk8hlOGPF8" # Replace with your actual key SEARCH_API_URL = "https://superapis-bing.hf.space/search" # --- System Prompt --- # This prompt guides the AI to behave like a factual research assistant. SYSTEM_PROMPT = """ You are an expert AI research assistant. Your primary goal is to provide accurate, comprehensive, and helpful answers based ONLY on the provided search results. Instructions: 1. Carefully analyze the user's query and the provided search results. 2. Synthesize an answer directly from the information found in the search results. 3. For every statement or piece of information you provide, you MUST cite the corresponding search result number in the format `[]`. 4. If multiple sources support a statement, you can cite them like `[1, 2]`. 5. If the search results do not contain enough information to answer the query, you must explicitly state that you could not find the information in the provided context. 6. Do not use any prior knowledge or information outside of the provided search results. 7. Structure your response in a clear and easy-to-read format. Start with a direct answer, followed by a more detailed explanation. """ # --- Pydantic Models --- # For incoming requests class ChatMessage(BaseModel): role: str content: str class ChatCompletionRequest(BaseModel): messages: List[ChatMessage] = Field(..., example=[{"role": "user", "content": "What are the benefits of learning Python?"}]) model: str = "perplexity-like" # Model name can be customized stream: bool = Field(default=False, description="Enable streaming response") # For outgoing streaming responses (OpenAI compatible) class ChatDelta(BaseModel): content: Optional[str] = None role: Optional[str] = None class ChatCompletionStreamChoice(BaseModel): delta: ChatDelta index: int = 0 finish_reason: Optional[str] = None class ChatCompletionStreamResponse(BaseModel): id: str = Field(default_factory=lambda: f"chatcmpl-{uuid.uuid4().hex}") object: str = "chat.completion.chunk" created: int = Field(default_factory=lambda: int(time.time())) model: str = "perplexity-like" choices: List[ChatCompletionStreamChoice] # --- FastAPI App Initialization --- app = FastAPI( title="Perplexity-like API", description="An API that uses web search to answer questions with citations, supporting streaming.", version="2.0.0" ) # --- Streaming Logic --- async def stream_llm_response(payload: dict): """ An async generator that streams the response from the language model. """ start_time = time.time() try: async with httpx.AsyncClient(timeout=60.0) as client: headers = { "Authorization": f"Bearer {TYPEGPT_API_KEY}", "Content-Type": "application/json" } async with client.stream("POST", TYPEGPT_API_URL, headers=headers, json=payload) as response: # Check for errors from the upstream API if response.status_code != 200: error_content = await response.aread() raise HTTPException( status_code=response.status_code, detail=f"Error from language model API: {error_content.decode()}" ) # Process the stream line by line async for line in response.aiter_lines(): if line.startswith("data: "): data_str = line.removeprefix("data: ") if data_str.strip() == "[DONE]": break try: chunk = json.loads(data_str) delta_content = chunk["choices"][0]["delta"].get("content") if delta_content: # Create a streaming-compliant response chunk stream_choice = ChatCompletionStreamChoice(delta=ChatDelta(content=delta_content)) stream_response = ChatCompletionStreamResponse(choices=[stream_choice]) yield f"data: {stream_response.model_dump_json()}\n\n" except (json.JSONDecodeError, KeyError, IndexError): # Skip malformed lines continue except httpx.RequestError as e: # Handle network-related errors during the streaming request error_message = f"HTTP Request Error during streaming: {e}" stream_choice = ChatCompletionStreamChoice(delta=ChatDelta(content=f"\n\nERROR: {error_message}")) stream_response = ChatCompletionStreamResponse(choices=[stream_choice]) yield f"data: {stream_response.model_dump_json()}\n\n" except Exception as e: # Handle other unexpected errors error_message = f"An unexpected error occurred during streaming: {e}" stream_choice = ChatCompletionStreamChoice(delta=ChatDelta(content=f"\n\nERROR: {error_message}")) stream_response = ChatCompletionStreamResponse(choices=[stream_choice]) yield f"data: {stream_response.model_dump_json()}\n\n" # Send the final chunk with finish_reason finally: finish_time = time.time() print(f"Stream finished in {finish_time - start_time:.2f} seconds.") final_choice = ChatCompletionStreamChoice(delta=ChatDelta(), finish_reason="stop") final_response = ChatCompletionStreamResponse(choices=[final_choice]) yield f"data: {final_response.model_dump_json()}\n\n" yield "data: [DONE]\n\n" # --- API Endpoint --- @app.post("/v1/chat/completions") async def chat_completions(request: ChatCompletionRequest): """ Takes a user's query, performs a web search, and streams a factual, cited response from a language model. """ if not request.messages or request.messages[-1].role != "user": raise HTTPException(status_code=400, detail="Invalid request. The last message must be from the 'user'.") user_query = request.messages[-1].content # 1. Perform a web search try: async with httpx.AsyncClient(timeout=30.0) as client: search_params = {"keywords": user_query, "max_results": 7} search_response = await client.get(SEARCH_API_URL, params=search_params) search_response.raise_for_status() search_results = search_response.json() except httpx.RequestError as e: raise HTTPException(status_code=502, detail=f"Error calling the search API: {e}") except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to process search results: {e}") # 2. Format search results into a context for the language model # Using the 'description' field as per the new OpenAPI spec context = "" for i, result in enumerate(search_results): context += f"Source [{i+1}]:\nTitle: {result.get('title', 'N/A')}\nDescription: {result.get('description', '')}\nURL: {result.get('url', 'N/A')}\n\n" # 3. Construct the prompt for the language model final_prompt = f""" **Search Results:** {context} **User Query:** "{user_query}" Please provide a comprehensive answer based on the search results above, following all instructions. """ # 4. Prepare the payload for the TypeGPT language model llm_payload = { "model": "gpt-4.1-mini", "messages": [ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": final_prompt} ], "stream": True # Enable streaming from the backing LLM } # 5. Return the streaming response return StreamingResponse(stream_llm_response(llm_payload), media_type="text/event-stream") # --- Main execution --- if __name__ == "__main__": import uvicorn # To run this app: # 1. Save the code as main.py # 2. Install necessary packages: pip install fastapi "uvicorn[standard]" httpx # 3. Run in your terminal: uvicorn main:app --reload # 4. Access the interactive docs at http://127.0.0.1:8000/docs uvicorn.run(app, host="0.0.0.0", port=8000)