File size: 8,640 Bytes
7c5229e
a388535
 
 
7c5229e
a388535
7c5229e
a388535
7c5229e
 
a388535
7c5229e
a388535
363d93e
7c5229e
 
c2074e1
7c5229e
 
 
 
 
c2074e1
 
 
 
 
7c5229e
 
a388535
7c5229e
a388535
7c5229e
 
 
 
 
 
a388535
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c2074e1
a388535
7c5229e
 
a388535
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7c5229e
 
 
a388535
7c5229e
 
a388535
 
7c5229e
 
 
 
 
 
a388535
 
 
 
7c5229e
 
 
a388535
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7c5229e
a388535
 
 
 
 
 
 
 
 
7c5229e
a388535
 
7c5229e
a388535
7c5229e
 
a388535
 
 
 
 
7c5229e
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
import httpx
import json
import time
import uuid
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field
from typing import List, Optional

# --- API Configuration ---
# It's recommended to use environment variables for sensitive data in production.
TYPEGPT_API_URL = "https://api.typegpt.net/v1/chat/completions"
TYPEGPT_API_KEY = "sk-oPdaZC7n1JlDq0sJ5NSSyHe7sYaeAXeEuj0wX4Lk8hlOGPF8" # Replace with your actual key
SEARCH_API_URL = "https://superapis-bing.hf.space/search"

# --- System Prompt ---
# This prompt guides the AI to behave like a factual research assistant.
SYSTEM_PROMPT = """
You are an expert AI research assistant. Your primary goal is to provide accurate, comprehensive, and helpful answers based ONLY on the provided search results.
Instructions:
1.  Carefully analyze the user's query and the provided search results.
2.  Synthesize an answer directly from the information found in the search results.
3.  For every statement or piece of information you provide, you MUST cite the corresponding search result number in the format `[<number>]`.
4.  If multiple sources support a statement, you can cite them like `[1, 2]`.
5.  If the search results do not contain enough information to answer the query, you must explicitly state that you could not find the information in the provided context.
6.  Do not use any prior knowledge or information outside of the provided search results.
7.  Structure your response in a clear and easy-to-read format. Start with a direct answer, followed by a more detailed explanation.
"""

# --- Pydantic Models ---

# For incoming requests
class ChatMessage(BaseModel):
    role: str
    content: str

class ChatCompletionRequest(BaseModel):
    messages: List[ChatMessage] = Field(..., example=[{"role": "user", "content": "What are the benefits of learning Python?"}])
    model: str = "perplexity-like" # Model name can be customized
    stream: bool = Field(default=False, description="Enable streaming response")

# For outgoing streaming responses (OpenAI compatible)
class ChatDelta(BaseModel):
    content: Optional[str] = None
    role: Optional[str] = None

class ChatCompletionStreamChoice(BaseModel):
    delta: ChatDelta
    index: int = 0
    finish_reason: Optional[str] = None

class ChatCompletionStreamResponse(BaseModel):
    id: str = Field(default_factory=lambda: f"chatcmpl-{uuid.uuid4().hex}")
    object: str = "chat.completion.chunk"
    created: int = Field(default_factory=lambda: int(time.time()))
    model: str = "perplexity-like"
    choices: List[ChatCompletionStreamChoice]

# --- FastAPI App Initialization ---
app = FastAPI(
    title="Perplexity-like API",
    description="An API that uses web search to answer questions with citations, supporting streaming.",
    version="2.0.0"
)


# --- Streaming Logic ---
async def stream_llm_response(payload: dict):
    """
    An async generator that streams the response from the language model.
    """
    start_time = time.time()
    try:
        async with httpx.AsyncClient(timeout=60.0) as client:
            headers = {
                "Authorization": f"Bearer {TYPEGPT_API_KEY}",
                "Content-Type": "application/json"
            }
            async with client.stream("POST", TYPEGPT_API_URL, headers=headers, json=payload) as response:
                # Check for errors from the upstream API
                if response.status_code != 200:
                    error_content = await response.aread()
                    raise HTTPException(
                        status_code=response.status_code,
                        detail=f"Error from language model API: {error_content.decode()}"
                    )

                # Process the stream line by line
                async for line in response.aiter_lines():
                    if line.startswith("data: "):
                        data_str = line.removeprefix("data: ")
                        if data_str.strip() == "[DONE]":
                            break
                        try:
                            chunk = json.loads(data_str)
                            delta_content = chunk["choices"][0]["delta"].get("content")
                            if delta_content:
                                # Create a streaming-compliant response chunk
                                stream_choice = ChatCompletionStreamChoice(delta=ChatDelta(content=delta_content))
                                stream_response = ChatCompletionStreamResponse(choices=[stream_choice])
                                yield f"data: {stream_response.model_dump_json()}\n\n"
                        except (json.JSONDecodeError, KeyError, IndexError):
                            # Skip malformed lines
                            continue
    
    except httpx.RequestError as e:
        # Handle network-related errors during the streaming request
        error_message = f"HTTP Request Error during streaming: {e}"
        stream_choice = ChatCompletionStreamChoice(delta=ChatDelta(content=f"\n\nERROR: {error_message}"))
        stream_response = ChatCompletionStreamResponse(choices=[stream_choice])
        yield f"data: {stream_response.model_dump_json()}\n\n"
    
    except Exception as e:
        # Handle other unexpected errors
        error_message = f"An unexpected error occurred during streaming: {e}"
        stream_choice = ChatCompletionStreamChoice(delta=ChatDelta(content=f"\n\nERROR: {error_message}"))
        stream_response = ChatCompletionStreamResponse(choices=[stream_choice])
        yield f"data: {stream_response.model_dump_json()}\n\n"

    # Send the final chunk with finish_reason
    finally:
        finish_time = time.time()
        print(f"Stream finished in {finish_time - start_time:.2f} seconds.")
        final_choice = ChatCompletionStreamChoice(delta=ChatDelta(), finish_reason="stop")
        final_response = ChatCompletionStreamResponse(choices=[final_choice])
        yield f"data: {final_response.model_dump_json()}\n\n"
        yield "data: [DONE]\n\n"


# --- API Endpoint ---
@app.post("/v1/chat/completions")
async def chat_completions(request: ChatCompletionRequest):
    """
    Takes a user's query, performs a web search, and streams a factual,
    cited response from a language model.
    """
    if not request.messages or request.messages[-1].role != "user":
        raise HTTPException(status_code=400, detail="Invalid request. The last message must be from the 'user'.")

    user_query = request.messages[-1].content

    # 1. Perform a web search
    try:
        async with httpx.AsyncClient(timeout=30.0) as client:
            search_params = {"keywords": user_query, "max_results": 7}
            search_response = await client.get(SEARCH_API_URL, params=search_params)
            search_response.raise_for_status()
            search_results = search_response.json()
    except httpx.RequestError as e:
        raise HTTPException(status_code=502, detail=f"Error calling the search API: {e}")
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Failed to process search results: {e}")

    # 2. Format search results into a context for the language model
    # Using the 'description' field as per the new OpenAPI spec
    context = ""
    for i, result in enumerate(search_results):
        context += f"Source [{i+1}]:\nTitle: {result.get('title', 'N/A')}\nDescription: {result.get('description', '')}\nURL: {result.get('url', 'N/A')}\n\n"

    # 3. Construct the prompt for the language model
    final_prompt = f"""
    **Search Results:**
    {context}
    **User Query:** "{user_query}"
    Please provide a comprehensive answer based on the search results above, following all instructions.
    """

    # 4. Prepare the payload for the TypeGPT language model
    llm_payload = {
        "model": "gpt-4.1-mini",
        "messages": [
            {"role": "system", "content": SYSTEM_PROMPT},
            {"role": "user", "content": final_prompt}
        ],
        "stream": True  # Enable streaming from the backing LLM
    }

    # 5. Return the streaming response
    return StreamingResponse(stream_llm_response(llm_payload), media_type="text/event-stream")

# --- Main execution ---
if __name__ == "__main__":
    import uvicorn
    # To run this app:
    # 1. Save the code as main.py
    # 2. Install necessary packages: pip install fastapi "uvicorn[standard]" httpx
    # 3. Run in your terminal: uvicorn main:app --reload
    # 4. Access the interactive docs at http://127.0.0.1:8000/docs
    uvicorn.run(app, host="0.0.0.0", port=8000)