Spaces:
Sleeping
Sleeping
File size: 901 Bytes
266d7bc 804054e 266d7bc 804054e 266d7bc 804054e 266d7bc 804054e 266d7bc |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
from loguru import logger
from sqlalchemy import text
from sqlalchemy.engine import Connection
from src.utils.logger_util import setup_logging
setup_logging()
def test_connect_to_test_table(db_session: Connection) -> None:
"""Test connectivity to the 'feed_test' table and fetch a single row.
Args:
db_session (Connection): SQLAlchemy Connection object.
Raises:
AssertionError: If the query result is not a list.
Exception: If the table does not exist or query fails.
"""
logger.info("Testing connection to 'feed_test' table...")
try:
result = db_session.execute(text("SELECT * FROM feed_test LIMIT 1")).fetchall()
logger.info(f"Query result: {result}")
assert isinstance(result, list), "Query result is not a list"
except Exception as e:
logger.error(f"Failed to query 'feed_test' table: {e}")
raise
|