IndraneelKumar
Initial search engine commit
266d7bc
import asyncio
from datetime import datetime
from src.infrastructure.qdrant.qdrant_vectorstore import AsyncQdrantVectorStore
from src.infrastructure.supabase.init_session import init_engine, init_session
from src.utils.logger_util import setup_logging
logger = setup_logging()
async def main() -> None:
"""Ingest articles from Supabase Postgres to Qdrant vector store.
Initializes a SQLAlchemy engine and session to connect to Supabase Postgres,
and an AsyncQdrantVectorStore to ingest articles from a specified date into
Qdrant. Closes the session and Qdrant client after completion. Logs errors
and ensures proper execution.
Args:
None
Returns:
None
Raises:
RuntimeError: If an error occurs during Qdrant ingestion or SQL operations.
Exception: For unexpected errors during execution.
"""
logger.info("Starting ingestion of articles from SQL to Qdrant")
try:
# Initialize database engine and session
engine = init_engine()
session = init_session(engine)
# Initialize Qdrant vector store
vectorstore = AsyncQdrantVectorStore()
# Set the start date for ingestion
from_date = datetime.strptime("2021-01-01", "%Y-%m-%d")
# Ingest articles from SQL to Qdrant
await vectorstore.ingest_from_sql(session=session, from_date=from_date)
logger.info("Ingestion task completed successfully")
except RuntimeError as e:
logger.error(f"Failed to ingest articles to Qdrant: {e}")
raise RuntimeError("Error during SQL to Qdrant ingestion") from e
except Exception as e:
logger.error(f"Unexpected error during ingestion: {e}")
raise
finally:
# Close session and Qdrant client
if "session" in locals():
session.close()
logger.info("SQLAlchemy session closed")
if "vectorstore" in locals():
await vectorstore.client.close()
logger.info("Qdrant client closed")
if "engine" in locals():
engine.dispose()
logger.info("Database engine disposed")
if __name__ == "__main__":
asyncio.run(main())