import os import asyncio import json import logging import random import re from typing import AsyncGenerator, Optional, Tuple, List from urllib.parse import unquote from fastapi import FastAPI from fastapi.responses import StreamingResponse from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from dotenv import load_dotenv import aiohttp from bs4 import BeautifulSoup # --- Configuration --- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) load_dotenv() LLM_API_KEY = os.getenv("LLM_API_KEY") if not LLM_API_KEY: raise RuntimeError("LLM_API_KEY must be set in a .env file.") else: logger.info("LLM API Key loaded successfully.") # --- Constants & Headers --- LLM_API_URL = "https://api.typegpt.net/v1/chat/completions" LLM_MODEL = "gpt-4.1-mini" MAX_SOURCES_TO_PROCESS = 15 # Real Browser User Agents for Rotation USER_AGENTS = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:129.0) Gecko/20100101 Firefox/129.0" ] LLM_HEADERS = {"Authorization": f"Bearer {LLM_API_KEY}", "Content-Type": "application/json", "Accept": "application/json"} class DeepResearchRequest(BaseModel): query: str app = FastAPI( title="AI Deep Research API", description="Provides robust, long-form, streaming deep research completions.", version="7.0.0" # Final Production Version ) # ***** CHANGE 1: Enable CORS for all origins ***** app.add_middleware( CORSMiddleware, allow_origins=["*"], # Allows all origins allow_credentials=True, allow_methods=["*"], # Allows all methods allow_headers=["*"], # Allows all headers ) logger.info("CORS middleware enabled for all origins.") # --- Helper Functions --- def extract_json_from_llm_response(text: str) -> Optional[list]: match = re.search(r'\[.*\]', text, re.DOTALL) if match: try: return json.loads(match.group(0)) except json.JSONDecodeError: return None return None # --- Core Service Functions --- async def call_duckduckgo_search(session: aiohttp.ClientSession, query: str) -> List[dict]: search_url = f"https://html.duckduckgo.com/html/?q={query.replace(' ', '+')}" headers = {'User-Agent': random.choice(USER_AGENTS)} try: async with session.get(search_url, headers=headers, timeout=15) as response: response.raise_for_status(); html = await response.text() soup = BeautifulSoup(html, "html.parser"); results = [] for res in soup.find_all('div', class_='result'): title_tag = res.find('a', class_='result__a') snippet_tag = res.find('a', class_='result__snippet') if title_tag and snippet_tag and 'href' in title_tag.attrs: # ***** CHANGE 2: The critical fix for scraping. Decode the real URL. ***** try: raw_link = title_tag['href'] # The real URL is percent-encoded in the 'uddg' parameter actual_url = unquote(raw_link.split('uddg=')[1]) if actual_url.startswith("http"): results.append({'title': title_tag.text, 'link': actual_url, 'snippet': snippet_tag.text}) except IndexError: # This link format is unexpected, skip it continue logger.info(f"Found {len(results)} valid sources from DuckDuckGo for: '{query}'") return results except Exception as e: logger.error(f"DuckDuckGo search failed for query '{query}': {e}"); return [] async def research_and_process_source(session: aiohttp.ClientSession, source: dict) -> Tuple[str, dict]: headers = {'User-Agent': random.choice(USER_AGENTS)} try: logger.info(f"Scraping: {source['link']}") if source['link'].lower().endswith('.pdf'): raise ValueError("PDF content") async with session.get(source['link'], headers=headers, timeout=10, ssl=False) as response: if response.status != 200: raise ValueError(f"HTTP status {response.status}") html = await response.text() soup = BeautifulSoup(html, "html.parser") for tag in soup(['script', 'style', 'nav', 'footer', 'header', 'aside']): tag.decompose() content = " ".join(soup.stripped_strings) if not content.strip(): raise ValueError("Parsed content is empty.") return content, source except Exception as e: logger.warning(f"Scraping failed for {source['link']} ({e}). Falling back to snippet.") return source.get('snippet', ''), source # --- Streaming Deep Research Logic --- async def run_deep_research_stream(query: str) -> AsyncGenerator[str, None]: def format_sse(data: dict) -> str: return f"data: {json.dumps(data)}\n\n" try: async with aiohttp.ClientSession() as session: yield format_sse({"event": "status", "data": "Generating research plan..."}) plan_prompt = {"model": LLM_MODEL, "messages": [{"role": "user", "content": f"Generate 3-4 key sub-questions for a research report on '{query}'. Your response MUST be ONLY the raw JSON array. Example: [\"Question 1?\"]"}]} try: async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=plan_prompt, timeout=25) as response: response.raise_for_status(); result = await response.json() sub_questions = result if isinstance(result, list) else extract_json_from_llm_response(result['choices'][0]['message']['content']) if not isinstance(sub_questions, list): raise ValueError(f"Invalid plan from LLM: {result}") except Exception as e: yield format_sse({"event": "error", "data": f"Could not generate research plan. Reason: {e}"}); return yield format_sse({"event": "plan", "data": sub_questions}) yield format_sse({"event": "status", "data": f"Searching sources for {len(sub_questions)} topics..."}) search_tasks = [call_duckduckgo_search(session, sq) for sq in sub_questions] all_search_results = await asyncio.gather(*search_tasks) unique_sources = list({source['link']: source for results in all_search_results for source in results}.values()) if not unique_sources: yield format_sse({"event": "error", "data": "All search queries returned zero usable sources."}); return sources_to_process = unique_sources[:MAX_SOURCES_TO_PROCESS] yield format_sse({"event": "status", "data": f"Found {len(unique_sources)} unique sources. Processing the top {len(sources_to_process)}..."}) processing_tasks = [research_and_process_source(session, source) for source in sources_to_process] consolidated_context, all_sources_used = "", [] for task in asyncio.as_completed(processing_tasks): content, source_info = await task if content: consolidated_context += f"Source: {source_info['link']}\nContent: {content}\n\n---\n\n" all_sources_used.append(source_info) if not consolidated_context.strip(): yield format_sse({"event": "error", "data": "Failed to gather any research context."}); return yield format_sse({"event": "status", "data": "Synthesizing final report..."}) report_prompt = f'Synthesize the provided context into a long-form, comprehensive, multi-page report on "{query}". Use markdown. Elaborate extensively on each point. Base your entire report ONLY on the provided context.\n\n## Research Context ##\n{consolidated_context}' report_payload = {"model": LLM_MODEL, "messages": [{"role": "user", "content": report_prompt}], "stream": True} async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=report_payload) as response: response.raise_for_status() async for line in response.content: line_str = line.decode('utf-8').strip() if line_str.startswith('data:'): line_str = line_str[5:].strip() if line_str == "[DONE]": break # ***** CHANGE 3: The definitive fix for the 'list index out of range' error ***** try: chunk = json.loads(line_str) choices = chunk.get("choices") if choices and isinstance(choices, list) and len(choices) > 0: content = choices[0].get("delta", {}).get("content") if content: yield format_sse({"event": "chunk", "data": content}) except json.JSONDecodeError: continue yield format_sse({"event": "sources", "data": all_sources_used}) except Exception as e: logger.error(f"A critical error occurred: {e}", exc_info=True) yield format_sse({"event": "error", "data": str(e)}) finally: yield format_sse({"event": "done", "data": "Deep research complete."}) @app.post("/v1/deepresearch/completions") async def deep_research_endpoint(request: DeepResearchRequest): return StreamingResponse(run_deep_research_stream(request.query), media_type="text/event-stream")