| # Backend Core Services | |
| Detailed reference for GeoQuery's core backend services. | |
| --- | |
| ## Service Overview | |
| | Service | File | Purpose | | |
| |---------|------|---------| | |
| | **LLMGateway** | `core/llm_gateway.py` | Gemini API integration | | |
| | **GeoEngine** | `core/geo_engine.py` | DuckDB Spatial wrapper | | |
| | **DataCatalog** | `core/data_catalog.py` | Dataset metadata management | | |
| | **SemanticSearch** | `core/semantic_search.py` | Embedding-based discovery | | |
| | **SessionStore** | `core/session_store.py` | User session and layer management | | |
| | **QueryPlanner** | `core/query_planner.py` | Multi-step query orchestration | | |
| | **QueryExecutor** | `services/executor.py` | Main query pipeline | | |
| --- | |
| ## LLMGateway | |
| **File**: `backend/core/llm_gateway.py` | |
| Unified interface to Google Gemini API with streaming support. | |
| ### Initialization | |
| ```python | |
| from backend.core.llm_gateway import LLMGateway | |
| llm = LLMGateway() | |
| ``` | |
| **Configuration**: | |
| - Reads `GEMINI_API_KEY` from environment | |
| - Uses `gemini-2.0-flash-exp` model | |
| - Enables "thinking" mode for reasoning transparency | |
| ### Key Methods | |
| #### `detect_intent(query, history) → str` | |
| Classifies user query into intent category. | |
| **Parameters**: | |
| - `query` (str): User's natural language query | |
| - `history` (List[Dict]): Conversation history | |
| **Returns**: One of: | |
| - `"GENERAL_CHAT"` - Conversational question | |
| - `"DATA_QUERY"` - Data request | |
| - `"MAP_REQUEST"` - Explicitly wants visualization | |
| - `"SPATIAL_OP"` - Geometric operation (intersection, buffer, etc.) | |
| - `"STAT_QUERY"` - Requests chart/graph | |
| **Example**: | |
| ```python | |
| intent = await llm.detect_intent("Show me hospitals in Panama", []) | |
| # Returns: "MAP_REQUEST" | |
| ``` | |
| #### `generate_analytical_sql(query, schema, history) → str` | |
| Generates DuckDB SQL query from natural language. | |
| **Parameters**: | |
| - `query` (str): User query | |
| - `schema` (str): Available table schemas | |
| - `history` (List[Dict]): Conversation context | |
| **Returns**: SQL query string | |
| **Special Cases**: | |
| - Returns `"-- ERROR: DATA_UNAVAILABLE"` if data doesn't exist | |
| - Includes `geom` column for map visualization | |
| - Uses DuckDB spatial functions (ST_Intersects, etc.) | |
| **Example**: | |
| ```python | |
| schema = "Table: panama_healthsites_geojson\\nColumns: name, amenity, geom..." | |
| sql = await llm.generate_analytical_sql("hospitals in David", schema, []) | |
| # Returns: "SELECT name, amenity, geom FROM panama_healthsites_geojson | |
| # WHERE amenity = 'hospital' AND ST_Intersects(geom, ...)" | |
| ``` | |
| #### `generate_spatial_sql(query, context, history) → str` | |
| Generates spatial operation SQL (difference, intersection, etc.). | |
| **Parameters**: | |
| - `query` (str): Spatial operation request | |
| - `context` (str): Base tables + user layers | |
| - `history` (List[Dict]): Conversation history | |
| **Returns**: SQL with spatial functions | |
| **Example**: | |
| ```python | |
| context = "Base: pan_admin1\\nUser Layers: layer_abc123 (Protected Areas)" | |
| sql = await llm.generate_spatial_sql("subtract protected areas from Chiriquí", context, []) | |
| # Returns: "WITH protected_union AS (SELECT ST_Union(geom) FROM layer_abc123) | |
| # SELECT a.*, ST_Difference(a.geom, p.geom) as geom | |
| # FROM pan_admin1 a, protected_union p WHERE a.adm1_name = 'Chiriquí'" | |
| ``` | |
| #### `generate_layer_name(query, sql) → Dict` | |
| Generates descriptive name, emoji, and point style for map layer. | |
| **Returns**: | |
| ```python | |
| { | |
| "name": "Hospitals in David", | |
| "emoji": "🏥", | |
| "pointStyle": "icon" # or "circle" or None | |
| } | |
| ``` | |
| **Point Style Logic**: | |
| - `"icon"`: Small to medium POI datasets (<500 points) | |
| - `"circle"`: Large point datasets (>500 points) | |
| - `None`: Polygon/line data (uses choropleth or line styling) | |
| #### `stream_explanation(query, sql, data_summary, history)` | |
| Streams natural language explanation of results. | |
| **Yields**: Dict with: | |
| - `{"type": "thought", "text": "reasoning..."}` - LLM thinking | |
| - `{"type": "content", "text": "response..."}` - Actual response | |
| **Example**: | |
| ```python | |
| async for chunk in llm.stream_explanation("show hospitals", sql, summary, []): | |
| if chunk["type"] == "content": | |
| print(chunk["text"], end="", flush=True) | |
| ``` | |
| ### Prompt System | |
| All prompts are centralized in `backend/core/prompts.py`: | |
| - `SYSTEM_INSTRUCTION` - Base system context | |
| - `INTENT_DETECTION_PROMPT` - Intent classification | |
| - `SQL_GENERATION_PROMPT` - Text-to-SQL | |
| - `SPATIAL_SQL_PROMPT` - Spatial operations | |
| - `LAYER_NAME_PROMPT` - Layer metadata generation | |
| - `EXPLANATION_PROMPT` - Result interpretation | |
| --- | |
| ## GeoEngine | |
| **File**: `backend/core/geo_engine.py` | |
| DuckDB Spatial database wrapper for geospatial queries. | |
| ### Initialization | |
| ```python | |
| from backend.core.geo_engine import get_geo_engine | |
| engine = get_geo_engine() # Singleton pattern | |
| ``` | |
| **Creates**: | |
| - In-memory DuckDB database | |
| - Loads Spatial extension | |
| - Configures JSON serialization | |
| ### Key Methods | |
| #### `ensure_table_loaded(table_name) → bool` | |
| Lazily loads GeoJSON dataset into DuckDB. | |
| **Parameters**: | |
| - `table_name` (str): Table identifier from catalog | |
| **Returns**: True if loaded successfully | |
| **Behavior**: | |
| - Checks if already loaded (no-op if yes) | |
| - Looks up path in DataCatalog | |
| - Reads GeoJSON file with GeoPandas | |
| - Creates DuckDB table with spatial index | |
| - Caches in `loaded_tables` dict | |
| **Example**: | |
| ```python | |
| success = engine.ensure_table_loaded("panama_healthsites_geojson") | |
| if success: | |
| print(f"Table has {len(engine.loaded_tables['panama_healthsites_geojson'])} rows") | |
| ``` | |
| #### `execute_spatial_query(sql) → Dict` | |
| Executes SQL and returns GeoJSON. | |
| **Parameters**: | |
| - `sql` (str): DuckDB SQL query | |
| **Returns**: GeoJSON FeatureCollection | |
| **Example**: | |
| ```python | |
| sql = "SELECT name, geom FROM panama_healthsites_geojson LIMIT 10" | |
| geojson = engine.execute_spatial_query(sql) | |
| # Returns: {"type": "FeatureCollection", "features": [...], "properties": {}} | |
| ``` | |
| **Error Handling**: | |
| - Raises exception with detailed error message | |
| - Logs SQL for debugging | |
| #### `register_layer(layer_id, geojson) → str` | |
| Registers user-created layer as temporary table. | |
| **Parameters**: | |
| - `layer_id` (str): Unique layer identifier | |
| - `geojson` (Dict): GeoJSON FeatureCollection | |
| **Returns**: Table name (`layer_{layer_id}`) | |
| **Purpose**: Enables spatial operations on user-created layers | |
| **Example**: | |
| ```python | |
| # User creates layer by querying hospitals | |
| hospitals_geojson = engine.execute_spatial_query("SELECT * FROM ... WHERE amenity='hospital'") | |
| # Register for later spatial ops | |
| table_name = engine.register_layer("abc123", hospitals_geojson) | |
| # Returns: "layer_abc123" | |
| # Now can use in spatial queries | |
| sql = f"SELECT * FROM pan_admin1 WHERE ST_Intersects(geom, (SELECT ST_Union(geom) FROM {table_name}))" | |
| ``` | |
| #### `get_table_schemas() → str` | |
| Generates schema descriptions for LLM context. | |
| **Returns**: Formatted string with table/column info | |
| **Example Output**: | |
| ``` | |
| Table: panama_healthsites_geojson | |
| Columns: osm_id, name, amenity, operator, geom | |
| Row count: 986 | |
| Table: pan_admin1 | |
| Columns: adm0_name, adm1_name, adm1_pcode, area_sqkm, geom | |
| Row count: 10 | |
| ``` | |
| ### Supported Spatial Functions | |
| DuckDB Spatial provides PostGIS-compatible functions: | |
| | Function | Purpose | Example | | |
| |----------|---------|---------| | |
| | `ST_Intersects(a, b)` | Test intersection | `WHERE ST_Intersects(hospital.geom, province.geom)` | | |
| | `ST_Within(a, b)` | Test containment | `WHERE ST_Within(point.geom, polygon.geom)` | | |
| | `ST_Distance(a, b)` | Calculate distance | `SELECT ST_Distance(a.geom, b.geom) as dist` | | |
| | `ST_Buffer(geom, dist)` | Create buffer | `SELECT ST_Buffer(geom, 0.1) FROM points` | | |
| | `ST_Union(geom)` | Merge geometries | `SELECT ST_Union(geom) FROM provinces` | | |
| | `ST_Difference(a, b)` | Subtract geometry | `SELECT ST_Difference(a.geom, b.geom)` | | |
| | `ST_Intersection(a, b)` | Intersect geometries | `SELECT ST_Intersection(a.geom, b.geom)` | | |
| --- | |
| ## DataCatalog | |
| **File**: `backend/core/data_catalog.py` | |
| Manages dataset metadata from `catalog.json`. | |
| ### Initialization | |
| ```python | |
| from backend.core.data_catalog import get_data_catalog | |
| catalog = get_data_catalog() # Singleton | |
| ``` | |
| **Loads**: | |
| - Reads `backend/data/catalog.json` | |
| - Parses dataset metadata | |
| - Builds searchable index | |
| ### Catalog Structure | |
| ```json | |
| { | |
| "table_name": { | |
| "path": "relative/path/to/file.geojson", | |
| "description": "Short description for display", | |
| "semantic_description": "Detailed description for AI discovery", | |
| "categories": ["infrastructure", "health"], | |
| "tags": ["hospitals", "clinics", "healthcare"], | |
| "schema": { | |
| "columns": ["name", "type", "beds", "geom"], | |
| "geometry_type": "Point" | |
| } | |
| } | |
| } | |
| ``` | |
| ### Key Methods | |
| #### `get_all_table_summaries() → str` | |
| Returns formatted summaries of all datasets for LLM context. | |
| **Format**: | |
| ``` | |
| Table: panama_healthsites_geojson | |
| Description: Healthcare facilities including hospitals, clinics... | |
| Categories: health, infrastructure | |
| ``` | |
| #### `get_summaries_for_tables(table_names) → str` | |
| Returns summaries for specific tables (used after semantic search). | |
| #### `get_table_metadata(table_name) → Dict` | |
| Returns full metadata for a single table. | |
| --- | |
| ## SemanticSearch | |
| **File**: `backend/core/semantic_search.py` | |
| Vector-based dataset discovery using sentence embeddings. | |
| ### How It Works | |
| 1. **Embedding Generation**: Convert dataset descriptions to 384-dim vectors | |
| 2. **Indexing**: Store embeddings in `embeddings.npy` | |
| 3. **Query**: Convert user query to vector | |
| 4. **Search**: Find top-k most similar datasets via cosine similarity | |
| ### Initialization | |
| ```python | |
| from backend.core.semantic_search import get_semantic_search | |
| search = get_semantic_search() # Singleton | |
| ``` | |
| **Loads**: | |
| - Sentence transformer model (`all-MiniLM-L6-v2`) | |
| - Pre-computed embeddings from file (or generates if missing) | |
| ### Key Methods | |
| #### `search_table_names(query, top_k=15) → List[str]` | |
| Finds most relevant datasets for a query. | |
| **Example**: | |
| ```python | |
| results = search.search_table_names("where are the doctors?", top_k=5) | |
| # Returns: ["panama_healthsites_geojson", "osm_amenities", ...] | |
| ``` | |
| **Performance**: Sub-millisecond for 100+ datasets | |
| ### Regenerating Embeddings | |
| When `catalog.json` changes: | |
| ```bash | |
| rm backend/data/embeddings.npy | |
| python -c "from backend.core.semantic_search import get_semantic_search; get_semantic_search()" | |
| ``` | |
| --- | |
| ## SessionStore | |
| **File**: `backend/core/session_store.py` | |
| Manages user sessions and created map layers. | |
| ### Purpose | |
| - Track layers created by each user | |
| - Enable spatial operations between user layers | |
| - Maintain session state | |
| ### Key Methods | |
| ```python | |
| from backend.core.session_store import get_session_store | |
| store = get_session_store() | |
| # Add layer to session | |
| store.add_layer("session-123", { | |
| "id": "layer_abc", | |
| "name": "Hospitals in Panama", | |
| "table_name": "layer_abc", | |
| "timestamp": "2026-01-10T12:00:00" | |
| }) | |
| # Get session layers | |
| layers = store.get_layers("session-123") | |
| ``` | |
| --- | |
| ## QueryPlanner | |
| **File**: `backend/core/query_planner.py` | |
| Decomposes complex queries into executable steps. | |
| ### Complexity Detection | |
| ```python | |
| from backend.core.query_planner import get_query_planner | |
| planner = get_query_planner() | |
| complexity = planner.detect_complexity("compare hospital count vs school count by province") | |
| # Returns: {"is_complex": True, "reason": "Multiple dataset comparison"} | |
| ``` | |
| **Complex Query Indicators**: | |
| - Multiple datasets | |
| - Aggregations across categories | |
| - Comparisons or ratios | |
| - Multi-condition filters | |
| ### Query Planning | |
| ```python | |
| plan = await planner.plan_query(query, available_tables, llm) | |
| # Returns ExecutionPlan with: | |
| # - steps: List of QueryStep objects | |
| # - parallel_groups: Steps that can run concurrently | |
| # - combination_logic: How to merge results | |
| ``` | |
| --- | |
| ## QueryExecutor | |
| **File**: `backend/services/executor.py` | |
| Main orchestrator that coordinates all services. | |
| ### Query Pipeline | |
| ```python | |
| from backend.services.executor import QueryExecutor | |
| executor = QueryExecutor() | |
| # Process query with streaming | |
| async for event in executor.process_query_stream(query, history): | |
| if event["event"] == "status": | |
| print(f"Status: {event['data']}") | |
| elif event["event"] == "chunk": | |
| print(event["data"], end="") | |
| elif event["event"] == "result": | |
| geojson = event["data"]["geojson"] | |
| ``` | |
| ### Execution Steps | |
| 1. **Intent Detection** → LLMGateway | |
| 2. **Semantic Search** → SemanticSearch | |
| 3. **Schema Loading** → DataCatalog + GeoEngine | |
| 4. **SQL Generation** → LLMGateway | |
| 5. **Query Execution** → GeoEngine | |
| 6. **Result Formatting** → ResponseFormatter | |
| 7. **Explanation** → LLMGateway (streaming) | |
| 8. **Layer Registration** → SessionStore | |
| - **Dataset Sources**: [../data/DATASET_SOURCES.md](../data/DATASET_SOURCES.md) for detailed walkthrough. | |
| --- | |
| ## Singleton Pattern | |
| Most services use the singleton pattern for efficiency: | |
| ```python | |
| # Internal cache | |
| _instance = None | |
| def get_service(): | |
| global _instance | |
| if _instance is None: | |
| _instance = Service() | |
| return _instance | |
| ``` | |
| **Benefits**: | |
| - Single database connection | |
| - Cached embeddings | |
| - Shared catalog | |
| --- | |
| ## Error Handling | |
| ### SQL Correction Loop | |
| When generated SQL fails: | |
| ```python | |
| try: | |
| result = geo_engine.execute_spatial_query(sql) | |
| except Exception as e: | |
| # Try to repair | |
| corrected_sql = await llm.correct_sql(query, sql, str(e), schema) | |
| result = geo_engine.execute_spatial_query(corrected_sql) | |
| ``` | |
| ### Data Unavailability | |
| LLM returns special marker: | |
| ```sql | |
| -- ERROR: DATA_UNAVAILABLE | |
| -- Requested: crime statistics | |
| -- Available: admin boundaries, hospitals, schools | |
| ``` | |
| Executor detects and returns helpful message to user. | |
| --- | |
| ## Next Steps | |
| - **API Reference**: [API_ENDPOINTS.md](API_ENDPOINTS.md) | |
| - **Frontend Components**: [../frontend/COMPONENTS.md](../frontend/COMPONENTS.md) | |
| - **API Reference**: [API_ENDPOINTS.md](API_ENDPOINTS.md) | |