File size: 4,104 Bytes
266d7bc
 
 
 
 
804054e
266d7bc
 
 
 
 
 
 
 
 
 
 
 
804054e
266d7bc
 
 
 
 
 
 
804054e
266d7bc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
804054e
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
from prefect import flow, unmapped

from src.config import settings
from src.infrastructure.supabase.init_session import init_engine
from src.models.article_models import FeedItem
from src.models.sql_models import FeedArticle
from src.pipelines.tasks.fetch_rss import fetch_rss_entries
from src.pipelines.tasks.ingest_rss import ingest_from_rss
from src.utils.logger_util import setup_logging


@flow(
    name="rss_ingest_flow",
    flow_run_name="rss_ingest_flow_run",
    description="Fetch and ingest articles from RSS feeds.",
    retries=2,
    retry_delay_seconds=120,
)
def rss_ingest_flow(article_model: type[FeedArticle] = FeedArticle) -> None:
    """Fetch and ingest articles from configured RSS feeds concurrently.

    Each feed is fetched in parallel and ingested into the database
    with error handling at each stage. Ensures the database engine is disposed
    after completion.

    Args:
        article_model (type[FeedArticle]): SQLAlchemy model for storing articles.

    Returns:
        None

    Raises:
        RuntimeError: If ingestion fails for all feeds.
        Exception: For unexpected errors during execution.
    """
    logger = setup_logging()
    engine = init_engine()
    errors = []

    # tracking counters
    per_feed_counts: dict[str, int] = {}
    total_ingested = 0

    try:
        if not settings.rss.feeds:
            logger.warning("No feeds found in configuration.")
            return

        feeds = [FeedItem(name=f.name, author=f.author, url=f.url) for f in settings.rss.feeds]
        logger.info(f"πŸš€ Processing {len(feeds)} feeds concurrently...")

        # 1. Fetch articles concurrently
        fetched_articles_futures = fetch_rss_entries.map(
            feeds,
            engine=unmapped(engine),
            article_model=unmapped(article_model),
        )

        # 2. Ingest concurrently per feed
        results = []
        for feed, fetched_future in zip(feeds, fetched_articles_futures, strict=False):
            try:
                fetched_articles = fetched_future.result()
            except Exception as e:
                logger.error(f"❌ Error fetching articles for feed '{feed.name}': {e}")
                errors.append(f"Fetch error: {feed.name}")
                continue

            if not fetched_articles:
                logger.info(f"πŸ“­ No new articles for feed '{feed.name}'")
                per_feed_counts[feed.name] = 0
                continue

            try:
                count = len(fetched_articles)
                per_feed_counts[feed.name] = count
                total_ingested += count
                logger.info(f"βœ… Feed '{feed.name}': {count} articles ready for ingestion")

                task_result = ingest_from_rss.submit(
                    fetched_articles,
                    feed,
                    article_model=article_model,
                    engine=engine,
                )
                results.append(task_result)
            except Exception as e:
                logger.error(f"❌ Error submitting ingest_from_rss for feed '{feed.name}': {e}")
                errors.append(f"Ingest error: {feed.name}")

        # 3. Wait for all ingestion tasks
        for r in results:
            try:
                r.result()
            except Exception as e:
                logger.error(f"❌ Error in ingest_from_rss task: {e}")
                errors.append("Task failure")

        # ---- Summary logging ----
        logger.info("πŸ“Š Ingestion Summary per feed:")
        for feed_name, count in per_feed_counts.items():
            logger.info(f"   β€’ {feed_name}: {count} article(s) ingested")

        logger.info(f"πŸ“ Total ingested across all feeds: {total_ingested}")

        if errors:
            raise RuntimeError(f"Flow completed with errors: {errors}")

    except Exception as e:
        logger.error(f"πŸ’₯ Unexpected error in rss_ingest_flow: {e}")
        raise
    finally:
        engine.dispose()
        logger.info("πŸ”’ Database engine disposed.")


if __name__ == "__main__":
    rss_ingest_flow(article_model=FeedArticle)