""" Query Executor Service Handles query processing with intent detection, data querying, and response generation. Uses semantic search for scalable dataset discovery and session-scoped layer storage. """ from backend.core.llm_gateway import LLMGateway from backend.services.data_loader import get_data_loader from backend.core.geo_engine import get_geo_engine from backend.services.response_formatter import ResponseFormatter from backend.core.session_store import get_session_store from backend.core.semantic_search import get_semantic_search from backend.core.data_catalog import get_data_catalog from backend.core.query_planner import get_query_planner from typing import List, Dict, Any, Optional import json import datetime import uuid import logging logger = logging.getLogger(__name__) # Default session ID for backward compatibility DEFAULT_SESSION_ID = "default-session" class QueryExecutor: def __init__(self): self.llm = LLMGateway() self.data_loader = get_data_loader() self.geo_engine = get_geo_engine() self.session_store = get_session_store() self.semantic_search = get_semantic_search() self.catalog = get_data_catalog() self.query_planner = get_query_planner() def _get_schema_context(self) -> str: """Returns the database schema for the LLM context.""" return self.data_loader.get_schema_context() async def process_query_with_context(self, query: str, history: List[Dict[str, str]]) -> Dict[str, Any]: """ Orchestrates the full query processing flow with conversation context. """ # 1. Detect intent intent = await self.llm.detect_intent(query, history) print(f"[GeoQuery] Detected intent: {intent}") # 2. Route based on intent if intent == "GENERAL_CHAT": return await self._handle_general_chat(query, history) elif intent in ["DATA_QUERY", "MAP_REQUEST"]: # Always include map for data queries - the visual is helpful return await self._handle_data_query(query, history, include_map=True) elif intent == "SPATIAL_OP": return await self._handle_spatial_op(query, history) elif intent == "STAT_QUERY": return await self._handle_stat_query(query, history) else: return await self._handle_general_chat(query, history) async def process_query_stream(self, query: str, history: List[Dict[str, str]]): """ Streamable version of process_query_with_context. Yields: {"event": "status"|"thought"|"chunk"|"result", "data": ...} """ # 1. Intent Detection with Thoughts yield {"event": "status", "data": json.dumps({"status": "🧠 Understanding intent..."})} intent = "GENERAL_CHAT" intent_buffer = "" try: async for chunk in self.llm.stream_intent(query, history): if chunk["type"] == "thought": yield {"event": "chunk", "data": json.dumps({"type": "thought", "content": chunk["text"]})} elif chunk["type"] == "content": intent_buffer += chunk["text"] except Exception as e: print(f"Intent stream error: {e}") intent = intent_buffer.strip().upper() if not intent: intent = "GENERAL_CHAT" # Clean up intent string for valid in ["GENERAL_CHAT", "DATA_QUERY", "MAP_REQUEST", "SPATIAL_OP", "STAT_QUERY"]: if valid in intent: intent = valid break yield {"event": "intent", "data": json.dumps({"intent": intent})} print(f"[GeoQuery] Detected intent: {intent}") if intent == "GENERAL_CHAT": async for chunk in self.llm.generate_response_stream(query, history): # Transform to frontend protocol if chunk.get("type") == "content": yield {"event": "chunk", "data": json.dumps({"type": "text", "content": chunk.get("text")})} elif chunk.get("type") == "thought": yield {"event": "chunk", "data": json.dumps({"type": "thought", "content": chunk.get("content")})} # Send final result to clear loading status yield {"event": "result", "data": json.dumps({"response": ""})} return # Handle Data/Map/Stat Queries together via a unified stream handler if intent in ["DATA_QUERY", "MAP_REQUEST", "STAT_QUERY"]: include_map = intent != "STAT_QUERY" session_id = DEFAULT_SESSION_ID # TODO: Get from request context # 0. Check query complexity complexity = self.query_planner.detect_complexity(query) if complexity["is_complex"]: yield {"event": "status", "data": json.dumps({"status": "🔄 Complex query detected, planning steps..."})} logger.info(f"Complex query detected: {complexity['reason']}") # Use multi-step executor async for event in self._execute_multi_step_query(query, history, include_map, session_id): yield event return # Simple query - continue with existing flow # 0. Semantic Discovery (scalable pre-filter) yield {"event": "status", "data": json.dumps({"status": "📚 Searching data catalog..."})} # Use semantic search to find top candidates candidate_tables = self.semantic_search.search_table_names(query, top_k=15) if candidate_tables: # Get focused summaries for LLM refinement candidate_summaries = self.catalog.get_summaries_for_tables(candidate_tables) else: # Fallback to all summaries (legacy behavior for small catalogs) candidate_summaries = self.catalog.get_all_table_summaries() # 1. LLM refines from candidates yield {"event": "status", "data": json.dumps({"status": "🔍 Identifying relevant tables..."})} relevant_tables = await self.llm.identify_relevant_tables(query, candidate_summaries) # 2. Lazy Load if relevant_tables: yield {"event": "status", "data": json.dumps({"status": f"💾 Loading tables: {', '.join(relevant_tables)}..."})} feature_tables = [] for table in relevant_tables: if self.geo_engine.ensure_table_loaded(table): feature_tables.append(table) # 3. Schema table_schema = self.geo_engine.get_table_schemas() # 4. Generate SQL (Streaming Thoughts!) yield {"event": "status", "data": json.dumps({"status": "âœī¸ Writing SQL query..."})} sql_buffer = "" async for chunk in self.llm.stream_analytical_sql(query, table_schema, history): if chunk["type"] == "thought": yield {"event": "chunk", "data": json.dumps({"type": "thought", "content": chunk["text"]})} elif chunk["type"] == "content": sql_buffer += chunk["text"] sql = sql_buffer.replace("```sql", "").replace("```", "").strip() # 5. Check for DATA_UNAVAILABLE error from LLM if "DATA_UNAVAILABLE" in sql or sql.startswith("-- ERROR"): yield {"event": "status", "data": json.dumps({"status": "â„šī¸ Data not available"})} requested = "the requested data" available = "administrative boundaries (provinces, districts, corregimientos)" for line in sql.split("\n"): if "Requested:" in line: requested = line.split("Requested:")[-1].strip() elif "Available:" in line: available = line.split("Available:")[-1].strip() error_response = f"""I couldn't find data for **{requested}** in the current database. **Available datasets include:** - {available} If you need additional data, please let me know and I can help you understand what's currently available or suggest alternative queries.""" yield { "event": "result", "data": json.dumps({ "response": error_response, "sql_query": sql, "geojson": None, "data_citations": [], "chart_data": None, "raw_data": [] }) } return # 6. Execute query yield {"event": "status", "data": json.dumps({"status": "⚡ Executing query..."})} geojson = None features = [] error_message = None try: geojson = self.geo_engine.execute_spatial_query(sql) features = geojson.get("features", []) yield {"event": "status", "data": json.dumps({"status": f"✅ Found {len(features)} results"})} except Exception as e: error_message = str(e) yield {"event": "status", "data": json.dumps({"status": "âš ī¸ Query error, attempting repair..."})} try: sql = await self.llm.correct_sql(query, sql, error_message, str(table_schema)) geojson = self.geo_engine.execute_spatial_query(sql) features = geojson.get("features", []) error_message = None except Exception as e2: print(f"Repair failed: {e2}") if error_message: yield { "event": "result", "data": json.dumps({ "response": f"I was unable to process your request because the data query failed. \n\nError details: {error_message}", "sql_query": sql, "geojson": None, "data_citations": [], "chart_data": None, "raw_data": [] }) } return # 7. Post-process using ResponseFormatter citations = ResponseFormatter.generate_citations(relevant_tables, features) # Chart chart_data = ResponseFormatter.generate_chart_data(sql, features) if intent == "STAT_QUERY" and not chart_data and features: chart_data = ResponseFormatter.generate_chart_data("GROUP BY forced", features) # Raw Data raw_data = ResponseFormatter.prepare_raw_data(features) # Map Config if include_map and features and geojson: # Generate AI layer name layer_info = await self.llm.generate_layer_name(query, sql) layer_name_ai = layer_info.get("name", "Map Layer") layer_emoji = layer_info.get("emoji", "📍") point_style = layer_info.get("pointStyle", None) geojson, layer_id, layer_name = ResponseFormatter.format_geojson_layer(query, geojson, features, layer_name_ai, layer_emoji, point_style) try: table_name = self.geo_engine.register_layer(layer_id, geojson) self.session_store.add_layer(session_id, { "id": layer_id, "name": layer_name, "table_name": table_name, "timestamp": datetime.datetime.now().isoformat() }) except Exception as e: logger.warning(f"Failed to register layer: {e}") # 8. Explanation (Streaming!) yield {"event": "status", "data": json.dumps({"status": "đŸ’Ŧ Generating explanation..."})} data_summary = ResponseFormatter.generate_data_summary(features) explanation_buffer = "" async for chunk in self.llm.stream_explanation(query, sql, data_summary, history): if chunk["type"] == "thought": yield {"event": "chunk", "data": json.dumps({"type": "thought", "content": chunk["text"]})} elif chunk["type"] == "content": explanation_buffer += chunk["text"] yield {"event": "chunk", "data": json.dumps({"type": "text", "content": chunk["text"]})} # 9. Final Result Event yield {"event": "result", "data": json.dumps({ "response": explanation_buffer, "sql_query": sql, "geojson": geojson if include_map and features else None, "chart_data": chart_data, "raw_data": raw_data, "data_citations": citations })} elif intent == "SPATIAL_OP": yield {"event": "status", "data": json.dumps({"status": "📐 Preparing spatial operation..."})} session_id = DEFAULT_SESSION_ID # TODO: Get from request context # 0. Semantic Discovery for base tables candidate_tables = self.semantic_search.search_table_names(query, top_k=15) if candidate_tables: candidate_summaries = self.catalog.get_summaries_for_tables(candidate_tables) else: candidate_summaries = self.catalog.get_all_table_summaries() # 1. Identify relevant base tables from query relevant_tables = await self.llm.identify_relevant_tables(query, candidate_summaries) # 2. Lazy load those tables for table in relevant_tables: self.geo_engine.ensure_table_loaded(table) # 3. Get schema of loaded base tables base_table_schema = self.geo_engine.get_table_schemas() # 4. Prepare Layer Context (user-created layers from session) session_layers = self.session_store.get_layers(session_id) layer_context = "User-Created Layers:\n" if not session_layers: layer_context += "(No user layers created yet.)\n" else: for i, layer in enumerate(session_layers): layer_context += f"Layer {i+1}: {layer['name']} (Table: {layer['table_name']})\n" # 5. Combine both contexts for LLM full_context = f"{base_table_schema}\n\n{layer_context}" # 6. Generate Spatial SQL yield {"event": "status", "data": json.dumps({"status": "âœī¸ Writing spatial SQL..."})} sql = await self.llm.generate_spatial_sql(query, full_context, history) # 7. Execute yield {"event": "status", "data": json.dumps({"status": "âš™ī¸ Processing geometry..."})} error_message = None geojson = None features = [] try: geojson = self.geo_engine.execute_spatial_query(sql) features = geojson.get("features", []) yield {"event": "status", "data": json.dumps({"status": f"✅ Result contains {len(features)} features"})} except Exception as e: error_message = str(e) yield {"event": "status", "data": json.dumps({"status": "âš ī¸ Spatial error, attempting repair..."})} try: sql = await self.llm.correct_sql(query, sql, error_message, full_context) geojson = self.geo_engine.execute_spatial_query(sql) features = geojson.get("features", []) error_message = None except Exception as e2: yield { "event": "result", "data": json.dumps({ "response": f"I tried to perform the spatial operation but encountered an error: {str(e)}\n\nQuery: {sql}", "sql_query": sql, "geojson": None, "data_citations": [], "chart_data": None, "raw_data": [] }) } return # 4. Result Processing if features: # Generate AI layer name layer_info = await self.llm.generate_layer_name(query, sql) layer_name_ai = layer_info.get("name", "Map Layer") layer_emoji = layer_info.get("emoji", "📍") point_style = layer_info.get("pointStyle", None) geojson, layer_id, layer_name = ResponseFormatter.format_geojson_layer(query, geojson, features, layer_name_ai, layer_emoji, point_style) try: table_name = self.geo_engine.register_layer(layer_id, geojson) self.session_store.add_layer(session_id, { "id": layer_id, "name": layer_name, "table_name": table_name, "timestamp": datetime.datetime.now().isoformat() }) except Exception as e: logger.warning(f"Failed to register layer: {e}") # 5. Explanation yield {"event": "status", "data": json.dumps({"status": "đŸ’Ŧ Explaining results..."})} data_summary = f"Spatial operation resulted in {len(features)} features." explanation_buffer = "" async for chunk in self.llm.stream_explanation(query, sql, data_summary, history): if chunk["type"] == "thought": yield {"event": "chunk", "data": json.dumps({"type": "thought", "content": chunk["text"]})} elif chunk["type"] == "content": explanation_buffer += chunk["text"] yield {"event": "chunk", "data": json.dumps({"type": "text", "content": chunk["text"]})} # 6. Final Result yield {"event": "result", "data": json.dumps({ "response": explanation_buffer, "sql_query": sql, "geojson": geojson, "chart_data": None, "raw_data": [], # Spatial ops usually visual "data_citations": [] })} return else: # Fallback yield {"event": "chunk", "data": json.dumps({"type": "text", "content": "I'm not sure how to handle this query yet."})} async def _handle_general_chat(self, query: str, history: List[Dict[str, str]]) -> Dict[str, Any]: """Handles general conversational queries.""" # Add schema context to help the LLM answer questions about the data enhanced_query = f"""The user is asking about Panama geographic data. Available data: {len(self.data_loader.admin1)} provinces, {len(self.data_loader.admin2)} districts, {len(self.data_loader.admin3)} corregimientos. User question: {query} Respond helpfully as GeoQuery, the territorial intelligence assistant.""" response = await self.llm.generate_response(enhanced_query, history) return { "response": response, "sql_query": None, "geojson": None, "data_citations": [], "intent": "GENERAL_CHAT" } async def _handle_data_query(self, query: str, history: List[Dict[str, str]], include_map: bool = True) -> Dict[str, Any]: """ Handles data queries using text-to-SQL with SOTA Smart Discovery. """ print(f"[GeoQuery] Starting Data Query: {query}") # 0. Get Catalog from backend.core.data_catalog import get_data_catalog catalog = get_data_catalog() # 1. Smart Discovery: Identify relevant tables summaries = catalog.get_all_table_summaries() # Ask LLM which tables are relevant relevant_tables = await self.llm.identify_relevant_tables(query, summaries) # 2. Lazy Loading feature_tables = [] for table in relevant_tables: if self.geo_engine.ensure_table_loaded(table): feature_tables.append(table) else: print(f"[GeoQuery] Warning: Could not load relevant table '{table}'") # 3. Get schema context (now includes the newly loaded tables) table_schema = self.geo_engine.get_table_schemas() # Fallback for empty schema if len(table_schema) < 50: print("[GeoQuery] GeoEngine schema empty. Fetching from Catalog Metadata.") fallback_tables = list(set(feature_tables + ["pan_admin1", "pan_admin2", "pan_admin3"])) table_schema = catalog.get_specific_table_schemas(fallback_tables) # 4. Generate real SQL using LLM print(f"[GeoQuery] Generating SQL with context size: {len(table_schema)} chars") sql = await self.llm.generate_analytical_sql(query, table_schema, history) # Check for SQL generation errors if sql.startswith("-- Error"): available_data = ", ".join(feature_tables) if feature_tables else "Administrative Boundaries" return { "response": f"I couldn't find the specific data you asked for. I have access to: {available_data}. \n\nOriginal request: {query}", "sql_query": sql, "intent": "DATA_QUERY" } # 5. Execute SQL in DuckDB error_message = None try: geojson = self.geo_engine.execute_spatial_query(sql) features = geojson.get("features", []) print(f"[GeoQuery] Query returned {len(features)} features") except Exception as e: error_message = str(e) print(f"[GeoQuery] SQL execution error: {error_message}") # Self-Correction Loop try: sql = await self.llm.correct_sql(query, sql, error_message, str(table_schema)) geojson = self.geo_engine.execute_spatial_query(sql) features = geojson.get("features", []) error_message = None except Exception as e2: return { "response": f"The SQL query failed to execute even after an automatic repair attempt.\nOriginal Error: {error_message}\nRepair Error: {str(e2)}", "sql_query": sql, "intent": "DATA_QUERY" } # 6. Post-Process via ResponseFormatter citations = ResponseFormatter.generate_citations(relevant_tables, features) data_summary = ResponseFormatter.generate_data_summary(features) # 7. Generate explanation explanation = await self.llm.generate_explanation(query, sql, data_summary, history) # 8. Add Layer Metadata to GeoJSON and REGISTER in GeoEngine if include_map and features: # Generate AI layer name layer_info = await self.llm.generate_layer_name(query, sql) layer_name_ai = layer_info.get("name", "Map Layer") layer_emoji = layer_info.get("emoji", "📍") point_style = layer_info.get("pointStyle", None) geojson, layer_id, layer_name = ResponseFormatter.format_geojson_layer(query, geojson, features, layer_name_ai, layer_emoji, point_style) try: table_name = self.geo_engine.register_layer(layer_id, geojson) self.session_store.add_layer(DEFAULT_SESSION_ID, { "id": layer_id, "name": layer_name, "table_name": table_name, "timestamp": datetime.datetime.now().isoformat() }) except Exception as e: logger.warning(f"Failed to register layer in GeoEngine: {e}") # 9. Auto-generate Chart chart_data = ResponseFormatter.generate_chart_data(sql, features) # 10. Prepare Raw Data raw_data = ResponseFormatter.prepare_raw_data(features) return { "response": explanation, "sql_query": sql, "geojson": geojson if include_map and features else None, "data_citations": citations, "chart_data": chart_data, "raw_data": raw_data, "intent": "DATA_QUERY" if not include_map else "MAP_REQUEST" } async def _handle_spatial_op(self, query: str, history: List[Dict[str, str]]) -> Dict[str, Any]: """Handles spatial operations (Difference, Intersection, etc) using GeoEngine.""" # 0. Get data catalog for relevant tables from backend.core.data_catalog import get_data_catalog catalog = get_data_catalog() summaries = catalog.get_all_table_summaries() # 1. Identify relevant base tables from query relevant_tables = await self.llm.identify_relevant_tables(query, summaries) # 2. Lazy load those tables for table in relevant_tables: self.geo_engine.ensure_table_loaded(table) # 3. Get schema of loaded base tables base_table_schema = self.geo_engine.get_table_schemas() # 4. Prepare Layer Context (user-created layers from session) session_layers = self.session_store.get_layers(DEFAULT_SESSION_ID) layer_context = "User-Created Layers:\n" if not session_layers: layer_context += "(No user layers created yet.)\n" else: for i, layer in enumerate(session_layers): layer_context += f"Layer {i+1}: {layer['name']} (Table: {layer['table_name']})\n" # 5. Combine both contexts for LLM full_context = f"{base_table_schema}\n\n{layer_context}" # 6. Generate Spatial SQL sql = await self.llm.generate_spatial_sql(query, full_context, history) # 7. Execute error_message = None geojson = None features = [] try: geojson = self.geo_engine.execute_spatial_query(sql) features = geojson.get("features", []) except Exception as e: error_message = str(e) try: sql = await self.llm.correct_sql(query, sql, error_message, full_context) geojson = self.geo_engine.execute_spatial_query(sql) features = geojson.get("features", []) error_message = None except Exception as e2: return { "response": f"I tried to perform the spatial operation but encountered an error: {str(e)}\n\nQuery: {sql}", "sql_query": sql, "intent": "SPATIAL_OP" } # 4. Result Processing if features: # Generate AI layer name layer_info = await self.llm.generate_layer_name(query, sql) layer_name_ai = layer_info.get("name", "Map Layer") layer_emoji = layer_info.get("emoji", "📍") point_style = layer_info.get("pointStyle", None) geojson, layer_id, layer_name = ResponseFormatter.format_geojson_layer(query, geojson, features, layer_name_ai, layer_emoji, point_style) table_name = self.geo_engine.register_layer(layer_id, geojson) self.session_store.add_layer(DEFAULT_SESSION_ID, { "id": layer_id, "name": layer_name, "table_name": table_name, "timestamp": datetime.datetime.now().isoformat() }) data_summary = f"Spatial operation resulted in {len(features)} features." explanation = await self.llm.generate_explanation(query, sql, data_summary, history) return { "response": explanation, "sql_query": sql, "geojson": geojson, "data_citations": [], "intent": "SPATIAL_OP" } async def _handle_stat_query(self, query: str, history: List[Dict[str, str]]) -> Dict[str, Any]: """ Handles statistical queries where charts/tables are more important than maps. """ # Reuse data query logic but without map emphasis result = await self._handle_data_query(query, history, include_map=False) result["intent"] = "STAT_QUERY" # Ensure chart data is present if possible if not result.get("chart_data") and result.get("raw_data"): # Force chart attempt features_mock = [{"properties": d} for d in result["raw_data"]] result["chart_data"] = ResponseFormatter.generate_chart_data(result.get("sql_query", ""), features_mock) return result async def _execute_multi_step_query( self, query: str, history: List[Dict[str, str]], include_map: bool, session_id: str ): """ Execute a complex query by breaking it into multiple steps. Yields streaming events throughout the multi-step process. """ import asyncio # 1. Get candidate tables for planning yield {"event": "status", "data": json.dumps({"status": "📚 Discovering relevant datasets..."})} candidate_tables = self.semantic_search.search_table_names(query, top_k=20) if not candidate_tables: candidate_tables = list(self.catalog.catalog.keys()) # 2. Plan the query yield {"event": "status", "data": json.dumps({"status": "📋 Creating execution plan..."})} plan = await self.query_planner.plan_query(query, candidate_tables, self.llm) if not plan.is_complex or not plan.steps: # Fallback to simple execution yield {"event": "status", "data": json.dumps({"status": "📚 Executing as simple query..."})} # Re-route to simple path by manually calling the logic candidate_summaries = self.catalog.get_summaries_for_tables(candidate_tables) relevant_tables = await self.llm.identify_relevant_tables(query, candidate_summaries) for table in relevant_tables: self.geo_engine.ensure_table_loaded(table) table_schema = self.geo_engine.get_table_schemas() yield {"event": "status", "data": json.dumps({"status": "âœī¸ Writing SQL query..."})} sql = await self.llm.generate_analytical_sql(query, table_schema, history) sql = sql.replace("```sql", "").replace("```", "").strip() try: geojson = self.geo_engine.execute_spatial_query(sql) features = geojson.get("features", []) except Exception as e: yield {"event": "result", "data": json.dumps({ "response": f"Query execution failed: {str(e)}", "sql_query": sql })} return data_summary = ResponseFormatter.generate_data_summary(features) explanation = await self.llm.generate_explanation(query, sql, data_summary, history) yield {"event": "result", "data": json.dumps({ "response": explanation, "sql_query": sql, "geojson": geojson if include_map and features else None, "chart_data": ResponseFormatter.generate_chart_data(sql, features), "raw_data": ResponseFormatter.prepare_raw_data(features), "data_citations": [] })} return # 3. Show plan to user step_descriptions = [f"Step {i+1}: {s.description}" for i, s in enumerate(plan.steps)] yield {"event": "chunk", "data": json.dumps({ "type": "thought", "content": f"Planning multi-step execution:\n" + "\n".join(step_descriptions) })} # 4. Load all needed tables all_tables = set() for step in plan.steps: all_tables.update(step.tables_needed) if all_tables: yield {"event": "status", "data": json.dumps({"status": f"💾 Loading {len(all_tables)} datasets..."})} for table in all_tables: self.geo_engine.ensure_table_loaded(table) # 5. Execute steps by parallel groups intermediate_results = {} all_features = [] all_sql = [] for group_idx, group in enumerate(plan.parallel_groups): group_steps = [s for s in plan.steps if s.step_id in group] yield {"event": "status", "data": json.dumps({ "status": f"⚡ Executing step group {group_idx + 1}/{len(plan.parallel_groups)}..." })} # Execute steps in this group (could be parallel, but sequential for simplicity) for step in group_steps: yield {"event": "status", "data": json.dumps({ "status": f"🔄 {step.description}..." })} # Generate SQL for this step table_schema = self.geo_engine.get_table_schemas() # Build step-specific prompt step_query = f"""Execute this step: {step.description} Original user request: {query} SQL Hint: {step.sql_template or 'None'} Previous step results available: {list(intermediate_results.keys())}""" sql = await self.llm.generate_analytical_sql(step_query, table_schema, history) sql = sql.replace("```sql", "").replace("```", "").strip() # Skip if LLM returned an error if "DATA_UNAVAILABLE" in sql or sql.startswith("-- ERROR"): logger.warning(f"Step {step.step_id} indicated data unavailable") intermediate_results[step.result_name] = {"features": [], "sql": sql} continue try: geojson = self.geo_engine.execute_spatial_query(sql) features = geojson.get("features", []) intermediate_results[step.result_name] = { "features": features, "sql": sql, "geojson": geojson } all_features.extend(features) all_sql.append(f"-- {step.description}\n{sql}") yield {"event": "status", "data": json.dumps({ "status": f"✅ Step got {len(features)} results" })} except Exception as e: logger.error(f"Step {step.step_id} failed: {e}") # Try to repair try: sql = await self.llm.correct_sql(step_query, sql, str(e), table_schema) geojson = self.geo_engine.execute_spatial_query(sql) features = geojson.get("features", []) intermediate_results[step.result_name] = { "features": features, "sql": sql, "geojson": geojson } all_features.extend(features) all_sql.append(f"-- {step.description} (repaired)\n{sql}") except Exception as e2: logger.error(f"Step repair also failed: {e2}") intermediate_results[step.result_name] = {"features": [], "sql": sql, "error": str(e2)} # 6. Generate final combined result yield {"event": "status", "data": json.dumps({"status": "đŸ’Ŧ Generating combined analysis..."})} # Summarize intermediate results for explanation result_summary = [] for name, result in intermediate_results.items(): features = result.get("features", []) result_summary.append(f"{name}: {len(features)} records") combined_summary = f"""Multi-step query completed with {len(plan.steps)} steps. Results: {chr(10).join(result_summary)} Combination logic: {plan.final_combination_logic}""" # Get combined explanation explanation_buffer = "" async for chunk in self.llm.stream_explanation(query, "\n\n".join(all_sql), combined_summary, history): if chunk["type"] == "content": explanation_buffer += chunk["text"] yield {"event": "chunk", "data": json.dumps({"type": "text", "content": chunk["text"]})} # Find the best geojson to display (use the one with most features) best_geojson = None best_features = [] for name, result in intermediate_results.items(): features = result.get("features", []) if len(features) > len(best_features): best_features = features best_geojson = result.get("geojson") # Generate layer if we have features if include_map and best_features and best_geojson: layer_info = await self.llm.generate_layer_name(query, all_sql[0] if all_sql else "") layer_name_ai = layer_info.get("name", "Multi-Step Result") layer_emoji = layer_info.get("emoji", "📊") best_geojson, layer_id, layer_name = ResponseFormatter.format_geojson_layer( query, best_geojson, best_features, layer_name_ai, layer_emoji ) try: table_name = self.geo_engine.register_layer(layer_id, best_geojson) self.session_store.add_layer(session_id, { "id": layer_id, "name": layer_name, "table_name": table_name, "timestamp": datetime.datetime.now().isoformat() }) except Exception as e: logger.warning(f"Failed to register multi-step layer: {e}") # Generate chart from combined results chart_data = ResponseFormatter.generate_chart_data("\n".join(all_sql), best_features) raw_data = ResponseFormatter.prepare_raw_data(best_features) # Final result yield {"event": "result", "data": json.dumps({ "response": explanation_buffer, "sql_query": "\n\n".join(all_sql), "geojson": best_geojson if include_map and best_features else None, "chart_data": chart_data, "raw_data": raw_data, "data_citations": [], "multi_step": True, "steps_executed": len(plan.steps) })}