clarkkitchen22's picture
Initial GeoBot Forecasting Framework commit
484e3bc
"""
Web Scraping and Article Extraction Module
Comprehensive web scraping capabilities for:
- News articles
- Analysis pieces
- Intelligence reports
- Research papers
- Real-time news feeds
Supports multiple extraction methods for robustness.
"""
import re
import requests
from datetime import datetime
from urllib.parse import urlparse
from typing import (
List,
Dict,
Any,
Tuple,
Optional,
Callable,
)
# -----------------------------------------------------
# WEB SCRAPER
# -----------------------------------------------------
class WebScraper:
"""
General-purpose web scraper for geopolitical content.
Handles various website structures and content types.
"""
def __init__(self, user_agent: Optional[str] = None):
self.user_agent = user_agent or "GeoBotv1/1.0 (Geopolitical Analysis)"
self.session = requests.Session()
self.session.headers.update({"User-Agent": self.user_agent})
def fetch_url(self, url: str, timeout: int = 30) -> Dict[str, Any]:
"""Fetch raw HTML from a URL."""
try:
response = self.session.get(url, timeout=timeout)
response.raise_for_status()
return {
"url": url,
"status_code": response.status_code,
"content": response.text,
"headers": dict(response.headers),
"encoding": response.encoding,
"timestamp": datetime.now().isoformat(),
}
except requests.RequestException as e:
return {
"url": url,
"error": str(e),
"status_code": None,
"content": None,
"timestamp": datetime.now().isoformat(),
}
def parse_html(self, html_content: str) -> Dict[str, Any]:
"""Parse HTML using BeautifulSoup if available."""
try:
from bs4 import BeautifulSoup
soup = BeautifulSoup(html_content, "html.parser")
parsed = {
"title": soup.title.string if soup.title else "",
"text": soup.get_text(),
"links": [a.get("href") for a in soup.find_all("a", href=True)],
"images": [img.get("src") for img in soup.find_all("img", src=True)],
"meta": {},
}
for meta in soup.find_all("meta"):
name = meta.get("name") or meta.get("property")
content = meta.get("content")
if name and content:
parsed["meta"][name] = content
return parsed
except ImportError:
# Fallback if soup is missing
return {
"title": "",
"text": self._simple_html_strip(html_content),
"links": [],
"images": [],
"meta": {},
}
def _simple_html_strip(self, html: str) -> str:
"""Simple fallback for removing HTML tags."""
return re.sub(r"<[^>]+>", "", html)
def scrape_url(self, url: str) -> Dict[str, Any]:
"""Fetch + parse a URL."""
response = self.fetch_url(url)
if response.get("error"):
return response
parsed = self.parse_html(response["content"])
return {
"url": url,
"domain": urlparse(url).netloc,
"title": parsed["title"],
"text": parsed["text"],
"meta": parsed["meta"],
"links": parsed["links"],
"images": parsed["images"],
"timestamp": response["timestamp"],
"status_code": response["status_code"],
}
# -----------------------------------------------------
# ARTICLE EXTRACTION
# -----------------------------------------------------
class ArticleExtractor:
"""
Wrapper for newspaper3k / trafilatura / fallback extraction.
"""
def __init__(self, method: str = "auto"):
self.method = method
self._check_dependencies()
def _check_dependencies(self) -> None:
self.has_newspaper = False
self.has_trafilatura = False
try:
import newspaper # noqa
self.has_newspaper = True
except ImportError:
pass
try:
import trafilatura # noqa
self.has_trafilatura = True
except ImportError:
pass
def extract_article(self, url: str) -> Dict[str, Any]:
"""Choose extraction method based on dependencies."""
method = self.method
if method == "auto":
if self.has_newspaper:
method = "newspaper"
elif self.has_trafilatura:
method = "trafilatura"
else:
method = "basic"
if method == "newspaper":
return self._extract_with_newspaper(url)
elif method == "trafilatura":
return self._extract_with_trafilatura(url)
else:
return self._extract_basic(url)
def _extract_with_newspaper(self, url: str) -> Dict[str, Any]:
"""Extract article using newspaper3k."""
try:
from newspaper import Article
article = Article(url)
article.download()
article.parse()
try:
article.nlp()
keywords = article.keywords
summary = article.summary
except Exception:
keywords = []
summary = ""
return {
"url": url,
"title": article.title,
"text": article.text,
"authors": article.authors,
"publish_date": article.publish_date.isoformat() if article.publish_date else None,
"keywords": keywords,
"summary": summary,
"top_image": article.top_image,
"images": list(article.images),
"method": "newspaper",
"timestamp": datetime.now().isoformat(),
}
except Exception as e:
return {"url": url, "error": str(e), "method": "newspaper"}
def _extract_with_trafilatura(self, url: str) -> Dict[str, Any]:
"""Extract article using trafilatura."""
try:
import trafilatura
downloaded = trafilatura.fetch_url(url)
text = trafilatura.extract(downloaded)
metadata = trafilatura.extract_metadata(downloaded)
return {
"url": url,
"title": metadata.title if metadata else "",
"text": text or "",
"authors": [metadata.author] if metadata and metadata.author else [],
"publish_date": metadata.date if metadata else None,
"description": metadata.description if metadata else "",
"method": "trafilatura",
"timestamp": datetime.now().isoformat(),
}
except Exception as e:
return {"url": url, "error": str(e), "method": "trafilatura"}
def _extract_basic(self, url: str) -> Dict[str, Any]:
scraper = WebScraper()
content = scraper.scrape_url(url)
return {
"url": url,
"title": content.get("title", ""),
"text": content.get("text", ""),
"meta": content.get("meta", {}),
"method": "basic",
"timestamp": datetime.now().isoformat(),
}
def batch_extract(self, urls: List[str]) -> List[Dict[str, Any]]:
articles = []
for url in urls:
try:
articles.append(self.extract_article(url))
except Exception as e:
print(f"Error extracting {url}: {e}")
return articles
# -----------------------------------------------------
# NEWS AGGREGATOR
# -----------------------------------------------------
class NewsAggregator:
"""Aggregate RSS feeds + websites into normalized article objects."""
def __init__(self):
self.extractor = ArticleExtractor()
self.sources: List[Dict[str, Any]] = []
def add_source(self, name: str, url: str, source_type: str = "rss") -> None:
self.sources.append({"name": name, "url": url, "type": source_type})
def fetch_news(self, keywords: Optional[List[str]] = None) -> List[Dict[str, Any]]:
articles = []
for source in self.sources:
try:
if source["type"] == "rss":
pulled = self._fetch_rss(source["url"])
else:
pulled = self._fetch_website(source["url"])
for a in pulled:
a["source"] = source["name"]
if keywords:
txt = a.get("text", "").lower()
if any(kw.lower() in txt for kw in keywords):
articles.append(a)
else:
articles.append(a)
except Exception as e:
print(f"Error fetching from {source['name']}: {e}")
return articles
def _fetch_rss(self, rss_url: str) -> List[Dict[str, Any]]:
try:
import feedparser
feed = feedparser.parse(rss_url)
articles = []
for entry in feed.entries:
base = {
"title": entry.get("title", ""),
"url": entry.get("link", ""),
"summary": entry.get("summary", ""),
"publish_date": entry.get("published", ""),
"authors": [a.get("name") for a in entry.get("authors", [])],
}
if base["url"]:
try:
full = self.extractor.extract_article(base["url"])
base["text"] = full.get("text", base["summary"])
except Exception:
base["text"] = base["summary"]
articles.append(base)
return articles
except ImportError:
print("feedparser not installed: pip install feedparser")
return []
def _fetch_website(self, url: str) -> List[Dict[str, Any]]:
article = self.extractor.extract_article(url)
return [article] if not article.get("error") else []
def monitor_sources(
self,
keywords: List[str],
callback: Optional[Callable[[List[Dict[str, Any]]], None]] = None,
interval: int = 3600,
) -> None:
"""Continuously monitor sources for new articles."""
import time
seen: set = set()
while True:
articles = self.fetch_news(keywords)
new_articles = [a for a in articles if a["url"] not in seen]
if new_articles and callback:
callback(new_articles)
seen.update(a["url"] for a in new_articles)
time.sleep(interval)
def get_trending_topics(
self,
articles: List[Dict[str, Any]],
n_topics: int = 10
) -> List[Tuple[str, int]]:
"""Compute most common keywords."""
from collections import Counter
words = []
stop = {
"the", "a", "an", "and", "or", "but", "in", "on", "at",
"to", "for", "of", "with", "by", "from"
}
for art in articles:
text = (art.get("text", "") + " " + art.get("title", "")).lower()
ws = [w for w in text.split() if len(w) > 3 and w not in stop]
words.extend(ws)
return Counter(words).most_common(n_topics)