File size: 6,888 Bytes
266d7bc
 
 
 
 
 
804054e
266d7bc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
804054e
 
266d7bc
 
 
804054e
266d7bc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
804054e
266d7bc
 
 
 
 
 
 
 
 
 
804054e
266d7bc
 
 
 
 
804054e
 
266d7bc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
804054e
266d7bc
 
 
 
 
 
 
 
 
804054e
266d7bc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
804054e
 
266d7bc
 
 
804054e
266d7bc
 
 
 
 
 
 
 
 
 
 
 
 
 
804054e
266d7bc
 
 
 
 
 
 
 
 
 
 
804054e
266d7bc
 
 
 
 
804054e
 
266d7bc
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
import pytest
import responses
from loguru import logger
from sqlalchemy import text
from sqlalchemy.engine import Engine
from sqlalchemy.orm import Session
from test_models.test_sql_models import FeedTestArticle  # Test-specific table model

from src.models.article_models import FeedItem
from src.pipelines.tasks.fetch_rss import fetch_rss_entries
from src.pipelines.tasks.ingest_rss import ingest_from_rss


@pytest.mark.integration
@responses.activate
def test_rss_pipeline_end_to_end_mocked(db_session: Session, db_engine: Engine) -> None:
    """Integration test for the RSS pipeline using mocked HTTP requests.

    This avoids hitting live RSS feeds or article URLs, making the test CI-safe.
    1. Clears the test table.
    2. Mocks fetching articles from an RSS feed.
    3. Mocks parsing article content.
    4. Ingests articles into the test table.
    5. Verifies insertion and basic correctness.

    Args:
        db_session (Session): SQLAlchemy session for DB interactions.
        db_engine (Engine): SQLAlchemy engine for task-level operations.
    """

    # Clear test table
    logger.info("Clearing test table 'feed_test'")
    db_session.execute(text("DELETE FROM feed_test"))
    db_session.commit()

    # Verify table is empty
    initial_count = db_session.query(FeedTestArticle).count()
    logger.info(f"Initial article count in test table: {initial_count}")
    assert initial_count == 0, "Test table was not cleared"

    # Mock RSS feed URL
    feed_url = "https://aiechoes.substack.com/feed"
    responses.add(
        responses.GET,
        feed_url,
        body="""
        <rss version="2.0">
          <channel>
            <title>Test Feed</title>
            <item>
              <title>Test Article</title>
              <link>https://example.com/test-article</link>
              <description>Test description</description>
              <pubDate>Mon, 01 Jan 2025 00:00:00 +0000</pubDate>
            </item>
          </channel>
        </rss>
        """,
        status=200,
        content_type="application/rss+xml",
    )

    # Mock the article page with the div your parser expects
    responses.add(
        responses.GET,
        "https://example.com/test-article",
        body="""
        <html>
          <body>
            <div class="post-body">
              <p>This is the article content</p>
            </div>
          </body>
        </html>
        """,
        status=200,
        content_type="text/html",
    )

    # Define test feed
    test_feed = FeedItem(
        name="Test Feed",
        author="Test Author",
        url=feed_url,
    )

    # Fetch articles (mocked feed)
    fetched_articles = fetch_rss_entries(
        test_feed,
        engine=db_engine,
        article_model=FeedTestArticle,
    )
    logger.info(f"Fetched {len(fetched_articles)} articles for feed '{test_feed.name}'")

    # Ensure we have articles
    assert fetched_articles, "No articles were fetched from mocked feed"

    # Ingest parsed articles
    ingest_from_rss(
        fetched_articles,
        feed=test_feed,
        article_model=FeedTestArticle,
        engine=db_engine,
    )

    # Verify DB insertion
    articles_in_db = (
        db_session.query(FeedTestArticle)
        .order_by(FeedTestArticle.published_at.desc())
        .all()
    )
    logger.info(f"Inserted article titles: {[a.title for a in articles_in_db]}")
    assert articles_in_db, "No articles were inserted into the test table"

    # Check at least the first fetched article was inserted
    first_fetched_title = fetched_articles[0].title
    titles_in_db = [a.title for a in articles_in_db]
    assert first_fetched_title in titles_in_db, (
        f"First fetched article '{first_fetched_title}' not found in DB"
    )


################################################################################
# The code below calls out to live URLs and is not suitable for CI,
# as Substack/medium may block requests from CI environments.
# It is left here for reference and can be run manually if desired.
# Uncomment to enable live integration test


# import pytest
# from loguru import logger
# from sqlalchemy import text
# from sqlalchemy.engine import Engine
# from sqlalchemy.orm import Session
# from test_models.test_sql_models import FeedTestArticle  # Test-specific table model

# from src.models.article_models import FeedItem
# from src.pipelines.tasks.batch_parse_ingest_articles import parse_and_ingest
# from src.pipelines.tasks.fetch_rss import fetch_rss_entries


# @pytest.mark.integration
# def test_rss_pipeline_end_to_end(db_session: Session, db_engine: Engine) -> None:
#     """Integration test for the end-to-end RSS pipeline:
#     1. Clears the test table.
#     2. Fetches articles from a live RSS feed.
#     3. Parses and ingests articles into the test table.
#     4. Verifies insertion and basic correctness.

#     Args:
#         db_session (Session): SQLAlchemy session for DB interactions.
#         db_engine (Engine): SQLAlchemy engine for task-level operations.

#     """
#     # Clear test table
#     logger.info("Clearing test table 'feed_test'")
#     db_session.execute(text("DELETE FROM feed_test"))
#     db_session.commit()

#     # Verify table is empty
#     initial_count = db_session.query(FeedTestArticle).count()
#     logger.info(f"Initial article count in test table: {initial_count}")
#     assert initial_count == 0, "Test table was not cleared"

#     # Define test feed
#     test_feed = FeedItem(
#         name="Test Feed",
#         author="Test Author",
#         url="https://aiechoes.substack.com/feed",
#     )

#     # Fetch articles
#     fetched_articles = fetch_rss_entries(
#         test_feed,
#         engine=db_engine,
#         article_model=FeedTestArticle,
#     )
#     logger.info(f"Fetched {len(fetched_articles)} articles for feed '{test_feed.name}'")

#     if not fetched_articles:
#         logger.warning("No articles fetched; skipping test due to empty RSS feed")
#         pytest.skip("No new articles available in the RSS feed")

#     # Parse and ingest
#     parse_and_ingest(
#         fetched_articles,
#         feed=test_feed,
#         article_model=FeedTestArticle,
#         engine=db_engine,
#     )

#     # Verify DB insertion
#     articles_in_db = (
#         db_session.query(FeedTestArticle)
#         .order_by(FeedTestArticle.published_at.desc())
#         .all()
#     )
#     logger.info(f"Inserted article titles: {[a.title for a in articles_in_db]}")
#     assert articles_in_db, "No articles were inserted into the test table"

#     # Check at least the first fetched article was inserted
#     first_fetched_title = fetched_articles[0].title
#     titles_in_db = [a.title for a in articles_in_db]
#     assert first_fetched_title in titles_in_db, (
#         f"First fetched article '{first_fetched_title}' not found in DB"
#     )