|
|
import os |
|
|
import asyncio |
|
|
import json |
|
|
import logging |
|
|
import random |
|
|
import re |
|
|
from typing import AsyncGenerator, Optional, Tuple, List |
|
|
|
|
|
from fastapi import FastAPI |
|
|
from fastapi.responses import StreamingResponse |
|
|
from pydantic import BaseModel |
|
|
from dotenv import load_dotenv |
|
|
import aiohttp |
|
|
from bs4 import BeautifulSoup |
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
load_dotenv() |
|
|
LLM_API_KEY = os.getenv("LLM_API_KEY") |
|
|
|
|
|
if not LLM_API_KEY: |
|
|
raise RuntimeError("LLM_API_KEY must be set in a .env file.") |
|
|
else: |
|
|
logger.info("LLM API Key loaded successfully.") |
|
|
|
|
|
|
|
|
|
|
|
SNAPZION_API_URL = "https://search.snapzion.com/get-snippets" |
|
|
LLM_API_URL = "https://api.typegpt.net/v1/chat/completions" |
|
|
LLM_MODEL = "gpt-4.1-mini" |
|
|
|
|
|
|
|
|
TARGET_TOKEN_LIMIT = 28000 |
|
|
ESTIMATED_CHARS_PER_TOKEN = 4 |
|
|
MAX_CONTEXT_CHAR_LENGTH = TARGET_TOKEN_LIMIT * ESTIMATED_CHARS_PER_TOKEN |
|
|
|
|
|
|
|
|
SNAPZION_HEADERS = { |
|
|
'accept': '*/*', |
|
|
'accept-language': 'en-US,en;q=0.9', |
|
|
'content-type': 'application/json', |
|
|
'origin': 'https://search.snapzion.com', |
|
|
'priority': 'u=1, i', |
|
|
'referer': 'https://search.snapzion.com/docs', |
|
|
'sec-ch-ua': '"Chromium";v="140", "Not=A?Brand";v="24", "Google Chrome";v="140"', |
|
|
'sec-ch-ua-mobile': '?0', |
|
|
'sec-ch-ua-platform': '"Windows"', |
|
|
'sec-fetch-dest': 'empty', |
|
|
'sec-fetch-mode': 'cors', |
|
|
'sec-fetch-site': 'same-origin', |
|
|
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36', |
|
|
} |
|
|
|
|
|
|
|
|
USER_AGENTS = [ |
|
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36", |
|
|
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36", |
|
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:129.0) Gecko/20100101 Firefox/129.0" |
|
|
] |
|
|
|
|
|
LLM_HEADERS = {"Authorization": f"Bearer {LLM_API_KEY}", "Content-Type": "application/json", "Accept": "application/json"} |
|
|
|
|
|
class DeepResearchRequest(BaseModel): |
|
|
query: str |
|
|
|
|
|
def extract_json_from_llm_response(text: str) -> Optional[list]: |
|
|
match = re.search(r'\[.*\]', text, re.DOTALL) |
|
|
if match: |
|
|
try: return json.loads(match.group(0)) |
|
|
except json.JSONDecodeError: return None |
|
|
return None |
|
|
|
|
|
app = FastAPI( |
|
|
title="AI Deep Research API", |
|
|
description="Provides robust, streaming deep research completions.", |
|
|
version="4.0.0" |
|
|
) |
|
|
|
|
|
|
|
|
async def call_snapzion_search(session: aiohttp.ClientSession, query: str) -> List[dict]: |
|
|
logger.info(f"Searching Snapzion for: '{query}'") |
|
|
try: |
|
|
async with session.post(SNAPZION_API_URL, headers=SNAPZION_HEADERS, json={"query": query}, timeout=20) as response: |
|
|
response.raise_for_status() |
|
|
data = await response.json() |
|
|
results = data.get("organic_results", []) |
|
|
logger.info(f"Found {len(results)} sources for: '{query}'") |
|
|
return results |
|
|
except Exception as e: |
|
|
logger.error(f"Snapzion search failed for query '{query}': {e}"); return [] |
|
|
|
|
|
async def scrape_url(session: aiohttp.ClientSession, url: str) -> str: |
|
|
if url.lower().endswith('.pdf'): return "Error: PDF" |
|
|
try: |
|
|
headers = {'User-Agent': random.choice(USER_AGENTS)} |
|
|
async with session.get(url, headers=headers, timeout=10, ssl=False) as response: |
|
|
if response.status != 200: return f"Error: HTTP {response.status}" |
|
|
return await response.text() |
|
|
except Exception as e: |
|
|
return f"Error: {e}" |
|
|
|
|
|
def parse_html(html: str) -> str: |
|
|
soup = BeautifulSoup(html, "html.parser") |
|
|
for tag in soup(['script', 'style', 'nav', 'footer', 'header', 'aside']): tag.decompose() |
|
|
return " ".join(soup.stripped_strings) |
|
|
|
|
|
async def research_and_process_source(session: aiohttp.ClientSession, source: dict) -> Tuple[str, dict]: |
|
|
html_or_error = await scrape_url(session, source['link']) |
|
|
if html_or_error.startswith("Error:"): |
|
|
logger.warning(f"Scraping failed for {source['link']} ({html_or_error}). Falling back to snippet.") |
|
|
return source.get('snippet', ''), source |
|
|
|
|
|
content = parse_html(html_or_error) |
|
|
return content, source |
|
|
|
|
|
|
|
|
async def run_deep_research_stream(query: str) -> AsyncGenerator[str, None]: |
|
|
def format_sse(data: dict) -> str: return f"data: {json.dumps(data)}\n\n" |
|
|
try: |
|
|
async with aiohttp.ClientSession() as session: |
|
|
|
|
|
yield format_sse({"event": "status", "data": "Generating research plan..."}) |
|
|
plan_prompt = {"model": LLM_MODEL, "messages": [{"role": "user", "content": f"Generate 3-4 key sub-questions for a research report on '{query}'. Your response MUST be ONLY the raw JSON array. Example: [\"Question 1?\"]"}]} |
|
|
try: |
|
|
async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=plan_prompt, timeout=25) as response: |
|
|
response.raise_for_status(); result = await response.json() |
|
|
sub_questions = result if isinstance(result, list) else extract_json_from_llm_response(result['choices'][0]['message']['content']) |
|
|
if not isinstance(sub_questions, list): raise ValueError(f"Invalid plan from LLM: {result}") |
|
|
except Exception as e: |
|
|
yield format_sse({"event": "error", "data": f"Could not generate research plan. Reason: {e}"}); return |
|
|
|
|
|
yield format_sse({"event": "plan", "data": sub_questions}) |
|
|
|
|
|
|
|
|
yield format_sse({"event": "status", "data": f"Searching sources for {len(sub_questions)} topics..."}) |
|
|
search_tasks = [call_snapzion_search(session, sq) for sq in sub_questions] |
|
|
all_search_results = await asyncio.gather(*search_tasks) |
|
|
|
|
|
unique_sources = list({source['link']: source for results in all_search_results for source in results if 'link' in source and 'snippet' in source}.values()) |
|
|
|
|
|
if not unique_sources: |
|
|
yield format_sse({"event": "error", "data": "All search queries returned zero usable sources. The search provider might be blocking requests or the topic is too obscure."}); return |
|
|
|
|
|
yield format_sse({"event": "status", "data": f"Found {len(unique_sources)} unique sources. Processing..."}) |
|
|
|
|
|
processing_tasks = [research_and_process_source(session, source) for source in unique_sources] |
|
|
|
|
|
consolidated_context, all_sources_used = "", [] |
|
|
successful_scrapes = 0 |
|
|
|
|
|
for task in asyncio.as_completed(processing_tasks): |
|
|
content, source_info = await task |
|
|
if content: |
|
|
consolidated_context += f"Source: {source_info['link']}\nContent: {content}\n\n---\n\n" |
|
|
all_sources_used.append(source_info) |
|
|
if not content == source_info.get('snippet'): successful_scrapes += 1 |
|
|
|
|
|
logger.info(f"Context complete. Scraped {successful_scrapes}/{len(unique_sources)} pages. Used {len(all_sources_used)} total sources (with snippet fallbacks).") |
|
|
|
|
|
if not consolidated_context.strip(): |
|
|
yield format_sse({"event": "error", "data": "Failed to gather any research context from scraping or snippets."}); return |
|
|
|
|
|
|
|
|
yield format_sse({"event": "status", "data": "Synthesizing final report..."}) |
|
|
if len(consolidated_context) > MAX_CONTEXT_CHAR_LENGTH: |
|
|
consolidated_context = consolidated_context[:MAX_CONTEXT_CHAR_LENGTH] |
|
|
|
|
|
report_prompt = f'Synthesize the provided context into a comprehensive, well-structured report on "{query}". Use markdown. Context:\n{consolidated_context}' |
|
|
report_payload = {"model": LLM_MODEL, "messages": [{"role": "user", "content": report_prompt}], "stream": True} |
|
|
|
|
|
async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=report_payload) as response: |
|
|
response.raise_for_status() |
|
|
async for line in response.content: |
|
|
line_str = line.decode('utf-8').strip() |
|
|
if line_str.startswith('data:'): line_str = line_str[5:].strip() |
|
|
if line_str == "[DONE]": break |
|
|
try: |
|
|
chunk = json.loads(line_str) |
|
|
content = chunk.get("choices", [{}])[0].get("delta", {}).get("content") |
|
|
if content: yield format_sse({"event": "chunk", "data": content}) |
|
|
except json.JSONDecodeError: continue |
|
|
|
|
|
yield format_sse({"event": "sources", "data": all_sources_used}) |
|
|
except Exception as e: |
|
|
logger.error(f"A critical error occurred: {e}", exc_info=True) |
|
|
yield format_sse({"event": "error", "data": str(e)}) |
|
|
finally: |
|
|
yield format_sse({"event": "done", "data": "Deep research complete."}) |
|
|
|
|
|
|
|
|
@app.post("/v1/deepresearch/completions") |
|
|
async def deep_research_endpoint(request: DeepResearchRequest): |
|
|
return StreamingResponse(run_deep_research_stream(request.query), media_type="text/event-stream") |