|
|
import os |
|
|
import asyncio |
|
|
import json |
|
|
import logging |
|
|
from typing import AsyncGenerator |
|
|
|
|
|
from fastapi import FastAPI, HTTPException, Query |
|
|
from fastapi.responses import StreamingResponse |
|
|
from pydantic import BaseModel |
|
|
from dotenv import load_dotenv |
|
|
import aiohttp |
|
|
from bs4 import BeautifulSoup |
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
load_dotenv() |
|
|
LLM_API_KEY = os.getenv("LLM_API_KEY") |
|
|
|
|
|
if not LLM_API_KEY: |
|
|
raise RuntimeError("LLM_API_KEY must be set in a .env file.") |
|
|
|
|
|
|
|
|
SNAPZION_API_URL = "https://search.snapzion.com/get-snippets" |
|
|
LLM_API_URL = "https://api.inference.net/v1/chat/completions" |
|
|
LLM_MODEL = "mistralai/mistral-nemo-12b-instruct/fp-8" |
|
|
|
|
|
|
|
|
SNAPZION_HEADERS = { 'Content-Type': 'application/json', 'User-Agent': 'AI-Deep-Research-Agent/1.0' } |
|
|
SCRAPING_HEADERS = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36' } |
|
|
LLM_HEADERS = { "Authorization": f"Bearer {LLM_API_KEY}", "Content-Type": "application/json" } |
|
|
|
|
|
|
|
|
class DeepResearchRequest(BaseModel): |
|
|
query: str |
|
|
|
|
|
|
|
|
app = FastAPI( |
|
|
title="AI Deep Research API", |
|
|
description="Provides single-shot AI search and streaming deep research completions.", |
|
|
version="2.0.0" |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
async def call_snapzion_search(session: aiohttp.ClientSession, query: str) -> list: |
|
|
try: |
|
|
async with session.post(SNAPZION_API_URL, headers=SNAPZION_HEADERS, json={"query": query}, timeout=15) as response: |
|
|
response.raise_for_status() |
|
|
data = await response.json() |
|
|
return data.get("organic_results", []) |
|
|
except Exception as e: |
|
|
logger.error(f"Snapzion search failed for query '{query}': {e}") |
|
|
return [] |
|
|
|
|
|
async def scrape_url(session: aiohttp.ClientSession, url: str) -> str: |
|
|
if url.lower().endswith('.pdf'): return "Error: PDF content cannot be scraped." |
|
|
try: |
|
|
async with session.get(url, headers=SCRAPING_HEADERS, timeout=10, ssl=False) as response: |
|
|
if response.status != 200: return f"Error: HTTP status {response.status}" |
|
|
html = await response.text() |
|
|
soup = BeautifulSoup(html, "html.parser") |
|
|
for tag in soup(['script', 'style', 'nav', 'footer', 'header', 'aside']): |
|
|
tag.decompose() |
|
|
return " ".join(soup.stripped_strings) |
|
|
except Exception as e: |
|
|
logger.warning(f"Scraping failed for {url}: {e}") |
|
|
return f"Error: {e}" |
|
|
|
|
|
async def search_and_scrape(session: aiohttp.ClientSession, query: str) -> tuple[str, list]: |
|
|
"""Performs the search and scrape pipeline for a given query.""" |
|
|
search_results = await call_snapzion_search(session, query) |
|
|
sources = search_results[:4] |
|
|
if not sources: return "", [] |
|
|
|
|
|
scrape_tasks = [scrape_url(session, source["link"]) for source in sources] |
|
|
scraped_contents = await asyncio.gather(*scrape_tasks) |
|
|
|
|
|
context = "\n\n".join( |
|
|
f"Source [{i+1}] (from {sources[i]['link']}):\n{content}" |
|
|
for i, content in enumerate(scraped_contents) if not content.startswith("Error:") |
|
|
) |
|
|
return context, sources |
|
|
|
|
|
|
|
|
|
|
|
async def run_deep_research_stream(query: str) -> AsyncGenerator[str, None]: |
|
|
"""The main async generator for the deep research process.""" |
|
|
|
|
|
def format_sse(data: dict) -> str: |
|
|
"""Formats a dictionary as a Server-Sent Event string.""" |
|
|
return f"data: {json.dumps(data)}\n\n" |
|
|
|
|
|
try: |
|
|
async with aiohttp.ClientSession() as session: |
|
|
|
|
|
yield format_sse({"event": "status", "data": "Generating research plan..."}) |
|
|
sub_question_prompt = { |
|
|
"model": LLM_MODEL, |
|
|
"messages": [{ |
|
|
"role": "user", |
|
|
"content": f"You are a research planner. Based on the user's query '{query}', generate a list of 3 to 4 crucial sub-questions that would form the basis of a comprehensive research report. Respond with ONLY a JSON array of strings. Example: [\"Question 1?\", \"Question 2?\"]" |
|
|
}] |
|
|
} |
|
|
async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=sub_question_prompt) as response: |
|
|
response.raise_for_status() |
|
|
result = await response.json() |
|
|
try: |
|
|
sub_questions = json.loads(result['choices'][0]['message']['content']) |
|
|
except (json.JSONDecodeError, IndexError): |
|
|
yield format_sse({"event": "error", "data": "Failed to parse sub-questions from LLM."}) |
|
|
return |
|
|
|
|
|
yield format_sse({"event": "plan", "data": sub_questions}) |
|
|
|
|
|
|
|
|
research_tasks = [search_and_scrape(session, sq) for sq in sub_questions] |
|
|
all_research_results = [] |
|
|
|
|
|
for i, task in enumerate(asyncio.as_completed(research_tasks)): |
|
|
yield format_sse({"event": "status", "data": f"Researching: \"{sub_questions[i]}\""}) |
|
|
result = await task |
|
|
all_research_results.append(result) |
|
|
|
|
|
|
|
|
yield format_sse({"event": "status", "data": "Consolidating research..."}) |
|
|
full_context = "\n\n---\n\n".join(res[0] for res in all_research_results if res[0]) |
|
|
all_sources = [source for res in all_research_results for source in res[1]] |
|
|
unique_sources = list({s['link']: s for s in all_sources}.values()) |
|
|
|
|
|
if not full_context.strip(): |
|
|
yield format_sse({"event": "error", "data": "Failed to gather any research context."}) |
|
|
return |
|
|
|
|
|
|
|
|
yield format_sse({"event": "status", "data": "Generating final report..."}) |
|
|
|
|
|
final_report_prompt = f""" |
|
|
You are a research analyst. Your task is to synthesize the provided context into a comprehensive, well-structured report on the topic: "{query}". |
|
|
Use the context below exclusively. Do not use outside knowledge. Structure the report with markdown headings. |
|
|
|
|
|
## Research Context ## |
|
|
{full_context} |
|
|
""" |
|
|
|
|
|
final_report_payload = { |
|
|
"model": LLM_MODEL, |
|
|
"messages": [{"role": "user", "content": final_report_prompt}], |
|
|
"stream": True |
|
|
} |
|
|
|
|
|
async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=final_report_payload) as response: |
|
|
response.raise_for_status() |
|
|
async for line in response.content: |
|
|
if line.strip(): |
|
|
|
|
|
line_str = line.decode('utf-8').strip() |
|
|
if line_str.startswith('data:'): |
|
|
line_str = line_str[5:].strip() |
|
|
if line_str == "[DONE]": |
|
|
break |
|
|
try: |
|
|
chunk = json.loads(line_str) |
|
|
content = chunk.get("choices", [{}])[0].get("delta", {}).get("content") |
|
|
if content: |
|
|
yield format_sse({"event": "chunk", "data": content}) |
|
|
except json.JSONDecodeError: |
|
|
continue |
|
|
|
|
|
yield format_sse({"event": "sources", "data": unique_sources}) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"An error occurred during deep research: {e}") |
|
|
yield format_sse({"event": "error", "data": str(e)}) |
|
|
finally: |
|
|
yield format_sse({"event": "done", "data": "Deep research complete."}) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.get("/", include_in_schema=False) |
|
|
def root(): |
|
|
return {"message": "AI Deep Research API is active. See /docs for details."} |
|
|
|
|
|
@app.post("/v1/deepresearch/completions") |
|
|
async def deep_research_endpoint(request: DeepResearchRequest): |
|
|
""" |
|
|
Performs a multi-step, streaming deep research task. |
|
|
|
|
|
**Events Streamed:** |
|
|
- `status`: Provides updates on the current stage of the process. |
|
|
- `plan`: The list of sub-questions that will be researched. |
|
|
- `chunk`: A piece of the final generated report. |
|
|
- `sources`: The list of web sources used for the report. |
|
|
- `error`: Indicates a fatal error occurred. |
|
|
- `done`: Signals the end of the stream. |
|
|
""" |
|
|
return StreamingResponse( |
|
|
run_deep_research_stream(request.query), |
|
|
media_type="text/event-stream" |
|
|
) |