File size: 6,375 Bytes
266d7bc
 
 
 
 
 
 
 
 
 
804054e
266d7bc
 
 
 
 
804054e
266d7bc
 
 
 
 
 
 
804054e
266d7bc
804054e
266d7bc
 
 
 
 
 
 
 
804054e
 
266d7bc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
import requests
from bs4 import BeautifulSoup
from markdownify import markdownify as md
from prefect import task
from prefect.cache_policies import NO_CACHE
from sqlalchemy.engine import Engine
from sqlalchemy.orm import Session

from src.infrastructure.supabase.init_session import init_session
from src.models.article_models import ArticleItem, FeedItem
from src.models.sql_models import FeedArticle
from src.utils.logger_util import setup_logging


@task(
    task_run_name="fetch_rss_entries-{feed.name}",
    description="Fetch RSS entries from a substack/Medium and top publications feed.",
    retries=2,
    retry_delay_seconds=120,
    cache_policy=NO_CACHE,
)
def fetch_rss_entries(
    feed: FeedItem,
    engine: Engine,
    article_model: type[FeedArticle] = FeedArticle,
) -> list[ArticleItem]:
    """Fetch all RSS items from a substack/Medium and top publications feed and convert them to ArticleItem objects.

    Each task uses its own SQLAlchemy session. Articles already stored in the database
    or with empty links/content are skipped. Errors during parsing individual items
    are logged but do not stop processing.

    Args:
        feed (FeedItem): Metadata for the feed (name, author, URL).
        engine (Engine): SQLAlchemy engine for database connection.
        article_model (type[FeedArticle], optional): Model used to persist articles.
            Defaults to FeedArticle.

    Returns:
        list[ArticleItem]: List of new ArticleItem objects ready for parsing/ingestion.

    Raises:
        RuntimeError: If the RSS fetch fails.
        Exception: For unexpected errors during execution.
    """

    logger = setup_logging()
    session: Session = init_session(engine)
    items: list[ArticleItem] = []

    try:
        try:
            response = requests.get(feed.url, timeout=15)
            response.raise_for_status()
        except requests.RequestException as e:
            logger.error(f"Failed to fetch feed '{feed.name}': {e}")
            raise RuntimeError(f"RSS fetch failed for feed '{feed.name}'") from e

        soup = BeautifulSoup(response.content, "xml")
        rss_items = soup.find_all("item")

        for _, item in enumerate(rss_items):
            try:
                link = item.find("link").get_text(strip=True) if item.find("link") else ""  # type: ignore
                if not link or session.query(article_model).filter_by(url=link).first():
                    logger.info(
                        f"Skipping already stored or empty-link article for feed '{feed.name}'"
                    )
                    continue

                title = (
                    item.find("title").get_text(strip=True) if item.find("title") else "Untitled"  # type: ignore
                )

                # Prefer full text in <content:encoded>
                content_elem = item.find("content:encoded") or item.find("description")  # type: ignore
                raw_html = content_elem.get_text() if content_elem else ""
                content_md = ""

                # 🚨 Skip if article contains a self-referencing "Read more" link
                if raw_html:
                    try:
                        html_soup = BeautifulSoup(raw_html, "html.parser")
                        for a in html_soup.find_all("a", href=True):
                            if (
                                a["href"].strip() == link  # type: ignore
                                and "read more" in a.get_text(strip=True).lower()
                            ):
                                logger.info(f"Paywalled/truncated article skipped: '{title}'")
                                raise StopIteration  # skip this item
                    except StopIteration:
                        continue
                    except Exception as e:
                        logger.warning(f"Failed to inspect links for '{title}': {e}")

                if raw_html:
                    try:
                        content_md = md(
                            raw_html,
                            strip=["script", "style"],
                            heading_style="ATX",
                            bullets="*",
                            autolinks=True,
                        )
                        content_md = "\n".join(
                            line.strip() for line in content_md.splitlines() if line.strip()
                        )
                    except Exception as e:
                        logger.warning(f"Markdown conversion failed for '{title}': {e}")
                        content_md = raw_html

                if not content_md:
                    logger.warning(f"Skipping article '{title}' with empty content")
                    continue

                author_elem = item.find("creator") or item.find("dc:creator")  # type: ignore
                author = author_elem.get_text(strip=True) if author_elem else feed.author

                pub_date_elem = item.find("pubDate")  # type: ignore
                pub_date_str = pub_date_elem.get_text(strip=True) if pub_date_elem else None

                article_item = ArticleItem(
                    feed_name=feed.name,
                    feed_author=feed.author,
                    title=title,
                    url=link,
                    content=content_md,
                    article_authors=[author] if author else [],
                    published_at=pub_date_str,
                )
                items.append(article_item)

            except Exception as e:
                logger.error(f"Error processing RSS item for feed '{feed.name}': {e}")
                continue

        logger.info(f"Fetched {len(items)} new articles for feed '{feed.name}'")
        return items

    except Exception as e:
        logger.error(f"Unexpected error in fetch_rss_entries for feed '{feed.name}': {e}")
        raise
    finally:
        session.close()
        logger.info(f"Database session closed for feed '{feed.name}'")


# if __name__ == "__main__":
#     from src.infrastructure.supabase.init_session import init_engine

#     engine = init_engine()
#     test_feed = FeedItem(
#         name="AI Echoes",
#         author="Benito Martin",
#         url="https://aiechoes.substack.com/feed"
#     )
#     articles = fetch_rss_entries(test_feed, engine)
#     print(f"Fetched {len(articles)} articles.")