"""
CoJournalist Data - Swiss Parliamentary Data & Statistics Chatbot
Powered by Llama-3.1-8B-Instruct with OpenParlData and BFS MCP
"""
import os
import json
import tempfile
from datetime import datetime
from pathlib import Path
import gradio as gr
from huggingface_hub import InferenceClient
from dotenv import load_dotenv
from mcp_integration import execute_mcp_query, execute_mcp_query_bfs
import asyncio
from usage_tracker import UsageTracker
from typing import Any
from ui.helpers import prefer_language, strip_html, pick_external_url
from datasets.parliament.constants import OPENPARLDATA_EXAMPLES, TOOL_PARAMS as PARLIAMENT_TOOL_PARAMS
from datasets.bfs.constants import BFS_EXAMPLES
# Load environment variables
load_dotenv()
# Load system prompts from files
PROMPTS_DIR = Path(__file__).parent / "prompts"
def load_prompt(dataset_name: str) -> str:
"""Load system prompt from file."""
prompt_file = PROMPTS_DIR / f"{dataset_name}.txt"
if not prompt_file.exists():
raise FileNotFoundError(f"Prompt file not found: {prompt_file}")
return prompt_file.read_text(encoding='utf-8')
# Load prompts at startup
PARLIAMENT_PROMPT = load_prompt("parliament")
BFS_PROMPT = load_prompt("bfs")
# Initialize Hugging Face Inference Client
HF_TOKEN = os.getenv("HF_TOKEN")
if not HF_TOKEN:
print("Warning: HF_TOKEN not found. Please set it in .env file or Hugging Face Space secrets.")
client = InferenceClient(token=HF_TOKEN)
def translate_to_german(text: str) -> str:
"""
Translate user-facing keywords into German to improve OpenParlData recall.
Falls back to the original text if translation fails or input is empty.
"""
cleaned = text.strip()
if not cleaned:
return cleaned
prompt = (
"Übersetze die folgenden Suchbegriffe ins Deutsche. "
"Gib nur die deutschen Stichwörter zurück, ohne Zusatztext.\n"
f"Original: {cleaned}"
)
try:
response = client.chat_completion(
model="meta-llama/Llama-3.1-70B-Instruct",
messages=[
{"role": "system", "content": "Du bist ein präziser Übersetzer ins Deutsche."},
{"role": "user", "content": prompt},
],
max_tokens=64,
temperature=0.0,
)
translated = response.choices[0].message.content.strip()
return translated or cleaned
except Exception as exc:
print(f"⚠️ [translate_to_german] Translation failed ({exc}); falling back to original text.")
return cleaned
class DatasetEngine:
"""Dataset-specific orchestrator for LLM prompting and tool execution."""
def __init__(
self,
name: str,
display_name: str,
system_prompt: str,
routing_instruction: str,
allowed_tools: set[str],
):
self.name = name
self.display_name = display_name
self.system_prompt = system_prompt
self.routing_instruction = routing_instruction
self.allowed_tools = allowed_tools
self._last_request: dict[str, Any] | None = None
def build_messages(self, user_message: str, language_label: str, language_code: str) -> list[dict]:
"""Construct chat completion messages with dataset-specific guardrails."""
routing_guardrails = (
f"TARGET_DATA_SOURCE: {self.display_name}\n"
f"{self.routing_instruction}\n"
'If the request requires a different data source, respond with '
'{"response": "Explain that the other dataset should be selected in the app."}'
)
# Get current date for dynamic date handling
current_date = datetime.now().strftime("%Y-%m-%d")
return [
{"role": "system", "content": self.system_prompt},
{"role": "system", "content": routing_guardrails},
{
"role": "user",
"content": (
f"Current date: {current_date}\n"
f"Selected dataset: {self.display_name}\n"
f"Language preference: {language_label} ({language_code})\n"
f"Question: {user_message}"
),
},
]
@staticmethod
def _parse_model_response(raw_response: str) -> dict:
"""Parse JSON (with cleanup) returned by the LLM."""
clean_response = raw_response.strip()
if clean_response.startswith("```json"):
clean_response = clean_response[7:]
if clean_response.startswith("```"):
clean_response = clean_response[3:]
if clean_response.endswith("```"):
clean_response = clean_response[:-3]
clean_response = clean_response.strip()
json_start_candidates = []
for ch in ("{", "["):
idx = clean_response.find(ch)
if idx != -1:
json_start_candidates.append(idx)
if json_start_candidates:
clean_response = clean_response[min(json_start_candidates):]
return json.loads(clean_response)
def query_model(self, user_message: str, language_label: str, language_code: str) -> dict:
"""Call the LLM with dataset-constrained instructions."""
try:
messages = self.build_messages(user_message, language_label, language_code)
response = client.chat_completion(
model="meta-llama/Llama-3.1-70B-Instruct",
messages=messages,
max_tokens=500,
temperature=0.3,
)
assistant_message = response.choices[0].message.content
return self._parse_model_response(assistant_message)
except json.JSONDecodeError:
# Surface malformed responses to the user so they can retry.
return {"response": assistant_message}
except Exception as exc:
return {"error": f"Error querying model: {str(exc)}"}
def execute_tool(
self,
user_message: str,
tool_name: str,
arguments: dict,
show_debug: bool,
) -> tuple[str, str | None]:
"""Run the MCP tool for the dataset."""
raise NotImplementedError("execute_tool must be implemented by subclasses.")
def sanitize_arguments(self, tool_name: str, arguments: dict) -> dict:
"""
Sanitize and validate tool arguments before execution.
Args:
tool_name: Name of the tool being called
arguments: Raw arguments from LLM
Returns:
Sanitized arguments dict with proper types and valid values
"""
raise NotImplementedError("sanitize_arguments must be implemented by subclasses.")
def _compose_response_text(
self,
explanation: str,
debug_info: str | None,
show_debug: bool,
body: str,
) -> str:
parts = []
if explanation:
parts.append(f"*{explanation}*")
if show_debug and debug_info:
parts.append(f"### 🔧 Debug Information\n{debug_info}\n\n---")
parts.append(body)
return "\n\n".join(parts)
def postprocess_tool_response(
self,
*,
response: str,
tool_name: str,
explanation: str,
debug_info: str | None,
show_debug: bool,
language_code: str,
) -> tuple[str, str | None, dict, list]:
"""Default dataset response handler."""
body = f"### 📊 Results\n{response}"
final_response = self._compose_response_text(explanation, debug_info, show_debug, body)
return final_response, None, {}, []
def respond(
self,
user_message: str,
language_label: str,
language_code: str,
show_debug: bool,
) -> tuple[str, str | None, dict, list]:
"""Entry point used by the Gradio handler."""
model_response = self.query_model(user_message, language_label, language_code)
if "response" in model_response:
return model_response["response"], None, {}, []
if "error" in model_response:
return f"❌ {model_response['error']}", None, {}, []
tool_name = model_response.get("tool")
arguments = model_response.get("arguments")
if not tool_name or not isinstance(arguments, dict):
return (
"I couldn't determine how to process your request. Please try rephrasing your question.",
None,
{},
[],
)
if tool_name not in self.allowed_tools:
allowed_list = ", ".join(sorted(self.allowed_tools))
warning = (
f"❌ Tool '{tool_name}' is not available for {self.display_name}. "
f"Allowed tools: {allowed_list}. Please adjust your request."
)
return warning, None, {}, []
if "language" not in arguments:
arguments["language"] = language_code
# Force JSON response format for parliament tools to ensure consistent card rendering
if isinstance(self, ParliamentEngine):
arguments["response_format"] = "json"
# Sanitize arguments before execution
arguments = self.sanitize_arguments(tool_name, arguments)
print(f"✅ [DatasetEngine] Sanitized arguments: {arguments}")
# Remember latest request context for downstream post-processing
self._last_request = {
"tool": tool_name,
"arguments": dict(arguments),
}
explanation = model_response.get("explanation", "")
response, debug_info = self.execute_tool(user_message, tool_name, arguments, show_debug)
return self.postprocess_tool_response(
response=response,
tool_name=tool_name,
explanation=explanation,
debug_info=debug_info,
show_debug=show_debug,
language_code=language_code,
)
class ParliamentEngine(DatasetEngine):
def __init__(self):
super().__init__(
name="parliament",
display_name="Swiss Parliament Data (OpenParlData)",
system_prompt=PARLIAMENT_PROMPT,
routing_instruction="Use only tools that begin with 'openparldata_'. Never mention BFS tools.",
allowed_tools={
"openparldata_search_parliamentarians",
"openparldata_search_votes",
"openparldata_search_motions",
"openparldata_search_debates",
"openparldata_search_meetings",
},
)
def sanitize_arguments(self, tool_name: str, arguments: dict) -> dict:
"""Sanitize arguments for OpenParlData tools."""
sanitized = {}
valid_params = PARLIAMENT_TOOL_PARAMS.get(tool_name, set())
requested_language = str(arguments.get("language", "")).lower()
original_arguments = dict(arguments)
optional_string_params = {
"canton",
"party",
"parliament_id",
"vote_type",
"submitter_id",
"speaker_id",
"topic",
"status",
"body_key",
"level",
}
for key, value in arguments.items():
# Skip extra fields not in the tool schema
if key not in valid_params:
print(f"⚠️ [ParliamentEngine] Skipping invalid parameter '{key}' for {tool_name}")
continue
# Normalize strings and drop empty values for optional params
if isinstance(value, str):
value = value.strip()
if value == "" and key in optional_string_params:
print(f"⚠️ [ParliamentEngine] Dropping empty string for '{key}'")
continue
# Type conversions
if key == "limit":
# Convert to int and clamp to 1-100
try:
limit_val = int(value) if isinstance(value, str) else value
sanitized[key] = max(1, min(100, limit_val))
except (ValueError, TypeError):
sanitized[key] = 20 # Default
elif key == "offset":
# Convert to int and ensure >= 0
try:
offset_val = int(value) if isinstance(value, str) else value
sanitized[key] = max(0, offset_val)
except (ValueError, TypeError):
sanitized[key] = 0 # Default
elif key == "language":
# Validate language enum (case-insensitive)
lang_upper = str(value).upper()
if lang_upper in ["DE", "FR", "IT", "EN"]:
sanitized[key] = lang_upper.lower()
else:
sanitized[key] = "en" # Default to English
elif key == "active_only":
# Convert to bool
sanitized[key] = bool(value)
elif key == "status":
status_val = str(value).strip().lower()
if status_val in {"", "all", "any", "*", "none"}:
print("⚠️ [ParliamentEngine] Removing non-specific status filter")
continue
status_map = {
"pending": "Eingereicht",
"submitted": "Eingereicht",
"in_progress": "Eingereicht",
"open": "Eingereicht",
"accepted": "Angenommen",
"approved": "Angenommen",
"rejected": "Abgelehnt",
"declined": "Abgelehnt",
"completed": "Erledigt",
"closed": "Erledigt",
}
if status_val.isdigit():
sanitized[key] = status_val
else:
mapped = status_map.get(status_val)
if mapped:
sanitized[key] = mapped
else:
print(f"⚠️ [ParliamentEngine] Unknown status '{value}' dropped")
continue
elif key == "body_key":
sanitized[key] = str(value).upper()
elif key == "level":
sanitized[key] = str(value).lower()
elif key == "query" and tool_name == "openparldata_search_parliamentarians":
query_text = str(value)
tokens = [tok for tok in query_text.replace(",", " ").split() if tok]
if len(tokens) >= 2 and all(tok[0].isupper() for tok in tokens if tok):
# Use last token (family name) for broader matching
sanitized[key] = tokens[-1]
else:
sanitized[key] = value
else:
# Keep other values as-is
sanitized[key] = value
# Enforce German language for English UI users
if requested_language == "en":
sanitized["language"] = "de"
elif "language" in sanitized:
sanitized["language"] = sanitized["language"].lower()
# Translate key textual filters into German for better recall
if sanitized.get("language") == "de":
for text_key in ("query", "topic"):
if text_key in sanitized:
text_value = str(sanitized[text_key]).strip()
if text_value:
translated = translate_to_german(text_value)
if translated:
sanitized[text_key] = translated
else:
# Restore original if translation failed
sanitized[text_key] = text_value
# Avoid empty required query strings by falling back to original input
if "query" in sanitized:
if not str(sanitized["query"]).strip():
fallback = str(original_arguments.get("query", "")).strip()
if fallback:
sanitized["query"] = translate_to_german(fallback) if sanitized.get("language") == "de" else fallback
else:
sanitized.pop("query", None)
return sanitized
def execute_tool(
self,
user_message: str,
tool_name: str,
arguments: dict,
show_debug: bool,
) -> tuple[str, str | None]:
# DEBUG: Capture arguments before MCP call
print(f"\n🔍 [ParliamentEngine] execute_tool called:")
print(f" Tool: {tool_name}")
print(f" Arguments: {arguments}")
print(f" Argument types: {dict((k, type(v).__name__) for k, v in arguments.items())}")
return asyncio.run(execute_mcp_query(user_message, tool_name, arguments, show_debug))
def postprocess_tool_response(
self,
*,
response: str,
tool_name: str,
explanation: str,
debug_info: str | None,
show_debug: bool,
language_code: str,
) -> tuple[str, str | None, dict, str]:
"""Pass through the response for parsing in respond() function."""
# Simplified: just return the raw JSON response
# The respond() function will handle parsing and card extraction
# Don't embed raw JSON in message - use clean placeholder instead
body = "Searching parliament data..."
final_response = self._compose_response_text(explanation, debug_info, show_debug, body)
return final_response, None, {}, response
class BFSEngine(DatasetEngine):
# Valid parameter names per tool
TOOL_PARAMS = {
"bfs_search": {
"keywords", "language" # NO format parameter!
},
"bfs_query_data": {
"datacube_id", "filters", "format", "language"
},
}
def __init__(self):
super().__init__(
name="statistics",
display_name="Swiss Statistics (BFS)",
system_prompt=BFS_PROMPT,
routing_instruction="Use only tools that begin with 'bfs_'. Never mention OpenParlData tools.",
allowed_tools={
"bfs_search",
"bfs_query_data",
},
)
def sanitize_arguments(self, tool_name: str, arguments: dict) -> dict:
"""Sanitize arguments for BFS tools."""
sanitized = {}
valid_params = self.TOOL_PARAMS.get(tool_name, set())
for key, value in arguments.items():
# Skip extra fields not in the tool schema
if key not in valid_params:
print(f"⚠️ [BFSEngine] Skipping invalid parameter '{key}' for {tool_name}")
continue
# Type conversions
if key == "language":
# Validate language enum (case-insensitive)
lang_upper = str(value).upper()
if lang_upper in ["DE", "FR", "IT", "EN"]:
sanitized[key] = lang_upper.lower()
else:
sanitized[key] = "en" # Default to English
elif key == "format":
# Validate and normalize format enum (only for bfs_query_data)
if tool_name == "bfs_query_data":
format_upper = str(value).upper().replace("-", "_")
# Map common values to DataFormat enum
format_map = {
"CSV": "csv",
"JSON": "json",
"JSON_STAT": "json-stat",
"JSON_STAT2": "json-stat2",
"PX": "px",
}
sanitized[key] = format_map.get(format_upper, "csv") # Default to CSV
else:
# Keep other values as-is
sanitized[key] = value
# Add default format for bfs_query_data if not present
if tool_name == "bfs_query_data" and "format" not in sanitized:
sanitized["format"] = "csv"
return sanitized
def execute_tool(
self,
user_message: str,
tool_name: str,
arguments: dict,
show_debug: bool,
) -> tuple[str, str | None]:
# DEBUG: Capture arguments after sanitization
print(f"\n🔍 [BFSEngine] execute_tool called:")
print(f" Tool: {tool_name}")
print(f" Arguments (sanitized): {arguments}")
print(f" Argument types: {dict((k, type(v).__name__) for k, v in arguments.items())}")
return asyncio.run(execute_mcp_query_bfs(user_message, tool_name, arguments, show_debug))
@staticmethod
def _parse_datacube_choices(response: str) -> tuple[dict, list]:
datacube_map: dict[str, str] = {}
datacube_choices: list[str] = []
import re
lines = response.split('\n')
i = 0
while i < len(lines):
line = lines[i]
match = re.search(r'^\s*\d+\.\s+\*\*([^*]+)\*\*\s*$', line)
if match:
datacube_id = match.group(1).strip()
description = datacube_id
if i + 1 < len(lines):
next_line = lines[i + 1].strip()
if not next_line.startswith('↳') and next_line:
description = next_line
elif i + 2 < len(lines):
description = lines[i + 2].strip() or datacube_id
if len(description) > 80:
description = description[:77] + "..."
label = f"{description} ({datacube_id})"
datacube_choices.append(label)
datacube_map[label] = datacube_id
i += 1
return datacube_map, datacube_choices
@staticmethod
def _detect_csv(response: str) -> bool:
lines = response.strip().split('\n')
if len(lines) < 2:
return False
if ',' not in lines[0] or ',' not in lines[1]:
return False
prefix = response.lower()[:200]
error_tokens = ["error", "no data", "no datacubes found", "try broader"]
return not any(token in prefix for token in error_tokens)
def postprocess_tool_response(
self,
*,
response: str,
tool_name: str,
explanation: str,
debug_info: str | None,
show_debug: bool,
language_code: str,
) -> tuple[str, str | None, dict, list]:
csv_file_path = None
datacube_map: dict[str, str] = {}
datacube_choices: list[str] = []
body = ""
if tool_name == "bfs_query_data" and self._detect_csv(response):
rows = response.count('\n')
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
csv_filename = f"bfs_data_{timestamp}.csv"
csv_file_path = os.path.join(tempfile.gettempdir(), csv_filename)
with open(csv_file_path, 'w', encoding='utf-8') as f:
f.write(response)
body = (
"### 📊 Data Ready\n"
f"✅ CSV file generated with {rows} rows\n\n"
"💾 **Download your data using the button below**"
)
else:
if tool_name == "bfs_search" and "matching datacube" in response.lower():
datacube_map, datacube_choices = self._parse_datacube_choices(response)
# If we found datacubes, show a simple message instead of the full response
if datacube_choices:
# Extract the search term from explanation
import re
match = re.search(r'related to (.+)', explanation, re.IGNORECASE)
search_term = match.group(1).strip() if match else "your search"
body = f"### 📊 Available Datasets\n\nHere is the data available for **{search_term}**. Please select a dataset below to download:"
else:
# No datacubes found, show the full error message
body = f"### 📊 Results\n{response}"
else:
body = f"### 📊 Results\n{response}"
final_response = self._compose_response_text(explanation, debug_info, show_debug, body)
return final_response, csv_file_path, datacube_map, datacube_choices
def fetch_datacube_data(
self,
datacube_id: str,
language_code: str,
show_debug: bool,
) -> tuple[str, str | None]:
response, debug_info = self.execute_tool(
user_message=f"Get data for datacube {datacube_id}",
tool_name="bfs_query_data",
arguments={"datacube_id": datacube_id, "language": language_code},
show_debug=show_debug,
)
if self._detect_csv(response):
rows = response.count('\n')
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
csv_filename = f"bfs_data_{timestamp}.csv"
csv_file_path = os.path.join(tempfile.gettempdir(), csv_filename)
with open(csv_file_path, 'w', encoding='utf-8') as f:
f.write(response)
message = (
"### 📊 Data Ready\n"
f"✅ CSV file generated with {rows} rows for datacube: `{datacube_id}`\n\n"
"💾 **Download your data using the button below**"
)
if show_debug and debug_info:
message = f"### 🔧 Debug Information\n{debug_info}\n\n---\n\n{message}"
return message, csv_file_path
error_message = f"❌ Error retrieving data:\n\n{response}"
return error_message, None
DATASET_ENGINES: dict[str, DatasetEngine] = {
"parliament": ParliamentEngine(),
"statistics": BFSEngine(),
}
# Initialize usage tracker with 50 requests per day limit
tracker = UsageTracker(daily_limit=50)
# Available languages
LANGUAGES = {
"English": "en",
"Deutsch": "de",
"Français": "fr",
"Italiano": "it"
}
# Constants imported from datasets/ modules above
def chat_response(message: str, history: list, language: str, show_debug: bool, dataset: str = "parliament") -> tuple[str, str | None, dict, list]:
"""
Main chat response function routed through dataset-specific engines.
"""
try:
engine = DATASET_ENGINES.get(dataset)
if not engine:
return f"❌ Unknown dataset selected: {dataset}", None, {}, []
language_code = LANGUAGES.get(language, "en")
return engine.respond(message, language, language_code, show_debug)
except Exception as e:
return f"❌ An error occurred: {str(e)}", None, {}, []
# Load custom CSS
with open("ui/styles.css", "r") as f:
custom_css = f.read()
# Build Gradio interface
with gr.Blocks(css=custom_css, title="Swiss and European Government Data LLM") as demo:
# State to track datacube search results
datacube_state = gr.State({}) # Maps display text → datacube_id
# State to track parliament cards
parliament_cards_state = gr.State([]) # List of card dicts
parliament_page_state = gr.State(1) # Current page number
gr.Markdown(
"""
"""
)
with gr.Row():
with gr.Column(scale=3):
# Simple query input form
with gr.Row():
msg = gr.Textbox(
placeholder="Ask a question about Swiss parliamentary data or statistics...",
show_label=False,
scale=4,
container=False
)
submit = gr.Button("🔍 Search", variant="primary", scale=1)
# Status/explanation text
status_text = gr.Markdown("", visible=False)
# CSV download file component
download_file = gr.File(
label="📥 Download Data",
visible=False,
interactive=False
)
# Datacube selection (hidden by default, shown when search returns results)
with gr.Row(visible=False) as datacube_selection_row:
with gr.Column(scale=4):
datacube_radio = gr.Radio(
label="📋 Select Datacube for Download",
choices=[],
visible=True
)
with gr.Column(scale=1):
get_data_btn = gr.Button("📥 Get Data", variant="primary", size="lg")
# Parliament cards display (hidden by default, shown when parliament results return)
with gr.Column(visible=False) as parliament_cards_row:
parliament_cards_html = gr.HTML("")
with gr.Row():
prev_page_btn = gr.Button("◀ Previous", size="sm")
page_info = gr.Markdown("Page 1")
next_page_btn = gr.Button("Next ▶", size="sm")
with gr.Column(scale=1):
gr.Markdown("### ⚙️ Settings")
dataset = gr.Radio(
choices=[
"Swiss Parliament Data",
"Swiss Statistics (BFS)"
],
value="Swiss Parliament Data",
label="Data Source",
info="Choose which API to query"
)
gr.HTML(
"""
ParlTalk • Coming Soon
Eurostat • Coming Soon
"""
)
language = gr.Radio(
choices=list(LANGUAGES.keys()),
value="English",
label="Language",
info="Select response language"
)
# Example queries display
gr.Markdown("### 💡 Example Queries")
examples_display = gr.Markdown()
def ensure_message_history(history):
"""Normalize chat history to the format expected by gr.Chatbot(type='messages')."""
normalized: list[dict] = []
if not history:
return normalized
for entry in history:
if isinstance(entry, dict):
role = entry.get("role")
content = entry.get("content", "")
if role:
normalized.append({"role": role, "content": "" if content is None else str(content)})
elif isinstance(entry, (tuple, list)) and len(entry) == 2:
user, assistant = entry
if user is not None:
normalized.append({"role": "user", "content": str(user)})
if assistant is not None:
normalized.append({"role": "assistant", "content": str(assistant)})
return normalized
def create_examples_text(dataset_choice: str, language: str) -> str:
"""Create formatted example queries text."""
lang_code = LANGUAGES.get(language, "en")
if dataset_choice == "Swiss Parliament Data":
examples = OPENPARLDATA_EXAMPLES.get(lang_code, OPENPARLDATA_EXAMPLES["en"])
elif dataset_choice == "Swiss Statistics (BFS)":
examples = BFS_EXAMPLES.get(lang_code, BFS_EXAMPLES["en"])
else:
examples = OPENPARLDATA_EXAMPLES.get(lang_code, OPENPARLDATA_EXAMPLES["en"])
examples_md = "\n".join([f"- {example}" for example in examples])
return examples_md
# Helper functions imported from ui.helpers
def build_parliament_card(item: dict, lang_code: str) -> dict:
"""Normalize OpenParlData rows into unified card metadata."""
card = {
"title": "Untitled",
"url": "#",
"date": "",
"category": "Result",
"summary": ""
}
if not isinstance(item, dict):
return card
# People directory
if any(key in item for key in ("firstname", "lastname", "fullname")):
card["category"] = "Person"
fullname = item.get("fullname") or f"{item.get('firstname', '')} {item.get('lastname', '')}".strip()
card["title"] = fullname or "Parliamentarian"
website = prefer_language(item.get("website_parliament_url"), lang_code)
card["url"] = website or item.get("url_api", "#")
party_display = None
if item.get("party"):
party_display = prefer_language(item.get("party"), lang_code)
if not party_display and isinstance(item["party"], dict):
party_display = prefer_language(item["party"], "de")
if not party_display and item.get("party_harmonized"):
party_display = prefer_language(item.get("party_harmonized"), lang_code)
body_key = item.get("body_key")
summary_parts = []
if party_display:
summary_parts.append(f"Party: {party_display}")
if body_key:
summary_parts.append(f"Body: {body_key}")
if summary_parts:
card["summary"] = " · ".join(summary_parts)
updated = item.get("updated_at") or item.get("created_at")
if updated:
card["date"] = updated[:10]
return card
# Meetings
if item.get("begin_date") and (item.get("name") or item.get("location") or item.get("type") == "meeting"):
card["category"] = "Meeting"
card["title"] = prefer_language(item.get("name"), lang_code) or item.get("number") or "Meeting"
card["date"] = (item.get("begin_date") or "")[:10]
card["url"] = prefer_language(item.get("url_external"), lang_code) or item.get("url_api", "#")
details = []
if item.get("location"):
details.append(item["location"])
if item.get("body_key"):
details.append(f"Body: {item['body_key']}")
if item.get("number"):
details.append(f"Meeting #{item['number']}")
if details:
card["summary"] = " · ".join(details)
return card
# Votes
if "results_yes" in item or "results_no" in item:
card["category"] = "Vote"
card["title"] = prefer_language(item.get("title"), lang_code) or "Vote"
card["date"] = (item.get("date") or "")[:10]
card["url"] = prefer_language(item.get("url_external"), lang_code) or item.get("url_api", "#")
affair_title = prefer_language(item.get("affair_title"), lang_code)
if affair_title:
card["summary"] = affair_title
else:
totals = []
if item.get("results_yes") is not None:
totals.append(f"Yes {item.get('results_yes')}")
if item.get("results_no") is not None:
totals.append(f"No {item.get('results_no')}")
if item.get("results_abstention") is not None:
totals.append(f"Abst {item.get('results_abstention')}")
if totals:
card["summary"] = " · ".join(totals)
return card
# Affairs / motions
if "type_name" in item or "number" in item or "state_name" in item:
card["category"] = "Affair"
card["title"] = prefer_language(item.get("title"), lang_code) or item.get("number") or "Affair"
card["url"] = prefer_language(item.get("url_external"), lang_code) or item.get("url_api", "#")
begin = item.get("begin_date") or item.get("created_at")
if begin:
card["date"] = begin[:10]
details = []
type_name = prefer_language(item.get("type_name"), lang_code)
state_name = prefer_language(item.get("state_name"), lang_code)
if type_name:
details.append(type_name)
if state_name:
details.append(state_name)
if item.get("number"):
details.append(item["number"])
if details:
card["summary"] = " · ".join(details)
return card
# Speeches / debates
if any(key in item for key in ("transcript", "speech_text", "speech_text_content", "speaker_name", "person_name", "person")):
card["category"] = "Speech"
# Extract person from nested expand structure: person = {"data": [...], "meta": {...}}
person_data = item.get("person", {})
if isinstance(person_data, dict) and "data" in person_data and person_data["data"]:
person = person_data["data"][0]
elif isinstance(person_data, dict):
person = person_data
else:
person = {}
speaker = (
prefer_language(person.get("fullname"), lang_code)
or prefer_language(item.get("person_name"), lang_code)
or person.get("fullname")
or item.get("speaker_name")
)
# Extract affair from nested expand structure
affair_data = item.get("affair", {})
if isinstance(affair_data, dict) and "data" in affair_data and affair_data["data"]:
affair = affair_data["data"][0]
elif isinstance(affair_data, dict):
affair = affair_data
else:
affair = {}
affair_title = prefer_language(affair.get("title"), lang_code)
card["title"] = (
prefer_language(item.get("title"), lang_code)
or affair_title
or (f"Rede von {speaker}" if speaker else "Rede")
)
card["date"] = (item.get("date") or item.get("date_start") or "")[:10]
# Extract meeting from nested expand structure
meeting_data = item.get("meeting")
if isinstance(meeting_data, dict) and "data" in meeting_data and meeting_data["data"]:
meeting = meeting_data["data"][0]
else:
meeting = {}
# Speeches use "url" field (plain string), not "url_external" (dict)
external_url = pick_external_url(
item.get("url"), # Speeches have direct url field
item.get("url_external"),
affair.get("url_external") if isinstance(affair, dict) else None,
meeting.get("url_external") if isinstance(meeting, dict) else None,
)
# Never use url_api for clickable links
card["url"] = external_url or "#"
text_content = item.get("speech_text_content")
summary = None
if isinstance(text_content, dict):
summary = prefer_language(text_content, lang_code) or prefer_language(text_content, "de")
elif isinstance(text_content, str):
summary = text_content
elif item.get("transcript"):
summary = item.get("transcript")
elif item.get("speech_text"):
summary = item.get("speech_text")
if summary:
summary = strip_html(summary)[:200]
summary_parts = []
if speaker:
summary_parts.append(speaker)
if summary:
summary_parts.append(summary)
if affair_title and affair_title != card["title"]:
summary_parts.append(affair_title)
if summary_parts:
card["summary"] = " — ".join(summary_parts[:2])
return card
# Fallback generic
if item.get("title"):
card["title"] = prefer_language(item.get("title"), lang_code) or item["title"]
external = prefer_language(item.get("url_external"), lang_code)
card["url"] = external or item.get("url_api", "#")
if item.get("date"):
card["date"] = item["date"][:10]
return card
def render_parliament_cards(cards: list[dict], page: int, items_per_page: int = 10) -> tuple[str, str, int, bool]:
"""Render parliament cards as HTML with pagination."""
if not cards:
return "", "No results", 1, False
total_pages = (len(cards) + items_per_page - 1) // items_per_page
page = max(1, min(page, total_pages)) # Clamp page to valid range
show_pagination = len(cards) > items_per_page
start_idx = (page - 1) * items_per_page
end_idx = min(start_idx + items_per_page, len(cards))
page_cards = cards[start_idx:end_idx]
# Generate HTML for cards
cards_html = ''
for card in page_cards:
title = card.get("title", "Untitled")
url = card.get("url", "#")
date = card.get("date", "")
category = card.get("category", "Result")
summary = card.get("summary", "")
# Truncate title if too long
if len(title) > 120:
title = title[:117] + "..."
date_badge = f'
{date}' if date else ''
cards_html += f'''
{category}
{title}
{f'
{summary}
' if summary else ''}
{date_badge}
'''
cards_html += '
'
page_info = f"Page {page} of {total_pages} ({len(cards)} total results)"
return cards_html, page_info, page, show_pagination
# Handle message submission
def respond(message, language, dataset_choice, current_datacube_state, current_parliament_cards, current_page, request: gr.Request):
show_debug = False # Debug mode disabled in UI
if not message.strip():
return "", gr.update(visible=False), None, gr.update(visible=False), current_datacube_state, gr.update(), gr.update(visible=False), current_parliament_cards, current_page, "", "", gr.update(visible=False), gr.update(), gr.update()
# Check usage limit
user_id = request.client.host if request and hasattr(request, 'client') else "unknown"
if not tracker.check_limit(user_id):
status_msg = (
"⚠️ **Daily request limit reached.** You have used all 50 requests for today. "
"Please try again tomorrow.\n\nThis limit helps us keep the service free and available for everyone."
)
return "", gr.update(value=status_msg, visible=True), None, gr.update(visible=False), current_datacube_state, gr.update(), gr.update(visible=False), current_parliament_cards, current_page, "", "", gr.update(visible=False), gr.update(), gr.update()
# Map dataset choice to engine type
dataset_map = {
"Swiss Parliament Data": "parliament",
"Swiss Statistics (BFS)": "statistics"
}
dataset_type = dataset_map.get(dataset_choice, "parliament")
# Get bot response (returns tuple with optional CSV file and results data)
# Create temporary chat history for API call
temp_chat = []
bot_message, csv_file, datacube_map, results_data = chat_response(
message, temp_chat, language, show_debug, dataset_type
)
engine_instance = DATASET_ENGINES.get(dataset_type)
last_request = getattr(engine_instance, "_last_request", None) if engine_instance else None
# Parse JSON and extract cards for Parliament dataset
parliament_cards: list[dict] = []
if dataset_type == "parliament" and results_data and isinstance(results_data, str):
try:
print(f"\n🔍 [respond] Parsing JSON results_data...")
data = json.loads(results_data, strict=False)
print(f"✅ [respond] JSON parsed successfully")
if isinstance(data, dict) and data.get("status") == "error":
error_msg = data.get("message") or data.get("detail") or "Die OpenParlData-API meldet einen Fehler."
endpoint = data.get("endpoint")
if endpoint:
error_msg += f"\n\nEndpoint: `{endpoint}`"
bot_message = f"❌ {error_msg}"
return (
"",
gr.update(value=bot_message, visible=True),
None,
gr.update(visible=False),
current_datacube_state,
gr.update(),
gr.update(visible=False),
current_parliament_cards,
current_page,
"",
"",
gr.update(visible=False),
gr.update(),
gr.update()
)
if isinstance(data, dict) and isinstance(data.get("data"), list):
items = data["data"]
print(f"✅ [respond] Found data array with {len(items)} items")
lang_code = LANGUAGES.get(language, "en")
# Filter out error objects before building cards
valid_items = [
item for item in items
if isinstance(item, dict) and item.get("status") != "error"
]
if len(valid_items) < len(items):
print(f"⚠️ [respond] Filtered out {len(items) - len(valid_items)} error objects")
for item in valid_items:
parliament_cards.append(build_parliament_card(item, lang_code))
# Optional date filtering for meetings (client-side)
if last_request and last_request.get("tool") == "openparldata_search_meetings":
args = last_request.get("arguments", {})
date_from = args.get("date_from")
date_to = args.get("date_to")
if date_from or date_to:
def within_window(date_value: str | None) -> bool:
if not date_value:
return False
try:
card_date = datetime.fromisoformat(date_value).date()
except ValueError:
try:
card_date = datetime.strptime(date_value, "%Y-%m-%d").date()
except ValueError:
return False
if date_from:
start = datetime.strptime(date_from, "%Y-%m-%d").date()
if card_date < start:
return False
if date_to:
end = datetime.strptime(date_to, "%Y-%m-%d").date()
if card_date > end:
return False
return True
before = len(parliament_cards)
parliament_cards = [card for card in parliament_cards if within_window(card.get("date"))]
print(f"✅ [respond] Filtered meetings by date window ({before} → {len(parliament_cards)})")
# Limit display to avoid overwhelming the UI
MAX_RESULTS = 50
truncated = False
if len(parliament_cards) > MAX_RESULTS:
print(f"⚠️ [respond] Truncating card list from {len(parliament_cards)} to {MAX_RESULTS}")
parliament_cards = parliament_cards[:MAX_RESULTS]
truncated = True
if parliament_cards:
total = data.get("meta", {}).get("total_records") or len(parliament_cards)
display_count = len(parliament_cards)
bot_message = f"**Found {total} result(s).** Showing {display_count} items below:"
if LANGUAGES.get(language, "en") == "en":
bot_message += "\n\n*Note: English content is not available from the API. Results are displayed in German.*"
if truncated:
bot_message += f"\n\n_Only the first {MAX_RESULTS} items are displayed. Refine your search for more specific results._"
elif last_request and last_request.get("tool") == "openparldata_search_meetings":
bot_message = "No meetings found that match the requested filters. Try adjusting the date range or search keywords."
else:
print("❌ [respond] Data structure does not contain a 'data' array.")
except json.JSONDecodeError as e:
print(f"❌ [respond] JSON parsing failed: {e}")
except Exception as e:
print(f"❌ [respond] Unexpected error during card extraction: {e}")
# Handle parliament cards (for Parliament dataset)
if dataset_type == "parliament" and parliament_cards:
cards_html, page_info, page_num, show_pagination = render_parliament_cards(parliament_cards, 1)
return (
"",
gr.update(value=bot_message, visible=True),
None,
gr.update(visible=False),
current_datacube_state,
gr.update(),
gr.update(visible=False),
parliament_cards, # parliament_cards_state
page_num, # parliament_page_state
cards_html, # parliament_cards_html
page_info, # page_info
gr.update(visible=True), # parliament_cards_row
gr.update(visible=show_pagination), # prev_page_btn
gr.update(visible=show_pagination) # next_page_btn
)
# Handle datacube search results (for BFS dataset)
if dataset_type == "statistics" and results_data:
return (
"",
gr.update(value=bot_message, visible=True),
None,
gr.update(visible=False),
datacube_map,
gr.update(choices=results_data, value=None),
gr.update(visible=True),
current_parliament_cards,
current_page,
"",
"",
gr.update(visible=False),
gr.update(),
gr.update()
)
# Handle CSV download
if csv_file:
return (
"",
gr.update(value=bot_message, visible=True),
csv_file,
gr.update(visible=True),
current_datacube_state,
gr.update(),
gr.update(visible=False),
current_parliament_cards,
current_page,
"",
"",
gr.update(visible=False),
gr.update(),
gr.update()
)
return (
"",
gr.update(value=bot_message, visible=True),
None,
gr.update(visible=False),
current_datacube_state,
gr.update(),
gr.update(visible=False),
current_parliament_cards,
current_page,
"",
"",
gr.update(visible=False),
gr.update(),
gr.update()
)
# Handle parliament pagination
def prev_page(cards, current_page):
"""Go to previous page of parliament results."""
new_page = max(1, current_page - 1)
cards_html, page_info, page_num, show_pagination = render_parliament_cards(cards, new_page)
return cards_html, page_info, page_num
def next_page(cards, current_page):
"""Go to next page of parliament results."""
if not cards:
return "", "No results", current_page
total_pages = (len(cards) + 9) // 10 # 10 items per page
new_page = min(total_pages, current_page + 1)
cards_html, page_info, page_num, show_pagination = render_parliament_cards(cards, new_page)
return cards_html, page_info, page_num
# Handle "Get Data" button click for datacube selection
def fetch_datacube_data(selected_choice, current_datacube_state, language, request: gr.Request):
show_debug = False # Debug mode disabled in UI
if not selected_choice or not current_datacube_state:
error_msg = "⚠️ Please select a datacube first."
return gr.update(value=error_msg, visible=True), None, gr.update(visible=False), gr.update(visible=False)
# Check usage limit
user_id = request.client.host if request and hasattr(request, 'client') else "unknown"
if not tracker.check_limit(user_id):
bot_message = (
"⚠️ Daily request limit reached. You have used all 50 requests for today. "
"Please try again tomorrow.\n\nThis limit helps us keep the service free and available for everyone."
)
return gr.update(value=bot_message, visible=True), None, gr.update(visible=False), gr.update(visible=False)
# Get datacube ID from mapping
datacube_id = current_datacube_state.get(selected_choice)
if not datacube_id:
error_msg = "❌ Error: Could not find datacube ID for selected option."
return gr.update(value=error_msg, visible=True), None, gr.update(visible=False), gr.update(visible=False)
# Get language code
lang_code = LANGUAGES.get(language, "en")
bfs_engine = DATASET_ENGINES.get("statistics")
if not isinstance(bfs_engine, BFSEngine):
error_msg = "❌ Error: BFS engine unavailable."
return gr.update(value=error_msg, visible=True), None, gr.update(visible=False), gr.update(visible=False)
bot_message, csv_file_path = bfs_engine.fetch_datacube_data(datacube_id, lang_code, show_debug)
if csv_file_path:
return gr.update(value=bot_message, visible=True), csv_file_path, gr.update(visible=True), gr.update(visible=False)
return gr.update(value=bot_message, visible=True), None, gr.update(visible=False), gr.update(visible=False)
msg.submit(
respond,
[msg, language, dataset, datacube_state, parliament_cards_state, parliament_page_state],
[msg, status_text, download_file, download_file, datacube_state, datacube_radio, datacube_selection_row,
parliament_cards_state, parliament_page_state, parliament_cards_html, page_info, parliament_cards_row,
prev_page_btn, next_page_btn]
)
submit.click(
respond,
[msg, language, dataset, datacube_state, parliament_cards_state, parliament_page_state],
[msg, status_text, download_file, download_file, datacube_state, datacube_radio, datacube_selection_row,
parliament_cards_state, parliament_page_state, parliament_cards_html, page_info, parliament_cards_row,
prev_page_btn, next_page_btn]
)
get_data_btn.click(
fetch_datacube_data,
[datacube_radio, datacube_state, language],
[status_text, download_file, download_file, datacube_selection_row]
)
prev_page_btn.click(
prev_page,
[parliament_cards_state, parliament_page_state],
[parliament_cards_html, page_info, parliament_page_state]
)
next_page_btn.click(
next_page,
[parliament_cards_state, parliament_page_state],
[parliament_cards_html, page_info, parliament_page_state]
)
# Update examples when dataset or language changes
dataset.change(
create_examples_text,
[dataset, language],
[examples_display]
)
language.change(
create_examples_text,
[dataset, language],
[examples_display]
)
# Initialize examples on load
demo.load(
create_examples_text,
[dataset, language],
[examples_display]
)
gr.Markdown(
"""
---
**Data Sources:**
- **Swiss Parliament Data:** with thanks to Christian, Florin and the many contributors for creating OpenParlData.ch, the model queries their API to retrieve parliamentary data
- **Swiss Statistics (BFS):** Federal Statistical Office data via PxWeb API
**Rate Limit:** 50 requests per day per user (shared across both datasets) to keep the service affordable and accessible.
Powered by [Llama-3.1-8B-Instruct](https://huggingface.co/meta-llama/Llama-3.1-8B-Instruct) via HF Inference Providers and [Model Context Protocol (MCP)](https://modelcontextprotocol.io/)
"""
)
# Launch the app
if __name__ == "__main__":
demo.launch()