""" Catalog Management Endpoints Provides API for viewing and enriching the data catalog. """ from fastapi import APIRouter, HTTPException, BackgroundTasks from pydantic import BaseModel from typing import List, Optional, Dict, Any router = APIRouter() class CatalogStatsResponse(BaseModel): total_datasets: int enriched_datasets: int by_category: Dict[str, int] by_tag: Dict[str, int] class TableMetadataResponse(BaseModel): name: str path: str description: str semantic_description: Optional[str] tags: List[str] data_type: str columns: List[str] row_count: Optional[int] category: str last_indexed: Optional[str] last_enriched: Optional[str] class EnrichmentRequest(BaseModel): table_names: Optional[List[str]] = None # None = all tables force_refresh: bool = False class EnrichmentResponse(BaseModel): status: str message: str tables_queued: int @router.get("/stats", response_model=CatalogStatsResponse) async def get_catalog_stats(): """Get statistics about the data catalog.""" from backend.core.data_catalog import get_data_catalog catalog = get_data_catalog() stats = catalog.get_stats() return CatalogStatsResponse( total_datasets=stats["total_datasets"], enriched_datasets=stats.get("enriched_datasets", 0), by_category=stats["by_category"], by_tag=stats["by_tag"] ) @router.get("/tables", response_model=List[TableMetadataResponse]) async def list_catalog_tables(): """List all tables in the catalog with their metadata.""" from backend.core.data_catalog import get_data_catalog catalog = get_data_catalog() tables = [] for name, meta in catalog.catalog.items(): tables.append(TableMetadataResponse( name=name, path=meta.get("path", ""), description=meta.get("description", ""), semantic_description=meta.get("semantic_description"), tags=meta.get("tags", []), data_type=meta.get("data_type", "static"), columns=meta.get("columns", []), row_count=meta.get("row_count"), category=meta.get("category", "unknown"), last_indexed=meta.get("last_indexed"), last_enriched=meta.get("last_enriched") )) return tables @router.get("/tables/{table_name}", response_model=TableMetadataResponse) async def get_table_metadata(table_name: str): """Get metadata for a specific table.""" from backend.core.data_catalog import get_data_catalog catalog = get_data_catalog() meta = catalog.get_table_metadata(table_name) if not meta: raise HTTPException(status_code=404, detail=f"Table '{table_name}' not found") return TableMetadataResponse( name=table_name, path=meta.get("path", ""), description=meta.get("description", ""), semantic_description=meta.get("semantic_description"), tags=meta.get("tags", []), data_type=meta.get("data_type", "static"), columns=meta.get("columns", []), row_count=meta.get("row_count"), category=meta.get("category", "unknown"), last_indexed=meta.get("last_indexed"), last_enriched=meta.get("last_enriched") ) @router.post("/enrich", response_model=EnrichmentResponse) async def enrich_catalog(request: EnrichmentRequest, background_tasks: BackgroundTasks): """ Trigger LLM enrichment for catalog tables. Enrichment generates semantic descriptions and refined tags. Runs in the background to avoid blocking. """ from backend.core.data_catalog import get_data_catalog catalog = get_data_catalog() if request.table_names: # Validate table names invalid = [t for t in request.table_names if t not in catalog.catalog] if invalid: raise HTTPException( status_code=400, detail=f"Unknown tables: {invalid}" ) tables_to_enrich = request.table_names else: tables_to_enrich = list(catalog.catalog.keys()) # Queue enrichment in background async def run_enrichment(): for table_name in tables_to_enrich: await catalog.enrich_table(table_name, request.force_refresh) background_tasks.add_task(run_enrichment) return EnrichmentResponse( status="queued", message=f"Enrichment started for {len(tables_to_enrich)} tables", tables_queued=len(tables_to_enrich) ) @router.post("/enrich/{table_name}") async def enrich_single_table(table_name: str, force: bool = False): """ Immediately enrich a single table (synchronous). Use for testing or when you need the result right away. """ from backend.core.data_catalog import get_data_catalog catalog = get_data_catalog() if table_name not in catalog.catalog: raise HTTPException(status_code=404, detail=f"Table '{table_name}' not found") success = await catalog.enrich_table(table_name, force) if success: meta = catalog.get_table_metadata(table_name) return { "status": "success", "table": table_name, "semantic_description": meta.get("semantic_description"), "tags": meta.get("tags", []) } else: raise HTTPException(status_code=500, detail=f"Failed to enrich table '{table_name}'") @router.get("/search") async def search_tables(query: str, top_k: int = 10): """ Search for tables using semantic search. Returns the most relevant tables for a natural language query. """ from backend.core.semantic_search import get_semantic_search from backend.core.data_catalog import get_data_catalog semantic = get_semantic_search() catalog = get_data_catalog() results = semantic.search(query, top_k=top_k) response = [] for table_name, score in results: meta = catalog.get_table_metadata(table_name) if meta: response.append({ "table": table_name, "score": round(score, 4), "description": meta.get("semantic_description") or meta.get("description"), "tags": meta.get("tags", []) }) return {"query": query, "results": response} @router.post("/rebuild-embeddings") async def rebuild_embeddings(): """ Rebuild all semantic search embeddings from current catalog. Use after bulk enrichment or catalog updates. """ from backend.core.semantic_search import get_semantic_search from backend.core.data_catalog import get_data_catalog semantic = get_semantic_search() catalog = get_data_catalog() # Force re-embed all tables count = 0 for table_name, metadata in catalog.catalog.items(): if semantic.embed_table(table_name, metadata): count += 1 semantic._save_embeddings() return { "status": "success", "message": f"Rebuilt embeddings for {count} tables", "total_embeddings": len(semantic.embeddings) }