Backend Core Services
Detailed reference for GeoQuery's core backend services.
Service Overview
| Service | File | Purpose |
|---|---|---|
| LLMGateway | core/llm_gateway.py |
Gemini API integration |
| GeoEngine | core/geo_engine.py |
DuckDB Spatial wrapper |
| DataCatalog | core/data_catalog.py |
Dataset metadata management |
| SemanticSearch | core/semantic_search.py |
Embedding-based discovery |
| SessionStore | core/session_store.py |
User session and layer management |
| QueryPlanner | core/query_planner.py |
Multi-step query orchestration |
| QueryExecutor | services/executor.py |
Main query pipeline |
LLMGateway
File: backend/core/llm_gateway.py
Unified interface to Google Gemini API with streaming support.
Initialization
from backend.core.llm_gateway import LLMGateway
llm = LLMGateway()
Configuration:
- Reads
GEMINI_API_KEYfrom environment - Uses
gemini-2.0-flash-expmodel - Enables "thinking" mode for reasoning transparency
Key Methods
detect_intent(query, history) β str
Classifies user query into intent category.
Parameters:
query(str): User's natural language queryhistory(List[Dict]): Conversation history
Returns: One of:
"GENERAL_CHAT"- Conversational question"DATA_QUERY"- Data request"MAP_REQUEST"- Explicitly wants visualization"SPATIAL_OP"- Geometric operation (intersection, buffer, etc.)"STAT_QUERY"- Requests chart/graph
Example:
intent = await llm.detect_intent("Show me hospitals in Panama", [])
# Returns: "MAP_REQUEST"
generate_analytical_sql(query, schema, history) β str
Generates DuckDB SQL query from natural language.
Parameters:
query(str): User queryschema(str): Available table schemashistory(List[Dict]): Conversation context
Returns: SQL query string
Special Cases:
- Returns
"-- ERROR: DATA_UNAVAILABLE"if data doesn't exist - Includes
geomcolumn for map visualization - Uses DuckDB spatial functions (ST_Intersects, etc.)
Example:
schema = "Table: panama_healthsites_geojson\\nColumns: name, amenity, geom..."
sql = await llm.generate_analytical_sql("hospitals in David", schema, [])
# Returns: "SELECT name, amenity, geom FROM panama_healthsites_geojson
# WHERE amenity = 'hospital' AND ST_Intersects(geom, ...)"
generate_spatial_sql(query, context, history) β str
Generates spatial operation SQL (difference, intersection, etc.).
Parameters:
query(str): Spatial operation requestcontext(str): Base tables + user layershistory(List[Dict]): Conversation history
Returns: SQL with spatial functions
Example:
context = "Base: pan_admin1\\nUser Layers: layer_abc123 (Protected Areas)"
sql = await llm.generate_spatial_sql("subtract protected areas from ChiriquΓ", context, [])
# Returns: "WITH protected_union AS (SELECT ST_Union(geom) FROM layer_abc123)
# SELECT a.*, ST_Difference(a.geom, p.geom) as geom
# FROM pan_admin1 a, protected_union p WHERE a.adm1_name = 'ChiriquΓ'"
generate_layer_name(query, sql) β Dict
Generates descriptive name, emoji, and point style for map layer.
Returns:
{
"name": "Hospitals in David",
"emoji": "π₯",
"pointStyle": "icon" # or "circle" or None
}
Point Style Logic:
"icon": Small to medium POI datasets (<500 points)"circle": Large point datasets (>500 points)None: Polygon/line data (uses choropleth or line styling)
stream_explanation(query, sql, data_summary, history)
Streams natural language explanation of results.
Yields: Dict with:
{"type": "thought", "text": "reasoning..."}- LLM thinking{"type": "content", "text": "response..."}- Actual response
Example:
async for chunk in llm.stream_explanation("show hospitals", sql, summary, []):
if chunk["type"] == "content":
print(chunk["text"], end="", flush=True)
Prompt System
All prompts are centralized in backend/core/prompts.py:
SYSTEM_INSTRUCTION- Base system contextINTENT_DETECTION_PROMPT- Intent classificationSQL_GENERATION_PROMPT- Text-to-SQLSPATIAL_SQL_PROMPT- Spatial operationsLAYER_NAME_PROMPT- Layer metadata generationEXPLANATION_PROMPT- Result interpretation
GeoEngine
File: backend/core/geo_engine.py
DuckDB Spatial database wrapper for geospatial queries.
Initialization
from backend.core.geo_engine import get_geo_engine
engine = get_geo_engine() # Singleton pattern
Creates:
- In-memory DuckDB database
- Loads Spatial extension
- Configures JSON serialization
Key Methods
ensure_table_loaded(table_name) β bool
Lazily loads GeoJSON dataset into DuckDB.
Parameters:
table_name(str): Table identifier from catalog
Returns: True if loaded successfully
Behavior:
- Checks if already loaded (no-op if yes)
- Looks up path in DataCatalog
- Reads GeoJSON file with GeoPandas
- Creates DuckDB table with spatial index
- Caches in
loaded_tablesdict
Example:
success = engine.ensure_table_loaded("panama_healthsites_geojson")
if success:
print(f"Table has {len(engine.loaded_tables['panama_healthsites_geojson'])} rows")
execute_spatial_query(sql) β Dict
Executes SQL and returns GeoJSON.
Parameters:
sql(str): DuckDB SQL query
Returns: GeoJSON FeatureCollection
Example:
sql = "SELECT name, geom FROM panama_healthsites_geojson LIMIT 10"
geojson = engine.execute_spatial_query(sql)
# Returns: {"type": "FeatureCollection", "features": [...], "properties": {}}
Error Handling:
- Raises exception with detailed error message
- Logs SQL for debugging
register_layer(layer_id, geojson) β str
Registers user-created layer as temporary table.
Parameters:
layer_id(str): Unique layer identifiergeojson(Dict): GeoJSON FeatureCollection
Returns: Table name (layer_{layer_id})
Purpose: Enables spatial operations on user-created layers
Example:
# User creates layer by querying hospitals
hospitals_geojson = engine.execute_spatial_query("SELECT * FROM ... WHERE amenity='hospital'")
# Register for later spatial ops
table_name = engine.register_layer("abc123", hospitals_geojson)
# Returns: "layer_abc123"
# Now can use in spatial queries
sql = f"SELECT * FROM pan_admin1 WHERE ST_Intersects(geom, (SELECT ST_Union(geom) FROM {table_name}))"
get_table_schemas() β str
Generates schema descriptions for LLM context.
Returns: Formatted string with table/column info
Example Output:
Table: panama_healthsites_geojson
Columns: osm_id, name, amenity, operator, geom
Row count: 986
Table: pan_admin1
Columns: adm0_name, adm1_name, adm1_pcode, area_sqkm, geom
Row count: 10
Supported Spatial Functions
DuckDB Spatial provides PostGIS-compatible functions:
| Function | Purpose | Example |
|---|---|---|
ST_Intersects(a, b) |
Test intersection | WHERE ST_Intersects(hospital.geom, province.geom) |
ST_Within(a, b) |
Test containment | WHERE ST_Within(point.geom, polygon.geom) |
ST_Distance(a, b) |
Calculate distance | SELECT ST_Distance(a.geom, b.geom) as dist |
ST_Buffer(geom, dist) |
Create buffer | SELECT ST_Buffer(geom, 0.1) FROM points |
ST_Union(geom) |
Merge geometries | SELECT ST_Union(geom) FROM provinces |
ST_Difference(a, b) |
Subtract geometry | SELECT ST_Difference(a.geom, b.geom) |
ST_Intersection(a, b) |
Intersect geometries | SELECT ST_Intersection(a.geom, b.geom) |
DataCatalog
File: backend/core/data_catalog.py
Manages dataset metadata from catalog.json.
Initialization
from backend.core.data_catalog import get_data_catalog
catalog = get_data_catalog() # Singleton
Loads:
- Reads
backend/data/catalog.json - Parses dataset metadata
- Builds searchable index
Catalog Structure
{
"table_name": {
"path": "relative/path/to/file.geojson",
"description": "Short description for display",
"semantic_description": "Detailed description for AI discovery",
"categories": ["infrastructure", "health"],
"tags": ["hospitals", "clinics", "healthcare"],
"schema": {
"columns": ["name", "type", "beds", "geom"],
"geometry_type": "Point"
}
}
}
Key Methods
get_all_table_summaries() β str
Returns formatted summaries of all datasets for LLM context.
Format:
Table: panama_healthsites_geojson
Description: Healthcare facilities including hospitals, clinics...
Categories: health, infrastructure
get_summaries_for_tables(table_names) β str
Returns summaries for specific tables (used after semantic search).
get_table_metadata(table_name) β Dict
Returns full metadata for a single table.
SemanticSearch
File: backend/core/semantic_search.py
Vector-based dataset discovery using sentence embeddings.
How It Works
- Embedding Generation: Convert dataset descriptions to 384-dim vectors
- Indexing: Store embeddings in
embeddings.npy - Query: Convert user query to vector
- Search: Find top-k most similar datasets via cosine similarity
Initialization
from backend.core.semantic_search import get_semantic_search
search = get_semantic_search() # Singleton
Loads:
- Sentence transformer model (
all-MiniLM-L6-v2) - Pre-computed embeddings from file (or generates if missing)
Key Methods
search_table_names(query, top_k=15) β List[str]
Finds most relevant datasets for a query.
Example:
results = search.search_table_names("where are the doctors?", top_k=5)
# Returns: ["panama_healthsites_geojson", "osm_amenities", ...]
Performance: Sub-millisecond for 100+ datasets
Regenerating Embeddings
When catalog.json changes:
rm backend/data/embeddings.npy
python -c "from backend.core.semantic_search import get_semantic_search; get_semantic_search()"
SessionStore
File: backend/core/session_store.py
Manages user sessions and created map layers.
Purpose
- Track layers created by each user
- Enable spatial operations between user layers
- Maintain session state
Key Methods
from backend.core.session_store import get_session_store
store = get_session_store()
# Add layer to session
store.add_layer("session-123", {
"id": "layer_abc",
"name": "Hospitals in Panama",
"table_name": "layer_abc",
"timestamp": "2026-01-10T12:00:00"
})
# Get session layers
layers = store.get_layers("session-123")
QueryPlanner
File: backend/core/query_planner.py
Decomposes complex queries into executable steps.
Complexity Detection
from backend.core.query_planner import get_query_planner
planner = get_query_planner()
complexity = planner.detect_complexity("compare hospital count vs school count by province")
# Returns: {"is_complex": True, "reason": "Multiple dataset comparison"}
Complex Query Indicators:
- Multiple datasets
- Aggregations across categories
- Comparisons or ratios
- Multi-condition filters
Query Planning
plan = await planner.plan_query(query, available_tables, llm)
# Returns ExecutionPlan with:
# - steps: List of QueryStep objects
# - parallel_groups: Steps that can run concurrently
# - combination_logic: How to merge results
QueryExecutor
File: backend/services/executor.py
Main orchestrator that coordinates all services.
Query Pipeline
from backend.services.executor import QueryExecutor
executor = QueryExecutor()
# Process query with streaming
async for event in executor.process_query_stream(query, history):
if event["event"] == "status":
print(f"Status: {event['data']}")
elif event["event"] == "chunk":
print(event["data"], end="")
elif event["event"] == "result":
geojson = event["data"]["geojson"]
Execution Steps
- Intent Detection β LLMGateway
- Semantic Search β SemanticSearch
- Schema Loading β DataCatalog + GeoEngine
- SQL Generation β LLMGateway
- Query Execution β GeoEngine
- Result Formatting β ResponseFormatter
- Explanation β LLMGateway (streaming)
- Layer Registration β SessionStore
- Dataset Sources: ../data/DATASET_SOURCES.md for detailed walkthrough.
Singleton Pattern
Most services use the singleton pattern for efficiency:
# Internal cache
_instance = None
def get_service():
global _instance
if _instance is None:
_instance = Service()
return _instance
Benefits:
- Single database connection
- Cached embeddings
- Shared catalog
Error Handling
SQL Correction Loop
When generated SQL fails:
try:
result = geo_engine.execute_spatial_query(sql)
except Exception as e:
# Try to repair
corrected_sql = await llm.correct_sql(query, sql, str(e), schema)
result = geo_engine.execute_spatial_query(corrected_sql)
Data Unavailability
LLM returns special marker:
-- ERROR: DATA_UNAVAILABLE
-- Requested: crime statistics
-- Available: admin boundaries, hospitals, schools
Executor detects and returns helpful message to user.
Next Steps
- API Reference: API_ENDPOINTS.md
- Frontend Components: ../frontend/COMPONENTS.md
- API Reference: API_ENDPOINTS.md