File size: 9,666 Bytes
4b17916 2a0098d 6142af3 46a015f 132c134 46a015f 6142af3 5b2a6b6 6142af3 2a0098d 4b17916 2a0098d 4b17916 1e679fd e1111e0 2a0098d 0e14740 132c134 2a0098d 46a015f 132c134 2a0098d 4c88f38 1e679fd 46a015f bc2abd9 46a015f bc2abd9 46a015f bc2abd9 46a015f 4b17916 46a015f 1e679fd 6142af3 132c134 bc2abd9 46a015f 132c134 4b17916 6142af3 46a015f bc2abd9 4b17916 46a015f bc2abd9 4b17916 bc2abd9 2a0098d 132c134 2a0098d bc2abd9 4b17916 46a015f bc2abd9 2a0098d bc2abd9 2a0098d 46a015f bc2abd9 2a0098d 6142af3 132c134 6142af3 46a015f 6142af3 bc2abd9 5b2a6b6 bc2abd9 46a015f bc2abd9 5b2a6b6 132c134 5b2a6b6 6142af3 46a015f bc2abd9 46a015f bc2abd9 6142af3 46a015f bc2abd9 0e14740 bc2abd9 46a015f bc2abd9 46a015f bc2abd9 46a015f bc2abd9 46a015f 1e679fd 46a015f 6142af3 46a015f 132c134 46a015f 6142af3 46a015f 132c134 6142af3 bc2abd9 1e679fd 46a015f 6142af3 bc2abd9 6142af3 0e14740 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 |
import os
import asyncio
import json
import logging
import random
import re
from typing import AsyncGenerator, Optional, Tuple, List
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from dotenv import load_dotenv
import aiohttp
from bs4 import BeautifulSoup
# --- Configuration ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
load_dotenv()
LLM_API_KEY = os.getenv("LLM_API_KEY")
if not LLM_API_KEY:
raise RuntimeError("LLM_API_KEY must be set in a .env file.")
else:
logger.info("LLM API Key loaded successfully.")
# --- Constants & Headers ---
# API Provider Constants
SNAPZION_API_URL = "https://search.snapzion.com/get-snippets"
LLM_API_URL = "https://api.typegpt.net/v1/chat/completions"
LLM_MODEL = "gpt-4.1-mini"
# Automatic Context Sizing (No more fixed limits)
TARGET_TOKEN_LIMIT = 28000
ESTIMATED_CHARS_PER_TOKEN = 4
MAX_CONTEXT_CHAR_LENGTH = TARGET_TOKEN_LIMIT * ESTIMATED_CHARS_PER_TOKEN
# ***** THE CRITICAL FIX: Full, legitimate headers for the Snapzion API call *****
SNAPZION_HEADERS = {
'accept': '*/*',
'accept-language': 'en-US,en;q=0.9',
'content-type': 'application/json',
'origin': 'https://search.snapzion.com',
'priority': 'u=1, i',
'referer': 'https://search.snapzion.com/docs',
'sec-ch-ua': '"Chromium";v="140", "Not=A?Brand";v="24", "Google Chrome";v="140"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'same-origin',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36',
}
# Real Browser User Agents for SCRAPING ROTATION
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:129.0) Gecko/20100101 Firefox/129.0"
]
LLM_HEADERS = {"Authorization": f"Bearer {LLM_API_KEY}", "Content-Type": "application/json", "Accept": "application/json"}
class DeepResearchRequest(BaseModel):
query: str
def extract_json_from_llm_response(text: str) -> Optional[list]:
match = re.search(r'\[.*\]', text, re.DOTALL)
if match:
try: return json.loads(match.group(0))
except json.JSONDecodeError: return None
return None
app = FastAPI(
title="AI Deep Research API",
description="Provides robust, streaming deep research completions.",
version="4.0.0" # Final Production Version
)
# --- Core Service Functions ---
async def call_snapzion_search(session: aiohttp.ClientSession, query: str) -> List[dict]:
logger.info(f"Searching Snapzion for: '{query}'")
try:
async with session.post(SNAPZION_API_URL, headers=SNAPZION_HEADERS, json={"query": query}, timeout=20) as response:
response.raise_for_status()
data = await response.json()
results = data.get("organic_results", [])
logger.info(f"Found {len(results)} sources for: '{query}'")
return results
except Exception as e:
logger.error(f"Snapzion search failed for query '{query}': {e}"); return []
async def scrape_url(session: aiohttp.ClientSession, url: str) -> str:
if url.lower().endswith('.pdf'): return "Error: PDF"
try:
headers = {'User-Agent': random.choice(USER_AGENTS)}
async with session.get(url, headers=headers, timeout=10, ssl=False) as response:
if response.status != 200: return f"Error: HTTP {response.status}"
return await response.text() # Return full HTML for parsing
except Exception as e:
return f"Error: {e}"
def parse_html(html: str) -> str:
soup = BeautifulSoup(html, "html.parser")
for tag in soup(['script', 'style', 'nav', 'footer', 'header', 'aside']): tag.decompose()
return " ".join(soup.stripped_strings)
async def research_and_process_source(session: aiohttp.ClientSession, source: dict) -> Tuple[str, dict]:
html_or_error = await scrape_url(session, source['link'])
if html_or_error.startswith("Error:"):
logger.warning(f"Scraping failed for {source['link']} ({html_or_error}). Falling back to snippet.")
return source.get('snippet', ''), source
content = parse_html(html_or_error)
return content, source
# --- Streaming Deep Research Logic ---
async def run_deep_research_stream(query: str) -> AsyncGenerator[str, None]:
def format_sse(data: dict) -> str: return f"data: {json.dumps(data)}\n\n"
try:
async with aiohttp.ClientSession() as session:
# Step 1: Generate Research Plan
yield format_sse({"event": "status", "data": "Generating research plan..."})
plan_prompt = {"model": LLM_MODEL, "messages": [{"role": "user", "content": f"Generate 3-4 key sub-questions for a research report on '{query}'. Your response MUST be ONLY the raw JSON array. Example: [\"Question 1?\"]"}]}
try:
async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=plan_prompt, timeout=25) as response:
response.raise_for_status(); result = await response.json()
sub_questions = result if isinstance(result, list) else extract_json_from_llm_response(result['choices'][0]['message']['content'])
if not isinstance(sub_questions, list): raise ValueError(f"Invalid plan from LLM: {result}")
except Exception as e:
yield format_sse({"event": "error", "data": f"Could not generate research plan. Reason: {e}"}); return
yield format_sse({"event": "plan", "data": sub_questions})
# Step 2: Conduct Research in Parallel
yield format_sse({"event": "status", "data": f"Searching sources for {len(sub_questions)} topics..."})
search_tasks = [call_snapzion_search(session, sq) for sq in sub_questions]
all_search_results = await asyncio.gather(*search_tasks)
unique_sources = list({source['link']: source for results in all_search_results for source in results if 'link' in source and 'snippet' in source}.values())
if not unique_sources:
yield format_sse({"event": "error", "data": "All search queries returned zero usable sources. The search provider might be blocking requests or the topic is too obscure."}); return
yield format_sse({"event": "status", "data": f"Found {len(unique_sources)} unique sources. Processing..."})
processing_tasks = [research_and_process_source(session, source) for source in unique_sources]
consolidated_context, all_sources_used = "", []
successful_scrapes = 0
for task in asyncio.as_completed(processing_tasks):
content, source_info = await task
if content:
consolidated_context += f"Source: {source_info['link']}\nContent: {content}\n\n---\n\n"
all_sources_used.append(source_info)
if not content == source_info.get('snippet'): successful_scrapes += 1
logger.info(f"Context complete. Scraped {successful_scrapes}/{len(unique_sources)} pages. Used {len(all_sources_used)} total sources (with snippet fallbacks).")
if not consolidated_context.strip():
yield format_sse({"event": "error", "data": "Failed to gather any research context from scraping or snippets."}); return
# Step 3: Synthesize Final Report
yield format_sse({"event": "status", "data": "Synthesizing final report..."})
if len(consolidated_context) > MAX_CONTEXT_CHAR_LENGTH:
consolidated_context = consolidated_context[:MAX_CONTEXT_CHAR_LENGTH]
report_prompt = f'Synthesize the provided context into a comprehensive, well-structured report on "{query}". Use markdown. Context:\n{consolidated_context}'
report_payload = {"model": LLM_MODEL, "messages": [{"role": "user", "content": report_prompt}], "stream": True}
async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=report_payload) as response:
response.raise_for_status()
async for line in response.content:
line_str = line.decode('utf-8').strip()
if line_str.startswith('data:'): line_str = line_str[5:].strip()
if line_str == "[DONE]": break
try:
chunk = json.loads(line_str)
content = chunk.get("choices", [{}])[0].get("delta", {}).get("content")
if content: yield format_sse({"event": "chunk", "data": content})
except json.JSONDecodeError: continue
yield format_sse({"event": "sources", "data": all_sources_used})
except Exception as e:
logger.error(f"A critical error occurred: {e}", exc_info=True)
yield format_sse({"event": "error", "data": str(e)})
finally:
yield format_sse({"event": "done", "data": "Deep research complete."})
# --- API Endpoints ---
@app.post("/v1/deepresearch/completions")
async def deep_research_endpoint(request: DeepResearchRequest):
return StreamingResponse(run_deep_research_stream(request.query), media_type="text/event-stream") |