File size: 4,818 Bytes
266d7bc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import argparse
import asyncio
from datetime import UTC, datetime, timedelta

from dateutil import parser
from prefect import flow, get_client
from prefect.client.schemas.filters import FlowFilter, FlowRunFilter
from prefect.client.schemas.sorting import FlowRunSort

from src.config import settings
from src.pipelines.tasks.ingest_embeddings import ingest_qdrant
from src.utils.logger_util import setup_logging


async def get_last_successful_run(flow_name: str) -> datetime | None:
    """Get the start time of the last successfully completed run for a given flow.

    Queries the Prefect API for recent completed runs of the exact flow `flow_name`.
    Returns the start time of the most recent completed run, or None if no runs exist.

    Args:
        flow_name (str): Exact name of the Prefect flow.

    Returns:
        datetime | None: Start time of the last completed run, or None if no run exists.

    Raises:
        Exception: If Prefect API calls fail unexpectedly.

    """
    logger = setup_logging()
    logger.info(f"Looking for last successful run of flow: {flow_name}")

    try:
        async with get_client() as client:
            # Step 1: get flows matching the name
            flows = await client.read_flows(flow_filter=FlowFilter(name=dict(eq_=flow_name)))  # type: ignore
            logger.debug(f"Flows returned by Prefect API: {flows}")

            exact_flow = next((f for f in flows if f.name == flow_name), None)
            if not exact_flow:
                logger.info(f"No flow found with exact name: {flow_name}")
                return None

            logger.info(f"Exact flow found: {exact_flow.id} ({exact_flow.name})")

            # Step 2: get recent completed runs
            flow_runs = await client.read_flow_runs(
                flow_run_filter=FlowRunFilter(state=dict(type=dict(any_=["COMPLETED"]))),  # type: ignore
                sort=FlowRunSort.START_TIME_DESC,
                limit=10,
            )
            logger.debug(f"Recent completed runs fetched: {[r.id for r in flow_runs]}")

            # Step 3: ensure only runs for this flow
            flow_runs = [r for r in flow_runs if r.flow_id == exact_flow.id]
            logger.debug(f"Filtered runs for exact flow: {[r.id for r in flow_runs]}")

            if not flow_runs:
                logger.info(f"No completed runs found for flow: {flow_name}")
                return None

            last_run_time = flow_runs[0].start_time
            logger.info(f"Last completed run for flow '{flow_name}' started at {last_run_time}")

            return last_run_time

    except Exception as e:
        logger.error(f"Error fetching last successful run for flow '{flow_name}': {e}")
        raise


@flow(
    name="qdrant_ingest_flow",
    flow_run_name="qdrant_ingest_flow_run",
    description="Orchestrates SQL → Qdrant ingestion",
    retries=2,
    retry_delay_seconds=120,
)
async def qdrant_ingest_flow(from_date: str | None = None) -> None:
    """Prefect Flow: Orchestrates ingestion of articles from SQL into Qdrant.

    Determines the starting cutoff date for ingestion (user-provided, last run date,
    or default fallback) and runs the Qdrant ingestion task.

    Args:
        from_date (str | None, optional): Start date in YYYY-MM-DD format. If None,
            falls back to last successful run or the configured default.

    Returns:
        None

    Raises:
        RuntimeError: If ingestion fails.
        Exception: For unexpected errors during execution.

    """
    logger = setup_logging()
    rss = settings.rss

    try:
        if from_date:
            # Parse user-provided date and assume UTC midnight
            from_date_dt = parser.parse(from_date).replace(tzinfo=UTC)
            logger.info(f"Using user-provided from_date: {from_date_dt}")
        else:
            # Fallback to last run date, default_start_date, or 30 days ago
            last_run_date = await get_last_successful_run("qdrant_ingest_flow")
            from_date_dt = (
                last_run_date
                or parser.parse(rss.default_start_date).replace(tzinfo=UTC)
                or (datetime.now(UTC) - timedelta(days=30))
            )
            logger.info(f"Using fallback from_date: {from_date_dt}")

        await ingest_qdrant(from_date=from_date_dt)

    except Exception as e:
        logger.error(f"Error during Qdrant ingestion flow: {e}")
        raise RuntimeError("Qdrant ingestion flow failed") from e


if __name__ == "__main__":
    arg_parser = argparse.ArgumentParser()
    arg_parser.add_argument(
        "--from-date",
        type=str,
        default=None,
        help="From date in YYYY-MM-DD format",
    )
    args = arg_parser.parse_args()

    asyncio.run(qdrant_ingest_flow(from_date=args.from_date))  # type: ignore[arg-type]