File size: 9,878 Bytes
4b17916
2a0098d
6142af3
 
46a015f
132c134
46a015f
6142af3
5b2a6b6
6142af3
 
2a0098d
4b17916
2a0098d
4b17916
 
1e679fd
e1111e0
 
2a0098d
 
 
 
 
0e14740
132c134
2a0098d
46a015f
132c134
2a0098d
4c88f38
1e679fd
46a015f
 
 
 
 
 
 
 
 
 
 
 
 
 
4b17916
132c134
46a015f
 
1e679fd
132c134
6142af3
 
 
132c134
 
 
 
46a015f
 
132c134
 
5b2a6b6
4b17916
6142af3
46a015f
 
4b17916
 
46a015f
 
4b17916
2a0098d
132c134
2a0098d
 
132c134
2a0098d
 
6142af3
4b17916
46a015f
 
 
6142af3
2a0098d
 
132c134
2a0098d
 
132c134
2a0098d
46a015f
 
 
 
 
 
 
 
2a0098d
6142af3
 
132c134
6142af3
 
46a015f
6142af3
46a015f
5b2a6b6
46a015f
 
 
 
5b2a6b6
132c134
 
5b2a6b6
6142af3
 
46a015f
 
 
 
 
 
 
6142af3
46a015f
 
0e14740
46a015f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1e679fd
46a015f
6142af3
46a015f
 
132c134
46a015f
132c134
 
46a015f
 
6142af3
46a015f
132c134
6142af3
 
 
0e14740
 
6142af3
 
396db14
0e14740
 
1e679fd
46a015f
6142af3
46a015f
6142af3
 
 
 
 
 
 
0e14740
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
import os
import asyncio
import json
import logging
import random
import re
from typing import AsyncGenerator, Optional, Tuple, List

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from dotenv import load_dotenv
import aiohttp
from bs4 import BeautifulSoup

# --- Configuration ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

load_dotenv()
LLM_API_KEY = os.getenv("LLM_API_KEY")

if not LLM_API_KEY:
    raise RuntimeError("LLM_API_KEY must be set in a .env file.")
else:
    logger.info("LLM API Key loaded successfully.")

# --- Constants & Headers ---
# API Provider Constants
SNAPZION_API_URL = "https://search.snapzion.com/get-snippets"
LLM_API_URL = "https://api.typegpt.net/v1/chat/completions"
LLM_MODEL = "gpt-4.1-mini"

# Automatic Context Sizing based on Tokens
TARGET_TOKEN_LIMIT = 28000  # Safe limit for models with ~32k context windows
ESTIMATED_CHARS_PER_TOKEN = 4
MAX_CONTEXT_CHAR_LENGTH = TARGET_TOKEN_LIMIT * ESTIMATED_CHARS_PER_TOKEN

# Real Browser User Agents for Rotation
USER_AGENTS = [
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36",
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:129.0) Gecko/20100101 Firefox/129.0",
    "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36",
    "Mozilla/5.0 (iPhone; CPU iPhone OS 17_5_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.5 Mobile/15E148 Safari/604.1"
]

# Headers
SNAPZION_HEADERS = {'Content-Type': 'application/json'}
LLM_HEADERS = {"Authorization": f"Bearer {LLM_API_KEY}", "Content-Type": "application/json", "Accept": "application/json"}

# --- Pydantic Models & Helper Functions ---
class DeepResearchRequest(BaseModel):
    query: str

def extract_json_from_llm_response(text: str) -> Optional[list]:
    match = re.search(r'\[.*\]', text, re.DOTALL)
    if match:
        json_str = match.group(0)
        try: return json.loads(json_str)
        except json.JSONDecodeError: return None
    return None

# --- FastAPI App ---
app = FastAPI(
    title="AI Deep Research API",
    description="Provides robust, streaming deep research completions.",
    version="3.0.0"  # Major version bump for robustness overhaul
)

# --- Core Service Functions ---
async def call_snapzion_search(session: aiohttp.ClientSession, query: str) -> List[dict]:
    try:
        async with session.post(SNAPZION_API_URL, headers=SNAPZION_HEADERS, json={"query": query}, timeout=15) as response:
            response.raise_for_status(); data = await response.json()
            return data.get("organic_results", [])
    except Exception as e:
        logger.error(f"Snapzion search failed for query '{query}': {e}"); return []

async def scrape_url(session: aiohttp.ClientSession, url: str) -> str:
    if url.lower().endswith('.pdf'): return "Error: PDF content cannot be scraped."
    try:
        # Rotate user agents for each request
        headers = {'User-Agent': random.choice(USER_AGENTS)}
        async with session.get(url, headers=headers, timeout=10, ssl=False) as response:
            if response.status != 200: return f"Error: HTTP status {response.status}"
            html = await response.text()
            soup = BeautifulSoup(html, "html.parser")
            for tag in soup(['script', 'style', 'nav', 'footer', 'header', 'aside']): tag.decompose()
            return " ".join(soup.stripped_strings)
    except Exception as e:
        logger.warning(f"Scraping failed for {url}: {e}"); return f"Error: {e}"

async def research_and_process_source(session: aiohttp.ClientSession, source: dict) -> Tuple[str, dict]:
    """Scrapes a single source and falls back to its snippet if scraping fails."""
    scraped_content = await scrape_url(session, source['link'])
    if scraped_content.startswith("Error:"):
        # SNIPPET FALLBACK LOGIC
        logger.warning(f"Scraping failed for {source['link']}. Falling back to snippet.")
        return source['snippet'], source
    return scraped_content, source

# --- Streaming Deep Research Logic ---
async def run_deep_research_stream(query: str) -> AsyncGenerator[str, None]:
    def format_sse(data: dict) -> str: return f"data: {json.dumps(data)}\n\n"
    try:
        async with aiohttp.ClientSession() as session:
            # Step 1: Generate Research Plan
            yield format_sse({"event": "status", "data": "Generating research plan..."})
            plan_prompt = {"model": LLM_MODEL, "messages": [{"role": "user", "content": f"Generate 3-4 key sub-questions for a research report on '{query}'. Your response MUST be ONLY the raw JSON array, without markdown. Example: [\"Question 1?\"]"}]}
            try:
                async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=plan_prompt, timeout=20) as response:
                    response.raise_for_status(); result = await response.json()
                    sub_questions = result if isinstance(result, list) else extract_json_from_llm_response(result['choices'][0]['message']['content'])
                    if not isinstance(sub_questions, list): raise ValueError(f"Could not extract a valid list from LLM response: {result}")
            except Exception as e:
                logger.error(f"Failed to generate research plan: {e}")
                yield format_sse({"event": "error", "data": f"Could not generate research plan. Reason: {e}"}); return

            yield format_sse({"event": "plan", "data": sub_questions})

            # Step 2: Conduct Research in Parallel
            yield format_sse({"event": "status", "data": f"Searching for sources for {len(sub_questions)} topics..."})
            search_tasks = [call_snapzion_search(session, sq) for sq in sub_questions]
            all_search_results = await asyncio.gather(*search_tasks)
            
            # Deduplicate sources by link to avoid scraping the same page multiple times
            unique_sources = list({source['link']: source for results in all_search_results for source in results}.values())
            
            if not unique_sources:
                yield format_sse({"event": "error", "data": "Search did not return any usable sources."}); return
            
            yield format_sse({"event": "status", "data": f"Found {len(unique_sources)} unique sources. Scraping and processing..."})
            
            # Process all unique sources concurrently with snippet fallback
            processing_tasks = [research_and_process_source(session, source) for source in unique_sources]
            
            consolidated_context = ""
            all_sources_used = []
            successful_scrapes = 0
            
            for task in asyncio.as_completed(processing_tasks):
                content, source_info = await task
                if content:
                    consolidated_context += f"Source: {source_info['link']}\nContent: {content}\n\n---\n\n"
                    all_sources_used.append(source_info)
                    if not content == source_info['snippet']: # Count as success only if not a snippet
                        successful_scrapes += 1

            logger.info(f"Context gathering complete. Successfully scraped {successful_scrapes}/{len(unique_sources)} pages. Used {len(all_sources_used)} total sources (including snippets).")

            if not consolidated_context.strip():
                yield format_sse({"event": "error", "data": "Failed to gather any research context from scraping or snippets."}); return

            # Step 3: Synthesize Final Report
            yield format_sse({"event": "status", "data": "Synthesizing final report..."})
            if len(consolidated_context) > MAX_CONTEXT_CHAR_LENGTH:
                logger.warning(f"Context truncated from {len(consolidated_context)} to {MAX_CONTEXT_CHAR_LENGTH} chars.")
                consolidated_context = consolidated_context[:MAX_CONTEXT_CHAR_LENGTH]

            report_prompt = f'Synthesize the provided context into a comprehensive, well-structured report on "{query}". Use markdown. Context:\n{consolidated_context}'
            report_payload = {"model": LLM_MODEL, "messages": [{"role": "user", "content": report_prompt}], "stream": True}

            async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=report_payload) as response:
                response.raise_for_status()
                async for line in response.content:
                    if line.strip():
                        line_str = line.decode('utf-8').strip()
                        if line_str.startswith('data:'): line_str = line_str[5:].strip()
                        if line_str == "[DONE]": break
                        try:
                            chunk = json.loads(line_str)
                            content = chunk.get("choices", [{}])[0].get("delta", {}).get("content")
                            if content: yield format_sse({"event": "chunk", "data": content})
                        except json.JSONDecodeError: continue
            
            yield format_sse({"event": "sources", "data": all_sources_used})
    except Exception as e:
        logger.error(f"A critical error occurred in the main research stream: {e}", exc_info=True)
        yield format_sse({"event": "error", "data": str(e)})
    finally:
        yield format_sse({"event": "done", "data": "Deep research complete."})

# --- API Endpoints ---
@app.post("/v1/deepresearch/completions")
async def deep_research_endpoint(request: DeepResearchRequest):
    return StreamingResponse(run_deep_research_stream(request.query), media_type="text/event-stream")