File size: 9,192 Bytes
4851501
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
import duckdb
import json
import logging
import os
from typing import Dict, Any, Optional, List
from backend.core.data_catalog import get_data_catalog

logger = logging.getLogger(__name__)

class GeoEngine:
    _instance = None
    
    def __new__(cls):
        if cls._instance is None:
            cls._instance = super(GeoEngine, cls).__new__(cls)
            cls._instance.initialized = False
        return cls._instance

    def __init__(self):
        if self.initialized:
            return
        
        logger.info("Initializing GeoEngine (DuckDB)...")
        try:
            self.con = duckdb.connect(database=':memory:')
            self.con.install_extension('spatial')
            self.con.load_extension('spatial')
            logger.info("GeoEngine initialized with Spatial extension.")
        except Exception as e:
            logger.error(f"Failed to initialize GeoEngine: {e}")
            raise e
            
        self.layers = {} # layer_id -> table_name
        self.catalog = get_data_catalog()
        self.base_tables_loaded = False
        self.initialized = True
        
        # Automatically load base tables
        self.initialize_base_tables()

    def initialize_base_tables(self):
        """
        Load essential administrative boundary files into DuckDB tables.
        """
        if self.base_tables_loaded:
            return
            
        logger.info("Loading base tables into DuckDB...")
        
        # Load core admin tables from catalog
        # We look for tables starting with 'pan_admin' in the 'base' category
        base_tables = [
            name for name, meta in self.catalog.catalog.items() 
            if meta.get('category') == 'base'
        ]
        
        for table_name in base_tables:
            self.ensure_table_loaded(table_name)
            
        self.base_tables_loaded = True
        logger.info("Base tables loaded.")

    def ensure_table_loaded(self, table_name: str) -> bool:
        """
        Ensure a table is loaded in DuckDB. If not, load it from the catalog.
        Returns True if successful, False otherwise.
        """
        # Check if already loaded
        try:
            self.con.execute(f"DESCRIBE {table_name}")
            return True
        except:
            pass # Not loaded
            
        # Look up in catalog
        file_path = self.catalog.get_file_path(table_name)
        if not file_path or not file_path.exists():
            logger.warning(f"Table {table_name} not found in catalog or file missing.")
            return False
            
        try:
            logger.info(f"Lazy loading table: {table_name}")
            self.con.execute(f"CREATE OR REPLACE TABLE {table_name} AS SELECT * FROM ST_Read('{file_path}')")
            return True
        except Exception as e:
            logger.error(f"Failed to load {table_name}: {e}")
            return False

    def get_table_schemas(self) -> str:
        """
        Get schema of currently loaded tables for LLM context.
        """
        result = "Currently Loaded Tables:\n\n"
        
        try:
            # Get all tables
            tables = self.con.execute("SHOW TABLES").fetchall()
            for table in tables:
                table_name = table[0]
                try:
                    columns = self.con.execute(f"DESCRIBE {table_name}").fetchall()
                    row_count = self.con.execute(f"SELECT COUNT(*) FROM {table_name}").fetchone()[0]
                    
                    result += f"### {table_name} ({row_count} rows)\n"
                    result += "Columns:\n"
                    
                    for col in columns:
                        col_name, col_type = col[0], col[1]
                        if col_name == 'geom':
                            result += f"  - geom: GEOMETRY (spatial data)\n"
                        else:
                            result += f"  - {col_name}: {col_type}\n"
                    result += "\n"
                except:
                    pass
        except Exception as e:
            logger.error(f"Error getting schemas: {e}")
            
        return result

    def get_table_list(self) -> List[str]:
        """Return list of all available table names."""
        tables = list(self.BASE_TABLES.keys())
        tables.extend(self.layers.values())
        return tables

    def register_layer(self, layer_id: str, geojson: Dict[str, Any]) -> str:
        """
        Registers a GeoJSON object as a table in DuckDB.
        Returns the table name.
        """
        table_name = f"layer_{layer_id.replace('-', '_')}"
        
        # If table exists, drop it
        self.con.execute(f"DROP TABLE IF EXISTS {table_name}")
        
        # DuckDB can read JSON objects directly via read_json_auto? 
        # Easier to dump to string and read from memory or temporary file.
        # For in-memory, we can use binding or just simple JSON text.
        
        # Strategy: Create a table with a JSON column, then unpack?
        # Better: ST_Read can read from a file. 
        # Using python objects directly with DuckDB replacement scan is possible but complex for nested GeoJSON.
        
        # Simplest: Write to temp file, load with ST_Read.
        try:
            import tempfile
            import os
            
            def json_serial(obj):
                """JSON serializer for objects not serializable by default json code"""
                if hasattr(obj, 'isoformat'):
                    return obj.isoformat()
                raise TypeError (f"Type {type(obj)} not serializable")

            with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as tmp:
                json.dump(geojson, tmp, default=json_serial)
                tmp_path = tmp.name
                
            self.con.execute(f"CREATE TABLE {table_name} AS SELECT * FROM ST_Read('{tmp_path}')")
            os.unlink(tmp_path)
            
            self.layers[layer_id] = table_name
            logger.info(f"Registered layer {layer_id} as table {table_name}")
            return table_name
            
        except Exception as e:
            logger.error(f"Error registering layer {layer_id}: {e}")
            raise e

    def execute_spatial_query(self, sql: str) -> Dict[str, Any]:
        """
        Executes a SQL query and returns the result as a GeoJSON FeatureCollection.
        Expects the query to return a geometry column.
        """
        try:
            logger.info(f"Executing Spatial SQL: {sql}")
            
            # Use ST_AsGeoJSON to format the geometry column
            # We assume the user/LLM selects *
            # We need to wrap the user query to convert to GeoJSON format
            
            # The query usually returns rows. We need to aggregate to FeatureCollection.
            # DuckDB Spatial doesn't automagically output FeatureCollection structure.
            # But the 'geojson' driver for ST_Read works. ST_AsGeoJSON works on geometries.
            
            # Approach: Create a temporary table from the result, then export?
            # Or fetch as Python objects.
            
            self.con.execute(f"CREATE OR REPLACE TEMP TABLE query_result AS {sql}")
            
            # Check columns to find geometry
            columns = self.con.execute("DESCRIBE query_result").fetchall()
            geom_col = next((c[0] for c in columns if c[0] in ['geom', 'geometry']), None)
            
            if not geom_col and 'geometry' not in [c[0] for c in columns]:
                # Maybe the user didn't select geometry?
                pass

            # Construct GeoJSON manually from rows
            # Select ST_AsGeoJSON(geom), * EXCLUDE (geom)
            
            other_cols = [c[0] for c in columns if c[0] != geom_col]
            other_cols_select = ", ".join(other_cols) if other_cols else ""
            
            select_clause = f"ST_AsGeoJSON({geom_col})"
            if other_cols_select:
                select_clause += f", {other_cols_select}"
            
            rows = self.con.execute(f"SELECT {select_clause} FROM query_result").fetchall()
            
            features = []
            for row in rows:
                geometry = json.loads(row[0])
                properties = {}
                for i, col_name in enumerate(other_cols):
                    properties[col_name] = row[i+1]
                
                features.append({
                    "type": "Feature",
                    "geometry": geometry,
                    "properties": properties
                })
                
            return {
                "type": "FeatureCollection",
                "features": features,
                "properties": {} 
            }

        except Exception as e:
            logger.error(f"Spatial query failed: {e}")
            raise e

    def get_table_name(self, layer_id: str) -> Optional[str]:
        return self.layers.get(layer_id)

_geo_engine = None

def get_geo_engine() -> GeoEngine:
    global _geo_engine
    if _geo_engine is None:
        _geo_engine = GeoEngine()
    return _geo_engine