File size: 9,192 Bytes
4851501 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 |
import duckdb
import json
import logging
import os
from typing import Dict, Any, Optional, List
from backend.core.data_catalog import get_data_catalog
logger = logging.getLogger(__name__)
class GeoEngine:
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(GeoEngine, cls).__new__(cls)
cls._instance.initialized = False
return cls._instance
def __init__(self):
if self.initialized:
return
logger.info("Initializing GeoEngine (DuckDB)...")
try:
self.con = duckdb.connect(database=':memory:')
self.con.install_extension('spatial')
self.con.load_extension('spatial')
logger.info("GeoEngine initialized with Spatial extension.")
except Exception as e:
logger.error(f"Failed to initialize GeoEngine: {e}")
raise e
self.layers = {} # layer_id -> table_name
self.catalog = get_data_catalog()
self.base_tables_loaded = False
self.initialized = True
# Automatically load base tables
self.initialize_base_tables()
def initialize_base_tables(self):
"""
Load essential administrative boundary files into DuckDB tables.
"""
if self.base_tables_loaded:
return
logger.info("Loading base tables into DuckDB...")
# Load core admin tables from catalog
# We look for tables starting with 'pan_admin' in the 'base' category
base_tables = [
name for name, meta in self.catalog.catalog.items()
if meta.get('category') == 'base'
]
for table_name in base_tables:
self.ensure_table_loaded(table_name)
self.base_tables_loaded = True
logger.info("Base tables loaded.")
def ensure_table_loaded(self, table_name: str) -> bool:
"""
Ensure a table is loaded in DuckDB. If not, load it from the catalog.
Returns True if successful, False otherwise.
"""
# Check if already loaded
try:
self.con.execute(f"DESCRIBE {table_name}")
return True
except:
pass # Not loaded
# Look up in catalog
file_path = self.catalog.get_file_path(table_name)
if not file_path or not file_path.exists():
logger.warning(f"Table {table_name} not found in catalog or file missing.")
return False
try:
logger.info(f"Lazy loading table: {table_name}")
self.con.execute(f"CREATE OR REPLACE TABLE {table_name} AS SELECT * FROM ST_Read('{file_path}')")
return True
except Exception as e:
logger.error(f"Failed to load {table_name}: {e}")
return False
def get_table_schemas(self) -> str:
"""
Get schema of currently loaded tables for LLM context.
"""
result = "Currently Loaded Tables:\n\n"
try:
# Get all tables
tables = self.con.execute("SHOW TABLES").fetchall()
for table in tables:
table_name = table[0]
try:
columns = self.con.execute(f"DESCRIBE {table_name}").fetchall()
row_count = self.con.execute(f"SELECT COUNT(*) FROM {table_name}").fetchone()[0]
result += f"### {table_name} ({row_count} rows)\n"
result += "Columns:\n"
for col in columns:
col_name, col_type = col[0], col[1]
if col_name == 'geom':
result += f" - geom: GEOMETRY (spatial data)\n"
else:
result += f" - {col_name}: {col_type}\n"
result += "\n"
except:
pass
except Exception as e:
logger.error(f"Error getting schemas: {e}")
return result
def get_table_list(self) -> List[str]:
"""Return list of all available table names."""
tables = list(self.BASE_TABLES.keys())
tables.extend(self.layers.values())
return tables
def register_layer(self, layer_id: str, geojson: Dict[str, Any]) -> str:
"""
Registers a GeoJSON object as a table in DuckDB.
Returns the table name.
"""
table_name = f"layer_{layer_id.replace('-', '_')}"
# If table exists, drop it
self.con.execute(f"DROP TABLE IF EXISTS {table_name}")
# DuckDB can read JSON objects directly via read_json_auto?
# Easier to dump to string and read from memory or temporary file.
# For in-memory, we can use binding or just simple JSON text.
# Strategy: Create a table with a JSON column, then unpack?
# Better: ST_Read can read from a file.
# Using python objects directly with DuckDB replacement scan is possible but complex for nested GeoJSON.
# Simplest: Write to temp file, load with ST_Read.
try:
import tempfile
import os
def json_serial(obj):
"""JSON serializer for objects not serializable by default json code"""
if hasattr(obj, 'isoformat'):
return obj.isoformat()
raise TypeError (f"Type {type(obj)} not serializable")
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as tmp:
json.dump(geojson, tmp, default=json_serial)
tmp_path = tmp.name
self.con.execute(f"CREATE TABLE {table_name} AS SELECT * FROM ST_Read('{tmp_path}')")
os.unlink(tmp_path)
self.layers[layer_id] = table_name
logger.info(f"Registered layer {layer_id} as table {table_name}")
return table_name
except Exception as e:
logger.error(f"Error registering layer {layer_id}: {e}")
raise e
def execute_spatial_query(self, sql: str) -> Dict[str, Any]:
"""
Executes a SQL query and returns the result as a GeoJSON FeatureCollection.
Expects the query to return a geometry column.
"""
try:
logger.info(f"Executing Spatial SQL: {sql}")
# Use ST_AsGeoJSON to format the geometry column
# We assume the user/LLM selects *
# We need to wrap the user query to convert to GeoJSON format
# The query usually returns rows. We need to aggregate to FeatureCollection.
# DuckDB Spatial doesn't automagically output FeatureCollection structure.
# But the 'geojson' driver for ST_Read works. ST_AsGeoJSON works on geometries.
# Approach: Create a temporary table from the result, then export?
# Or fetch as Python objects.
self.con.execute(f"CREATE OR REPLACE TEMP TABLE query_result AS {sql}")
# Check columns to find geometry
columns = self.con.execute("DESCRIBE query_result").fetchall()
geom_col = next((c[0] for c in columns if c[0] in ['geom', 'geometry']), None)
if not geom_col and 'geometry' not in [c[0] for c in columns]:
# Maybe the user didn't select geometry?
pass
# Construct GeoJSON manually from rows
# Select ST_AsGeoJSON(geom), * EXCLUDE (geom)
other_cols = [c[0] for c in columns if c[0] != geom_col]
other_cols_select = ", ".join(other_cols) if other_cols else ""
select_clause = f"ST_AsGeoJSON({geom_col})"
if other_cols_select:
select_clause += f", {other_cols_select}"
rows = self.con.execute(f"SELECT {select_clause} FROM query_result").fetchall()
features = []
for row in rows:
geometry = json.loads(row[0])
properties = {}
for i, col_name in enumerate(other_cols):
properties[col_name] = row[i+1]
features.append({
"type": "Feature",
"geometry": geometry,
"properties": properties
})
return {
"type": "FeatureCollection",
"features": features,
"properties": {}
}
except Exception as e:
logger.error(f"Spatial query failed: {e}")
raise e
def get_table_name(self, layer_id: str) -> Optional[str]:
return self.layers.get(layer_id)
_geo_engine = None
def get_geo_engine() -> GeoEngine:
global _geo_engine
if _geo_engine is None:
_geo_engine = GeoEngine()
return _geo_engine
|