Spaces:
Sleeping
Sleeping
File size: 2,192 Bytes
266d7bc |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
import asyncio
from datetime import datetime
from src.infrastructure.qdrant.qdrant_vectorstore import AsyncQdrantVectorStore
from src.infrastructure.supabase.init_session import init_engine, init_session
from src.utils.logger_util import setup_logging
logger = setup_logging()
async def main() -> None:
"""Ingest articles from Supabase Postgres to Qdrant vector store.
Initializes a SQLAlchemy engine and session to connect to Supabase Postgres,
and an AsyncQdrantVectorStore to ingest articles from a specified date into
Qdrant. Closes the session and Qdrant client after completion. Logs errors
and ensures proper execution.
Args:
None
Returns:
None
Raises:
RuntimeError: If an error occurs during Qdrant ingestion or SQL operations.
Exception: For unexpected errors during execution.
"""
logger.info("Starting ingestion of articles from SQL to Qdrant")
try:
# Initialize database engine and session
engine = init_engine()
session = init_session(engine)
# Initialize Qdrant vector store
vectorstore = AsyncQdrantVectorStore()
# Set the start date for ingestion
from_date = datetime.strptime("2021-01-01", "%Y-%m-%d")
# Ingest articles from SQL to Qdrant
await vectorstore.ingest_from_sql(session=session, from_date=from_date)
logger.info("Ingestion task completed successfully")
except RuntimeError as e:
logger.error(f"Failed to ingest articles to Qdrant: {e}")
raise RuntimeError("Error during SQL to Qdrant ingestion") from e
except Exception as e:
logger.error(f"Unexpected error during ingestion: {e}")
raise
finally:
# Close session and Qdrant client
if "session" in locals():
session.close()
logger.info("SQLAlchemy session closed")
if "vectorstore" in locals():
await vectorstore.client.close()
logger.info("Qdrant client closed")
if "engine" in locals():
engine.dispose()
logger.info("Database engine disposed")
if __name__ == "__main__":
asyncio.run(main())
|