# Backend Core Services Detailed reference for GeoQuery's core backend services. --- ## Service Overview | Service | File | Purpose | |---------|------|---------| | **LLMGateway** | `core/llm_gateway.py` | Gemini API integration | | **GeoEngine** | `core/geo_engine.py` | DuckDB Spatial wrapper | | **DataCatalog** | `core/data_catalog.py` | Dataset metadata management | | **SemanticSearch** | `core/semantic_search.py` | Embedding-based discovery | | **SessionStore** | `core/session_store.py` | User session and layer management | | **QueryPlanner** | `core/query_planner.py` | Multi-step query orchestration | | **QueryExecutor** | `services/executor.py` | Main query pipeline | --- ## LLMGateway **File**: `backend/core/llm_gateway.py` Unified interface to Google Gemini API with streaming support. ### Initialization ```python from backend.core.llm_gateway import LLMGateway llm = LLMGateway() ``` **Configuration**: - Reads `GEMINI_API_KEY` from environment - Uses `gemini-2.0-flash-exp` model - Enables "thinking" mode for reasoning transparency ### Key Methods #### `detect_intent(query, history) → str` Classifies user query into intent category. **Parameters**: - `query` (str): User's natural language query - `history` (List[Dict]): Conversation history **Returns**: One of: - `"GENERAL_CHAT"` - Conversational question - `"DATA_QUERY"` - Data request - `"MAP_REQUEST"` - Explicitly wants visualization - `"SPATIAL_OP"` - Geometric operation (intersection, buffer, etc.) - `"STAT_QUERY"` - Requests chart/graph **Example**: ```python intent = await llm.detect_intent("Show me hospitals in Panama", []) # Returns: "MAP_REQUEST" ``` #### `generate_analytical_sql(query, schema, history) → str` Generates DuckDB SQL query from natural language. **Parameters**: - `query` (str): User query - `schema` (str): Available table schemas - `history` (List[Dict]): Conversation context **Returns**: SQL query string **Special Cases**: - Returns `"-- ERROR: DATA_UNAVAILABLE"` if data doesn't exist - Includes `geom` column for map visualization - Uses DuckDB spatial functions (ST_Intersects, etc.) **Example**: ```python schema = "Table: panama_healthsites_geojson\\nColumns: name, amenity, geom..." sql = await llm.generate_analytical_sql("hospitals in David", schema, []) # Returns: "SELECT name, amenity, geom FROM panama_healthsites_geojson # WHERE amenity = 'hospital' AND ST_Intersects(geom, ...)" ``` #### `generate_spatial_sql(query, context, history) → str` Generates spatial operation SQL (difference, intersection, etc.). **Parameters**: - `query` (str): Spatial operation request - `context` (str): Base tables + user layers - `history` (List[Dict]): Conversation history **Returns**: SQL with spatial functions **Example**: ```python context = "Base: pan_admin1\\nUser Layers: layer_abc123 (Protected Areas)" sql = await llm.generate_spatial_sql("subtract protected areas from Chiriquí", context, []) # Returns: "WITH protected_union AS (SELECT ST_Union(geom) FROM layer_abc123) # SELECT a.*, ST_Difference(a.geom, p.geom) as geom # FROM pan_admin1 a, protected_union p WHERE a.adm1_name = 'Chiriquí'" ``` #### `generate_layer_name(query, sql) → Dict` Generates descriptive name, emoji, and point style for map layer. **Returns**: ```python { "name": "Hospitals in David", "emoji": "🏥", "pointStyle": "icon" # or "circle" or None } ``` **Point Style Logic**: - `"icon"`: Small to medium POI datasets (<500 points) - `"circle"`: Large point datasets (>500 points) - `None`: Polygon/line data (uses choropleth or line styling) #### `stream_explanation(query, sql, data_summary, history)` Streams natural language explanation of results. **Yields**: Dict with: - `{"type": "thought", "text": "reasoning..."}` - LLM thinking - `{"type": "content", "text": "response..."}` - Actual response **Example**: ```python async for chunk in llm.stream_explanation("show hospitals", sql, summary, []): if chunk["type"] == "content": print(chunk["text"], end="", flush=True) ``` ### Prompt System All prompts are centralized in `backend/core/prompts.py`: - `SYSTEM_INSTRUCTION` - Base system context - `INTENT_DETECTION_PROMPT` - Intent classification - `SQL_GENERATION_PROMPT` - Text-to-SQL - `SPATIAL_SQL_PROMPT` - Spatial operations - `LAYER_NAME_PROMPT` - Layer metadata generation - `EXPLANATION_PROMPT` - Result interpretation --- ## GeoEngine **File**: `backend/core/geo_engine.py` DuckDB Spatial database wrapper for geospatial queries. ### Initialization ```python from backend.core.geo_engine import get_geo_engine engine = get_geo_engine() # Singleton pattern ``` **Creates**: - In-memory DuckDB database - Loads Spatial extension - Configures JSON serialization ### Key Methods #### `ensure_table_loaded(table_name) → bool` Lazily loads GeoJSON dataset into DuckDB. **Parameters**: - `table_name` (str): Table identifier from catalog **Returns**: True if loaded successfully **Behavior**: - Checks if already loaded (no-op if yes) - Looks up path in DataCatalog - Reads GeoJSON file with GeoPandas - Creates DuckDB table with spatial index - Caches in `loaded_tables` dict **Example**: ```python success = engine.ensure_table_loaded("panama_healthsites_geojson") if success: print(f"Table has {len(engine.loaded_tables['panama_healthsites_geojson'])} rows") ``` #### `execute_spatial_query(sql) → Dict` Executes SQL and returns GeoJSON. **Parameters**: - `sql` (str): DuckDB SQL query **Returns**: GeoJSON FeatureCollection **Example**: ```python sql = "SELECT name, geom FROM panama_healthsites_geojson LIMIT 10" geojson = engine.execute_spatial_query(sql) # Returns: {"type": "FeatureCollection", "features": [...], "properties": {}} ``` **Error Handling**: - Raises exception with detailed error message - Logs SQL for debugging #### `register_layer(layer_id, geojson) → str` Registers user-created layer as temporary table. **Parameters**: - `layer_id` (str): Unique layer identifier - `geojson` (Dict): GeoJSON FeatureCollection **Returns**: Table name (`layer_{layer_id}`) **Purpose**: Enables spatial operations on user-created layers **Example**: ```python # User creates layer by querying hospitals hospitals_geojson = engine.execute_spatial_query("SELECT * FROM ... WHERE amenity='hospital'") # Register for later spatial ops table_name = engine.register_layer("abc123", hospitals_geojson) # Returns: "layer_abc123" # Now can use in spatial queries sql = f"SELECT * FROM pan_admin1 WHERE ST_Intersects(geom, (SELECT ST_Union(geom) FROM {table_name}))" ``` #### `get_table_schemas() → str` Generates schema descriptions for LLM context. **Returns**: Formatted string with table/column info **Example Output**: ``` Table: panama_healthsites_geojson Columns: osm_id, name, amenity, operator, geom Row count: 986 Table: pan_admin1 Columns: adm0_name, adm1_name, adm1_pcode, area_sqkm, geom Row count: 10 ``` ### Supported Spatial Functions DuckDB Spatial provides PostGIS-compatible functions: | Function | Purpose | Example | |----------|---------|---------| | `ST_Intersects(a, b)` | Test intersection | `WHERE ST_Intersects(hospital.geom, province.geom)` | | `ST_Within(a, b)` | Test containment | `WHERE ST_Within(point.geom, polygon.geom)` | | `ST_Distance(a, b)` | Calculate distance | `SELECT ST_Distance(a.geom, b.geom) as dist` | | `ST_Buffer(geom, dist)` | Create buffer | `SELECT ST_Buffer(geom, 0.1) FROM points` | | `ST_Union(geom)` | Merge geometries | `SELECT ST_Union(geom) FROM provinces` | | `ST_Difference(a, b)` | Subtract geometry | `SELECT ST_Difference(a.geom, b.geom)` | | `ST_Intersection(a, b)` | Intersect geometries | `SELECT ST_Intersection(a.geom, b.geom)` | --- ## DataCatalog **File**: `backend/core/data_catalog.py` Manages dataset metadata from `catalog.json`. ### Initialization ```python from backend.core.data_catalog import get_data_catalog catalog = get_data_catalog() # Singleton ``` **Loads**: - Reads `backend/data/catalog.json` - Parses dataset metadata - Builds searchable index ### Catalog Structure ```json { "table_name": { "path": "relative/path/to/file.geojson", "description": "Short description for display", "semantic_description": "Detailed description for AI discovery", "categories": ["infrastructure", "health"], "tags": ["hospitals", "clinics", "healthcare"], "schema": { "columns": ["name", "type", "beds", "geom"], "geometry_type": "Point" } } } ``` ### Key Methods #### `get_all_table_summaries() → str` Returns formatted summaries of all datasets for LLM context. **Format**: ``` Table: panama_healthsites_geojson Description: Healthcare facilities including hospitals, clinics... Categories: health, infrastructure ``` #### `get_summaries_for_tables(table_names) → str` Returns summaries for specific tables (used after semantic search). #### `get_table_metadata(table_name) → Dict` Returns full metadata for a single table. --- ## SemanticSearch **File**: `backend/core/semantic_search.py` Vector-based dataset discovery using sentence embeddings. ### How It Works 1. **Embedding Generation**: Convert dataset descriptions to 384-dim vectors 2. **Indexing**: Store embeddings in `embeddings.npy` 3. **Query**: Convert user query to vector 4. **Search**: Find top-k most similar datasets via cosine similarity ### Initialization ```python from backend.core.semantic_search import get_semantic_search search = get_semantic_search() # Singleton ``` **Loads**: - Sentence transformer model (`all-MiniLM-L6-v2`) - Pre-computed embeddings from file (or generates if missing) ### Key Methods #### `search_table_names(query, top_k=15) → List[str]` Finds most relevant datasets for a query. **Example**: ```python results = search.search_table_names("where are the doctors?", top_k=5) # Returns: ["panama_healthsites_geojson", "osm_amenities", ...] ``` **Performance**: Sub-millisecond for 100+ datasets ### Regenerating Embeddings When `catalog.json` changes: ```bash rm backend/data/embeddings.npy python -c "from backend.core.semantic_search import get_semantic_search; get_semantic_search()" ``` --- ## SessionStore **File**: `backend/core/session_store.py` Manages user sessions and created map layers. ### Purpose - Track layers created by each user - Enable spatial operations between user layers - Maintain session state ### Key Methods ```python from backend.core.session_store import get_session_store store = get_session_store() # Add layer to session store.add_layer("session-123", { "id": "layer_abc", "name": "Hospitals in Panama", "table_name": "layer_abc", "timestamp": "2026-01-10T12:00:00" }) # Get session layers layers = store.get_layers("session-123") ``` --- ## QueryPlanner **File**: `backend/core/query_planner.py` Decomposes complex queries into executable steps. ### Complexity Detection ```python from backend.core.query_planner import get_query_planner planner = get_query_planner() complexity = planner.detect_complexity("compare hospital count vs school count by province") # Returns: {"is_complex": True, "reason": "Multiple dataset comparison"} ``` **Complex Query Indicators**: - Multiple datasets - Aggregations across categories - Comparisons or ratios - Multi-condition filters ### Query Planning ```python plan = await planner.plan_query(query, available_tables, llm) # Returns ExecutionPlan with: # - steps: List of QueryStep objects # - parallel_groups: Steps that can run concurrently # - combination_logic: How to merge results ``` --- ## QueryExecutor **File**: `backend/services/executor.py` Main orchestrator that coordinates all services. ### Query Pipeline ```python from backend.services.executor import QueryExecutor executor = QueryExecutor() # Process query with streaming async for event in executor.process_query_stream(query, history): if event["event"] == "status": print(f"Status: {event['data']}") elif event["event"] == "chunk": print(event["data"], end="") elif event["event"] == "result": geojson = event["data"]["geojson"] ``` ### Execution Steps 1. **Intent Detection** → LLMGateway 2. **Semantic Search** → SemanticSearch 3. **Schema Loading** → DataCatalog + GeoEngine 4. **SQL Generation** → LLMGateway 5. **Query Execution** → GeoEngine 6. **Result Formatting** → ResponseFormatter 7. **Explanation** → LLMGateway (streaming) 8. **Layer Registration** → SessionStore - **Dataset Sources**: [../data/DATASET_SOURCES.md](../data/DATASET_SOURCES.md) for detailed walkthrough. --- ## Singleton Pattern Most services use the singleton pattern for efficiency: ```python # Internal cache _instance = None def get_service(): global _instance if _instance is None: _instance = Service() return _instance ``` **Benefits**: - Single database connection - Cached embeddings - Shared catalog --- ## Error Handling ### SQL Correction Loop When generated SQL fails: ```python try: result = geo_engine.execute_spatial_query(sql) except Exception as e: # Try to repair corrected_sql = await llm.correct_sql(query, sql, str(e), schema) result = geo_engine.execute_spatial_query(corrected_sql) ``` ### Data Unavailability LLM returns special marker: ```sql -- ERROR: DATA_UNAVAILABLE -- Requested: crime statistics -- Available: admin boundaries, hospitals, schools ``` Executor detects and returns helpful message to user. --- ## Next Steps - **API Reference**: [API_ENDPOINTS.md](API_ENDPOINTS.md) - **Frontend Components**: [../frontend/COMPONENTS.md](../frontend/COMPONENTS.md) - **API Reference**: [API_ENDPOINTS.md](API_ENDPOINTS.md)