|
|
""" |
|
|
Web Scraping and Article Extraction Module |
|
|
|
|
|
Comprehensive web scraping capabilities for: |
|
|
- News articles |
|
|
- Analysis pieces |
|
|
- Intelligence reports |
|
|
- Research papers |
|
|
- Real-time news feeds |
|
|
|
|
|
Supports multiple extraction methods for robustness. |
|
|
""" |
|
|
|
|
|
import re |
|
|
import requests |
|
|
from datetime import datetime |
|
|
from urllib.parse import urlparse |
|
|
from typing import ( |
|
|
List, |
|
|
Dict, |
|
|
Any, |
|
|
Tuple, |
|
|
Optional, |
|
|
Callable, |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class WebScraper: |
|
|
""" |
|
|
General-purpose web scraper for geopolitical content. |
|
|
Handles various website structures and content types. |
|
|
""" |
|
|
|
|
|
def __init__(self, user_agent: Optional[str] = None): |
|
|
self.user_agent = user_agent or "GeoBotv1/1.0 (Geopolitical Analysis)" |
|
|
self.session = requests.Session() |
|
|
self.session.headers.update({"User-Agent": self.user_agent}) |
|
|
|
|
|
def fetch_url(self, url: str, timeout: int = 30) -> Dict[str, Any]: |
|
|
"""Fetch raw HTML from a URL.""" |
|
|
try: |
|
|
response = self.session.get(url, timeout=timeout) |
|
|
response.raise_for_status() |
|
|
return { |
|
|
"url": url, |
|
|
"status_code": response.status_code, |
|
|
"content": response.text, |
|
|
"headers": dict(response.headers), |
|
|
"encoding": response.encoding, |
|
|
"timestamp": datetime.now().isoformat(), |
|
|
} |
|
|
|
|
|
except requests.RequestException as e: |
|
|
return { |
|
|
"url": url, |
|
|
"error": str(e), |
|
|
"status_code": None, |
|
|
"content": None, |
|
|
"timestamp": datetime.now().isoformat(), |
|
|
} |
|
|
|
|
|
def parse_html(self, html_content: str) -> Dict[str, Any]: |
|
|
"""Parse HTML using BeautifulSoup if available.""" |
|
|
try: |
|
|
from bs4 import BeautifulSoup |
|
|
|
|
|
soup = BeautifulSoup(html_content, "html.parser") |
|
|
|
|
|
parsed = { |
|
|
"title": soup.title.string if soup.title else "", |
|
|
"text": soup.get_text(), |
|
|
"links": [a.get("href") for a in soup.find_all("a", href=True)], |
|
|
"images": [img.get("src") for img in soup.find_all("img", src=True)], |
|
|
"meta": {}, |
|
|
} |
|
|
|
|
|
for meta in soup.find_all("meta"): |
|
|
name = meta.get("name") or meta.get("property") |
|
|
content = meta.get("content") |
|
|
if name and content: |
|
|
parsed["meta"][name] = content |
|
|
|
|
|
return parsed |
|
|
|
|
|
except ImportError: |
|
|
|
|
|
return { |
|
|
"title": "", |
|
|
"text": self._simple_html_strip(html_content), |
|
|
"links": [], |
|
|
"images": [], |
|
|
"meta": {}, |
|
|
} |
|
|
|
|
|
def _simple_html_strip(self, html: str) -> str: |
|
|
"""Simple fallback for removing HTML tags.""" |
|
|
return re.sub(r"<[^>]+>", "", html) |
|
|
|
|
|
def scrape_url(self, url: str) -> Dict[str, Any]: |
|
|
"""Fetch + parse a URL.""" |
|
|
response = self.fetch_url(url) |
|
|
if response.get("error"): |
|
|
return response |
|
|
|
|
|
parsed = self.parse_html(response["content"]) |
|
|
|
|
|
return { |
|
|
"url": url, |
|
|
"domain": urlparse(url).netloc, |
|
|
"title": parsed["title"], |
|
|
"text": parsed["text"], |
|
|
"meta": parsed["meta"], |
|
|
"links": parsed["links"], |
|
|
"images": parsed["images"], |
|
|
"timestamp": response["timestamp"], |
|
|
"status_code": response["status_code"], |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ArticleExtractor: |
|
|
""" |
|
|
Wrapper for newspaper3k / trafilatura / fallback extraction. |
|
|
""" |
|
|
|
|
|
def __init__(self, method: str = "auto"): |
|
|
self.method = method |
|
|
self._check_dependencies() |
|
|
|
|
|
def _check_dependencies(self) -> None: |
|
|
self.has_newspaper = False |
|
|
self.has_trafilatura = False |
|
|
|
|
|
try: |
|
|
import newspaper |
|
|
self.has_newspaper = True |
|
|
except ImportError: |
|
|
pass |
|
|
|
|
|
try: |
|
|
import trafilatura |
|
|
self.has_trafilatura = True |
|
|
except ImportError: |
|
|
pass |
|
|
|
|
|
def extract_article(self, url: str) -> Dict[str, Any]: |
|
|
"""Choose extraction method based on dependencies.""" |
|
|
method = self.method |
|
|
|
|
|
if method == "auto": |
|
|
if self.has_newspaper: |
|
|
method = "newspaper" |
|
|
elif self.has_trafilatura: |
|
|
method = "trafilatura" |
|
|
else: |
|
|
method = "basic" |
|
|
|
|
|
if method == "newspaper": |
|
|
return self._extract_with_newspaper(url) |
|
|
elif method == "trafilatura": |
|
|
return self._extract_with_trafilatura(url) |
|
|
else: |
|
|
return self._extract_basic(url) |
|
|
|
|
|
def _extract_with_newspaper(self, url: str) -> Dict[str, Any]: |
|
|
"""Extract article using newspaper3k.""" |
|
|
try: |
|
|
from newspaper import Article |
|
|
|
|
|
article = Article(url) |
|
|
article.download() |
|
|
article.parse() |
|
|
|
|
|
try: |
|
|
article.nlp() |
|
|
keywords = article.keywords |
|
|
summary = article.summary |
|
|
except Exception: |
|
|
keywords = [] |
|
|
summary = "" |
|
|
|
|
|
return { |
|
|
"url": url, |
|
|
"title": article.title, |
|
|
"text": article.text, |
|
|
"authors": article.authors, |
|
|
"publish_date": article.publish_date.isoformat() if article.publish_date else None, |
|
|
"keywords": keywords, |
|
|
"summary": summary, |
|
|
"top_image": article.top_image, |
|
|
"images": list(article.images), |
|
|
"method": "newspaper", |
|
|
"timestamp": datetime.now().isoformat(), |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
return {"url": url, "error": str(e), "method": "newspaper"} |
|
|
|
|
|
def _extract_with_trafilatura(self, url: str) -> Dict[str, Any]: |
|
|
"""Extract article using trafilatura.""" |
|
|
try: |
|
|
import trafilatura |
|
|
|
|
|
downloaded = trafilatura.fetch_url(url) |
|
|
text = trafilatura.extract(downloaded) |
|
|
metadata = trafilatura.extract_metadata(downloaded) |
|
|
|
|
|
return { |
|
|
"url": url, |
|
|
"title": metadata.title if metadata else "", |
|
|
"text": text or "", |
|
|
"authors": [metadata.author] if metadata and metadata.author else [], |
|
|
"publish_date": metadata.date if metadata else None, |
|
|
"description": metadata.description if metadata else "", |
|
|
"method": "trafilatura", |
|
|
"timestamp": datetime.now().isoformat(), |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
return {"url": url, "error": str(e), "method": "trafilatura"} |
|
|
|
|
|
def _extract_basic(self, url: str) -> Dict[str, Any]: |
|
|
scraper = WebScraper() |
|
|
content = scraper.scrape_url(url) |
|
|
|
|
|
return { |
|
|
"url": url, |
|
|
"title": content.get("title", ""), |
|
|
"text": content.get("text", ""), |
|
|
"meta": content.get("meta", {}), |
|
|
"method": "basic", |
|
|
"timestamp": datetime.now().isoformat(), |
|
|
} |
|
|
|
|
|
def batch_extract(self, urls: List[str]) -> List[Dict[str, Any]]: |
|
|
articles = [] |
|
|
for url in urls: |
|
|
try: |
|
|
articles.append(self.extract_article(url)) |
|
|
except Exception as e: |
|
|
print(f"Error extracting {url}: {e}") |
|
|
return articles |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class NewsAggregator: |
|
|
"""Aggregate RSS feeds + websites into normalized article objects.""" |
|
|
|
|
|
def __init__(self): |
|
|
self.extractor = ArticleExtractor() |
|
|
self.sources: List[Dict[str, Any]] = [] |
|
|
|
|
|
def add_source(self, name: str, url: str, source_type: str = "rss") -> None: |
|
|
self.sources.append({"name": name, "url": url, "type": source_type}) |
|
|
|
|
|
def fetch_news(self, keywords: Optional[List[str]] = None) -> List[Dict[str, Any]]: |
|
|
articles = [] |
|
|
|
|
|
for source in self.sources: |
|
|
try: |
|
|
if source["type"] == "rss": |
|
|
pulled = self._fetch_rss(source["url"]) |
|
|
else: |
|
|
pulled = self._fetch_website(source["url"]) |
|
|
|
|
|
for a in pulled: |
|
|
a["source"] = source["name"] |
|
|
if keywords: |
|
|
txt = a.get("text", "").lower() |
|
|
if any(kw.lower() in txt for kw in keywords): |
|
|
articles.append(a) |
|
|
else: |
|
|
articles.append(a) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error fetching from {source['name']}: {e}") |
|
|
|
|
|
return articles |
|
|
|
|
|
def _fetch_rss(self, rss_url: str) -> List[Dict[str, Any]]: |
|
|
try: |
|
|
import feedparser |
|
|
|
|
|
feed = feedparser.parse(rss_url) |
|
|
articles = [] |
|
|
|
|
|
for entry in feed.entries: |
|
|
base = { |
|
|
"title": entry.get("title", ""), |
|
|
"url": entry.get("link", ""), |
|
|
"summary": entry.get("summary", ""), |
|
|
"publish_date": entry.get("published", ""), |
|
|
"authors": [a.get("name") for a in entry.get("authors", [])], |
|
|
} |
|
|
|
|
|
if base["url"]: |
|
|
try: |
|
|
full = self.extractor.extract_article(base["url"]) |
|
|
base["text"] = full.get("text", base["summary"]) |
|
|
except Exception: |
|
|
base["text"] = base["summary"] |
|
|
|
|
|
articles.append(base) |
|
|
|
|
|
return articles |
|
|
|
|
|
except ImportError: |
|
|
print("feedparser not installed: pip install feedparser") |
|
|
return [] |
|
|
|
|
|
def _fetch_website(self, url: str) -> List[Dict[str, Any]]: |
|
|
article = self.extractor.extract_article(url) |
|
|
return [article] if not article.get("error") else [] |
|
|
|
|
|
def monitor_sources( |
|
|
self, |
|
|
keywords: List[str], |
|
|
callback: Optional[Callable[[List[Dict[str, Any]]], None]] = None, |
|
|
interval: int = 3600, |
|
|
) -> None: |
|
|
"""Continuously monitor sources for new articles.""" |
|
|
import time |
|
|
|
|
|
seen: set = set() |
|
|
|
|
|
while True: |
|
|
articles = self.fetch_news(keywords) |
|
|
new_articles = [a for a in articles if a["url"] not in seen] |
|
|
|
|
|
if new_articles and callback: |
|
|
callback(new_articles) |
|
|
|
|
|
seen.update(a["url"] for a in new_articles) |
|
|
|
|
|
time.sleep(interval) |
|
|
|
|
|
def get_trending_topics( |
|
|
self, |
|
|
articles: List[Dict[str, Any]], |
|
|
n_topics: int = 10 |
|
|
) -> List[Tuple[str, int]]: |
|
|
"""Compute most common keywords.""" |
|
|
from collections import Counter |
|
|
|
|
|
words = [] |
|
|
stop = { |
|
|
"the", "a", "an", "and", "or", "but", "in", "on", "at", |
|
|
"to", "for", "of", "with", "by", "from" |
|
|
} |
|
|
|
|
|
for art in articles: |
|
|
text = (art.get("text", "") + " " + art.get("title", "")).lower() |
|
|
ws = [w for w in text.split() if len(w) > 3 and w not in stop] |
|
|
words.extend(ws) |
|
|
|
|
|
return Counter(words).most_common(n_topics) |
|
|
|