Spaces:
Sleeping
Sleeping
File size: 4,104 Bytes
266d7bc 804054e 266d7bc 804054e 266d7bc 804054e 266d7bc 804054e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
from prefect import flow, unmapped
from src.config import settings
from src.infrastructure.supabase.init_session import init_engine
from src.models.article_models import FeedItem
from src.models.sql_models import FeedArticle
from src.pipelines.tasks.fetch_rss import fetch_rss_entries
from src.pipelines.tasks.ingest_rss import ingest_from_rss
from src.utils.logger_util import setup_logging
@flow(
name="rss_ingest_flow",
flow_run_name="rss_ingest_flow_run",
description="Fetch and ingest articles from RSS feeds.",
retries=2,
retry_delay_seconds=120,
)
def rss_ingest_flow(article_model: type[FeedArticle] = FeedArticle) -> None:
"""Fetch and ingest articles from configured RSS feeds concurrently.
Each feed is fetched in parallel and ingested into the database
with error handling at each stage. Ensures the database engine is disposed
after completion.
Args:
article_model (type[FeedArticle]): SQLAlchemy model for storing articles.
Returns:
None
Raises:
RuntimeError: If ingestion fails for all feeds.
Exception: For unexpected errors during execution.
"""
logger = setup_logging()
engine = init_engine()
errors = []
# tracking counters
per_feed_counts: dict[str, int] = {}
total_ingested = 0
try:
if not settings.rss.feeds:
logger.warning("No feeds found in configuration.")
return
feeds = [FeedItem(name=f.name, author=f.author, url=f.url) for f in settings.rss.feeds]
logger.info(f"π Processing {len(feeds)} feeds concurrently...")
# 1. Fetch articles concurrently
fetched_articles_futures = fetch_rss_entries.map(
feeds,
engine=unmapped(engine),
article_model=unmapped(article_model),
)
# 2. Ingest concurrently per feed
results = []
for feed, fetched_future in zip(feeds, fetched_articles_futures, strict=False):
try:
fetched_articles = fetched_future.result()
except Exception as e:
logger.error(f"β Error fetching articles for feed '{feed.name}': {e}")
errors.append(f"Fetch error: {feed.name}")
continue
if not fetched_articles:
logger.info(f"π No new articles for feed '{feed.name}'")
per_feed_counts[feed.name] = 0
continue
try:
count = len(fetched_articles)
per_feed_counts[feed.name] = count
total_ingested += count
logger.info(f"β
Feed '{feed.name}': {count} articles ready for ingestion")
task_result = ingest_from_rss.submit(
fetched_articles,
feed,
article_model=article_model,
engine=engine,
)
results.append(task_result)
except Exception as e:
logger.error(f"β Error submitting ingest_from_rss for feed '{feed.name}': {e}")
errors.append(f"Ingest error: {feed.name}")
# 3. Wait for all ingestion tasks
for r in results:
try:
r.result()
except Exception as e:
logger.error(f"β Error in ingest_from_rss task: {e}")
errors.append("Task failure")
# ---- Summary logging ----
logger.info("π Ingestion Summary per feed:")
for feed_name, count in per_feed_counts.items():
logger.info(f" β’ {feed_name}: {count} article(s) ingested")
logger.info(f"π Total ingested across all feeds: {total_ingested}")
if errors:
raise RuntimeError(f"Flow completed with errors: {errors}")
except Exception as e:
logger.error(f"π₯ Unexpected error in rss_ingest_flow: {e}")
raise
finally:
engine.dispose()
logger.info("π Database engine disposed.")
if __name__ == "__main__":
rss_ingest_flow(article_model=FeedArticle)
|