|
|
""" |
|
|
Catalog Management Endpoints |
|
|
|
|
|
Provides API for viewing and enriching the data catalog. |
|
|
""" |
|
|
|
|
|
from fastapi import APIRouter, HTTPException, BackgroundTasks |
|
|
from pydantic import BaseModel |
|
|
from typing import List, Optional, Dict, Any |
|
|
|
|
|
router = APIRouter() |
|
|
|
|
|
|
|
|
class CatalogStatsResponse(BaseModel): |
|
|
total_datasets: int |
|
|
enriched_datasets: int |
|
|
by_category: Dict[str, int] |
|
|
by_tag: Dict[str, int] |
|
|
|
|
|
|
|
|
class TableMetadataResponse(BaseModel): |
|
|
name: str |
|
|
path: str |
|
|
description: str |
|
|
semantic_description: Optional[str] |
|
|
tags: List[str] |
|
|
data_type: str |
|
|
columns: List[str] |
|
|
row_count: Optional[int] |
|
|
category: str |
|
|
last_indexed: Optional[str] |
|
|
last_enriched: Optional[str] |
|
|
|
|
|
|
|
|
class EnrichmentRequest(BaseModel): |
|
|
table_names: Optional[List[str]] = None |
|
|
force_refresh: bool = False |
|
|
|
|
|
|
|
|
class EnrichmentResponse(BaseModel): |
|
|
status: str |
|
|
message: str |
|
|
tables_queued: int |
|
|
|
|
|
|
|
|
@router.get("/stats", response_model=CatalogStatsResponse) |
|
|
async def get_catalog_stats(): |
|
|
"""Get statistics about the data catalog.""" |
|
|
from backend.core.data_catalog import get_data_catalog |
|
|
|
|
|
catalog = get_data_catalog() |
|
|
stats = catalog.get_stats() |
|
|
|
|
|
return CatalogStatsResponse( |
|
|
total_datasets=stats["total_datasets"], |
|
|
enriched_datasets=stats.get("enriched_datasets", 0), |
|
|
by_category=stats["by_category"], |
|
|
by_tag=stats["by_tag"] |
|
|
) |
|
|
|
|
|
|
|
|
@router.get("/tables", response_model=List[TableMetadataResponse]) |
|
|
async def list_catalog_tables(): |
|
|
"""List all tables in the catalog with their metadata.""" |
|
|
from backend.core.data_catalog import get_data_catalog |
|
|
|
|
|
catalog = get_data_catalog() |
|
|
tables = [] |
|
|
|
|
|
for name, meta in catalog.catalog.items(): |
|
|
tables.append(TableMetadataResponse( |
|
|
name=name, |
|
|
path=meta.get("path", ""), |
|
|
description=meta.get("description", ""), |
|
|
semantic_description=meta.get("semantic_description"), |
|
|
tags=meta.get("tags", []), |
|
|
data_type=meta.get("data_type", "static"), |
|
|
columns=meta.get("columns", []), |
|
|
row_count=meta.get("row_count"), |
|
|
category=meta.get("category", "unknown"), |
|
|
last_indexed=meta.get("last_indexed"), |
|
|
last_enriched=meta.get("last_enriched") |
|
|
)) |
|
|
|
|
|
return tables |
|
|
|
|
|
|
|
|
@router.get("/tables/{table_name}", response_model=TableMetadataResponse) |
|
|
async def get_table_metadata(table_name: str): |
|
|
"""Get metadata for a specific table.""" |
|
|
from backend.core.data_catalog import get_data_catalog |
|
|
|
|
|
catalog = get_data_catalog() |
|
|
meta = catalog.get_table_metadata(table_name) |
|
|
|
|
|
if not meta: |
|
|
raise HTTPException(status_code=404, detail=f"Table '{table_name}' not found") |
|
|
|
|
|
return TableMetadataResponse( |
|
|
name=table_name, |
|
|
path=meta.get("path", ""), |
|
|
description=meta.get("description", ""), |
|
|
semantic_description=meta.get("semantic_description"), |
|
|
tags=meta.get("tags", []), |
|
|
data_type=meta.get("data_type", "static"), |
|
|
columns=meta.get("columns", []), |
|
|
row_count=meta.get("row_count"), |
|
|
category=meta.get("category", "unknown"), |
|
|
last_indexed=meta.get("last_indexed"), |
|
|
last_enriched=meta.get("last_enriched") |
|
|
) |
|
|
|
|
|
|
|
|
@router.post("/enrich", response_model=EnrichmentResponse) |
|
|
async def enrich_catalog(request: EnrichmentRequest, background_tasks: BackgroundTasks): |
|
|
""" |
|
|
Trigger LLM enrichment for catalog tables. |
|
|
|
|
|
Enrichment generates semantic descriptions and refined tags. |
|
|
Runs in the background to avoid blocking. |
|
|
""" |
|
|
from backend.core.data_catalog import get_data_catalog |
|
|
|
|
|
catalog = get_data_catalog() |
|
|
|
|
|
if request.table_names: |
|
|
|
|
|
invalid = [t for t in request.table_names if t not in catalog.catalog] |
|
|
if invalid: |
|
|
raise HTTPException( |
|
|
status_code=400, |
|
|
detail=f"Unknown tables: {invalid}" |
|
|
) |
|
|
tables_to_enrich = request.table_names |
|
|
else: |
|
|
tables_to_enrich = list(catalog.catalog.keys()) |
|
|
|
|
|
|
|
|
async def run_enrichment(): |
|
|
for table_name in tables_to_enrich: |
|
|
await catalog.enrich_table(table_name, request.force_refresh) |
|
|
|
|
|
background_tasks.add_task(run_enrichment) |
|
|
|
|
|
return EnrichmentResponse( |
|
|
status="queued", |
|
|
message=f"Enrichment started for {len(tables_to_enrich)} tables", |
|
|
tables_queued=len(tables_to_enrich) |
|
|
) |
|
|
|
|
|
|
|
|
@router.post("/enrich/{table_name}") |
|
|
async def enrich_single_table(table_name: str, force: bool = False): |
|
|
""" |
|
|
Immediately enrich a single table (synchronous). |
|
|
|
|
|
Use for testing or when you need the result right away. |
|
|
""" |
|
|
from backend.core.data_catalog import get_data_catalog |
|
|
|
|
|
catalog = get_data_catalog() |
|
|
|
|
|
if table_name not in catalog.catalog: |
|
|
raise HTTPException(status_code=404, detail=f"Table '{table_name}' not found") |
|
|
|
|
|
success = await catalog.enrich_table(table_name, force) |
|
|
|
|
|
if success: |
|
|
meta = catalog.get_table_metadata(table_name) |
|
|
return { |
|
|
"status": "success", |
|
|
"table": table_name, |
|
|
"semantic_description": meta.get("semantic_description"), |
|
|
"tags": meta.get("tags", []) |
|
|
} |
|
|
else: |
|
|
raise HTTPException(status_code=500, detail=f"Failed to enrich table '{table_name}'") |
|
|
|
|
|
|
|
|
@router.get("/search") |
|
|
async def search_tables(query: str, top_k: int = 10): |
|
|
""" |
|
|
Search for tables using semantic search. |
|
|
|
|
|
Returns the most relevant tables for a natural language query. |
|
|
""" |
|
|
from backend.core.semantic_search import get_semantic_search |
|
|
from backend.core.data_catalog import get_data_catalog |
|
|
|
|
|
semantic = get_semantic_search() |
|
|
catalog = get_data_catalog() |
|
|
|
|
|
results = semantic.search(query, top_k=top_k) |
|
|
|
|
|
response = [] |
|
|
for table_name, score in results: |
|
|
meta = catalog.get_table_metadata(table_name) |
|
|
if meta: |
|
|
response.append({ |
|
|
"table": table_name, |
|
|
"score": round(score, 4), |
|
|
"description": meta.get("semantic_description") or meta.get("description"), |
|
|
"tags": meta.get("tags", []) |
|
|
}) |
|
|
|
|
|
return {"query": query, "results": response} |
|
|
|
|
|
|
|
|
@router.post("/rebuild-embeddings") |
|
|
async def rebuild_embeddings(): |
|
|
""" |
|
|
Rebuild all semantic search embeddings from current catalog. |
|
|
|
|
|
Use after bulk enrichment or catalog updates. |
|
|
""" |
|
|
from backend.core.semantic_search import get_semantic_search |
|
|
from backend.core.data_catalog import get_data_catalog |
|
|
|
|
|
semantic = get_semantic_search() |
|
|
catalog = get_data_catalog() |
|
|
|
|
|
|
|
|
count = 0 |
|
|
for table_name, metadata in catalog.catalog.items(): |
|
|
if semantic.embed_table(table_name, metadata): |
|
|
count += 1 |
|
|
|
|
|
semantic._save_embeddings() |
|
|
|
|
|
return { |
|
|
"status": "success", |
|
|
"message": f"Rebuilt embeddings for {count} tables", |
|
|
"total_embeddings": len(semantic.embeddings) |
|
|
} |
|
|
|
|
|
|