Spaces:
Sleeping
Sleeping
File size: 6,375 Bytes
266d7bc 804054e 266d7bc 804054e 266d7bc 804054e 266d7bc 804054e 266d7bc 804054e 266d7bc |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 |
import requests
from bs4 import BeautifulSoup
from markdownify import markdownify as md
from prefect import task
from prefect.cache_policies import NO_CACHE
from sqlalchemy.engine import Engine
from sqlalchemy.orm import Session
from src.infrastructure.supabase.init_session import init_session
from src.models.article_models import ArticleItem, FeedItem
from src.models.sql_models import FeedArticle
from src.utils.logger_util import setup_logging
@task(
task_run_name="fetch_rss_entries-{feed.name}",
description="Fetch RSS entries from a substack/Medium and top publications feed.",
retries=2,
retry_delay_seconds=120,
cache_policy=NO_CACHE,
)
def fetch_rss_entries(
feed: FeedItem,
engine: Engine,
article_model: type[FeedArticle] = FeedArticle,
) -> list[ArticleItem]:
"""Fetch all RSS items from a substack/Medium and top publications feed and convert them to ArticleItem objects.
Each task uses its own SQLAlchemy session. Articles already stored in the database
or with empty links/content are skipped. Errors during parsing individual items
are logged but do not stop processing.
Args:
feed (FeedItem): Metadata for the feed (name, author, URL).
engine (Engine): SQLAlchemy engine for database connection.
article_model (type[FeedArticle], optional): Model used to persist articles.
Defaults to FeedArticle.
Returns:
list[ArticleItem]: List of new ArticleItem objects ready for parsing/ingestion.
Raises:
RuntimeError: If the RSS fetch fails.
Exception: For unexpected errors during execution.
"""
logger = setup_logging()
session: Session = init_session(engine)
items: list[ArticleItem] = []
try:
try:
response = requests.get(feed.url, timeout=15)
response.raise_for_status()
except requests.RequestException as e:
logger.error(f"Failed to fetch feed '{feed.name}': {e}")
raise RuntimeError(f"RSS fetch failed for feed '{feed.name}'") from e
soup = BeautifulSoup(response.content, "xml")
rss_items = soup.find_all("item")
for _, item in enumerate(rss_items):
try:
link = item.find("link").get_text(strip=True) if item.find("link") else "" # type: ignore
if not link or session.query(article_model).filter_by(url=link).first():
logger.info(
f"Skipping already stored or empty-link article for feed '{feed.name}'"
)
continue
title = (
item.find("title").get_text(strip=True) if item.find("title") else "Untitled" # type: ignore
)
# Prefer full text in <content:encoded>
content_elem = item.find("content:encoded") or item.find("description") # type: ignore
raw_html = content_elem.get_text() if content_elem else ""
content_md = ""
# 🚨 Skip if article contains a self-referencing "Read more" link
if raw_html:
try:
html_soup = BeautifulSoup(raw_html, "html.parser")
for a in html_soup.find_all("a", href=True):
if (
a["href"].strip() == link # type: ignore
and "read more" in a.get_text(strip=True).lower()
):
logger.info(f"Paywalled/truncated article skipped: '{title}'")
raise StopIteration # skip this item
except StopIteration:
continue
except Exception as e:
logger.warning(f"Failed to inspect links for '{title}': {e}")
if raw_html:
try:
content_md = md(
raw_html,
strip=["script", "style"],
heading_style="ATX",
bullets="*",
autolinks=True,
)
content_md = "\n".join(
line.strip() for line in content_md.splitlines() if line.strip()
)
except Exception as e:
logger.warning(f"Markdown conversion failed for '{title}': {e}")
content_md = raw_html
if not content_md:
logger.warning(f"Skipping article '{title}' with empty content")
continue
author_elem = item.find("creator") or item.find("dc:creator") # type: ignore
author = author_elem.get_text(strip=True) if author_elem else feed.author
pub_date_elem = item.find("pubDate") # type: ignore
pub_date_str = pub_date_elem.get_text(strip=True) if pub_date_elem else None
article_item = ArticleItem(
feed_name=feed.name,
feed_author=feed.author,
title=title,
url=link,
content=content_md,
article_authors=[author] if author else [],
published_at=pub_date_str,
)
items.append(article_item)
except Exception as e:
logger.error(f"Error processing RSS item for feed '{feed.name}': {e}")
continue
logger.info(f"Fetched {len(items)} new articles for feed '{feed.name}'")
return items
except Exception as e:
logger.error(f"Unexpected error in fetch_rss_entries for feed '{feed.name}': {e}")
raise
finally:
session.close()
logger.info(f"Database session closed for feed '{feed.name}'")
# if __name__ == "__main__":
# from src.infrastructure.supabase.init_session import init_engine
# engine = init_engine()
# test_feed = FeedItem(
# name="AI Echoes",
# author="Benito Martin",
# url="https://aiechoes.substack.com/feed"
# )
# articles = fetch_rss_entries(test_feed, engine)
# print(f"Fetched {len(articles)} articles.")
|