Spaces:
Sleeping
Sleeping
File size: 6,888 Bytes
266d7bc 804054e 266d7bc 804054e 266d7bc 804054e 266d7bc 804054e 266d7bc 804054e 266d7bc 804054e 266d7bc 804054e 266d7bc 804054e 266d7bc 804054e 266d7bc 804054e 266d7bc 804054e 266d7bc 804054e 266d7bc 804054e 266d7bc |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 |
import pytest
import responses
from loguru import logger
from sqlalchemy import text
from sqlalchemy.engine import Engine
from sqlalchemy.orm import Session
from test_models.test_sql_models import FeedTestArticle # Test-specific table model
from src.models.article_models import FeedItem
from src.pipelines.tasks.fetch_rss import fetch_rss_entries
from src.pipelines.tasks.ingest_rss import ingest_from_rss
@pytest.mark.integration
@responses.activate
def test_rss_pipeline_end_to_end_mocked(db_session: Session, db_engine: Engine) -> None:
"""Integration test for the RSS pipeline using mocked HTTP requests.
This avoids hitting live RSS feeds or article URLs, making the test CI-safe.
1. Clears the test table.
2. Mocks fetching articles from an RSS feed.
3. Mocks parsing article content.
4. Ingests articles into the test table.
5. Verifies insertion and basic correctness.
Args:
db_session (Session): SQLAlchemy session for DB interactions.
db_engine (Engine): SQLAlchemy engine for task-level operations.
"""
# Clear test table
logger.info("Clearing test table 'feed_test'")
db_session.execute(text("DELETE FROM feed_test"))
db_session.commit()
# Verify table is empty
initial_count = db_session.query(FeedTestArticle).count()
logger.info(f"Initial article count in test table: {initial_count}")
assert initial_count == 0, "Test table was not cleared"
# Mock RSS feed URL
feed_url = "https://aiechoes.substack.com/feed"
responses.add(
responses.GET,
feed_url,
body="""
<rss version="2.0">
<channel>
<title>Test Feed</title>
<item>
<title>Test Article</title>
<link>https://example.com/test-article</link>
<description>Test description</description>
<pubDate>Mon, 01 Jan 2025 00:00:00 +0000</pubDate>
</item>
</channel>
</rss>
""",
status=200,
content_type="application/rss+xml",
)
# Mock the article page with the div your parser expects
responses.add(
responses.GET,
"https://example.com/test-article",
body="""
<html>
<body>
<div class="post-body">
<p>This is the article content</p>
</div>
</body>
</html>
""",
status=200,
content_type="text/html",
)
# Define test feed
test_feed = FeedItem(
name="Test Feed",
author="Test Author",
url=feed_url,
)
# Fetch articles (mocked feed)
fetched_articles = fetch_rss_entries(
test_feed,
engine=db_engine,
article_model=FeedTestArticle,
)
logger.info(f"Fetched {len(fetched_articles)} articles for feed '{test_feed.name}'")
# Ensure we have articles
assert fetched_articles, "No articles were fetched from mocked feed"
# Ingest parsed articles
ingest_from_rss(
fetched_articles,
feed=test_feed,
article_model=FeedTestArticle,
engine=db_engine,
)
# Verify DB insertion
articles_in_db = (
db_session.query(FeedTestArticle)
.order_by(FeedTestArticle.published_at.desc())
.all()
)
logger.info(f"Inserted article titles: {[a.title for a in articles_in_db]}")
assert articles_in_db, "No articles were inserted into the test table"
# Check at least the first fetched article was inserted
first_fetched_title = fetched_articles[0].title
titles_in_db = [a.title for a in articles_in_db]
assert first_fetched_title in titles_in_db, (
f"First fetched article '{first_fetched_title}' not found in DB"
)
################################################################################
# The code below calls out to live URLs and is not suitable for CI,
# as Substack/medium may block requests from CI environments.
# It is left here for reference and can be run manually if desired.
# Uncomment to enable live integration test
# import pytest
# from loguru import logger
# from sqlalchemy import text
# from sqlalchemy.engine import Engine
# from sqlalchemy.orm import Session
# from test_models.test_sql_models import FeedTestArticle # Test-specific table model
# from src.models.article_models import FeedItem
# from src.pipelines.tasks.batch_parse_ingest_articles import parse_and_ingest
# from src.pipelines.tasks.fetch_rss import fetch_rss_entries
# @pytest.mark.integration
# def test_rss_pipeline_end_to_end(db_session: Session, db_engine: Engine) -> None:
# """Integration test for the end-to-end RSS pipeline:
# 1. Clears the test table.
# 2. Fetches articles from a live RSS feed.
# 3. Parses and ingests articles into the test table.
# 4. Verifies insertion and basic correctness.
# Args:
# db_session (Session): SQLAlchemy session for DB interactions.
# db_engine (Engine): SQLAlchemy engine for task-level operations.
# """
# # Clear test table
# logger.info("Clearing test table 'feed_test'")
# db_session.execute(text("DELETE FROM feed_test"))
# db_session.commit()
# # Verify table is empty
# initial_count = db_session.query(FeedTestArticle).count()
# logger.info(f"Initial article count in test table: {initial_count}")
# assert initial_count == 0, "Test table was not cleared"
# # Define test feed
# test_feed = FeedItem(
# name="Test Feed",
# author="Test Author",
# url="https://aiechoes.substack.com/feed",
# )
# # Fetch articles
# fetched_articles = fetch_rss_entries(
# test_feed,
# engine=db_engine,
# article_model=FeedTestArticle,
# )
# logger.info(f"Fetched {len(fetched_articles)} articles for feed '{test_feed.name}'")
# if not fetched_articles:
# logger.warning("No articles fetched; skipping test due to empty RSS feed")
# pytest.skip("No new articles available in the RSS feed")
# # Parse and ingest
# parse_and_ingest(
# fetched_articles,
# feed=test_feed,
# article_model=FeedTestArticle,
# engine=db_engine,
# )
# # Verify DB insertion
# articles_in_db = (
# db_session.query(FeedTestArticle)
# .order_by(FeedTestArticle.published_at.desc())
# .all()
# )
# logger.info(f"Inserted article titles: {[a.title for a in articles_in_db]}")
# assert articles_in_db, "No articles were inserted into the test table"
# # Check at least the first fetched article was inserted
# first_fetched_title = fetched_articles[0].title
# titles_in_db = [a.title for a in articles_in_db]
# assert first_fetched_title in titles_in_db, (
# f"First fetched article '{first_fetched_title}' not found in DB"
# )
|