Spaces:
Sleeping
Sleeping
| from prefect import task | |
| from prefect.cache_policies import NO_CACHE | |
| from sqlalchemy.engine import Engine | |
| from sqlalchemy.orm import Session | |
| from src.config import settings | |
| from src.infrastructure.supabase.init_session import init_session | |
| from src.models.article_models import ArticleItem, FeedItem | |
| from src.models.sql_models import FeedArticle | |
| from src.utils.logger_util import setup_logging | |
| def ingest_from_rss( | |
| fetched_articles: list[ArticleItem], | |
| feed: FeedItem, | |
| article_model: type[FeedArticle], | |
| engine: Engine, | |
| ) -> None: | |
| """Ingest articles fetched from RSS (already Markdownified). | |
| Articles are inserted in batches to optimize database writes. Errors during | |
| ingestion of individual batches are logged but do not stop subsequent batches. | |
| Args: | |
| fetched_articles: List of ArticleItem objects to ingest. | |
| feed: The FeedItem representing the source feed. | |
| article_model: The SQLAlchemy model class for articles. | |
| engine: SQLAlchemy Engine for database connection. | |
| Raises: | |
| RuntimeError: If ingestion completes with errors. | |
| """ | |
| logger = setup_logging() | |
| rss = settings.rss | |
| errors = [] | |
| batch: list[ArticleItem] = [] | |
| session: Session = init_session(engine) | |
| try: | |
| for i, article in enumerate(fetched_articles, start=1): | |
| batch.append(article) | |
| if len(batch) >= rss.batch_size: | |
| batch_num = i // rss.batch_size | |
| try: | |
| _persist_batch(session, batch, article_model) | |
| session.commit() | |
| except Exception as e: | |
| session.rollback() | |
| logger.error(f"Failed to ingest batch {batch_num} for feed '{feed.name}': {e}") | |
| errors.append(f"Batch {batch_num}") | |
| else: | |
| logger.info( | |
| f"π Ingested batch {batch_num} with {len(batch)} articles " | |
| f"for feed '{feed.name}'" | |
| ) | |
| batch = [] | |
| # leftovers | |
| if batch: | |
| try: | |
| _persist_batch(session, batch, article_model) | |
| session.commit() | |
| except Exception as e: | |
| session.rollback() | |
| logger.error(f"Failed to ingest final batch for feed '{feed.name}': {e}") | |
| errors.append("Final batch") | |
| else: | |
| logger.info( | |
| f"π Ingested final batch of {len(batch)} articles for feed '{feed.name}'" | |
| ) | |
| if errors: | |
| raise RuntimeError(f"Ingestion completed with errors: {errors}") | |
| except Exception as e: | |
| logger.error(f"Unexpected error in ingest_from_rss for feed '{feed.name}': {e}") | |
| raise | |
| finally: | |
| session.close() | |
| logger.info(f"Database session closed for feed '{feed.name}'") | |
| def _persist_batch( | |
| session: Session, | |
| batch: list[ArticleItem], | |
| article_model: type[FeedArticle], | |
| ) -> None: | |
| """Helper to bulk insert a batch of ArticleItems.""" | |
| rows = [ | |
| article_model( | |
| feed_name=article.feed_name, | |
| feed_author=article.feed_author, | |
| title=article.title, | |
| url=article.url, | |
| content=article.content, | |
| article_authors=article.article_authors, | |
| published_at=article.published_at, | |
| ) | |
| for article in batch | |
| ] | |
| session.bulk_save_objects(rows) | |