Spaces:
Paused
Paused
| # | |
| # SPDX-FileCopyrightText: Hadad <hadad@linuxmail.org> | |
| # SPDX-License-Identifier: Apache-2.0 | |
| # | |
| import asyncio | |
| import aiohttp | |
| import requests | |
| from urllib.parse import quote | |
| from config import CONTENT_EXTRACTION, SEARCH_SELECTION | |
| from src.core.web_loader import web_loader | |
| class BrowserEngine: | |
| def __init__(self, configuration): | |
| self.config = configuration | |
| def generate_headers(self): | |
| ipv4 = web_loader.get_ipv4() | |
| ipv6 = web_loader.get_ipv6() | |
| user_agent = web_loader.get_user_agent() | |
| origin = web_loader.get_origin() | |
| referrer = web_loader.get_referrer() | |
| location = web_loader.get_location() | |
| return { | |
| "User-Agent": user_agent, | |
| "X-Forwarded-For": f"{ipv4}, {ipv6}", | |
| "X-Real-IP": ipv4, | |
| "X-Originating-IP": ipv4, | |
| "X-Remote-IP": ipv4, | |
| "X-Remote-Addr": ipv4, | |
| "X-Client-IP": ipv4, | |
| "X-Forwarded-Host": origin.replace("https://", "").replace("http://", ""), | |
| "Origin": origin, | |
| "Referer": referrer, | |
| "Accept-Language": f"{location['language']},en;q=0.9", | |
| "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", | |
| "Accept-Encoding": "gzip, deflate, br", | |
| "DNT": "1", | |
| "Connection": "keep-alive", | |
| "Upgrade-Insecure-Requests": "1", | |
| "Sec-Fetch-Dest": "document", | |
| "Sec-Fetch-Mode": "navigate", | |
| "Sec-Fetch-Site": "cross-site", | |
| "Sec-Fetch-User": "?1", | |
| "Cache-Control": "max-age=0", | |
| "X-Country": location['country'], | |
| "X-Timezone": location['timezone'] | |
| } | |
| def _build_search_url_and_selector(self, search_query: str, search_provider: str = "google"): | |
| if search_provider == "baidu": | |
| return ( | |
| f"{self.config.content_reader_api}{self.config.baidu_endpoint}?wd={quote(search_query)}", | |
| "#content_left" | |
| ) | |
| provider_prefix = "!go" if search_provider == "google" else "!bi" | |
| return ( | |
| f"{self.config.content_reader_api}{self.config.searxng_endpoint}?q={quote(f'{provider_prefix} {search_query}')}", | |
| "#urls" | |
| ) | |
| async def _async_post(self, url: str, data: dict, headers: dict): | |
| timeout = aiohttp.ClientTimeout(total=self.config.request_timeout) | |
| async with aiohttp.ClientSession(timeout=timeout) as session: | |
| async with session.post(url, data=data, headers=headers) as response: | |
| text = await response.text() | |
| if response.status >= 400: | |
| raise aiohttp.ClientResponseError( | |
| request_info=response.request_info, | |
| history=response.history, | |
| status=response.status, | |
| message=text, | |
| headers=response.headers | |
| ) | |
| return text | |
| async def _async_get(self, url: str, headers: dict): | |
| timeout = aiohttp.ClientTimeout(total=self.config.request_timeout) | |
| async with aiohttp.ClientSession(timeout=timeout) as session: | |
| async with session.get(url, headers=headers) as response: | |
| text = await response.text() | |
| if response.status >= 400: | |
| raise aiohttp.ClientResponseError( | |
| request_info=response.request_info, | |
| history=response.history, | |
| status=response.status, | |
| message=text, | |
| headers=response.headers | |
| ) | |
| return text | |
| def _sync_post(self, url: str, data: dict, headers: dict): | |
| response = requests.post(url, data=data, headers=headers, timeout=self.config.request_timeout) | |
| response.raise_for_status() | |
| return response.text | |
| def _sync_get(self, url: str, headers: dict): | |
| response = requests.get(url, headers=headers, timeout=self.config.request_timeout) | |
| response.raise_for_status() | |
| return response.text | |
| async def async_extract_page_content(self, target_url: str) -> str: | |
| headers = self.generate_headers() | |
| payload = {"url": target_url} | |
| extracted_content = await self._async_post(self.config.content_reader_api, payload, headers) | |
| return f"{extracted_content}\n\n\n{CONTENT_EXTRACTION}\n\n\n" | |
| def extract_page_content(self, target_url: str) -> str: | |
| try: | |
| return asyncio.run(self.async_extract_page_content(target_url)) | |
| except Exception: | |
| try: | |
| headers = self.generate_headers() | |
| payload = {"url": target_url} | |
| extracted_content = self._sync_post(self.config.content_reader_api, payload, headers) | |
| return f"{extracted_content}\n\n\n{CONTENT_EXTRACTION}\n\n\n" | |
| except Exception as error: | |
| return f"Error reading URL: {str(error)}" | |
| async def async_perform_search(self, search_query: str, search_provider: str = "google") -> str: | |
| headers = self.generate_headers() | |
| full_url, selector = self._build_search_url_and_selector(search_query, search_provider) | |
| headers["X-Target-Selector"] = selector | |
| search_results = await self._async_get(full_url, headers) | |
| return f"{search_results}\n\n\n{SEARCH_SELECTION}\n\n\n" | |
| def perform_search(self, search_query: str, search_provider: str = "google") -> str: | |
| try: | |
| return asyncio.run(self.async_perform_search(search_query, search_provider)) | |
| except Exception: | |
| try: | |
| headers = self.generate_headers() | |
| full_url, selector = self._build_search_url_and_selector(search_query, search_provider) | |
| headers["X-Target-Selector"] = selector | |
| search_results = self._sync_get(full_url, headers) | |
| return f"{search_results}\n\n\n{SEARCH_SELECTION}\n\n\n" | |
| except Exception as error: | |
| return f"Error during search: {str(error)}" |