rkihacker commited on
Commit
46a015f
·
verified ·
1 Parent(s): 396db14

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +79 -61
main.py CHANGED
@@ -2,8 +2,9 @@ import os
2
  import asyncio
3
  import json
4
  import logging
 
5
  import re
6
- from typing import AsyncGenerator, Optional
7
 
8
  from fastapi import FastAPI
9
  from fastapi.responses import StreamingResponse
@@ -24,16 +25,29 @@ if not LLM_API_KEY:
24
  else:
25
  logger.info("LLM API Key loaded successfully.")
26
 
 
27
  # API Provider Constants
28
  SNAPZION_API_URL = "https://search.snapzion.com/get-snippets"
29
  LLM_API_URL = "https://api.typegpt.net/v1/chat/completions"
30
  LLM_MODEL = "gpt-4.1-mini"
31
- MAX_CONTEXT_CHAR_LENGTH = 120000
 
 
 
 
 
 
 
 
 
 
 
 
 
32
 
33
  # Headers
34
- SNAPZION_HEADERS = { 'Content-Type': 'application/json', 'User-Agent': 'AI-Deep-Research-Agent/1.0' }
35
- SCRAPING_HEADERS = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36' }
36
- LLM_HEADERS = { "Authorization": f"Bearer {LLM_API_KEY}", "Content-Type": "application/json", "Accept": "application/json" }
37
 
38
  # --- Pydantic Models & Helper Functions ---
39
  class DeepResearchRequest(BaseModel):
@@ -43,23 +57,19 @@ def extract_json_from_llm_response(text: str) -> Optional[list]:
43
  match = re.search(r'\[.*\]', text, re.DOTALL)
44
  if match:
45
  json_str = match.group(0)
46
- try:
47
- return json.loads(json_str)
48
- except json.JSONDecodeError:
49
- logger.error(f"Failed to parse extracted JSON string: {json_str}")
50
- return None
51
- logger.warning(f"No JSON array found in LLM response: {text}")
52
  return None
53
 
54
  # --- FastAPI App ---
55
  app = FastAPI(
56
  title="AI Deep Research API",
57
- description="Provides streaming deep research completions.",
58
- version="2.5.0" # Version bump for AttributeError fix
59
  )
60
 
61
- # --- Core Service Functions (Unchanged) ---
62
- async def call_snapzion_search(session: aiohttp.ClientSession, query: str) -> list:
63
  try:
64
  async with session.post(SNAPZION_API_URL, headers=SNAPZION_HEADERS, json={"query": query}, timeout=15) as response:
65
  response.raise_for_status(); data = await response.json()
@@ -70,7 +80,9 @@ async def call_snapzion_search(session: aiohttp.ClientSession, query: str) -> li
70
  async def scrape_url(session: aiohttp.ClientSession, url: str) -> str:
71
  if url.lower().endswith('.pdf'): return "Error: PDF content cannot be scraped."
72
  try:
73
- async with session.get(url, headers=SCRAPING_HEADERS, timeout=10, ssl=False) as response:
 
 
74
  if response.status != 200: return f"Error: HTTP status {response.status}"
75
  html = await response.text()
76
  soup = BeautifulSoup(html, "html.parser")
@@ -79,70 +91,77 @@ async def scrape_url(session: aiohttp.ClientSession, url: str) -> str:
79
  except Exception as e:
80
  logger.warning(f"Scraping failed for {url}: {e}"); return f"Error: {e}"
81
 
82
- async def search_and_scrape(session: aiohttp.ClientSession, query: str) -> tuple[str, list]:
83
- search_results = await call_snapzion_search(session, query); sources = search_results[:4]
84
- if not sources: return "", []
85
- scrape_tasks = [scrape_url(session, source["link"]) for source in sources]
86
- scraped_contents = await asyncio.gather(*scrape_tasks)
87
- context = "\n\n".join(f"Source: {sources[i]['link']}\nContent: {content}" for i, content in enumerate(scraped_contents) if not content.startswith("Error:"))
88
- return context, sources
 
89
 
90
  # --- Streaming Deep Research Logic ---
91
  async def run_deep_research_stream(query: str) -> AsyncGenerator[str, None]:
92
  def format_sse(data: dict) -> str: return f"data: {json.dumps(data)}\n\n"
93
  try:
94
  async with aiohttp.ClientSession() as session:
95
- # Step 1: Generate Sub-Questions
96
  yield format_sse({"event": "status", "data": "Generating research plan..."})
97
-
98
- sub_question_prompt = {
99
- "model": LLM_MODEL, "messages": [{"role": "user", "content": f"Generate 3-4 key sub-questions for a research report on '{query}'. Your response MUST be ONLY the raw JSON array, without markdown, explanations, or any other text. Example: [\"Question 1?\", \"Question 2?\"]"}]
100
- }
101
  try:
102
- async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=sub_question_prompt, timeout=20) as response:
103
- response.raise_for_status()
104
- result = await response.json()
105
-
106
- # ***** CHANGE 1: The definitive fix for the AttributeError *****
107
- sub_questions = None
108
- if isinstance(result, dict) and 'choices' in result:
109
- # Handle standard OpenAI dictionary format
110
- llm_content = result.get('choices', [{}])[0].get('message', {}).get('content', '')
111
- sub_questions = extract_json_from_llm_response(llm_content)
112
- elif isinstance(result, list):
113
- # Handle the case where the API returns the list directly
114
- sub_questions = result
115
-
116
- if not sub_questions or not isinstance(sub_questions, list):
117
- raise ValueError(f"Could not extract a valid list of questions from LLM response: {result}")
118
-
119
  except Exception as e:
120
  logger.error(f"Failed to generate research plan: {e}")
121
  yield format_sse({"event": "error", "data": f"Could not generate research plan. Reason: {e}"}); return
122
 
123
  yield format_sse({"event": "plan", "data": sub_questions})
124
 
125
- # The rest of the pipeline can now execute
126
- research_tasks = [search_and_scrape(session, sq) for sq in sub_questions]
127
- yield format_sse({"event": "status", "data": f"Starting research on {len(sub_questions)} topics..."})
 
 
 
 
128
 
129
- consolidated_context, all_sources = "", []
130
- for task in asyncio.as_completed(research_tasks):
131
- context, sources = await task
132
- if context: consolidated_context += context + "\n\n---\n\n"
133
- if sources: all_sources.extend(sources)
134
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
135
  if not consolidated_context.strip():
136
- yield format_sse({"event": "error", "data": "Failed to gather any research context."}); return
137
 
138
- yield format_sse({"event": "status", "data": "Generating final report..."})
 
139
  if len(consolidated_context) > MAX_CONTEXT_CHAR_LENGTH:
 
140
  consolidated_context = consolidated_context[:MAX_CONTEXT_CHAR_LENGTH]
141
 
142
- final_report_prompt = f'Synthesize the provided context into a comprehensive report on "{query}". Use markdown. Context:\n{consolidated_context}'
143
- final_report_payload = {"model": LLM_MODEL, "messages": [{"role": "user", "content": final_report_prompt}], "stream": True}
144
 
145
- async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=final_report_payload) as response:
146
  response.raise_for_status()
147
  async for line in response.content:
148
  if line.strip():
@@ -155,10 +174,9 @@ async def run_deep_research_stream(query: str) -> AsyncGenerator[str, None]:
155
  if content: yield format_sse({"event": "chunk", "data": content})
156
  except json.JSONDecodeError: continue
157
 
158
- unique_sources = list({s['link']: s for s in all_sources}.values())
159
- yield format_sse({"event": "sources", "data": unique_sources})
160
  except Exception as e:
161
- logger.error(f"A critical error occurred in the main research stream: {e}")
162
  yield format_sse({"event": "error", "data": str(e)})
163
  finally:
164
  yield format_sse({"event": "done", "data": "Deep research complete."})
 
2
  import asyncio
3
  import json
4
  import logging
5
+ import random
6
  import re
7
+ from typing import AsyncGenerator, Optional, Tuple, List
8
 
9
  from fastapi import FastAPI
10
  from fastapi.responses import StreamingResponse
 
25
  else:
26
  logger.info("LLM API Key loaded successfully.")
27
 
28
+ # --- Constants & Headers ---
29
  # API Provider Constants
30
  SNAPZION_API_URL = "https://search.snapzion.com/get-snippets"
31
  LLM_API_URL = "https://api.typegpt.net/v1/chat/completions"
32
  LLM_MODEL = "gpt-4.1-mini"
33
+
34
+ # Automatic Context Sizing based on Tokens
35
+ TARGET_TOKEN_LIMIT = 28000 # Safe limit for models with ~32k context windows
36
+ ESTIMATED_CHARS_PER_TOKEN = 4
37
+ MAX_CONTEXT_CHAR_LENGTH = TARGET_TOKEN_LIMIT * ESTIMATED_CHARS_PER_TOKEN
38
+
39
+ # Real Browser User Agents for Rotation
40
+ USER_AGENTS = [
41
+ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36",
42
+ "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36",
43
+ "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:129.0) Gecko/20100101 Firefox/129.0",
44
+ "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36",
45
+ "Mozilla/5.0 (iPhone; CPU iPhone OS 17_5_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.5 Mobile/15E148 Safari/604.1"
46
+ ]
47
 
48
  # Headers
49
+ SNAPZION_HEADERS = {'Content-Type': 'application/json'}
50
+ LLM_HEADERS = {"Authorization": f"Bearer {LLM_API_KEY}", "Content-Type": "application/json", "Accept": "application/json"}
 
51
 
52
  # --- Pydantic Models & Helper Functions ---
53
  class DeepResearchRequest(BaseModel):
 
57
  match = re.search(r'\[.*\]', text, re.DOTALL)
58
  if match:
59
  json_str = match.group(0)
60
+ try: return json.loads(json_str)
61
+ except json.JSONDecodeError: return None
 
 
 
 
62
  return None
63
 
64
  # --- FastAPI App ---
65
  app = FastAPI(
66
  title="AI Deep Research API",
67
+ description="Provides robust, streaming deep research completions.",
68
+ version="3.0.0" # Major version bump for robustness overhaul
69
  )
70
 
71
+ # --- Core Service Functions ---
72
+ async def call_snapzion_search(session: aiohttp.ClientSession, query: str) -> List[dict]:
73
  try:
74
  async with session.post(SNAPZION_API_URL, headers=SNAPZION_HEADERS, json={"query": query}, timeout=15) as response:
75
  response.raise_for_status(); data = await response.json()
 
80
  async def scrape_url(session: aiohttp.ClientSession, url: str) -> str:
81
  if url.lower().endswith('.pdf'): return "Error: PDF content cannot be scraped."
82
  try:
83
+ # Rotate user agents for each request
84
+ headers = {'User-Agent': random.choice(USER_AGENTS)}
85
+ async with session.get(url, headers=headers, timeout=10, ssl=False) as response:
86
  if response.status != 200: return f"Error: HTTP status {response.status}"
87
  html = await response.text()
88
  soup = BeautifulSoup(html, "html.parser")
 
91
  except Exception as e:
92
  logger.warning(f"Scraping failed for {url}: {e}"); return f"Error: {e}"
93
 
94
+ async def research_and_process_source(session: aiohttp.ClientSession, source: dict) -> Tuple[str, dict]:
95
+ """Scrapes a single source and falls back to its snippet if scraping fails."""
96
+ scraped_content = await scrape_url(session, source['link'])
97
+ if scraped_content.startswith("Error:"):
98
+ # SNIPPET FALLBACK LOGIC
99
+ logger.warning(f"Scraping failed for {source['link']}. Falling back to snippet.")
100
+ return source['snippet'], source
101
+ return scraped_content, source
102
 
103
  # --- Streaming Deep Research Logic ---
104
  async def run_deep_research_stream(query: str) -> AsyncGenerator[str, None]:
105
  def format_sse(data: dict) -> str: return f"data: {json.dumps(data)}\n\n"
106
  try:
107
  async with aiohttp.ClientSession() as session:
108
+ # Step 1: Generate Research Plan
109
  yield format_sse({"event": "status", "data": "Generating research plan..."})
110
+ plan_prompt = {"model": LLM_MODEL, "messages": [{"role": "user", "content": f"Generate 3-4 key sub-questions for a research report on '{query}'. Your response MUST be ONLY the raw JSON array, without markdown. Example: [\"Question 1?\"]"}]}
 
 
 
111
  try:
112
+ async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=plan_prompt, timeout=20) as response:
113
+ response.raise_for_status(); result = await response.json()
114
+ sub_questions = result if isinstance(result, list) else extract_json_from_llm_response(result['choices'][0]['message']['content'])
115
+ if not isinstance(sub_questions, list): raise ValueError(f"Could not extract a valid list from LLM response: {result}")
 
 
 
 
 
 
 
 
 
 
 
 
 
116
  except Exception as e:
117
  logger.error(f"Failed to generate research plan: {e}")
118
  yield format_sse({"event": "error", "data": f"Could not generate research plan. Reason: {e}"}); return
119
 
120
  yield format_sse({"event": "plan", "data": sub_questions})
121
 
122
+ # Step 2: Conduct Research in Parallel
123
+ yield format_sse({"event": "status", "data": f"Searching for sources for {len(sub_questions)} topics..."})
124
+ search_tasks = [call_snapzion_search(session, sq) for sq in sub_questions]
125
+ all_search_results = await asyncio.gather(*search_tasks)
126
+
127
+ # Deduplicate sources by link to avoid scraping the same page multiple times
128
+ unique_sources = list({source['link']: source for results in all_search_results for source in results}.values())
129
 
130
+ if not unique_sources:
131
+ yield format_sse({"event": "error", "data": "Search did not return any usable sources."}); return
 
 
 
132
 
133
+ yield format_sse({"event": "status", "data": f"Found {len(unique_sources)} unique sources. Scraping and processing..."})
134
+
135
+ # Process all unique sources concurrently with snippet fallback
136
+ processing_tasks = [research_and_process_source(session, source) for source in unique_sources]
137
+
138
+ consolidated_context = ""
139
+ all_sources_used = []
140
+ successful_scrapes = 0
141
+
142
+ for task in asyncio.as_completed(processing_tasks):
143
+ content, source_info = await task
144
+ if content:
145
+ consolidated_context += f"Source: {source_info['link']}\nContent: {content}\n\n---\n\n"
146
+ all_sources_used.append(source_info)
147
+ if not content == source_info['snippet']: # Count as success only if not a snippet
148
+ successful_scrapes += 1
149
+
150
+ logger.info(f"Context gathering complete. Successfully scraped {successful_scrapes}/{len(unique_sources)} pages. Used {len(all_sources_used)} total sources (including snippets).")
151
+
152
  if not consolidated_context.strip():
153
+ yield format_sse({"event": "error", "data": "Failed to gather any research context from scraping or snippets."}); return
154
 
155
+ # Step 3: Synthesize Final Report
156
+ yield format_sse({"event": "status", "data": "Synthesizing final report..."})
157
  if len(consolidated_context) > MAX_CONTEXT_CHAR_LENGTH:
158
+ logger.warning(f"Context truncated from {len(consolidated_context)} to {MAX_CONTEXT_CHAR_LENGTH} chars.")
159
  consolidated_context = consolidated_context[:MAX_CONTEXT_CHAR_LENGTH]
160
 
161
+ report_prompt = f'Synthesize the provided context into a comprehensive, well-structured report on "{query}". Use markdown. Context:\n{consolidated_context}'
162
+ report_payload = {"model": LLM_MODEL, "messages": [{"role": "user", "content": report_prompt}], "stream": True}
163
 
164
+ async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=report_payload) as response:
165
  response.raise_for_status()
166
  async for line in response.content:
167
  if line.strip():
 
174
  if content: yield format_sse({"event": "chunk", "data": content})
175
  except json.JSONDecodeError: continue
176
 
177
+ yield format_sse({"event": "sources", "data": all_sources_used})
 
178
  except Exception as e:
179
+ logger.error(f"A critical error occurred in the main research stream: {e}", exc_info=True)
180
  yield format_sse({"event": "error", "data": str(e)})
181
  finally:
182
  yield format_sse({"event": "done", "data": "Deep research complete."})