GeoQuery / docs /backend /CORE_SERVICES.md
GerardCB's picture
Deploy to Spaces (Final Clean)
4851501
# Backend Core Services
Detailed reference for GeoQuery's core backend services.
---
## Service Overview
| Service | File | Purpose |
|---------|------|---------|
| **LLMGateway** | `core/llm_gateway.py` | Gemini API integration |
| **GeoEngine** | `core/geo_engine.py` | DuckDB Spatial wrapper |
| **DataCatalog** | `core/data_catalog.py` | Dataset metadata management |
| **SemanticSearch** | `core/semantic_search.py` | Embedding-based discovery |
| **SessionStore** | `core/session_store.py` | User session and layer management |
| **QueryPlanner** | `core/query_planner.py` | Multi-step query orchestration |
| **QueryExecutor** | `services/executor.py` | Main query pipeline |
---
## LLMGateway
**File**: `backend/core/llm_gateway.py`
Unified interface to Google Gemini API with streaming support.
### Initialization
```python
from backend.core.llm_gateway import LLMGateway
llm = LLMGateway()
```
**Configuration**:
- Reads `GEMINI_API_KEY` from environment
- Uses `gemini-2.0-flash-exp` model
- Enables "thinking" mode for reasoning transparency
### Key Methods
#### `detect_intent(query, history) → str`
Classifies user query into intent category.
**Parameters**:
- `query` (str): User's natural language query
- `history` (List[Dict]): Conversation history
**Returns**: One of:
- `"GENERAL_CHAT"` - Conversational question
- `"DATA_QUERY"` - Data request
- `"MAP_REQUEST"` - Explicitly wants visualization
- `"SPATIAL_OP"` - Geometric operation (intersection, buffer, etc.)
- `"STAT_QUERY"` - Requests chart/graph
**Example**:
```python
intent = await llm.detect_intent("Show me hospitals in Panama", [])
# Returns: "MAP_REQUEST"
```
#### `generate_analytical_sql(query, schema, history) → str`
Generates DuckDB SQL query from natural language.
**Parameters**:
- `query` (str): User query
- `schema` (str): Available table schemas
- `history` (List[Dict]): Conversation context
**Returns**: SQL query string
**Special Cases**:
- Returns `"-- ERROR: DATA_UNAVAILABLE"` if data doesn't exist
- Includes `geom` column for map visualization
- Uses DuckDB spatial functions (ST_Intersects, etc.)
**Example**:
```python
schema = "Table: panama_healthsites_geojson\\nColumns: name, amenity, geom..."
sql = await llm.generate_analytical_sql("hospitals in David", schema, [])
# Returns: "SELECT name, amenity, geom FROM panama_healthsites_geojson
# WHERE amenity = 'hospital' AND ST_Intersects(geom, ...)"
```
#### `generate_spatial_sql(query, context, history) → str`
Generates spatial operation SQL (difference, intersection, etc.).
**Parameters**:
- `query` (str): Spatial operation request
- `context` (str): Base tables + user layers
- `history` (List[Dict]): Conversation history
**Returns**: SQL with spatial functions
**Example**:
```python
context = "Base: pan_admin1\\nUser Layers: layer_abc123 (Protected Areas)"
sql = await llm.generate_spatial_sql("subtract protected areas from Chiriquí", context, [])
# Returns: "WITH protected_union AS (SELECT ST_Union(geom) FROM layer_abc123)
# SELECT a.*, ST_Difference(a.geom, p.geom) as geom
# FROM pan_admin1 a, protected_union p WHERE a.adm1_name = 'Chiriquí'"
```
#### `generate_layer_name(query, sql) → Dict`
Generates descriptive name, emoji, and point style for map layer.
**Returns**:
```python
{
"name": "Hospitals in David",
"emoji": "🏥",
"pointStyle": "icon" # or "circle" or None
}
```
**Point Style Logic**:
- `"icon"`: Small to medium POI datasets (<500 points)
- `"circle"`: Large point datasets (>500 points)
- `None`: Polygon/line data (uses choropleth or line styling)
#### `stream_explanation(query, sql, data_summary, history)`
Streams natural language explanation of results.
**Yields**: Dict with:
- `{"type": "thought", "text": "reasoning..."}` - LLM thinking
- `{"type": "content", "text": "response..."}` - Actual response
**Example**:
```python
async for chunk in llm.stream_explanation("show hospitals", sql, summary, []):
if chunk["type"] == "content":
print(chunk["text"], end="", flush=True)
```
### Prompt System
All prompts are centralized in `backend/core/prompts.py`:
- `SYSTEM_INSTRUCTION` - Base system context
- `INTENT_DETECTION_PROMPT` - Intent classification
- `SQL_GENERATION_PROMPT` - Text-to-SQL
- `SPATIAL_SQL_PROMPT` - Spatial operations
- `LAYER_NAME_PROMPT` - Layer metadata generation
- `EXPLANATION_PROMPT` - Result interpretation
---
## GeoEngine
**File**: `backend/core/geo_engine.py`
DuckDB Spatial database wrapper for geospatial queries.
### Initialization
```python
from backend.core.geo_engine import get_geo_engine
engine = get_geo_engine() # Singleton pattern
```
**Creates**:
- In-memory DuckDB database
- Loads Spatial extension
- Configures JSON serialization
### Key Methods
#### `ensure_table_loaded(table_name) → bool`
Lazily loads GeoJSON dataset into DuckDB.
**Parameters**:
- `table_name` (str): Table identifier from catalog
**Returns**: True if loaded successfully
**Behavior**:
- Checks if already loaded (no-op if yes)
- Looks up path in DataCatalog
- Reads GeoJSON file with GeoPandas
- Creates DuckDB table with spatial index
- Caches in `loaded_tables` dict
**Example**:
```python
success = engine.ensure_table_loaded("panama_healthsites_geojson")
if success:
print(f"Table has {len(engine.loaded_tables['panama_healthsites_geojson'])} rows")
```
#### `execute_spatial_query(sql) → Dict`
Executes SQL and returns GeoJSON.
**Parameters**:
- `sql` (str): DuckDB SQL query
**Returns**: GeoJSON FeatureCollection
**Example**:
```python
sql = "SELECT name, geom FROM panama_healthsites_geojson LIMIT 10"
geojson = engine.execute_spatial_query(sql)
# Returns: {"type": "FeatureCollection", "features": [...], "properties": {}}
```
**Error Handling**:
- Raises exception with detailed error message
- Logs SQL for debugging
#### `register_layer(layer_id, geojson) → str`
Registers user-created layer as temporary table.
**Parameters**:
- `layer_id` (str): Unique layer identifier
- `geojson` (Dict): GeoJSON FeatureCollection
**Returns**: Table name (`layer_{layer_id}`)
**Purpose**: Enables spatial operations on user-created layers
**Example**:
```python
# User creates layer by querying hospitals
hospitals_geojson = engine.execute_spatial_query("SELECT * FROM ... WHERE amenity='hospital'")
# Register for later spatial ops
table_name = engine.register_layer("abc123", hospitals_geojson)
# Returns: "layer_abc123"
# Now can use in spatial queries
sql = f"SELECT * FROM pan_admin1 WHERE ST_Intersects(geom, (SELECT ST_Union(geom) FROM {table_name}))"
```
#### `get_table_schemas() → str`
Generates schema descriptions for LLM context.
**Returns**: Formatted string with table/column info
**Example Output**:
```
Table: panama_healthsites_geojson
Columns: osm_id, name, amenity, operator, geom
Row count: 986
Table: pan_admin1
Columns: adm0_name, adm1_name, adm1_pcode, area_sqkm, geom
Row count: 10
```
### Supported Spatial Functions
DuckDB Spatial provides PostGIS-compatible functions:
| Function | Purpose | Example |
|----------|---------|---------|
| `ST_Intersects(a, b)` | Test intersection | `WHERE ST_Intersects(hospital.geom, province.geom)` |
| `ST_Within(a, b)` | Test containment | `WHERE ST_Within(point.geom, polygon.geom)` |
| `ST_Distance(a, b)` | Calculate distance | `SELECT ST_Distance(a.geom, b.geom) as dist` |
| `ST_Buffer(geom, dist)` | Create buffer | `SELECT ST_Buffer(geom, 0.1) FROM points` |
| `ST_Union(geom)` | Merge geometries | `SELECT ST_Union(geom) FROM provinces` |
| `ST_Difference(a, b)` | Subtract geometry | `SELECT ST_Difference(a.geom, b.geom)` |
| `ST_Intersection(a, b)` | Intersect geometries | `SELECT ST_Intersection(a.geom, b.geom)` |
---
## DataCatalog
**File**: `backend/core/data_catalog.py`
Manages dataset metadata from `catalog.json`.
### Initialization
```python
from backend.core.data_catalog import get_data_catalog
catalog = get_data_catalog() # Singleton
```
**Loads**:
- Reads `backend/data/catalog.json`
- Parses dataset metadata
- Builds searchable index
### Catalog Structure
```json
{
"table_name": {
"path": "relative/path/to/file.geojson",
"description": "Short description for display",
"semantic_description": "Detailed description for AI discovery",
"categories": ["infrastructure", "health"],
"tags": ["hospitals", "clinics", "healthcare"],
"schema": {
"columns": ["name", "type", "beds", "geom"],
"geometry_type": "Point"
}
}
}
```
### Key Methods
#### `get_all_table_summaries() → str`
Returns formatted summaries of all datasets for LLM context.
**Format**:
```
Table: panama_healthsites_geojson
Description: Healthcare facilities including hospitals, clinics...
Categories: health, infrastructure
```
#### `get_summaries_for_tables(table_names) → str`
Returns summaries for specific tables (used after semantic search).
#### `get_table_metadata(table_name) → Dict`
Returns full metadata for a single table.
---
## SemanticSearch
**File**: `backend/core/semantic_search.py`
Vector-based dataset discovery using sentence embeddings.
### How It Works
1. **Embedding Generation**: Convert dataset descriptions to 384-dim vectors
2. **Indexing**: Store embeddings in `embeddings.npy`
3. **Query**: Convert user query to vector
4. **Search**: Find top-k most similar datasets via cosine similarity
### Initialization
```python
from backend.core.semantic_search import get_semantic_search
search = get_semantic_search() # Singleton
```
**Loads**:
- Sentence transformer model (`all-MiniLM-L6-v2`)
- Pre-computed embeddings from file (or generates if missing)
### Key Methods
#### `search_table_names(query, top_k=15) → List[str]`
Finds most relevant datasets for a query.
**Example**:
```python
results = search.search_table_names("where are the doctors?", top_k=5)
# Returns: ["panama_healthsites_geojson", "osm_amenities", ...]
```
**Performance**: Sub-millisecond for 100+ datasets
### Regenerating Embeddings
When `catalog.json` changes:
```bash
rm backend/data/embeddings.npy
python -c "from backend.core.semantic_search import get_semantic_search; get_semantic_search()"
```
---
## SessionStore
**File**: `backend/core/session_store.py`
Manages user sessions and created map layers.
### Purpose
- Track layers created by each user
- Enable spatial operations between user layers
- Maintain session state
### Key Methods
```python
from backend.core.session_store import get_session_store
store = get_session_store()
# Add layer to session
store.add_layer("session-123", {
"id": "layer_abc",
"name": "Hospitals in Panama",
"table_name": "layer_abc",
"timestamp": "2026-01-10T12:00:00"
})
# Get session layers
layers = store.get_layers("session-123")
```
---
## QueryPlanner
**File**: `backend/core/query_planner.py`
Decomposes complex queries into executable steps.
### Complexity Detection
```python
from backend.core.query_planner import get_query_planner
planner = get_query_planner()
complexity = planner.detect_complexity("compare hospital count vs school count by province")
# Returns: {"is_complex": True, "reason": "Multiple dataset comparison"}
```
**Complex Query Indicators**:
- Multiple datasets
- Aggregations across categories
- Comparisons or ratios
- Multi-condition filters
### Query Planning
```python
plan = await planner.plan_query(query, available_tables, llm)
# Returns ExecutionPlan with:
# - steps: List of QueryStep objects
# - parallel_groups: Steps that can run concurrently
# - combination_logic: How to merge results
```
---
## QueryExecutor
**File**: `backend/services/executor.py`
Main orchestrator that coordinates all services.
### Query Pipeline
```python
from backend.services.executor import QueryExecutor
executor = QueryExecutor()
# Process query with streaming
async for event in executor.process_query_stream(query, history):
if event["event"] == "status":
print(f"Status: {event['data']}")
elif event["event"] == "chunk":
print(event["data"], end="")
elif event["event"] == "result":
geojson = event["data"]["geojson"]
```
### Execution Steps
1. **Intent Detection** → LLMGateway
2. **Semantic Search** → SemanticSearch
3. **Schema Loading** → DataCatalog + GeoEngine
4. **SQL Generation** → LLMGateway
5. **Query Execution** → GeoEngine
6. **Result Formatting** → ResponseFormatter
7. **Explanation** → LLMGateway (streaming)
8. **Layer Registration** → SessionStore
- **Dataset Sources**: [../data/DATASET_SOURCES.md](../data/DATASET_SOURCES.md) for detailed walkthrough.
---
## Singleton Pattern
Most services use the singleton pattern for efficiency:
```python
# Internal cache
_instance = None
def get_service():
global _instance
if _instance is None:
_instance = Service()
return _instance
```
**Benefits**:
- Single database connection
- Cached embeddings
- Shared catalog
---
## Error Handling
### SQL Correction Loop
When generated SQL fails:
```python
try:
result = geo_engine.execute_spatial_query(sql)
except Exception as e:
# Try to repair
corrected_sql = await llm.correct_sql(query, sql, str(e), schema)
result = geo_engine.execute_spatial_query(corrected_sql)
```
### Data Unavailability
LLM returns special marker:
```sql
-- ERROR: DATA_UNAVAILABLE
-- Requested: crime statistics
-- Available: admin boundaries, hospitals, schools
```
Executor detects and returns helpful message to user.
---
## Next Steps
- **API Reference**: [API_ENDPOINTS.md](API_ENDPOINTS.md)
- **Frontend Components**: [../frontend/COMPONENTS.md](../frontend/COMPONENTS.md)
- **API Reference**: [API_ENDPOINTS.md](API_ENDPOINTS.md)