File size: 3,688 Bytes
266d7bc
 
 
 
 
 
 
 
804054e
266d7bc
 
 
 
 
 
 
 
 
 
 
 
 
804054e
266d7bc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
804054e
266d7bc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
from prefect import task
from prefect.cache_policies import NO_CACHE
from sqlalchemy.engine import Engine
from sqlalchemy.orm import Session

from src.config import settings
from src.infrastructure.supabase.init_session import init_session
from src.models.article_models import ArticleItem, FeedItem
from src.models.sql_models import FeedArticle
from src.utils.logger_util import setup_logging


@task(
    task_run_name="batch_ingest-{feed.name}",
    description="Ingest already parsed RSS articles in batches.",
    retries=2,
    retry_delay_seconds=120,
    cache_policy=NO_CACHE,
)
def ingest_from_rss(
    fetched_articles: list[ArticleItem],
    feed: FeedItem,
    article_model: type[FeedArticle],
    engine: Engine,
) -> None:
    """Ingest articles fetched from RSS (already Markdownified).

    Articles are inserted in batches to optimize database writes. Errors during
    ingestion of individual batches are logged but do not stop subsequent batches.

    Args:
        fetched_articles: List of ArticleItem objects to ingest.
        feed: The FeedItem representing the source feed.
        article_model: The SQLAlchemy model class for articles.
        engine: SQLAlchemy Engine for database connection.

    Raises:
        RuntimeError: If ingestion completes with errors.
    """

    logger = setup_logging()
    rss = settings.rss
    errors = []
    batch: list[ArticleItem] = []

    session: Session = init_session(engine)

    try:
        for i, article in enumerate(fetched_articles, start=1):
            batch.append(article)

            if len(batch) >= rss.batch_size:
                batch_num = i // rss.batch_size
                try:
                    _persist_batch(session, batch, article_model)
                    session.commit()
                except Exception as e:
                    session.rollback()
                    logger.error(f"Failed to ingest batch {batch_num} for feed '{feed.name}': {e}")
                    errors.append(f"Batch {batch_num}")
                else:
                    logger.info(
                        f"πŸ” Ingested batch {batch_num} with {len(batch)} articles "
                        f"for feed '{feed.name}'"
                    )
                batch = []

        # leftovers
        if batch:
            try:
                _persist_batch(session, batch, article_model)
                session.commit()
            except Exception as e:
                session.rollback()
                logger.error(f"Failed to ingest final batch for feed '{feed.name}': {e}")
                errors.append("Final batch")
            else:
                logger.info(
                    f"πŸ‘‰ Ingested final batch of {len(batch)} articles for feed '{feed.name}'"
                )

        if errors:
            raise RuntimeError(f"Ingestion completed with errors: {errors}")

    except Exception as e:
        logger.error(f"Unexpected error in ingest_from_rss for feed '{feed.name}': {e}")
        raise
    finally:
        session.close()
        logger.info(f"Database session closed for feed '{feed.name}'")


def _persist_batch(
    session: Session,
    batch: list[ArticleItem],
    article_model: type[FeedArticle],
) -> None:
    """Helper to bulk insert a batch of ArticleItems."""
    rows = [
        article_model(
            feed_name=article.feed_name,
            feed_author=article.feed_author,
            title=article.title,
            url=article.url,
            content=article.content,
            article_authors=article.article_authors,
            published_at=article.published_at,
        )
        for article in batch
    ]
    session.bulk_save_objects(rows)