""" Web Scraping and Article Extraction Module Comprehensive web scraping capabilities for: - News articles - Analysis pieces - Intelligence reports - Research papers - Real-time news feeds Supports multiple extraction methods for robustness. """ import re import requests from datetime import datetime from urllib.parse import urlparse from typing import ( List, Dict, Any, Tuple, Optional, Callable, ) # ----------------------------------------------------- # WEB SCRAPER # ----------------------------------------------------- class WebScraper: """ General-purpose web scraper for geopolitical content. Handles various website structures and content types. """ def __init__(self, user_agent: Optional[str] = None): self.user_agent = user_agent or "GeoBotv1/1.0 (Geopolitical Analysis)" self.session = requests.Session() self.session.headers.update({"User-Agent": self.user_agent}) def fetch_url(self, url: str, timeout: int = 30) -> Dict[str, Any]: """Fetch raw HTML from a URL.""" try: response = self.session.get(url, timeout=timeout) response.raise_for_status() return { "url": url, "status_code": response.status_code, "content": response.text, "headers": dict(response.headers), "encoding": response.encoding, "timestamp": datetime.now().isoformat(), } except requests.RequestException as e: return { "url": url, "error": str(e), "status_code": None, "content": None, "timestamp": datetime.now().isoformat(), } def parse_html(self, html_content: str) -> Dict[str, Any]: """Parse HTML using BeautifulSoup if available.""" try: from bs4 import BeautifulSoup soup = BeautifulSoup(html_content, "html.parser") parsed = { "title": soup.title.string if soup.title else "", "text": soup.get_text(), "links": [a.get("href") for a in soup.find_all("a", href=True)], "images": [img.get("src") for img in soup.find_all("img", src=True)], "meta": {}, } for meta in soup.find_all("meta"): name = meta.get("name") or meta.get("property") content = meta.get("content") if name and content: parsed["meta"][name] = content return parsed except ImportError: # Fallback if soup is missing return { "title": "", "text": self._simple_html_strip(html_content), "links": [], "images": [], "meta": {}, } def _simple_html_strip(self, html: str) -> str: """Simple fallback for removing HTML tags.""" return re.sub(r"<[^>]+>", "", html) def scrape_url(self, url: str) -> Dict[str, Any]: """Fetch + parse a URL.""" response = self.fetch_url(url) if response.get("error"): return response parsed = self.parse_html(response["content"]) return { "url": url, "domain": urlparse(url).netloc, "title": parsed["title"], "text": parsed["text"], "meta": parsed["meta"], "links": parsed["links"], "images": parsed["images"], "timestamp": response["timestamp"], "status_code": response["status_code"], } # ----------------------------------------------------- # ARTICLE EXTRACTION # ----------------------------------------------------- class ArticleExtractor: """ Wrapper for newspaper3k / trafilatura / fallback extraction. """ def __init__(self, method: str = "auto"): self.method = method self._check_dependencies() def _check_dependencies(self) -> None: self.has_newspaper = False self.has_trafilatura = False try: import newspaper # noqa self.has_newspaper = True except ImportError: pass try: import trafilatura # noqa self.has_trafilatura = True except ImportError: pass def extract_article(self, url: str) -> Dict[str, Any]: """Choose extraction method based on dependencies.""" method = self.method if method == "auto": if self.has_newspaper: method = "newspaper" elif self.has_trafilatura: method = "trafilatura" else: method = "basic" if method == "newspaper": return self._extract_with_newspaper(url) elif method == "trafilatura": return self._extract_with_trafilatura(url) else: return self._extract_basic(url) def _extract_with_newspaper(self, url: str) -> Dict[str, Any]: """Extract article using newspaper3k.""" try: from newspaper import Article article = Article(url) article.download() article.parse() try: article.nlp() keywords = article.keywords summary = article.summary except Exception: keywords = [] summary = "" return { "url": url, "title": article.title, "text": article.text, "authors": article.authors, "publish_date": article.publish_date.isoformat() if article.publish_date else None, "keywords": keywords, "summary": summary, "top_image": article.top_image, "images": list(article.images), "method": "newspaper", "timestamp": datetime.now().isoformat(), } except Exception as e: return {"url": url, "error": str(e), "method": "newspaper"} def _extract_with_trafilatura(self, url: str) -> Dict[str, Any]: """Extract article using trafilatura.""" try: import trafilatura downloaded = trafilatura.fetch_url(url) text = trafilatura.extract(downloaded) metadata = trafilatura.extract_metadata(downloaded) return { "url": url, "title": metadata.title if metadata else "", "text": text or "", "authors": [metadata.author] if metadata and metadata.author else [], "publish_date": metadata.date if metadata else None, "description": metadata.description if metadata else "", "method": "trafilatura", "timestamp": datetime.now().isoformat(), } except Exception as e: return {"url": url, "error": str(e), "method": "trafilatura"} def _extract_basic(self, url: str) -> Dict[str, Any]: scraper = WebScraper() content = scraper.scrape_url(url) return { "url": url, "title": content.get("title", ""), "text": content.get("text", ""), "meta": content.get("meta", {}), "method": "basic", "timestamp": datetime.now().isoformat(), } def batch_extract(self, urls: List[str]) -> List[Dict[str, Any]]: articles = [] for url in urls: try: articles.append(self.extract_article(url)) except Exception as e: print(f"Error extracting {url}: {e}") return articles # ----------------------------------------------------- # NEWS AGGREGATOR # ----------------------------------------------------- class NewsAggregator: """Aggregate RSS feeds + websites into normalized article objects.""" def __init__(self): self.extractor = ArticleExtractor() self.sources: List[Dict[str, Any]] = [] def add_source(self, name: str, url: str, source_type: str = "rss") -> None: self.sources.append({"name": name, "url": url, "type": source_type}) def fetch_news(self, keywords: Optional[List[str]] = None) -> List[Dict[str, Any]]: articles = [] for source in self.sources: try: if source["type"] == "rss": pulled = self._fetch_rss(source["url"]) else: pulled = self._fetch_website(source["url"]) for a in pulled: a["source"] = source["name"] if keywords: txt = a.get("text", "").lower() if any(kw.lower() in txt for kw in keywords): articles.append(a) else: articles.append(a) except Exception as e: print(f"Error fetching from {source['name']}: {e}") return articles def _fetch_rss(self, rss_url: str) -> List[Dict[str, Any]]: try: import feedparser feed = feedparser.parse(rss_url) articles = [] for entry in feed.entries: base = { "title": entry.get("title", ""), "url": entry.get("link", ""), "summary": entry.get("summary", ""), "publish_date": entry.get("published", ""), "authors": [a.get("name") for a in entry.get("authors", [])], } if base["url"]: try: full = self.extractor.extract_article(base["url"]) base["text"] = full.get("text", base["summary"]) except Exception: base["text"] = base["summary"] articles.append(base) return articles except ImportError: print("feedparser not installed: pip install feedparser") return [] def _fetch_website(self, url: str) -> List[Dict[str, Any]]: article = self.extractor.extract_article(url) return [article] if not article.get("error") else [] def monitor_sources( self, keywords: List[str], callback: Optional[Callable[[List[Dict[str, Any]]], None]] = None, interval: int = 3600, ) -> None: """Continuously monitor sources for new articles.""" import time seen: set = set() while True: articles = self.fetch_news(keywords) new_articles = [a for a in articles if a["url"] not in seen] if new_articles and callback: callback(new_articles) seen.update(a["url"] for a in new_articles) time.sleep(interval) def get_trending_topics( self, articles: List[Dict[str, Any]], n_topics: int = 10 ) -> List[Tuple[str, int]]: """Compute most common keywords.""" from collections import Counter words = [] stop = { "the", "a", "an", "and", "or", "but", "in", "on", "at", "to", "for", "of", "with", "by", "from" } for art in articles: text = (art.get("text", "") + " " + art.get("title", "")).lower() ws = [w for w in text.split() if len(w) > 3 and w not in stop] words.extend(ws) return Counter(words).most_common(n_topics)