Spaces:
Sleeping
Sleeping
File size: 1,623 Bytes
266d7bc |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
import gc
import os
from datetime import datetime
import dotenv
from prefect import task
from src.infrastructure.qdrant.qdrant_vectorstore import AsyncQdrantVectorStore
from src.infrastructure.supabase.init_session import init_engine, init_session
from src.utils.logger_util import setup_logging
dotenv.load_dotenv()
@task(
task_run_name="ingest_qdrant",
description="Ingest articles from SQL to Qdrant.",
retries=2,
retry_delay_seconds=120,
)
async def ingest_qdrant(from_date: datetime | None = None):
"""Ingest articles from SQL database into Qdrant vector store.
Args:
from_date (datetime | None, optional): Only ingest articles published after this date.
Defaults to None (ingest all articles).
Returns:
None
Raises:
RuntimeError: If ingestion fails.
Exception: For unexpected errors during execution.
"""
logger = setup_logging()
logger.info(f"Starting Qdrant ingestion task from_date={from_date}")
logger.info(f"QDRANT_URL: {os.getenv('QDRANT__URL')}")
vectorstore = AsyncQdrantVectorStore()
engine = init_engine()
session = init_session(engine)
try:
await vectorstore.ingest_from_sql(session=session, from_date=from_date)
except Exception as e:
logger.error(f"Unexpected error during Qdrant ingestion: {e}")
raise RuntimeError("Qdrant ingestion failed") from e
finally:
# Cleanup resources
session.close()
await vectorstore.client.close()
gc.collect()
logger.info("Qdrant ingestion task complete and resources cleaned up")
|