Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import requests | |
| from bs4 import BeautifulSoup | |
| import re | |
| from requests.sessions import Session | |
| from langdetect import detect, LangDetectException | |
| from deep_translator import GoogleTranslator | |
| from requests.exceptions import RequestException | |
| import time | |
| # --- CONSTANTS --- | |
| HEADERS = { | |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36", | |
| "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", | |
| "Accept-Language": "en-US,en;q=0.5" | |
| } | |
| # --- HELPER FUNCTIONS --- | |
| def get_session(): | |
| """Creates a session with standard headers.""" | |
| session = Session() | |
| session.headers.update(HEADERS) | |
| return session | |
| def perform_login(session, login_url, email, password): | |
| """ | |
| Attempts a login. | |
| ENHANCEMENT: Fetches the page first to find hidden CSRF tokens. | |
| """ | |
| try: | |
| # 1. Get the login page first to set cookies/CSRF tokens | |
| page = session.get(login_url, timeout=10) | |
| page.raise_for_status() | |
| soup = BeautifulSoup(page.content, 'html.parser') | |
| payload = { | |
| 'email': email, # Note: Some sites use 'username', this is specific to target | |
| 'password': password | |
| } | |
| # 2. Find hidden inputs (often required for CSRF protection) | |
| form = soup.find('form') | |
| if form: | |
| for input_tag in form.find_all('input', type='hidden'): | |
| name = input_tag.get('name') | |
| value = input_tag.get('value') | |
| if name and value: | |
| payload[name] = value | |
| # 3. Post credentials | |
| response = session.post(login_url, data=payload, timeout=10) | |
| response.raise_for_status() | |
| return True, "Login request sent." | |
| except Exception as e: | |
| return False, str(e) | |
| def chunk_text(text, max_chars=4500): | |
| """Splits text into chunks to respect translation API limits.""" | |
| return [text[i:i+max_chars] for i in range(0, len(text), max_chars)] | |
| def translate_content(text, target_lang='en'): | |
| """ | |
| Translates text using Deep Translator (more stable). | |
| Optimized to check language first, then translate in chunks. | |
| """ | |
| if not text or len(text) < 5: | |
| return text | |
| try: | |
| # Check language first to avoid unnecessary API calls | |
| detected_lang = detect(text[:1000]) # Detect based on first 1000 chars | |
| if detected_lang == target_lang: | |
| return text | |
| except LangDetectException: | |
| pass # If detection fails, attempt translation anyway | |
| translator = GoogleTranslator(source='auto', target=target_lang) | |
| chunks = chunk_text(text) | |
| translated_chunks = [] | |
| progress_bar = st.progress(0) | |
| for i, chunk in enumerate(chunks): | |
| try: | |
| translated = translator.translate(chunk) | |
| translated_chunks.append(translated) | |
| except Exception: | |
| # Fallback: append original if translation fails | |
| translated_chunks.append(chunk) | |
| # Update progress | |
| progress_bar.progress((i + 1) / len(chunks)) | |
| progress_bar.empty() # Remove bar when done | |
| return " ".join(translated_chunks) | |
| def scrape_url(url, query_selector=None, auth_details=None): | |
| session = get_session() | |
| # --- Authentication Phase --- | |
| if auth_details and auth_details.get('login_url'): | |
| st.info("π Attempting authentication...") | |
| success, msg = perform_login( | |
| session, | |
| auth_details['login_url'], | |
| auth_details['email'], | |
| auth_details['password'] | |
| ) | |
| if not success: | |
| st.error(f"Authentication failed (proceeding as guest): {msg}") | |
| else: | |
| st.success("Authentication request sent (Session updated).") | |
| # --- Fetching Phase --- | |
| try: | |
| response = session.get(url, timeout=15) | |
| response.raise_for_status() | |
| except RequestException as e: | |
| return None, f"Network Error: {e}" | |
| # --- Parsing Phase --- | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| # Remove clutter | |
| for tag in soup(["script", "style", "meta", "link", "noscript", "header", "footer", "nav", "aside", "svg", "iframe", "ad"]): | |
| tag.decompose() # decompose is faster than extract | |
| # Extraction | |
| if query_selector: | |
| elements = soup.select(query_selector) | |
| if not elements: | |
| return None, "Query selector found no elements." | |
| # Get text with separator to prevent words merging | |
| text_content = "\n".join([el.get_text(separator=' ', strip=True) for el in elements]) | |
| else: | |
| # Get all visible text from body, using separator | |
| if soup.body: | |
| text_content = soup.body.get_text(separator=' ', strip=True) | |
| else: | |
| text_content = soup.get_text(separator=' ', strip=True) | |
| # Clean up whitespace | |
| clean_text = re.sub(r'\s+', ' ', text_content).strip() | |
| return clean_text, None | |
| # --- MAIN APP --- | |
| def main(): | |
| st.set_page_config(page_title="Universal Web Scraper", page_icon="π·οΈ", layout="wide") | |
| st.title("π·οΈ Universal Web Scraper & Translator") | |
| st.markdown("---") | |
| # --- Sidebar Inputs --- | |
| with st.sidebar: | |
| st.header("βοΈ Configuration") | |
| url_input = st.text_input("Target URL", placeholder="https://example.com") | |
| query_selector = st.text_input("CSS Selector (Optional)", placeholder="e.g., div.article-content") | |
| enable_translation = st.checkbox("Enable Auto-Translation", value=True) | |
| with st.expander("π Authentication (Advanced)"): | |
| st.caption("Only use if the content is behind a login.") | |
| login_url = st.text_input("Login Page URL") | |
| email = st.text_input("Email/Username") | |
| password = st.text_input("Password", type="password") | |
| # --- Main Logic --- | |
| if st.button("π Start Scraping", type="primary"): | |
| if not url_input: | |
| st.warning("Please enter a URL to proceed.") | |
| return | |
| auth_details = None | |
| if login_url and email and password: | |
| auth_details = {'login_url': login_url, 'email': email, 'password': password} | |
| with st.spinner("Fetching and processing data..."): | |
| # 1. Scrape | |
| scraped_text, error = scrape_url(url_input, query_selector, auth_details) | |
| if error: | |
| st.error(error) | |
| elif not scraped_text: | |
| st.warning("No text content found at this URL. The site might be JavaScript-rendered (SPA).") | |
| else: | |
| # 2. Translate (if enabled) | |
| final_text = scraped_text | |
| if enable_translation: | |
| with st.status("Detecting language and translating...", expanded=True) as status: | |
| final_text = translate_content(scraped_text) | |
| status.update(label="Processing Complete!", state="complete", expanded=False) | |
| # 3. Display Results | |
| st.success(f"Successfully extracted {len(final_text)} characters.") | |
| tab1, tab2 = st.tabs(["π Cleaned Text", "π Raw Preview"]) | |
| with tab1: | |
| st.text_area("Content", final_text, height=400) | |
| # Download Button | |
| st.download_button( | |
| label="π₯ Download Text", | |
| data=final_text, | |
| file_name="scraped_data.txt", | |
| mime="text/plain" | |
| ) | |
| with tab2: | |
| st.json({ | |
| "source": url_input, | |
| "length": len(final_text), | |
| "selector_used": query_selector if query_selector else "Body (Default)", | |
| "snippet": final_text[:500] + "..." | |
| }) | |
| if __name__ == "__main__": | |
| main() |