import os import asyncio import json import logging import random import re import time from typing import AsyncGenerator, Optional, Tuple, List, Dict from urllib.parse import quote_plus, urlparse from fastapi import FastAPI, HTTPException from fastapi.responses import StreamingResponse from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from dotenv import load_dotenv import aiohttp from bs4 import BeautifulSoup from fake_useragent import UserAgent from collections import defaultdict # --- Configuration --- logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) load_dotenv() LLM_API_KEY = os.getenv("LLM_API_KEY") if not LLM_API_KEY: raise RuntimeError("LLM_API_KEY must be set in a .env file.") else: logging.info("LLM API Key loaded successfully.") # --- Constants & Headers --- LLM_API_URL = "https://api.typegpt.net/v1/chat/completions" LLM_MODEL = "meta-llama/Llama-4-Maverick-17B-128E-Instruct-FP8" MAX_SOURCES_TO_PROCESS = 10 MAX_CONCURRENT_REQUESTS = 5 SEARCH_TIMEOUT = 120 # Default search time in seconds TOTAL_TIMEOUT = 180 # Total time limit in seconds REQUEST_DELAY = 1.0 # Delay between requests in seconds USER_AGENT_ROTATION = True # Initialize fake user agent generator try: ua = UserAgent() except: class SimpleUA: def random(self): return random.choice([ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:129.0) Gecko/20100101 Firefox/129.0" ]) ua = SimpleUA() LLM_HEADERS = { "Authorization": f"Bearer {LLM_API_KEY}", "Content-Type": "application/json", "Accept": "application/json" } class DeepResearchRequest(BaseModel): query: str search_time: int = SEARCH_TIMEOUT # Default search time app = FastAPI( title="AI Deep Research API", description="Provides comprehensive research reports from real web searches.", version="3.1.0" ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"] ) def extract_json_from_llm_response(text: str) -> Optional[list]: """Extract JSON array from LLM response text.""" match = re.search(r'$$ .* $$', text, re.DOTALL) if match: try: return json.loads(match.group(0)) except json.JSONDecodeError: return None return None async def get_real_user_agent() -> str: """Get a realistic user agent string.""" try: if isinstance(ua, UserAgent): return ua.random() return ua.random() # For our fallback class except: return "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36" def clean_url(url: str) -> str: """Clean up and normalize URLs.""" if not url: return "" # Handle DuckDuckGo redirect URLs if url.startswith('//duckduckgo.com/l/'): url = f"https:{url}" # Make it a proper URL try: parsed = urlparse(url) query_params = parsed.query if 'uddg=' in query_params: match = re.search(r'uddg=([^&]+)', query_params) if match: encoded_url = match.group(1) try: # URL decode the parameter decoded_url = quote_plus(encoded_url) # Sometimes it's double-encoded if '%25' in decoded_url: decoded_url = quote_plus(decoded_url) return decoded_url except: pass except: pass # Ensure URL has proper scheme if url.startswith('//'): url = 'https:' + url elif not url.startswith(('http://', 'https://')): url = 'https://' + url return url async def check_robots_txt(url: str) -> bool: """Check if scraping is allowed by robots.txt.""" try: domain_match = re.search(r'https?://([^/]+)', url) if not domain_match: return False domain = domain_match.group(1) robots_url = f"https://{domain}/robots.txt" async with aiohttp.ClientSession() as session: headers = {'User-Agent': await get_real_user_agent()} async with session.get(robots_url, headers=headers, timeout=5) as response: if response.status == 200: robots = await response.text() if "Disallow: /" in robots: return False # Check for specific path disallows path = re.sub(r'https?://[^/]+', '', url) if any(f"Disallow: {p}" in robots for p in [path, path.rstrip('/') + '/']): return False return True except Exception as e: logging.warning(f"Could not check robots.txt for {url}: {e}") return False async def fetch_search_results(query: str, max_results: int = 5) -> List[dict]: """ Perform a real search using DuckDuckGo's HTML interface with improved URL handling. """ try: search_url = f"https://html.duckduckgo.com/html/?q={quote_plus(query)}" headers = { "User-Agent": await get_real_user_agent(), "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.5", "Referer": "https://duckduckgo.com/", "DNT": "1" } async with aiohttp.ClientSession() as session: async with session.get(search_url, headers=headers, timeout=10) as response: if response.status != 200: logging.warning(f"Search failed with status {response.status}") return [] html = await response.text() soup = BeautifulSoup(html, 'html.parser') results = [] # Try multiple selectors as DuckDuckGo may change their HTML structure for selector in ['.result__body', '.result__a', '.result']: if len(results) >= max_results: break for result in soup.select(selector)[:max_results]: try: title_elem = result.select_one('.result__title .result__a') or result.select_one('.result__a') if not title_elem: continue link = title_elem['href'] snippet_elem = result.select_one('.result__snippet') or result.select_one('.result__body') # Clean the URL clean_link = clean_url(link) # Skip if we couldn't get a clean URL if not clean_link or clean_link.startswith('javascript:'): continue # Get snippet if available snippet = snippet_elem.get_text(strip=True) if snippet_elem else "" # Skip if we already have this URL if any(r['link'] == clean_link for r in results): continue results.append({ 'title': title_elem.get_text(strip=True), 'link': clean_link, 'snippet': snippet, 'source': 'duckduckgo' }) except Exception as e: logging.warning(f"Error parsing search result: {e}") continue logging.info(f"Found {len(results)} real search results for '{query}'") return results[:max_results] except Exception as e: logging.error(f"Real search failed: {e}") return [] async def process_web_source(session: aiohttp.ClientSession, source: dict, timeout: int = 15) -> Tuple[str, dict]: """ Process a real web source with improved content extraction and error handling. """ headers = {'User-Agent': await get_real_user_agent()} source_info = source.copy() source_info['link'] = clean_url(source['link']) # Ensure URL is clean # Skip if URL is invalid if not source_info['link'] or not source_info['link'].startswith(('http://', 'https://')): return source.get('snippet', ''), source_info # Check robots.txt first if not await check_robots_txt(source_info['link']): logging.info(f"Scraping disallowed by robots.txt for {source_info['link']}") return source.get('snippet', ''), source_info try: logging.info(f"Processing source: {source_info['link']}") start_time = time.time() # Skip non-HTML content if any(source_info['link'].lower().endswith(ext) for ext in ['.pdf', '.doc', '.docx', '.ppt', '.pptx', '.xls', '.xlsx']): logging.info(f"Skipping non-HTML content at {source_info['link']}") return source.get('snippet', ''), source_info # Add delay between requests to be polite await asyncio.sleep(REQUEST_DELAY) async with session.get(source_info['link'], headers=headers, timeout=timeout, ssl=False) as response: if response.status != 200: logging.warning(f"HTTP {response.status} for {source_info['link']}") return source.get('snippet', ''), source_info content_type = response.headers.get('Content-Type', '').lower() if 'text/html' not in content_type: logging.info(f"Non-HTML content at {source_info['link']} (type: {content_type})") return source.get('snippet', ''), source_info html = await response.text() soup = BeautifulSoup(html, "html.parser") # Remove unwanted elements for tag in soup(['script', 'style', 'nav', 'footer', 'header', 'aside', 'iframe', 'noscript', 'form']): tag.decompose() # Try to find main content by common patterns selectors_to_try = [ 'main', 'article', '[role="main"]', '.main-content', '.content', '.article-body', '.post-content', '.entry-content', '#content', '#main', '.main', '.article' ] main_content = None for selector in selectors_to_try: main_content = soup.select_one(selector) if main_content: break if not main_content: # If no main content found, try to find the largest text block all_elements = soup.find_all() candidates = [el for el in all_elements if el.name not in ['script', 'style', 'nav', 'footer', 'header']] if candidates: candidates.sort(key=lambda x: len(x.get_text()), reverse=True) main_content = candidates[0] if candidates else soup if not main_content: main_content = soup.find('body') or soup # Clean up the content content = " ".join(main_content.stripped_strings) content = re.sub(r'\s+', ' ', content).strip() # If content is too short, try alternative extraction methods if len(content.split()) < 50 and len(html) > 10000: # Try extracting all paragraphs paras = soup.find_all('p') content = " ".join([p.get_text() for p in paras if p.get_text().strip()]) content = re.sub(r'\s+', ' ', content).strip() # If still too short, try getting all text nodes if len(content.split()) < 50: content = " ".join(soup.stripped_strings) content = re.sub(r'\s+', ' ', content).strip() # If content is still too short, try to extract from specific tags if len(content.split()) < 30: # Try to get content from divs with certain classes for tag in ['div', 'section', 'article']: for element in soup.find_all(tag): if len(element.get_text().split()) > 200: # If this element has substantial content content = " ".join(element.stripped_strings) content = re.sub(r'\s+', ' ', content).strip() if len(content.split()) >= 30: # If we got enough content break if len(content.split()) >= 30: break if len(content.split()) < 30: logging.warning(f"Very little content extracted from {source_info['link']}") return source.get('snippet', ''), source_info source_info['word_count'] = len(content.split()) source_info['processing_time'] = time.time() - start_time return content, source_info except asyncio.TimeoutError: logging.warning(f"Timeout while processing {source_info['link']}") return source.get('snippet', ''), source_info except Exception as e: logging.warning(f"Error processing {source_info['link']}: {str(e)[:200]}") return source.get('snippet', ''), source_info async def generate_research_plan(query: str, session: aiohttp.ClientSession) -> List[str]: """Generate a comprehensive research plan with sub-questions.""" try: plan_prompt = { "model": LLM_MODEL, "messages": [{ "role": "user", "content": f"""Generate 4-6 comprehensive sub-questions for in-depth research on '{query}'. Focus on key aspects that would provide a complete understanding of the topic. Your response MUST be ONLY the raw JSON array with no additional text. Example: ["What is the historical background of X?", "What are the current trends in X?"]""" }], "temperature": 0.7, "max_tokens": 300 } async with session.post(LLM_API_URL, headers=LLM_HEADERS, json=plan_prompt, timeout=30) as response: response.raise_for_status() result = await response.json() if isinstance(result, list): return result elif isinstance(result, dict) and 'choices' in result: content = result['choices'][0]['message']['content'] sub_questions = extract_json_from_llm_response(content) if sub_questions and isinstance(sub_questions, list): cleaned = [] for q in sub_questions: if isinstance(q, str) and q.strip(): cleaned_q = re.sub(r'^[^a-zA-Z0-9]*|[^a-zA-Z0-9]*\$', '', q) if cleaned_q: cleaned.append(cleaned_q) return cleaned[:6] # Limit to 6 questions max # Fallback if we couldn't get good questions from LLM return [ f"What is {query} and its key features?", f"How does {query} compare to alternatives?", f"What are the current developments in {query}?", f"What are the main challenges with {query}?", f"What does the future hold for {query}?" ] except Exception as e: logging.error(f"Failed to generate research plan: {e}") return [ f"What is {query}?", f"What are the key aspects of {query}?", f"What are current trends in {query}?", f"What are the challenges with {query}?" ] async def continuous_search(query: str, search_time: int = 120) -> List[dict]: """ Perform continuous searching for better results within time constraints. Provides detailed feedback about the search process. """ start_time = time.time() all_results = [] seen_urls = set() seen_domains = defaultdict(int) search_iterations = 0 # Generate multiple variations of the query query_variations = [ query, f"{query} comparison", f"{query} analysis", f"{query} review", f"{query} features", f"{query} vs alternatives", f"latest {query} news", f"{query} pros and cons" ] async with aiohttp.ClientSession() as session: while time.time() - start_time < search_time: search_iterations += 1 # Shuffle the query variations to get diverse results random.shuffle(query_variations) # Use only a subset of queries each iteration queries_for_this_iteration = query_variations[:min(3, len(query_variations))] for q in queries_for_this_iteration: if time.time() - start_time >= search_time: break try: # Notify about current search logging.info(f"Searching for: '{q}'") results = await fetch_search_results(q, max_results=5) if results: for result in results: clean_link = clean_url(result['link']) domain = urlparse(clean_link).netloc if clean_link else "" # Skip if we've already seen this URL if clean_link in seen_urls: continue # Skip if we have too many results from this domain if domain and seen_domains[domain] >= 2: # Max 2 results per domain continue seen_urls.add(clean_link) if domain: seen_domains[domain] += 1 result['link'] = clean_link all_results.append(result) logging.info(f"Found new result: {result['title']} ({domain})") # Small delay between searches await asyncio.sleep(1.0) # If we have enough unique results, we can stop early if len(all_results) >= MAX_SOURCES_TO_PROCESS * 2: # Get more than we need for selection logging.info(f"Found enough unique results ({len(all_results)})") break except Exception as e: logging.error(f"Error during continuous search: {e}") await asyncio.sleep(2.0) # Wait a bit before trying again # Break if we've done several iterations if search_iterations >= 4: # Limit to 4 search iterations break # Filter and sort results by relevance if all_results: # Simple relevance scoring def score_result(result): query_terms = set(query.lower().split()) title = result['title'].lower() snippet = result['snippet'].lower() matches = 0 for term in query_terms: if term in title or term in snippet: matches += 1 # Also consider length of snippet as a proxy for content richness snippet_length = len(result['snippet'].split()) # Prefer results from diverse domains domain = urlparse(result['link']).netloc if result['link'] else "" domain_score = 10 if seen_domains[domain] <= 1 else 5 # Bonus for unique domains return matches * 10 + snippet_length + domain_score # Sort by score, descending all_results.sort(key=lambda x: score_result(x), reverse=True) return all_results[:MAX_SOURCES_TO_PROCESS * 2] # Return more than we need for selection async def filter_and_select_sources(results: List[dict]) -> List[dict]: """ Filter and select the best sources from search results. Returns a tuple of (selected_sources, rejected_sources_with_reasons) """ if not results: return [], [] # Group by domain to ensure diversity domain_counts = defaultdict(int) domain_results = defaultdict(list) for result in results: domain = urlparse(result['link']).netloc if result['link'] else "" domain_counts[domain] += 1 domain_results[domain].append(result) selected = [] rejected = [] # First pass: take the top result from each domain for domain, domain_res in domain_results.items(): if len(selected) >= MAX_SOURCES_TO_PROCESS: break # Take the best result from this domain (sorted by position in original results) if domain_res: # Sort domain results by snippet length (proxy for content richness) domain_res.sort(key=lambda x: len(x['snippet'].split()), reverse=True) selected.append(domain_res[0]) # Second pass: if we need more, take additional results from domains with good content if len(selected) < MAX_SOURCES_TO_PROCESS: # Calculate average snippet length as a proxy for content quality domain_quality = {} for domain, domain_res in domain_results.items(): if not domain_res: continue avg_length = sum(len(r['snippet'].split()) for r in domain_res) / len(domain_res) domain_quality[domain] = avg_length # Sort domains by quality sorted_domains = sorted(domain_quality.items(), key=lambda x: x[1], reverse=True) # Add more results from high-quality domains for domain, _ in sorted_domains: if len(selected) >= MAX_SOURCES_TO_PROCESS: break for res in domain_results[domain]: if res not in selected: selected.append(res) if len(selected) >= MAX_SOURCES_TO_PROCESS: break # Third pass: if still need more, add remaining high-snippet-length results if len(selected) < MAX_SOURCES_TO_PROCESS: # Sort all results by snippet length remaining_results = [res for res in results if res not in selected] remaining_results.sort(key=lambda x: len(x['snippet'].split()), reverse=True) while len(selected) < MAX_SOURCES_TO_PROCESS and remaining_results: selected.append(remaining_results.pop(0)) # The remaining results are our rejected ones (for now we won't track reasons) rejected = [res for res in results if res not in selected] return selected, rejected async def run_deep_research_stream(query: str, search_time: int = 120) -> AsyncGenerator[str, None]: def format_sse(data: dict) -> str: return f"data: {json.dumps(data)}\n\n" start_time = time.time() processed_sources = 0 successful_sources = 0 total_tokens = 0 try: # Initialize the SSE stream with start message yield format_sse({ "event": "status", "data": f"Starting deep research on '{query}'. Searching for comprehensive sources..." }) async with aiohttp.ClientSession() as session: # Step 1: Generate research plan yield format_sse({"event": "status", "data": "Generating comprehensive research plan..."}) sub_questions = await generate_research_plan(query, session) yield format_sse({ "event": "plan", "data": { "sub_questions": sub_questions, "message": f"Research will focus on these {len(sub_questions)} key aspects" } }) # Step 2: Continuous search for better results yield format_sse({ "event": "status", "data": "Performing intelligent search for high-quality sources..." }) # Show search variations we'll use query_variations = [ query, f"{query} comparison", f"{query} analysis", f"{query} review", f"{query} features", f"{query} vs alternatives" ] yield format_sse({ "event": "status", "data": f"Using {len(query_variations)} different search variations to find diverse sources" }) search_results = await continuous_search(query, search_time) # Report on search results unique_domains = len({urlparse(r['link']).netloc for r in search_results if r['link']}) yield format_sse({ "event": "status", "data": f"Found {len(search_results)} potential sources from {unique_domains} unique domains" }) # Display some of the top sources found if search_results: top_sources = search_results[:5] # Show top 5 sources_list = [] for i, source in enumerate(top_sources, 1): domain = urlparse(source['link']).netloc if source['link'] else "Unknown" sources_list.append(f"{i}. {source['title']} ({domain})") yield format_sse({ "event": "sources_found", "data": { "top_sources": sources_list, "total_sources": len(search_results) } }) if not search_results: yield format_sse({ "event": "error", "data": "No search results found. Check your query and try again." }) return # Select the best sources selected_sources, rejected_sources = await filter_and_select_sources(search_results) # Report on selected sources unique_selected_domains = len({urlparse(r['link']).netloc for r in selected_sources if r['link']}) yield format_sse({ "event": "status", "data": f"Selected {len(selected_sources)} high-quality sources from {unique_selected_domains} unique domains for in-depth analysis" }) if not selected_sources: yield format_sse({ "event": "error", "data": "No valid sources found after filtering." }) return # Show selected sources selected_sources_list = [] for i, source in enumerate(selected_sources, 1): domain = urlparse(source['link']).netloc if source['link'] else "Unknown" selected_sources_list.append(f"{i}. {source['title']} ({domain})") yield format_sse({ "event": "sources_selected", "data": { "selected_sources": selected_sources_list, "message": "Proceeding with in-depth analysis of these sources" } }) # Step 3: Process selected sources with concurrency control semaphore = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS) consolidated_context = "" all_sources_used = [] processing_errors = 0 async def process_with_semaphore(source): async with semaphore: return await process_web_source(session, source, timeout=20) # Process sources with progress updates processing_tasks = [] for i, source in enumerate(selected_sources): # Check if we're running out of time elapsed = time.time() - start_time if elapsed > TOTAL_TIMEOUT * 0.8: # Leave 20% of time for synthesis yield format_sse({ "event": "status", "data": f"Approaching time limit, stopping source processing after {i}/{len(selected_sources)} sources" }) break # Add delay between processing each source to be polite if i > 0: await asyncio.sleep(REQUEST_DELAY * 0.5) # Notify about processing this source domain = urlparse(source['link']).netloc if source['link'] else "Unknown" yield format_sse({ "event": "processing_source", "data": { "index": i + 1, "total": len(selected_sources), "title": source['title'], "domain": domain, "url": source['link'] } }) task = asyncio.create_task(process_with_semaphore(source)) processing_tasks.append(task) # Process completed tasks as they finish for future in asyncio.as_completed(processing_tasks): processed_sources += 1 content, source_info = await future if content and content.strip(): # Report successful processing domain = urlparse(source_info['link']).netloc if source_info['link'] else "Unknown" word_count = len(content.split()) yield format_sse({ "event": "source_processed", "data": { "title": source_info['title'], "domain": domain, "word_count": word_count, "status": "success" } }) # Add to our consolidated context consolidated_context += f"Source: {source_info['link']}\nContent: {content}\n\n---\n\n" all_sources_used.append(source_info) successful_sources += 1 total_tokens += word_count # Add to token count else: processing_errors += 1 yield format_sse({ "event": "source_processed", "data": { "title": source_info['title'], "status": "failed", "reason": "Could not extract sufficient content" } }) if not consolidated_context.strip(): yield format_sse({ "event": "error", "data": f"Failed to extract content from any sources. {processing_errors} errors occurred." }) return # Report on processing results yield format_sse({ "event": "status", "data": f"Successfully processed {successful_sources} of {processed_sources} sources, extracting approximately {total_tokens} words of content" }) # Step 4: Synthesize comprehensive report time_remaining = max(0, TOTAL_TIMEOUT - (time.time() - start_time)) yield format_sse({ "event": "status", "data": f"Generating comprehensive analysis report from {successful_sources} sources..." }) max_output_tokens = min(2000, int(time_remaining * 6)) # More aggressive token count report_prompt = f"""Compose a comprehensive analysis report on "{query}". Structure the report with these sections: 1. Executive Summary 2. Key Features and Capabilities 3. Comparative Analysis 4. Strengths and Weaknesses 5. Current Trends and