|
|
""" |
|
|
Response Formatter Service |
|
|
|
|
|
Handles formatting of query results into citations, charts, GeoJSON layers, and raw data for the frontend. |
|
|
Separates presentation logic from execution logic. |
|
|
""" |
|
|
|
|
|
from typing import List, Dict, Any, Optional |
|
|
import uuid |
|
|
|
|
|
class ResponseFormatter: |
|
|
@staticmethod |
|
|
def generate_citations(tables: List[str], features: Optional[List[Dict]] = None) -> List[str]: |
|
|
"""Generates readable citations based on table names and returned features.""" |
|
|
citations = [] |
|
|
processed = set() |
|
|
|
|
|
|
|
|
for table in tables: |
|
|
table = table.lower() |
|
|
if table in processed: continue |
|
|
|
|
|
if "universit" in table: |
|
|
citations.append("Universities Data (OpenStreetMap, 2024)") |
|
|
elif "school" in table or "education" in table: |
|
|
citations.append("Education Facilities (OpenStreetMap, 2024)") |
|
|
elif "hospital" in table or "health" in table: |
|
|
citations.append("Health Facilities (OpenStreetMap, 2024)") |
|
|
elif "airport" in table: |
|
|
citations.append("Airports Data (OpenStreetMap, 2024)") |
|
|
elif "road" in table: |
|
|
citations.append("Road Network (OpenStreetMap, 2024)") |
|
|
elif "population" in table or "census" in table: |
|
|
citations.append("Panama Census Data (INEC, 2023)") |
|
|
elif "admin" in table or "boundar" in table: |
|
|
if "Admin Boundaries" not in processed: |
|
|
citations.append("Panama Administrative Boundaries (HDX COD-AB, 2021)") |
|
|
processed.add("Admin Boundaries") |
|
|
continue |
|
|
|
|
|
processed.add(table) |
|
|
|
|
|
|
|
|
if not citations and features: |
|
|
if any(k.startswith("adm") for k in features[0].get("properties", {}).keys()): |
|
|
citations.append("Panama Administrative Boundaries (HDX COD-AB, 2021)") |
|
|
|
|
|
return list(set(citations)) |
|
|
|
|
|
@staticmethod |
|
|
def generate_chart_data(sql: str, features: List[Dict]) -> Optional[Dict[str, Any]]: |
|
|
""" |
|
|
Generates Chart.js compatible data structure if the query looks aggregative. |
|
|
""" |
|
|
if not features: |
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
chart_items = [] |
|
|
x_key = "name" |
|
|
y_key = "value" |
|
|
x_label = "Feature" |
|
|
y_label = "Value" |
|
|
|
|
|
|
|
|
if features: |
|
|
sample_props = features[0].get("properties", {}) |
|
|
|
|
|
|
|
|
valid_keys = [k for k in sample_props.keys() if k not in ["geom", "geometry", "style", "layer_name", "layer_id", "choropleth", "fillColor", "color"]] |
|
|
|
|
|
|
|
|
for k in valid_keys: |
|
|
if isinstance(sample_props[k], (int, float)) and not k.endswith("_id") and not k.endswith("_code"): |
|
|
y_key = k |
|
|
y_label = k.replace("_", " ").title() |
|
|
if "sqkm" in k: y_label = "Area (km²)" |
|
|
elif "pop" in k: y_label = "Population" |
|
|
elif "count" in k: y_label = "Count" |
|
|
break |
|
|
|
|
|
|
|
|
for k in valid_keys: |
|
|
if isinstance(sample_props[k], str) and "name" in k: |
|
|
x_key = k |
|
|
x_label = k.replace("_", " ").title().replace("Name", "").strip() or "Region" |
|
|
break |
|
|
|
|
|
|
|
|
for f in features: |
|
|
props = f.get("properties", {}) |
|
|
label = props.get(x_key) |
|
|
value = props.get(y_key) |
|
|
|
|
|
if label is not None and value is not None: |
|
|
chart_items.append({"name": str(label), "value": value}) |
|
|
|
|
|
if chart_items: |
|
|
|
|
|
chart_items.sort(key=lambda x: x["value"], reverse=True) |
|
|
|
|
|
return { |
|
|
"type": "bar", |
|
|
"title": f"{y_label} by {x_label}", |
|
|
"data": chart_items[:15], |
|
|
"xKey": "name", |
|
|
"yKey": "value", |
|
|
"xAxisLabel": x_label, |
|
|
"yAxisLabel": y_label |
|
|
} |
|
|
except Exception as e: |
|
|
print(f"Error generating chart data: {e}") |
|
|
return None |
|
|
|
|
|
return None |
|
|
|
|
|
@staticmethod |
|
|
def prepare_raw_data(features: List[Dict]) -> List[Dict]: |
|
|
"""Cleans feature properties for display in the raw data table.""" |
|
|
raw_data = [] |
|
|
if not features: |
|
|
return raw_data |
|
|
|
|
|
for f in features: |
|
|
props = f.get("properties", {}).copy() |
|
|
|
|
|
props = ResponseFormatter._serialize_properties(props) |
|
|
|
|
|
|
|
|
for key in ["geom", "geometry", "style", "layer_name", "layer_id", "choropleth", "fillColor", "color"]: |
|
|
props.pop(key, None) |
|
|
raw_data.append(props) |
|
|
|
|
|
return raw_data |
|
|
|
|
|
@staticmethod |
|
|
def format_geojson_layer(query: str, geojson: Dict[str, Any], features: List[Dict], layer_name: str, layer_emoji: str = "📍", point_style: Optional[str] = None, admin_levels: Optional[List[str]] = None) -> tuple[Dict[str, Any], str, str]: |
|
|
""" |
|
|
styles the GeoJSON layer and generates metadata (ID, Name, Choropleth). |
|
|
|
|
|
Args: |
|
|
point_style: "icon" for emoji markers, "circle" for simple colored circles, None for auto-detect |
|
|
""" |
|
|
|
|
|
|
|
|
if features: |
|
|
for f in features: |
|
|
if "properties" in f: |
|
|
f["properties"] = ResponseFormatter._serialize_properties(f["properties"]) |
|
|
|
|
|
|
|
|
|
|
|
palette = [ |
|
|
"#E63946", |
|
|
"#F4A261", |
|
|
"#2A9D8F", |
|
|
"#E9C46A", |
|
|
"#9C6644", |
|
|
"#D62828", |
|
|
"#8338EC", |
|
|
"#3A86FF", |
|
|
"#FB5607", |
|
|
"#FF006E", |
|
|
] |
|
|
|
|
|
|
|
|
color_idx = abs(hash(query)) % len(palette) |
|
|
layer_color = palette[color_idx] |
|
|
|
|
|
|
|
|
|
|
|
choropleth_col = None |
|
|
if features: |
|
|
sample = features[0].get("properties", {}) |
|
|
valid_numerics = [ |
|
|
k for k, v in sample.items() |
|
|
if isinstance(v, (int, float)) |
|
|
and k not in ["layer_id", "style"] |
|
|
and not k.endswith("_code") |
|
|
and not k.endswith("_id") |
|
|
] |
|
|
|
|
|
|
|
|
priority_cols = ["population", "pop", "count", "num", "density", "area_sqkm", "area"] |
|
|
|
|
|
for p in priority_cols: |
|
|
matches = [c for c in valid_numerics if p in c] |
|
|
if matches: |
|
|
choropleth_col = matches[0] |
|
|
break |
|
|
|
|
|
|
|
|
if not choropleth_col and valid_numerics: |
|
|
choropleth_col = valid_numerics[0] |
|
|
|
|
|
|
|
|
if choropleth_col: |
|
|
|
|
|
values = [f["properties"].get(choropleth_col, 0) for f in features] |
|
|
if len(set(values)) > 1: |
|
|
geojson["properties"]["choropleth"] = { |
|
|
"enabled": True, |
|
|
"palette": "viridis", |
|
|
"column": choropleth_col, |
|
|
"scale": "log" if "pop" in choropleth_col or "density" in choropleth_col else "linear" |
|
|
} |
|
|
else: |
|
|
|
|
|
geojson["properties"]["style"] = { |
|
|
"color": layer_color, |
|
|
"fillColor": layer_color, |
|
|
"opacity": 0.8, |
|
|
"fillOpacity": 0.4 |
|
|
} |
|
|
|
|
|
layer_id = str(uuid.uuid4())[:8] |
|
|
geojson["properties"]["layer_name"] = layer_name |
|
|
geojson["properties"]["layer_id"] = layer_id |
|
|
|
|
|
|
|
|
|
|
|
marker_icon = None |
|
|
marker_style = "circle" |
|
|
|
|
|
if point_style == "icon": |
|
|
|
|
|
marker_icon = layer_emoji |
|
|
marker_style = "icon" |
|
|
elif point_style == "circle": |
|
|
|
|
|
marker_icon = None |
|
|
marker_style = "circle" |
|
|
else: |
|
|
|
|
|
marker_icon = layer_emoji |
|
|
marker_style = "icon" |
|
|
|
|
|
geojson["properties"]["pointMarker"] = { |
|
|
"icon": marker_icon, |
|
|
"style": marker_style, |
|
|
"color": layer_color, |
|
|
"size": 32 |
|
|
} |
|
|
|
|
|
return geojson, layer_id, layer_name |
|
|
|
|
|
@staticmethod |
|
|
def generate_data_summary(features: List[Dict]) -> str: |
|
|
"""Generates a text summary of the features for the LLM explanation context.""" |
|
|
if features: |
|
|
sample_names = [] |
|
|
for f in features[:5]: |
|
|
props = f.get("properties", {}) |
|
|
name = props.get("adm3_name") or props.get("adm2_name") or props.get("adm1_name") or props.get("name") or "Feature" |
|
|
area = props.get("area_sqkm") |
|
|
if area: |
|
|
sample_names.append(f"{name} ({float(area):.1f} km²)") |
|
|
else: |
|
|
sample_names.append(name) |
|
|
return f"Found {len(features)} features. Sample: {', '.join(sample_names)}" |
|
|
return f"Found {len(features)} features. Sample: {', '.join(sample_names)}" |
|
|
else: |
|
|
return "No features found matching the query." |
|
|
|
|
|
@staticmethod |
|
|
def _serialize_properties(properties: Dict[str, Any]) -> Dict[str, Any]: |
|
|
"""Recursively converts datetime/date objects to strings for JSON serialization.""" |
|
|
from datetime import datetime, date |
|
|
|
|
|
serialized = {} |
|
|
for k, v in properties.items(): |
|
|
if isinstance(v, (datetime, date)): |
|
|
serialized[k] = v.isoformat() |
|
|
elif isinstance(v, dict): |
|
|
serialized[k] = ResponseFormatter._serialize_properties(v) |
|
|
elif isinstance(v, list): |
|
|
serialized[k] = [ |
|
|
x.isoformat() if isinstance(x, (datetime, date)) else x |
|
|
for x in v |
|
|
] |
|
|
else: |
|
|
serialized[k] = v |
|
|
return serialized |
|
|
|