|
|
""" |
|
|
Data Catalog Service |
|
|
|
|
|
Manages metadata for all datasets available in the platform. |
|
|
Supports semantic search integration for scalable discovery. |
|
|
""" |
|
|
|
|
|
import json |
|
|
import duckdb |
|
|
import logging |
|
|
from datetime import datetime |
|
|
from pathlib import Path |
|
|
from typing import List, Dict, Any, Optional |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
|
|
|
TAG_RULES = { |
|
|
|
|
|
"health": ["health", "facilities", "infrastructure"], |
|
|
"hospital": ["health", "facilities", "medical"], |
|
|
"clinic": ["health", "facilities", "medical"], |
|
|
"school": ["education", "facilities", "infrastructure"], |
|
|
"university": ["education", "facilities", "higher-education"], |
|
|
"education": ["education", "facilities"], |
|
|
"road": ["transportation", "infrastructure", "roads"], |
|
|
"street": ["transportation", "infrastructure", "roads"], |
|
|
"highway": ["transportation", "infrastructure", "roads"], |
|
|
"airport": ["transportation", "infrastructure", "aviation"], |
|
|
"port": ["transportation", "infrastructure", "maritime"], |
|
|
"population": ["demographics", "census", "population"], |
|
|
"census": ["demographics", "census", "statistics"], |
|
|
"admin": ["administrative", "boundaries", "government"], |
|
|
"district": ["administrative", "boundaries"], |
|
|
"province": ["administrative", "boundaries"], |
|
|
"corregimiento": ["administrative", "boundaries"], |
|
|
"park": ["recreation", "green-space", "amenities"], |
|
|
"water": ["hydrology", "natural-resources"], |
|
|
"river": ["hydrology", "water"], |
|
|
"forest": ["environment", "natural-resources", "land-cover"], |
|
|
"building": ["infrastructure", "built-environment"], |
|
|
"poi": ["points-of-interest", "amenities"], |
|
|
} |
|
|
|
|
|
|
|
|
class DataCatalog: |
|
|
""" |
|
|
Singleton service managing dataset metadata. |
|
|
|
|
|
Features: |
|
|
- Auto-discovery of GeoJSON files in data directories |
|
|
- Schema inference from first record |
|
|
- Auto-tagging based on naming conventions |
|
|
- Integration with semantic search for scalable discovery |
|
|
""" |
|
|
|
|
|
_instance = None |
|
|
|
|
|
DATA_DIR = Path(__file__).parent.parent / "data" |
|
|
CATALOG_FILE = DATA_DIR / "catalog.json" |
|
|
|
|
|
def __new__(cls): |
|
|
if cls._instance is None: |
|
|
cls._instance = super(DataCatalog, cls).__new__(cls) |
|
|
cls._instance.initialized = False |
|
|
return cls._instance |
|
|
|
|
|
def __init__(self): |
|
|
if self.initialized: |
|
|
return |
|
|
|
|
|
self.catalog: Dict[str, Any] = {} |
|
|
self.load_catalog() |
|
|
self.scan_and_update() |
|
|
self._init_semantic_search() |
|
|
self.initialized = True |
|
|
|
|
|
def load_catalog(self): |
|
|
"""Load catalog from JSON file.""" |
|
|
if self.CATALOG_FILE.exists(): |
|
|
try: |
|
|
with open(self.CATALOG_FILE, 'r') as f: |
|
|
self.catalog = json.load(f) |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to load catalog: {e}") |
|
|
self.catalog = {} |
|
|
else: |
|
|
self.catalog = {} |
|
|
|
|
|
def save_catalog(self): |
|
|
"""Save catalog to JSON file.""" |
|
|
try: |
|
|
with open(self.CATALOG_FILE, 'w') as f: |
|
|
json.dump(self.catalog, f, indent=2) |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to save catalog: {e}") |
|
|
|
|
|
def _infer_tags(self, table_name: str, columns: List[str]) -> List[str]: |
|
|
"""Auto-generate tags based on table name and columns.""" |
|
|
tags = set() |
|
|
name_lower = table_name.lower() |
|
|
|
|
|
|
|
|
for keyword, keyword_tags in TAG_RULES.items(): |
|
|
if keyword in name_lower: |
|
|
tags.update(keyword_tags) |
|
|
|
|
|
|
|
|
columns_lower = [c.lower() for c in columns] |
|
|
if any('pop' in c for c in columns_lower): |
|
|
tags.add("population") |
|
|
if any('area' in c for c in columns_lower): |
|
|
tags.add("geographic") |
|
|
if 'geom' in columns_lower or 'geometry' in columns_lower: |
|
|
tags.add("spatial") |
|
|
|
|
|
return list(tags) |
|
|
|
|
|
def _infer_data_type(self, category: str, table_name: str) -> str: |
|
|
"""Infer data type (static, semi-static, realtime).""" |
|
|
|
|
|
if category == "base": |
|
|
return "static" |
|
|
|
|
|
|
|
|
if category == "osm": |
|
|
return "semi-static" |
|
|
|
|
|
|
|
|
if category == "hdx": |
|
|
return "semi-static" |
|
|
|
|
|
|
|
|
if "census" in table_name.lower(): |
|
|
return "static" |
|
|
|
|
|
return "static" |
|
|
|
|
|
def scan_and_update(self): |
|
|
"""Scan data directories and update catalog with new files.""" |
|
|
logger.info("Scanning data directories...") |
|
|
|
|
|
|
|
|
subdirs = ['base', 'osm', 'inec', 'hdx', 'custom', 'overture', 'ms_buildings'] |
|
|
|
|
|
|
|
|
con = duckdb.connect(':memory:') |
|
|
con.install_extension('spatial') |
|
|
con.load_extension('spatial') |
|
|
|
|
|
updated = False |
|
|
|
|
|
for subdir in subdirs: |
|
|
dir_path = self.DATA_DIR / subdir |
|
|
if not dir_path.exists(): |
|
|
continue |
|
|
|
|
|
|
|
|
for file_path in list(dir_path.glob('**/*.geojson')) + list(dir_path.glob('**/*.geojson.gz')): |
|
|
table_name = file_path.name.replace('.geojson.gz', '').replace('.geojson', '').lower().replace('-', '_').replace(' ', '_') |
|
|
|
|
|
|
|
|
existing = self.catalog.get(table_name) |
|
|
rel_path = str(file_path.relative_to(self.DATA_DIR)) |
|
|
|
|
|
if existing and existing.get('path') == rel_path: |
|
|
|
|
|
if 'tags' in existing and 'data_type' in existing: |
|
|
continue |
|
|
|
|
|
try: |
|
|
logger.info(f"Indexing {table_name}...") |
|
|
|
|
|
|
|
|
query = f"SELECT * FROM ST_Read('{file_path}') LIMIT 1" |
|
|
df = con.execute(query).fetchdf() |
|
|
columns = list(df.columns) |
|
|
|
|
|
|
|
|
row_count_query = f"SELECT COUNT(*) FROM ST_Read('{file_path}')" |
|
|
row_count = con.execute(row_count_query).fetchone()[0] |
|
|
|
|
|
|
|
|
tags = self._infer_tags(table_name, columns) |
|
|
|
|
|
|
|
|
data_type = self._infer_data_type(subdir, table_name) |
|
|
|
|
|
|
|
|
self.catalog[table_name] = { |
|
|
"path": rel_path, |
|
|
"description": f"Data from {subdir}/{file_path.name}", |
|
|
"semantic_description": None, |
|
|
"tags": tags, |
|
|
"data_type": data_type, |
|
|
"update_frequency": None, |
|
|
"columns": columns, |
|
|
"row_count": row_count, |
|
|
"category": subdir, |
|
|
"format": "geojson", |
|
|
"last_indexed": datetime.now().isoformat() |
|
|
} |
|
|
updated = True |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Failed to index {file_path}: {e}") |
|
|
|
|
|
con.close() |
|
|
|
|
|
if updated: |
|
|
self.save_catalog() |
|
|
logger.info("Catalog updated.") |
|
|
|
|
|
def _init_semantic_search(self): |
|
|
"""Initialize semantic search with current catalog.""" |
|
|
try: |
|
|
from backend.core.semantic_search import get_semantic_search |
|
|
semantic = get_semantic_search() |
|
|
|
|
|
|
|
|
new_embeddings = semantic.embed_all_tables(self.catalog) |
|
|
if new_embeddings > 0: |
|
|
logger.info(f"Created {new_embeddings} new semantic embeddings.") |
|
|
except Exception as e: |
|
|
logger.warning(f"Semantic search initialization failed: {e}") |
|
|
|
|
|
def get_table_metadata(self, table_name: str) -> Optional[Dict]: |
|
|
"""Get metadata for a specific table.""" |
|
|
return self.catalog.get(table_name) |
|
|
|
|
|
def get_all_table_summaries(self) -> str: |
|
|
""" |
|
|
Returns a concise summary of all tables. |
|
|
|
|
|
WARNING: This can be very large with many datasets. |
|
|
Prefer using semantic_search.search() for discovery. |
|
|
""" |
|
|
summary = "Available Data Tables:\n" |
|
|
|
|
|
|
|
|
by_category: Dict[str, List] = {} |
|
|
for name, meta in self.catalog.items(): |
|
|
cat = meta.get('category', 'other') |
|
|
if cat not in by_category: |
|
|
by_category[cat] = [] |
|
|
by_category[cat].append((name, meta)) |
|
|
|
|
|
for cat, items in by_category.items(): |
|
|
summary += f"\n## {cat.upper()}\n" |
|
|
for name, meta in items: |
|
|
desc = meta.get('semantic_description') or meta.get('description', 'No description') |
|
|
tags = meta.get('tags', []) |
|
|
tag_str = f" [{', '.join(tags[:3])}]" if tags else "" |
|
|
summary += f"- {name}: {desc}{tag_str}\n" |
|
|
|
|
|
return summary |
|
|
|
|
|
def get_summaries_for_tables(self, table_names: List[str]) -> str: |
|
|
""" |
|
|
Get summaries only for specified tables. |
|
|
|
|
|
Used after semantic pre-filtering to build focused LLM context. |
|
|
""" |
|
|
summary = "Relevant Data Tables:\n\n" |
|
|
|
|
|
for name in table_names: |
|
|
meta = self.catalog.get(name) |
|
|
if not meta: |
|
|
continue |
|
|
|
|
|
desc = meta.get('semantic_description') or meta.get('description', 'No description') |
|
|
tags = meta.get('tags', []) |
|
|
columns = meta.get('columns', [])[:10] |
|
|
row_count = meta.get('row_count', 'unknown') |
|
|
|
|
|
summary += f"### {name}\n" |
|
|
summary += f"Description: {desc}\n" |
|
|
if tags: |
|
|
summary += f"Tags: {', '.join(tags)}\n" |
|
|
summary += f"Columns: {', '.join(columns)}\n" |
|
|
summary += f"Rows: {row_count}\n\n" |
|
|
|
|
|
return summary |
|
|
|
|
|
def get_specific_table_schemas(self, table_names: List[str]) -> str: |
|
|
"""Returns detailed schema for specific tables.""" |
|
|
output = "" |
|
|
for name in table_names: |
|
|
meta = self.catalog.get(name) |
|
|
if not meta: |
|
|
continue |
|
|
|
|
|
output += f"### {name}\n" |
|
|
output += f"Description: {meta.get('description')}\n" |
|
|
output += "Columns: " + ", ".join(meta.get('columns', [])) + "\n\n" |
|
|
return output |
|
|
|
|
|
def get_file_path(self, table_name: str) -> Optional[Path]: |
|
|
"""Get absolute path for a table's data file.""" |
|
|
meta = self.catalog.get(table_name) |
|
|
if meta and 'path' in meta: |
|
|
return self.DATA_DIR / meta['path'] |
|
|
return None |
|
|
|
|
|
def get_tables_by_tag(self, tag: str) -> List[str]: |
|
|
"""Get all table names that have a specific tag.""" |
|
|
return [ |
|
|
name for name, meta in self.catalog.items() |
|
|
if tag in meta.get('tags', []) |
|
|
] |
|
|
|
|
|
def get_tables_by_category(self, category: str) -> List[str]: |
|
|
"""Get all table names in a specific category.""" |
|
|
return [ |
|
|
name for name, meta in self.catalog.items() |
|
|
if meta.get('category') == category |
|
|
] |
|
|
|
|
|
def get_stats(self) -> dict: |
|
|
"""Return statistics about the catalog.""" |
|
|
categories = {} |
|
|
tags = {} |
|
|
enriched_count = 0 |
|
|
|
|
|
for meta in self.catalog.values(): |
|
|
cat = meta.get('category', 'other') |
|
|
categories[cat] = categories.get(cat, 0) + 1 |
|
|
|
|
|
if meta.get('semantic_description'): |
|
|
enriched_count += 1 |
|
|
|
|
|
for tag in meta.get('tags', []): |
|
|
tags[tag] = tags.get(tag, 0) + 1 |
|
|
|
|
|
return { |
|
|
"total_datasets": len(self.catalog), |
|
|
"enriched_datasets": enriched_count, |
|
|
"by_category": categories, |
|
|
"by_tag": dict(sorted(tags.items(), key=lambda x: -x[1])[:20]), |
|
|
"catalog_file": str(self.CATALOG_FILE) |
|
|
} |
|
|
|
|
|
async def enrich_table(self, table_name: str, force_refresh: bool = False) -> bool: |
|
|
""" |
|
|
Enrich a single table with LLM-generated metadata. |
|
|
|
|
|
Returns True if enrichment was successful. |
|
|
""" |
|
|
if table_name not in self.catalog: |
|
|
logger.warning(f"Table {table_name} not found in catalog") |
|
|
return False |
|
|
|
|
|
metadata = self.catalog[table_name] |
|
|
|
|
|
|
|
|
if not force_refresh and metadata.get('semantic_description'): |
|
|
logger.info(f"Table {table_name} already enriched, skipping") |
|
|
return True |
|
|
|
|
|
try: |
|
|
from backend.core.catalog_enricher import get_catalog_enricher |
|
|
enricher = get_catalog_enricher() |
|
|
|
|
|
|
|
|
sample_values = await self._get_sample_values(table_name) |
|
|
|
|
|
|
|
|
enriched = await enricher.enrich_table(table_name, metadata, sample_values, force_refresh) |
|
|
|
|
|
|
|
|
enriched['last_enriched'] = datetime.now().isoformat() |
|
|
self.catalog[table_name] = enriched |
|
|
self.save_catalog() |
|
|
|
|
|
|
|
|
self._update_embedding(table_name, enriched) |
|
|
|
|
|
logger.info(f"Successfully enriched {table_name}") |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to enrich {table_name}: {e}") |
|
|
return False |
|
|
|
|
|
async def enrich_all_tables(self, force_refresh: bool = False) -> Dict[str, bool]: |
|
|
""" |
|
|
Enrich all tables in the catalog. |
|
|
|
|
|
Returns dict of table_name -> success status. |
|
|
""" |
|
|
results = {} |
|
|
|
|
|
for table_name in self.catalog.keys(): |
|
|
success = await self.enrich_table(table_name, force_refresh) |
|
|
results[table_name] = success |
|
|
|
|
|
return results |
|
|
|
|
|
async def _get_sample_values(self, table_name: str) -> Optional[Dict[str, str]]: |
|
|
"""Get sample values from a table for enrichment context.""" |
|
|
try: |
|
|
from backend.core.geo_engine import get_geo_engine |
|
|
geo_engine = get_geo_engine() |
|
|
|
|
|
|
|
|
geo_engine.ensure_table_loaded(table_name) |
|
|
|
|
|
|
|
|
result = geo_engine.con.execute(f"SELECT * FROM {table_name} LIMIT 1").fetchdf() |
|
|
|
|
|
if len(result) > 0: |
|
|
sample = {} |
|
|
for col in result.columns: |
|
|
if col != 'geom': |
|
|
val = result[col].iloc[0] |
|
|
if val is not None: |
|
|
sample[col] = str(val)[:50] |
|
|
return sample |
|
|
|
|
|
except Exception as e: |
|
|
logger.debug(f"Could not get sample values for {table_name}: {e}") |
|
|
|
|
|
return None |
|
|
|
|
|
def _update_embedding(self, table_name: str, metadata: Dict[str, Any]) -> None: |
|
|
"""Update semantic search embedding for a table.""" |
|
|
try: |
|
|
from backend.core.semantic_search import get_semantic_search |
|
|
semantic = get_semantic_search() |
|
|
semantic.embed_table(table_name, metadata) |
|
|
semantic._save_embeddings() |
|
|
except Exception as e: |
|
|
logger.warning(f"Could not update embedding for {table_name}: {e}") |
|
|
|
|
|
|
|
|
_data_catalog = None |
|
|
|
|
|
|
|
|
def get_data_catalog() -> DataCatalog: |
|
|
"""Get the singleton data catalog instance.""" |
|
|
global _data_catalog |
|
|
if _data_catalog is None: |
|
|
_data_catalog = DataCatalog() |
|
|
return _data_catalog |
|
|
|