import duckdb import json import logging import os from typing import Dict, Any, Optional, List from backend.core.data_catalog import get_data_catalog logger = logging.getLogger(__name__) class GeoEngine: _instance = None def __new__(cls): if cls._instance is None: cls._instance = super(GeoEngine, cls).__new__(cls) cls._instance.initialized = False return cls._instance def __init__(self): if self.initialized: return logger.info("Initializing GeoEngine (DuckDB)...") try: self.con = duckdb.connect(database=':memory:') self.con.install_extension('spatial') self.con.load_extension('spatial') logger.info("GeoEngine initialized with Spatial extension.") except Exception as e: logger.error(f"Failed to initialize GeoEngine: {e}") raise e self.layers = {} # layer_id -> table_name self.catalog = get_data_catalog() self.base_tables_loaded = False self.initialized = True # Automatically load base tables self.initialize_base_tables() def initialize_base_tables(self): """ Load essential administrative boundary files into DuckDB tables. """ if self.base_tables_loaded: return logger.info("Loading base tables into DuckDB...") # Load core admin tables from catalog # We look for tables starting with 'pan_admin' in the 'base' category base_tables = [ name for name, meta in self.catalog.catalog.items() if meta.get('category') == 'base' ] for table_name in base_tables: self.ensure_table_loaded(table_name) self.base_tables_loaded = True logger.info("Base tables loaded.") def ensure_table_loaded(self, table_name: str) -> bool: """ Ensure a table is loaded in DuckDB. If not, load it from the catalog. Returns True if successful, False otherwise. """ # Check if already loaded try: self.con.execute(f"DESCRIBE {table_name}") return True except: pass # Not loaded # Look up in catalog file_path = self.catalog.get_file_path(table_name) if not file_path or not file_path.exists(): logger.warning(f"Table {table_name} not found in catalog or file missing.") return False try: logger.info(f"Lazy loading table: {table_name}") self.con.execute(f"CREATE OR REPLACE TABLE {table_name} AS SELECT * FROM ST_Read('{file_path}')") return True except Exception as e: logger.error(f"Failed to load {table_name}: {e}") return False def get_table_schemas(self) -> str: """ Get schema of currently loaded tables for LLM context. """ result = "Currently Loaded Tables:\n\n" try: # Get all tables tables = self.con.execute("SHOW TABLES").fetchall() for table in tables: table_name = table[0] try: columns = self.con.execute(f"DESCRIBE {table_name}").fetchall() row_count = self.con.execute(f"SELECT COUNT(*) FROM {table_name}").fetchone()[0] result += f"### {table_name} ({row_count} rows)\n" result += "Columns:\n" for col in columns: col_name, col_type = col[0], col[1] if col_name == 'geom': result += f" - geom: GEOMETRY (spatial data)\n" else: result += f" - {col_name}: {col_type}\n" result += "\n" except: pass except Exception as e: logger.error(f"Error getting schemas: {e}") return result def get_table_list(self) -> List[str]: """Return list of all available table names.""" tables = list(self.BASE_TABLES.keys()) tables.extend(self.layers.values()) return tables def register_layer(self, layer_id: str, geojson: Dict[str, Any]) -> str: """ Registers a GeoJSON object as a table in DuckDB. Returns the table name. """ table_name = f"layer_{layer_id.replace('-', '_')}" # If table exists, drop it self.con.execute(f"DROP TABLE IF EXISTS {table_name}") # DuckDB can read JSON objects directly via read_json_auto? # Easier to dump to string and read from memory or temporary file. # For in-memory, we can use binding or just simple JSON text. # Strategy: Create a table with a JSON column, then unpack? # Better: ST_Read can read from a file. # Using python objects directly with DuckDB replacement scan is possible but complex for nested GeoJSON. # Simplest: Write to temp file, load with ST_Read. try: import tempfile import os def json_serial(obj): """JSON serializer for objects not serializable by default json code""" if hasattr(obj, 'isoformat'): return obj.isoformat() raise TypeError (f"Type {type(obj)} not serializable") with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as tmp: json.dump(geojson, tmp, default=json_serial) tmp_path = tmp.name self.con.execute(f"CREATE TABLE {table_name} AS SELECT * FROM ST_Read('{tmp_path}')") os.unlink(tmp_path) self.layers[layer_id] = table_name logger.info(f"Registered layer {layer_id} as table {table_name}") return table_name except Exception as e: logger.error(f"Error registering layer {layer_id}: {e}") raise e def execute_spatial_query(self, sql: str) -> Dict[str, Any]: """ Executes a SQL query and returns the result as a GeoJSON FeatureCollection. Expects the query to return a geometry column. """ try: logger.info(f"Executing Spatial SQL: {sql}") # Use ST_AsGeoJSON to format the geometry column # We assume the user/LLM selects * # We need to wrap the user query to convert to GeoJSON format # The query usually returns rows. We need to aggregate to FeatureCollection. # DuckDB Spatial doesn't automagically output FeatureCollection structure. # But the 'geojson' driver for ST_Read works. ST_AsGeoJSON works on geometries. # Approach: Create a temporary table from the result, then export? # Or fetch as Python objects. self.con.execute(f"CREATE OR REPLACE TEMP TABLE query_result AS {sql}") # Check columns to find geometry columns = self.con.execute("DESCRIBE query_result").fetchall() geom_col = next((c[0] for c in columns if c[0] in ['geom', 'geometry']), None) if not geom_col and 'geometry' not in [c[0] for c in columns]: # Maybe the user didn't select geometry? pass # Construct GeoJSON manually from rows # Select ST_AsGeoJSON(geom), * EXCLUDE (geom) other_cols = [c[0] for c in columns if c[0] != geom_col] other_cols_select = ", ".join(other_cols) if other_cols else "" select_clause = f"ST_AsGeoJSON({geom_col})" if other_cols_select: select_clause += f", {other_cols_select}" rows = self.con.execute(f"SELECT {select_clause} FROM query_result").fetchall() features = [] for row in rows: geometry = json.loads(row[0]) properties = {} for i, col_name in enumerate(other_cols): properties[col_name] = row[i+1] features.append({ "type": "Feature", "geometry": geometry, "properties": properties }) return { "type": "FeatureCollection", "features": features, "properties": {} } except Exception as e: logger.error(f"Spatial query failed: {e}") raise e def get_table_name(self, layer_id: str) -> Optional[str]: return self.layers.get(layer_id) _geo_engine = None def get_geo_engine() -> GeoEngine: global _geo_engine if _geo_engine is None: _geo_engine = GeoEngine() return _geo_engine