File size: 1,623 Bytes
266d7bc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import gc
import os
from datetime import datetime

import dotenv
from prefect import task

from src.infrastructure.qdrant.qdrant_vectorstore import AsyncQdrantVectorStore
from src.infrastructure.supabase.init_session import init_engine, init_session
from src.utils.logger_util import setup_logging

dotenv.load_dotenv()


@task(
    task_run_name="ingest_qdrant",
    description="Ingest articles from SQL to Qdrant.",
    retries=2,
    retry_delay_seconds=120,
)
async def ingest_qdrant(from_date: datetime | None = None):
    """Ingest articles from SQL database into Qdrant vector store.

    Args:
        from_date (datetime | None, optional): Only ingest articles published after this date.
            Defaults to None (ingest all articles).

    Returns:
        None

    Raises:
        RuntimeError: If ingestion fails.
        Exception: For unexpected errors during execution.

    """
    logger = setup_logging()
    logger.info(f"Starting Qdrant ingestion task from_date={from_date}")

    logger.info(f"QDRANT_URL: {os.getenv('QDRANT__URL')}")

    vectorstore = AsyncQdrantVectorStore()
    engine = init_engine()
    session = init_session(engine)

    try:
        await vectorstore.ingest_from_sql(session=session, from_date=from_date)
    except Exception as e:
        logger.error(f"Unexpected error during Qdrant ingestion: {e}")
        raise RuntimeError("Qdrant ingestion failed") from e
    finally:
        # Cleanup resources
        session.close()
        await vectorstore.client.close()
        gc.collect()
        logger.info("Qdrant ingestion task complete and resources cleaned up")