GeoQuery / backend /core /geo_engine.py
GerardCB's picture
Deploy to Spaces (Final Clean)
4851501
import duckdb
import json
import logging
import os
from typing import Dict, Any, Optional, List
from backend.core.data_catalog import get_data_catalog
logger = logging.getLogger(__name__)
class GeoEngine:
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(GeoEngine, cls).__new__(cls)
cls._instance.initialized = False
return cls._instance
def __init__(self):
if self.initialized:
return
logger.info("Initializing GeoEngine (DuckDB)...")
try:
self.con = duckdb.connect(database=':memory:')
self.con.install_extension('spatial')
self.con.load_extension('spatial')
logger.info("GeoEngine initialized with Spatial extension.")
except Exception as e:
logger.error(f"Failed to initialize GeoEngine: {e}")
raise e
self.layers = {} # layer_id -> table_name
self.catalog = get_data_catalog()
self.base_tables_loaded = False
self.initialized = True
# Automatically load base tables
self.initialize_base_tables()
def initialize_base_tables(self):
"""
Load essential administrative boundary files into DuckDB tables.
"""
if self.base_tables_loaded:
return
logger.info("Loading base tables into DuckDB...")
# Load core admin tables from catalog
# We look for tables starting with 'pan_admin' in the 'base' category
base_tables = [
name for name, meta in self.catalog.catalog.items()
if meta.get('category') == 'base'
]
for table_name in base_tables:
self.ensure_table_loaded(table_name)
self.base_tables_loaded = True
logger.info("Base tables loaded.")
def ensure_table_loaded(self, table_name: str) -> bool:
"""
Ensure a table is loaded in DuckDB. If not, load it from the catalog.
Returns True if successful, False otherwise.
"""
# Check if already loaded
try:
self.con.execute(f"DESCRIBE {table_name}")
return True
except:
pass # Not loaded
# Look up in catalog
file_path = self.catalog.get_file_path(table_name)
if not file_path or not file_path.exists():
logger.warning(f"Table {table_name} not found in catalog or file missing.")
return False
try:
logger.info(f"Lazy loading table: {table_name}")
self.con.execute(f"CREATE OR REPLACE TABLE {table_name} AS SELECT * FROM ST_Read('{file_path}')")
return True
except Exception as e:
logger.error(f"Failed to load {table_name}: {e}")
return False
def get_table_schemas(self) -> str:
"""
Get schema of currently loaded tables for LLM context.
"""
result = "Currently Loaded Tables:\n\n"
try:
# Get all tables
tables = self.con.execute("SHOW TABLES").fetchall()
for table in tables:
table_name = table[0]
try:
columns = self.con.execute(f"DESCRIBE {table_name}").fetchall()
row_count = self.con.execute(f"SELECT COUNT(*) FROM {table_name}").fetchone()[0]
result += f"### {table_name} ({row_count} rows)\n"
result += "Columns:\n"
for col in columns:
col_name, col_type = col[0], col[1]
if col_name == 'geom':
result += f" - geom: GEOMETRY (spatial data)\n"
else:
result += f" - {col_name}: {col_type}\n"
result += "\n"
except:
pass
except Exception as e:
logger.error(f"Error getting schemas: {e}")
return result
def get_table_list(self) -> List[str]:
"""Return list of all available table names."""
tables = list(self.BASE_TABLES.keys())
tables.extend(self.layers.values())
return tables
def register_layer(self, layer_id: str, geojson: Dict[str, Any]) -> str:
"""
Registers a GeoJSON object as a table in DuckDB.
Returns the table name.
"""
table_name = f"layer_{layer_id.replace('-', '_')}"
# If table exists, drop it
self.con.execute(f"DROP TABLE IF EXISTS {table_name}")
# DuckDB can read JSON objects directly via read_json_auto?
# Easier to dump to string and read from memory or temporary file.
# For in-memory, we can use binding or just simple JSON text.
# Strategy: Create a table with a JSON column, then unpack?
# Better: ST_Read can read from a file.
# Using python objects directly with DuckDB replacement scan is possible but complex for nested GeoJSON.
# Simplest: Write to temp file, load with ST_Read.
try:
import tempfile
import os
def json_serial(obj):
"""JSON serializer for objects not serializable by default json code"""
if hasattr(obj, 'isoformat'):
return obj.isoformat()
raise TypeError (f"Type {type(obj)} not serializable")
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as tmp:
json.dump(geojson, tmp, default=json_serial)
tmp_path = tmp.name
self.con.execute(f"CREATE TABLE {table_name} AS SELECT * FROM ST_Read('{tmp_path}')")
os.unlink(tmp_path)
self.layers[layer_id] = table_name
logger.info(f"Registered layer {layer_id} as table {table_name}")
return table_name
except Exception as e:
logger.error(f"Error registering layer {layer_id}: {e}")
raise e
def execute_spatial_query(self, sql: str) -> Dict[str, Any]:
"""
Executes a SQL query and returns the result as a GeoJSON FeatureCollection.
Expects the query to return a geometry column.
"""
try:
logger.info(f"Executing Spatial SQL: {sql}")
# Use ST_AsGeoJSON to format the geometry column
# We assume the user/LLM selects *
# We need to wrap the user query to convert to GeoJSON format
# The query usually returns rows. We need to aggregate to FeatureCollection.
# DuckDB Spatial doesn't automagically output FeatureCollection structure.
# But the 'geojson' driver for ST_Read works. ST_AsGeoJSON works on geometries.
# Approach: Create a temporary table from the result, then export?
# Or fetch as Python objects.
self.con.execute(f"CREATE OR REPLACE TEMP TABLE query_result AS {sql}")
# Check columns to find geometry
columns = self.con.execute("DESCRIBE query_result").fetchall()
geom_col = next((c[0] for c in columns if c[0] in ['geom', 'geometry']), None)
if not geom_col and 'geometry' not in [c[0] for c in columns]:
# Maybe the user didn't select geometry?
pass
# Construct GeoJSON manually from rows
# Select ST_AsGeoJSON(geom), * EXCLUDE (geom)
other_cols = [c[0] for c in columns if c[0] != geom_col]
other_cols_select = ", ".join(other_cols) if other_cols else ""
select_clause = f"ST_AsGeoJSON({geom_col})"
if other_cols_select:
select_clause += f", {other_cols_select}"
rows = self.con.execute(f"SELECT {select_clause} FROM query_result").fetchall()
features = []
for row in rows:
geometry = json.loads(row[0])
properties = {}
for i, col_name in enumerate(other_cols):
properties[col_name] = row[i+1]
features.append({
"type": "Feature",
"geometry": geometry,
"properties": properties
})
return {
"type": "FeatureCollection",
"features": features,
"properties": {}
}
except Exception as e:
logger.error(f"Spatial query failed: {e}")
raise e
def get_table_name(self, layer_id: str) -> Optional[str]:
return self.layers.get(layer_id)
_geo_engine = None
def get_geo_engine() -> GeoEngine:
global _geo_engine
if _geo_engine is None:
_geo_engine = GeoEngine()
return _geo_engine