Spaces:
Sleeping
Sleeping
File size: 4,818 Bytes
266d7bc |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
import argparse
import asyncio
from datetime import UTC, datetime, timedelta
from dateutil import parser
from prefect import flow, get_client
from prefect.client.schemas.filters import FlowFilter, FlowRunFilter
from prefect.client.schemas.sorting import FlowRunSort
from src.config import settings
from src.pipelines.tasks.ingest_embeddings import ingest_qdrant
from src.utils.logger_util import setup_logging
async def get_last_successful_run(flow_name: str) -> datetime | None:
"""Get the start time of the last successfully completed run for a given flow.
Queries the Prefect API for recent completed runs of the exact flow `flow_name`.
Returns the start time of the most recent completed run, or None if no runs exist.
Args:
flow_name (str): Exact name of the Prefect flow.
Returns:
datetime | None: Start time of the last completed run, or None if no run exists.
Raises:
Exception: If Prefect API calls fail unexpectedly.
"""
logger = setup_logging()
logger.info(f"Looking for last successful run of flow: {flow_name}")
try:
async with get_client() as client:
# Step 1: get flows matching the name
flows = await client.read_flows(flow_filter=FlowFilter(name=dict(eq_=flow_name))) # type: ignore
logger.debug(f"Flows returned by Prefect API: {flows}")
exact_flow = next((f for f in flows if f.name == flow_name), None)
if not exact_flow:
logger.info(f"No flow found with exact name: {flow_name}")
return None
logger.info(f"Exact flow found: {exact_flow.id} ({exact_flow.name})")
# Step 2: get recent completed runs
flow_runs = await client.read_flow_runs(
flow_run_filter=FlowRunFilter(state=dict(type=dict(any_=["COMPLETED"]))), # type: ignore
sort=FlowRunSort.START_TIME_DESC,
limit=10,
)
logger.debug(f"Recent completed runs fetched: {[r.id for r in flow_runs]}")
# Step 3: ensure only runs for this flow
flow_runs = [r for r in flow_runs if r.flow_id == exact_flow.id]
logger.debug(f"Filtered runs for exact flow: {[r.id for r in flow_runs]}")
if not flow_runs:
logger.info(f"No completed runs found for flow: {flow_name}")
return None
last_run_time = flow_runs[0].start_time
logger.info(f"Last completed run for flow '{flow_name}' started at {last_run_time}")
return last_run_time
except Exception as e:
logger.error(f"Error fetching last successful run for flow '{flow_name}': {e}")
raise
@flow(
name="qdrant_ingest_flow",
flow_run_name="qdrant_ingest_flow_run",
description="Orchestrates SQL → Qdrant ingestion",
retries=2,
retry_delay_seconds=120,
)
async def qdrant_ingest_flow(from_date: str | None = None) -> None:
"""Prefect Flow: Orchestrates ingestion of articles from SQL into Qdrant.
Determines the starting cutoff date for ingestion (user-provided, last run date,
or default fallback) and runs the Qdrant ingestion task.
Args:
from_date (str | None, optional): Start date in YYYY-MM-DD format. If None,
falls back to last successful run or the configured default.
Returns:
None
Raises:
RuntimeError: If ingestion fails.
Exception: For unexpected errors during execution.
"""
logger = setup_logging()
rss = settings.rss
try:
if from_date:
# Parse user-provided date and assume UTC midnight
from_date_dt = parser.parse(from_date).replace(tzinfo=UTC)
logger.info(f"Using user-provided from_date: {from_date_dt}")
else:
# Fallback to last run date, default_start_date, or 30 days ago
last_run_date = await get_last_successful_run("qdrant_ingest_flow")
from_date_dt = (
last_run_date
or parser.parse(rss.default_start_date).replace(tzinfo=UTC)
or (datetime.now(UTC) - timedelta(days=30))
)
logger.info(f"Using fallback from_date: {from_date_dt}")
await ingest_qdrant(from_date=from_date_dt)
except Exception as e:
logger.error(f"Error during Qdrant ingestion flow: {e}")
raise RuntimeError("Qdrant ingestion flow failed") from e
if __name__ == "__main__":
arg_parser = argparse.ArgumentParser()
arg_parser.add_argument(
"--from-date",
type=str,
default=None,
help="From date in YYYY-MM-DD format",
)
args = arg_parser.parse_args()
asyncio.run(qdrant_ingest_flow(from_date=args.from_date)) # type: ignore[arg-type]
|