""" CoJournalist Data - Swiss Parliamentary Data & Statistics Chatbot Powered by Llama-3.1-8B-Instruct with OpenParlData and BFS MCP """ import os import json import tempfile from datetime import datetime from pathlib import Path import gradio as gr from huggingface_hub import InferenceClient from dotenv import load_dotenv from mcp_integration import execute_mcp_query, execute_mcp_query_bfs import asyncio from usage_tracker import UsageTracker from typing import Any from ui.helpers import prefer_language, strip_html, pick_external_url from datasets.parliament.constants import OPENPARLDATA_EXAMPLES, TOOL_PARAMS as PARLIAMENT_TOOL_PARAMS from datasets.bfs.constants import BFS_EXAMPLES # Load environment variables load_dotenv() # Load system prompts from files PROMPTS_DIR = Path(__file__).parent / "prompts" def load_prompt(dataset_name: str) -> str: """Load system prompt from file.""" prompt_file = PROMPTS_DIR / f"{dataset_name}.txt" if not prompt_file.exists(): raise FileNotFoundError(f"Prompt file not found: {prompt_file}") return prompt_file.read_text(encoding='utf-8') # Load prompts at startup PARLIAMENT_PROMPT = load_prompt("parliament") BFS_PROMPT = load_prompt("bfs") # Initialize Hugging Face Inference Client HF_TOKEN = os.getenv("HF_TOKEN") if not HF_TOKEN: print("Warning: HF_TOKEN not found. Please set it in .env file or Hugging Face Space secrets.") client = InferenceClient(token=HF_TOKEN) def translate_to_german(text: str) -> str: """ Translate user-facing keywords into German to improve OpenParlData recall. Falls back to the original text if translation fails or input is empty. """ cleaned = text.strip() if not cleaned: return cleaned prompt = ( "Übersetze die folgenden Suchbegriffe ins Deutsche. " "Gib nur die deutschen Stichwörter zurück, ohne Zusatztext.\n" f"Original: {cleaned}" ) try: response = client.chat_completion( model="meta-llama/Llama-3.1-70B-Instruct", messages=[ {"role": "system", "content": "Du bist ein präziser Übersetzer ins Deutsche."}, {"role": "user", "content": prompt}, ], max_tokens=64, temperature=0.0, ) translated = response.choices[0].message.content.strip() return translated or cleaned except Exception as exc: print(f"⚠️ [translate_to_german] Translation failed ({exc}); falling back to original text.") return cleaned class DatasetEngine: """Dataset-specific orchestrator for LLM prompting and tool execution.""" def __init__( self, name: str, display_name: str, system_prompt: str, routing_instruction: str, allowed_tools: set[str], ): self.name = name self.display_name = display_name self.system_prompt = system_prompt self.routing_instruction = routing_instruction self.allowed_tools = allowed_tools self._last_request: dict[str, Any] | None = None def build_messages(self, user_message: str, language_label: str, language_code: str) -> list[dict]: """Construct chat completion messages with dataset-specific guardrails.""" routing_guardrails = ( f"TARGET_DATA_SOURCE: {self.display_name}\n" f"{self.routing_instruction}\n" 'If the request requires a different data source, respond with ' '{"response": "Explain that the other dataset should be selected in the app."}' ) # Get current date for dynamic date handling current_date = datetime.now().strftime("%Y-%m-%d") return [ {"role": "system", "content": self.system_prompt}, {"role": "system", "content": routing_guardrails}, { "role": "user", "content": ( f"Current date: {current_date}\n" f"Selected dataset: {self.display_name}\n" f"Language preference: {language_label} ({language_code})\n" f"Question: {user_message}" ), }, ] @staticmethod def _parse_model_response(raw_response: str) -> dict: """Parse JSON (with cleanup) returned by the LLM.""" clean_response = raw_response.strip() if clean_response.startswith("```json"): clean_response = clean_response[7:] if clean_response.startswith("```"): clean_response = clean_response[3:] if clean_response.endswith("```"): clean_response = clean_response[:-3] clean_response = clean_response.strip() json_start_candidates = [] for ch in ("{", "["): idx = clean_response.find(ch) if idx != -1: json_start_candidates.append(idx) if json_start_candidates: clean_response = clean_response[min(json_start_candidates):] return json.loads(clean_response) def query_model(self, user_message: str, language_label: str, language_code: str) -> dict: """Call the LLM with dataset-constrained instructions.""" try: messages = self.build_messages(user_message, language_label, language_code) response = client.chat_completion( model="meta-llama/Llama-3.1-70B-Instruct", messages=messages, max_tokens=500, temperature=0.3, ) assistant_message = response.choices[0].message.content return self._parse_model_response(assistant_message) except json.JSONDecodeError: # Surface malformed responses to the user so they can retry. return {"response": assistant_message} except Exception as exc: return {"error": f"Error querying model: {str(exc)}"} def execute_tool( self, user_message: str, tool_name: str, arguments: dict, show_debug: bool, ) -> tuple[str, str | None]: """Run the MCP tool for the dataset.""" raise NotImplementedError("execute_tool must be implemented by subclasses.") def sanitize_arguments(self, tool_name: str, arguments: dict) -> dict: """ Sanitize and validate tool arguments before execution. Args: tool_name: Name of the tool being called arguments: Raw arguments from LLM Returns: Sanitized arguments dict with proper types and valid values """ raise NotImplementedError("sanitize_arguments must be implemented by subclasses.") def _compose_response_text( self, explanation: str, debug_info: str | None, show_debug: bool, body: str, ) -> str: parts = [] if explanation: parts.append(f"*{explanation}*") if show_debug and debug_info: parts.append(f"### 🔧 Debug Information\n{debug_info}\n\n---") parts.append(body) return "\n\n".join(parts) def postprocess_tool_response( self, *, response: str, tool_name: str, explanation: str, debug_info: str | None, show_debug: bool, language_code: str, ) -> tuple[str, str | None, dict, list]: """Default dataset response handler.""" body = f"### 📊 Results\n{response}" final_response = self._compose_response_text(explanation, debug_info, show_debug, body) return final_response, None, {}, [] def respond( self, user_message: str, language_label: str, language_code: str, show_debug: bool, ) -> tuple[str, str | None, dict, list]: """Entry point used by the Gradio handler.""" model_response = self.query_model(user_message, language_label, language_code) if "response" in model_response: return model_response["response"], None, {}, [] if "error" in model_response: return f"❌ {model_response['error']}", None, {}, [] tool_name = model_response.get("tool") arguments = model_response.get("arguments") if not tool_name or not isinstance(arguments, dict): return ( "I couldn't determine how to process your request. Please try rephrasing your question.", None, {}, [], ) if tool_name not in self.allowed_tools: allowed_list = ", ".join(sorted(self.allowed_tools)) warning = ( f"❌ Tool '{tool_name}' is not available for {self.display_name}. " f"Allowed tools: {allowed_list}. Please adjust your request." ) return warning, None, {}, [] if "language" not in arguments: arguments["language"] = language_code # Force JSON response format for parliament tools to ensure consistent card rendering if isinstance(self, ParliamentEngine): arguments["response_format"] = "json" # Sanitize arguments before execution arguments = self.sanitize_arguments(tool_name, arguments) print(f"✅ [DatasetEngine] Sanitized arguments: {arguments}") # Remember latest request context for downstream post-processing self._last_request = { "tool": tool_name, "arguments": dict(arguments), } explanation = model_response.get("explanation", "") response, debug_info = self.execute_tool(user_message, tool_name, arguments, show_debug) return self.postprocess_tool_response( response=response, tool_name=tool_name, explanation=explanation, debug_info=debug_info, show_debug=show_debug, language_code=language_code, ) class ParliamentEngine(DatasetEngine): def __init__(self): super().__init__( name="parliament", display_name="Swiss Parliament Data (OpenParlData)", system_prompt=PARLIAMENT_PROMPT, routing_instruction="Use only tools that begin with 'openparldata_'. Never mention BFS tools.", allowed_tools={ "openparldata_search_parliamentarians", "openparldata_search_votes", "openparldata_search_motions", "openparldata_search_debates", "openparldata_search_meetings", }, ) def sanitize_arguments(self, tool_name: str, arguments: dict) -> dict: """Sanitize arguments for OpenParlData tools.""" sanitized = {} valid_params = PARLIAMENT_TOOL_PARAMS.get(tool_name, set()) requested_language = str(arguments.get("language", "")).lower() original_arguments = dict(arguments) optional_string_params = { "canton", "party", "parliament_id", "vote_type", "submitter_id", "speaker_id", "topic", "status", "body_key", "level", } for key, value in arguments.items(): # Skip extra fields not in the tool schema if key not in valid_params: print(f"⚠️ [ParliamentEngine] Skipping invalid parameter '{key}' for {tool_name}") continue # Normalize strings and drop empty values for optional params if isinstance(value, str): value = value.strip() if value == "" and key in optional_string_params: print(f"⚠️ [ParliamentEngine] Dropping empty string for '{key}'") continue # Type conversions if key == "limit": # Convert to int and clamp to 1-100 try: limit_val = int(value) if isinstance(value, str) else value sanitized[key] = max(1, min(100, limit_val)) except (ValueError, TypeError): sanitized[key] = 20 # Default elif key == "offset": # Convert to int and ensure >= 0 try: offset_val = int(value) if isinstance(value, str) else value sanitized[key] = max(0, offset_val) except (ValueError, TypeError): sanitized[key] = 0 # Default elif key == "language": # Validate language enum (case-insensitive) lang_upper = str(value).upper() if lang_upper in ["DE", "FR", "IT", "EN"]: sanitized[key] = lang_upper.lower() else: sanitized[key] = "en" # Default to English elif key == "active_only": # Convert to bool sanitized[key] = bool(value) elif key == "status": status_val = str(value).strip().lower() if status_val in {"", "all", "any", "*", "none"}: print("⚠️ [ParliamentEngine] Removing non-specific status filter") continue status_map = { "pending": "Eingereicht", "submitted": "Eingereicht", "in_progress": "Eingereicht", "open": "Eingereicht", "accepted": "Angenommen", "approved": "Angenommen", "rejected": "Abgelehnt", "declined": "Abgelehnt", "completed": "Erledigt", "closed": "Erledigt", } if status_val.isdigit(): sanitized[key] = status_val else: mapped = status_map.get(status_val) if mapped: sanitized[key] = mapped else: print(f"⚠️ [ParliamentEngine] Unknown status '{value}' dropped") continue elif key == "body_key": sanitized[key] = str(value).upper() elif key == "level": sanitized[key] = str(value).lower() elif key == "query" and tool_name == "openparldata_search_parliamentarians": query_text = str(value) tokens = [tok for tok in query_text.replace(",", " ").split() if tok] if len(tokens) >= 2 and all(tok[0].isupper() for tok in tokens if tok): # Use last token (family name) for broader matching sanitized[key] = tokens[-1] else: sanitized[key] = value else: # Keep other values as-is sanitized[key] = value # Enforce German language for English UI users if requested_language == "en": sanitized["language"] = "de" elif "language" in sanitized: sanitized["language"] = sanitized["language"].lower() # Translate key textual filters into German for better recall if sanitized.get("language") == "de": for text_key in ("query", "topic"): if text_key in sanitized: text_value = str(sanitized[text_key]).strip() if text_value: translated = translate_to_german(text_value) if translated: sanitized[text_key] = translated else: # Restore original if translation failed sanitized[text_key] = text_value # Avoid empty required query strings by falling back to original input if "query" in sanitized: if not str(sanitized["query"]).strip(): fallback = str(original_arguments.get("query", "")).strip() if fallback: sanitized["query"] = translate_to_german(fallback) if sanitized.get("language") == "de" else fallback else: sanitized.pop("query", None) return sanitized def execute_tool( self, user_message: str, tool_name: str, arguments: dict, show_debug: bool, ) -> tuple[str, str | None]: # DEBUG: Capture arguments before MCP call print(f"\n🔍 [ParliamentEngine] execute_tool called:") print(f" Tool: {tool_name}") print(f" Arguments: {arguments}") print(f" Argument types: {dict((k, type(v).__name__) for k, v in arguments.items())}") return asyncio.run(execute_mcp_query(user_message, tool_name, arguments, show_debug)) def postprocess_tool_response( self, *, response: str, tool_name: str, explanation: str, debug_info: str | None, show_debug: bool, language_code: str, ) -> tuple[str, str | None, dict, str]: """Pass through the response for parsing in respond() function.""" # Simplified: just return the raw JSON response # The respond() function will handle parsing and card extraction # Don't embed raw JSON in message - use clean placeholder instead body = "Searching parliament data..." final_response = self._compose_response_text(explanation, debug_info, show_debug, body) return final_response, None, {}, response class BFSEngine(DatasetEngine): # Valid parameter names per tool TOOL_PARAMS = { "bfs_search": { "keywords", "language" # NO format parameter! }, "bfs_query_data": { "datacube_id", "filters", "format", "language" }, } def __init__(self): super().__init__( name="statistics", display_name="Swiss Statistics (BFS)", system_prompt=BFS_PROMPT, routing_instruction="Use only tools that begin with 'bfs_'. Never mention OpenParlData tools.", allowed_tools={ "bfs_search", "bfs_query_data", }, ) def sanitize_arguments(self, tool_name: str, arguments: dict) -> dict: """Sanitize arguments for BFS tools.""" sanitized = {} valid_params = self.TOOL_PARAMS.get(tool_name, set()) for key, value in arguments.items(): # Skip extra fields not in the tool schema if key not in valid_params: print(f"⚠️ [BFSEngine] Skipping invalid parameter '{key}' for {tool_name}") continue # Type conversions if key == "language": # Validate language enum (case-insensitive) lang_upper = str(value).upper() if lang_upper in ["DE", "FR", "IT", "EN"]: sanitized[key] = lang_upper.lower() else: sanitized[key] = "en" # Default to English elif key == "format": # Validate and normalize format enum (only for bfs_query_data) if tool_name == "bfs_query_data": format_upper = str(value).upper().replace("-", "_") # Map common values to DataFormat enum format_map = { "CSV": "csv", "JSON": "json", "JSON_STAT": "json-stat", "JSON_STAT2": "json-stat2", "PX": "px", } sanitized[key] = format_map.get(format_upper, "csv") # Default to CSV else: # Keep other values as-is sanitized[key] = value # Add default format for bfs_query_data if not present if tool_name == "bfs_query_data" and "format" not in sanitized: sanitized["format"] = "csv" return sanitized def execute_tool( self, user_message: str, tool_name: str, arguments: dict, show_debug: bool, ) -> tuple[str, str | None]: # DEBUG: Capture arguments after sanitization print(f"\n🔍 [BFSEngine] execute_tool called:") print(f" Tool: {tool_name}") print(f" Arguments (sanitized): {arguments}") print(f" Argument types: {dict((k, type(v).__name__) for k, v in arguments.items())}") return asyncio.run(execute_mcp_query_bfs(user_message, tool_name, arguments, show_debug)) @staticmethod def _parse_datacube_choices(response: str) -> tuple[dict, list]: datacube_map: dict[str, str] = {} datacube_choices: list[str] = [] import re lines = response.split('\n') i = 0 while i < len(lines): line = lines[i] match = re.search(r'^\s*\d+\.\s+\*\*([^*]+)\*\*\s*$', line) if match: datacube_id = match.group(1).strip() description = datacube_id if i + 1 < len(lines): next_line = lines[i + 1].strip() if not next_line.startswith('↳') and next_line: description = next_line elif i + 2 < len(lines): description = lines[i + 2].strip() or datacube_id if len(description) > 80: description = description[:77] + "..." label = f"{description} ({datacube_id})" datacube_choices.append(label) datacube_map[label] = datacube_id i += 1 return datacube_map, datacube_choices @staticmethod def _detect_csv(response: str) -> bool: lines = response.strip().split('\n') if len(lines) < 2: return False if ',' not in lines[0] or ',' not in lines[1]: return False prefix = response.lower()[:200] error_tokens = ["error", "no data", "no datacubes found", "try broader"] return not any(token in prefix for token in error_tokens) def postprocess_tool_response( self, *, response: str, tool_name: str, explanation: str, debug_info: str | None, show_debug: bool, language_code: str, ) -> tuple[str, str | None, dict, list]: csv_file_path = None datacube_map: dict[str, str] = {} datacube_choices: list[str] = [] body = "" if tool_name == "bfs_query_data" and self._detect_csv(response): rows = response.count('\n') timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") csv_filename = f"bfs_data_{timestamp}.csv" csv_file_path = os.path.join(tempfile.gettempdir(), csv_filename) with open(csv_file_path, 'w', encoding='utf-8') as f: f.write(response) body = ( "### 📊 Data Ready\n" f"✅ CSV file generated with {rows} rows\n\n" "💾 **Download your data using the button below**" ) else: if tool_name == "bfs_search" and "matching datacube" in response.lower(): datacube_map, datacube_choices = self._parse_datacube_choices(response) # If we found datacubes, show a simple message instead of the full response if datacube_choices: # Extract the search term from explanation import re match = re.search(r'related to (.+)', explanation, re.IGNORECASE) search_term = match.group(1).strip() if match else "your search" body = f"### 📊 Available Datasets\n\nHere is the data available for **{search_term}**. Please select a dataset below to download:" else: # No datacubes found, show the full error message body = f"### 📊 Results\n{response}" else: body = f"### 📊 Results\n{response}" final_response = self._compose_response_text(explanation, debug_info, show_debug, body) return final_response, csv_file_path, datacube_map, datacube_choices def fetch_datacube_data( self, datacube_id: str, language_code: str, show_debug: bool, ) -> tuple[str, str | None]: response, debug_info = self.execute_tool( user_message=f"Get data for datacube {datacube_id}", tool_name="bfs_query_data", arguments={"datacube_id": datacube_id, "language": language_code}, show_debug=show_debug, ) if self._detect_csv(response): rows = response.count('\n') timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") csv_filename = f"bfs_data_{timestamp}.csv" csv_file_path = os.path.join(tempfile.gettempdir(), csv_filename) with open(csv_file_path, 'w', encoding='utf-8') as f: f.write(response) message = ( "### 📊 Data Ready\n" f"✅ CSV file generated with {rows} rows for datacube: `{datacube_id}`\n\n" "💾 **Download your data using the button below**" ) if show_debug and debug_info: message = f"### 🔧 Debug Information\n{debug_info}\n\n---\n\n{message}" return message, csv_file_path error_message = f"❌ Error retrieving data:\n\n{response}" return error_message, None DATASET_ENGINES: dict[str, DatasetEngine] = { "parliament": ParliamentEngine(), "statistics": BFSEngine(), } # Initialize usage tracker with 50 requests per day limit tracker = UsageTracker(daily_limit=50) # Available languages LANGUAGES = { "English": "en", "Deutsch": "de", "Français": "fr", "Italiano": "it" } # Constants imported from datasets/ modules above def chat_response(message: str, history: list, language: str, show_debug: bool, dataset: str = "parliament") -> tuple[str, str | None, dict, list]: """ Main chat response function routed through dataset-specific engines. """ try: engine = DATASET_ENGINES.get(dataset) if not engine: return f"❌ Unknown dataset selected: {dataset}", None, {}, [] language_code = LANGUAGES.get(language, "en") return engine.respond(message, language, language_code, show_debug) except Exception as e: return f"❌ An error occurred: {str(e)}", None, {}, [] # Load custom CSS with open("ui/styles.css", "r") as f: custom_css = f.read() # Build Gradio interface with gr.Blocks(css=custom_css, title="Swiss and European Government Data LLM") as demo: # State to track datacube search results datacube_state = gr.State({}) # Maps display text → datacube_id # State to track parliament cards parliament_cards_state = gr.State([]) # List of card dicts parliament_page_state = gr.State(1) # Current page number gr.Markdown( """

🇨🇭 Swiss & European Government Data LLM

Explore Swiss parliament records and BFS statistics, with more datasets on the way.

""" ) with gr.Row(): with gr.Column(scale=3): # Simple query input form with gr.Row(): msg = gr.Textbox( placeholder="Ask a question about Swiss parliamentary data or statistics...", show_label=False, scale=4, container=False ) submit = gr.Button("🔍 Search", variant="primary", scale=1) # Status/explanation text status_text = gr.Markdown("", visible=False) # CSV download file component download_file = gr.File( label="📥 Download Data", visible=False, interactive=False ) # Datacube selection (hidden by default, shown when search returns results) with gr.Row(visible=False) as datacube_selection_row: with gr.Column(scale=4): datacube_radio = gr.Radio( label="📋 Select Datacube for Download", choices=[], visible=True ) with gr.Column(scale=1): get_data_btn = gr.Button("📥 Get Data", variant="primary", size="lg") # Parliament cards display (hidden by default, shown when parliament results return) with gr.Column(visible=False) as parliament_cards_row: parliament_cards_html = gr.HTML("") with gr.Row(): prev_page_btn = gr.Button("◀ Previous", size="sm") page_info = gr.Markdown("Page 1") next_page_btn = gr.Button("Next ▶", size="sm") with gr.Column(scale=1): gr.Markdown("### ⚙️ Settings") dataset = gr.Radio( choices=[ "Swiss Parliament Data", "Swiss Statistics (BFS)" ], value="Swiss Parliament Data", label="Data Source", info="Choose which API to query" ) gr.HTML( """
ParlTalk • Coming Soon Eurostat • Coming Soon
""" ) language = gr.Radio( choices=list(LANGUAGES.keys()), value="English", label="Language", info="Select response language" ) # Example queries display gr.Markdown("### 💡 Example Queries") examples_display = gr.Markdown() def ensure_message_history(history): """Normalize chat history to the format expected by gr.Chatbot(type='messages').""" normalized: list[dict] = [] if not history: return normalized for entry in history: if isinstance(entry, dict): role = entry.get("role") content = entry.get("content", "") if role: normalized.append({"role": role, "content": "" if content is None else str(content)}) elif isinstance(entry, (tuple, list)) and len(entry) == 2: user, assistant = entry if user is not None: normalized.append({"role": "user", "content": str(user)}) if assistant is not None: normalized.append({"role": "assistant", "content": str(assistant)}) return normalized def create_examples_text(dataset_choice: str, language: str) -> str: """Create formatted example queries text.""" lang_code = LANGUAGES.get(language, "en") if dataset_choice == "Swiss Parliament Data": examples = OPENPARLDATA_EXAMPLES.get(lang_code, OPENPARLDATA_EXAMPLES["en"]) elif dataset_choice == "Swiss Statistics (BFS)": examples = BFS_EXAMPLES.get(lang_code, BFS_EXAMPLES["en"]) else: examples = OPENPARLDATA_EXAMPLES.get(lang_code, OPENPARLDATA_EXAMPLES["en"]) examples_md = "\n".join([f"- {example}" for example in examples]) return examples_md # Helper functions imported from ui.helpers def build_parliament_card(item: dict, lang_code: str) -> dict: """Normalize OpenParlData rows into unified card metadata.""" card = { "title": "Untitled", "url": "#", "date": "", "category": "Result", "summary": "" } if not isinstance(item, dict): return card # People directory if any(key in item for key in ("firstname", "lastname", "fullname")): card["category"] = "Person" fullname = item.get("fullname") or f"{item.get('firstname', '')} {item.get('lastname', '')}".strip() card["title"] = fullname or "Parliamentarian" website = prefer_language(item.get("website_parliament_url"), lang_code) card["url"] = website or item.get("url_api", "#") party_display = None if item.get("party"): party_display = prefer_language(item.get("party"), lang_code) if not party_display and isinstance(item["party"], dict): party_display = prefer_language(item["party"], "de") if not party_display and item.get("party_harmonized"): party_display = prefer_language(item.get("party_harmonized"), lang_code) body_key = item.get("body_key") summary_parts = [] if party_display: summary_parts.append(f"Party: {party_display}") if body_key: summary_parts.append(f"Body: {body_key}") if summary_parts: card["summary"] = " · ".join(summary_parts) updated = item.get("updated_at") or item.get("created_at") if updated: card["date"] = updated[:10] return card # Meetings if item.get("begin_date") and (item.get("name") or item.get("location") or item.get("type") == "meeting"): card["category"] = "Meeting" card["title"] = prefer_language(item.get("name"), lang_code) or item.get("number") or "Meeting" card["date"] = (item.get("begin_date") or "")[:10] card["url"] = prefer_language(item.get("url_external"), lang_code) or item.get("url_api", "#") details = [] if item.get("location"): details.append(item["location"]) if item.get("body_key"): details.append(f"Body: {item['body_key']}") if item.get("number"): details.append(f"Meeting #{item['number']}") if details: card["summary"] = " · ".join(details) return card # Votes if "results_yes" in item or "results_no" in item: card["category"] = "Vote" card["title"] = prefer_language(item.get("title"), lang_code) or "Vote" card["date"] = (item.get("date") or "")[:10] card["url"] = prefer_language(item.get("url_external"), lang_code) or item.get("url_api", "#") affair_title = prefer_language(item.get("affair_title"), lang_code) if affair_title: card["summary"] = affair_title else: totals = [] if item.get("results_yes") is not None: totals.append(f"Yes {item.get('results_yes')}") if item.get("results_no") is not None: totals.append(f"No {item.get('results_no')}") if item.get("results_abstention") is not None: totals.append(f"Abst {item.get('results_abstention')}") if totals: card["summary"] = " · ".join(totals) return card # Affairs / motions if "type_name" in item or "number" in item or "state_name" in item: card["category"] = "Affair" card["title"] = prefer_language(item.get("title"), lang_code) or item.get("number") or "Affair" card["url"] = prefer_language(item.get("url_external"), lang_code) or item.get("url_api", "#") begin = item.get("begin_date") or item.get("created_at") if begin: card["date"] = begin[:10] details = [] type_name = prefer_language(item.get("type_name"), lang_code) state_name = prefer_language(item.get("state_name"), lang_code) if type_name: details.append(type_name) if state_name: details.append(state_name) if item.get("number"): details.append(item["number"]) if details: card["summary"] = " · ".join(details) return card # Speeches / debates if any(key in item for key in ("transcript", "speech_text", "speech_text_content", "speaker_name", "person_name", "person")): card["category"] = "Speech" # Extract person from nested expand structure: person = {"data": [...], "meta": {...}} person_data = item.get("person", {}) if isinstance(person_data, dict) and "data" in person_data and person_data["data"]: person = person_data["data"][0] elif isinstance(person_data, dict): person = person_data else: person = {} speaker = ( prefer_language(person.get("fullname"), lang_code) or prefer_language(item.get("person_name"), lang_code) or person.get("fullname") or item.get("speaker_name") ) # Extract affair from nested expand structure affair_data = item.get("affair", {}) if isinstance(affair_data, dict) and "data" in affair_data and affair_data["data"]: affair = affair_data["data"][0] elif isinstance(affair_data, dict): affair = affair_data else: affair = {} affair_title = prefer_language(affair.get("title"), lang_code) card["title"] = ( prefer_language(item.get("title"), lang_code) or affair_title or (f"Rede von {speaker}" if speaker else "Rede") ) card["date"] = (item.get("date") or item.get("date_start") or "")[:10] # Extract meeting from nested expand structure meeting_data = item.get("meeting") if isinstance(meeting_data, dict) and "data" in meeting_data and meeting_data["data"]: meeting = meeting_data["data"][0] else: meeting = {} # Speeches use "url" field (plain string), not "url_external" (dict) external_url = pick_external_url( item.get("url"), # Speeches have direct url field item.get("url_external"), affair.get("url_external") if isinstance(affair, dict) else None, meeting.get("url_external") if isinstance(meeting, dict) else None, ) # Never use url_api for clickable links card["url"] = external_url or "#" text_content = item.get("speech_text_content") summary = None if isinstance(text_content, dict): summary = prefer_language(text_content, lang_code) or prefer_language(text_content, "de") elif isinstance(text_content, str): summary = text_content elif item.get("transcript"): summary = item.get("transcript") elif item.get("speech_text"): summary = item.get("speech_text") if summary: summary = strip_html(summary)[:200] summary_parts = [] if speaker: summary_parts.append(speaker) if summary: summary_parts.append(summary) if affair_title and affair_title != card["title"]: summary_parts.append(affair_title) if summary_parts: card["summary"] = " — ".join(summary_parts[:2]) return card # Fallback generic if item.get("title"): card["title"] = prefer_language(item.get("title"), lang_code) or item["title"] external = prefer_language(item.get("url_external"), lang_code) card["url"] = external or item.get("url_api", "#") if item.get("date"): card["date"] = item["date"][:10] return card def render_parliament_cards(cards: list[dict], page: int, items_per_page: int = 10) -> tuple[str, str, int, bool]: """Render parliament cards as HTML with pagination.""" if not cards: return "", "No results", 1, False total_pages = (len(cards) + items_per_page - 1) // items_per_page page = max(1, min(page, total_pages)) # Clamp page to valid range show_pagination = len(cards) > items_per_page start_idx = (page - 1) * items_per_page end_idx = min(start_idx + items_per_page, len(cards)) page_cards = cards[start_idx:end_idx] # Generate HTML for cards cards_html = '
' for card in page_cards: title = card.get("title", "Untitled") url = card.get("url", "#") date = card.get("date", "") category = card.get("category", "Result") summary = card.get("summary", "") # Truncate title if too long if len(title) > 120: title = title[:117] + "..." date_badge = f'{date}' if date else '' cards_html += f'''
{category}

{title}

{f'

{summary}

' if summary else ''}
{date_badge}
''' cards_html += '
' page_info = f"Page {page} of {total_pages} ({len(cards)} total results)" return cards_html, page_info, page, show_pagination # Handle message submission def respond(message, language, dataset_choice, current_datacube_state, current_parliament_cards, current_page, request: gr.Request): show_debug = False # Debug mode disabled in UI if not message.strip(): return "", gr.update(visible=False), None, gr.update(visible=False), current_datacube_state, gr.update(), gr.update(visible=False), current_parliament_cards, current_page, "", "", gr.update(visible=False), gr.update(), gr.update() # Check usage limit user_id = request.client.host if request and hasattr(request, 'client') else "unknown" if not tracker.check_limit(user_id): status_msg = ( "⚠️ **Daily request limit reached.** You have used all 50 requests for today. " "Please try again tomorrow.\n\nThis limit helps us keep the service free and available for everyone." ) return "", gr.update(value=status_msg, visible=True), None, gr.update(visible=False), current_datacube_state, gr.update(), gr.update(visible=False), current_parliament_cards, current_page, "", "", gr.update(visible=False), gr.update(), gr.update() # Map dataset choice to engine type dataset_map = { "Swiss Parliament Data": "parliament", "Swiss Statistics (BFS)": "statistics" } dataset_type = dataset_map.get(dataset_choice, "parliament") # Get bot response (returns tuple with optional CSV file and results data) # Create temporary chat history for API call temp_chat = [] bot_message, csv_file, datacube_map, results_data = chat_response( message, temp_chat, language, show_debug, dataset_type ) engine_instance = DATASET_ENGINES.get(dataset_type) last_request = getattr(engine_instance, "_last_request", None) if engine_instance else None # Parse JSON and extract cards for Parliament dataset parliament_cards: list[dict] = [] if dataset_type == "parliament" and results_data and isinstance(results_data, str): try: print(f"\n🔍 [respond] Parsing JSON results_data...") data = json.loads(results_data, strict=False) print(f"✅ [respond] JSON parsed successfully") if isinstance(data, dict) and data.get("status") == "error": error_msg = data.get("message") or data.get("detail") or "Die OpenParlData-API meldet einen Fehler." endpoint = data.get("endpoint") if endpoint: error_msg += f"\n\nEndpoint: `{endpoint}`" bot_message = f"❌ {error_msg}" return ( "", gr.update(value=bot_message, visible=True), None, gr.update(visible=False), current_datacube_state, gr.update(), gr.update(visible=False), current_parliament_cards, current_page, "", "", gr.update(visible=False), gr.update(), gr.update() ) if isinstance(data, dict) and isinstance(data.get("data"), list): items = data["data"] print(f"✅ [respond] Found data array with {len(items)} items") lang_code = LANGUAGES.get(language, "en") # Filter out error objects before building cards valid_items = [ item for item in items if isinstance(item, dict) and item.get("status") != "error" ] if len(valid_items) < len(items): print(f"⚠️ [respond] Filtered out {len(items) - len(valid_items)} error objects") for item in valid_items: parliament_cards.append(build_parliament_card(item, lang_code)) # Optional date filtering for meetings (client-side) if last_request and last_request.get("tool") == "openparldata_search_meetings": args = last_request.get("arguments", {}) date_from = args.get("date_from") date_to = args.get("date_to") if date_from or date_to: def within_window(date_value: str | None) -> bool: if not date_value: return False try: card_date = datetime.fromisoformat(date_value).date() except ValueError: try: card_date = datetime.strptime(date_value, "%Y-%m-%d").date() except ValueError: return False if date_from: start = datetime.strptime(date_from, "%Y-%m-%d").date() if card_date < start: return False if date_to: end = datetime.strptime(date_to, "%Y-%m-%d").date() if card_date > end: return False return True before = len(parliament_cards) parliament_cards = [card for card in parliament_cards if within_window(card.get("date"))] print(f"✅ [respond] Filtered meetings by date window ({before} → {len(parliament_cards)})") # Limit display to avoid overwhelming the UI MAX_RESULTS = 50 truncated = False if len(parliament_cards) > MAX_RESULTS: print(f"⚠️ [respond] Truncating card list from {len(parliament_cards)} to {MAX_RESULTS}") parliament_cards = parliament_cards[:MAX_RESULTS] truncated = True if parliament_cards: total = data.get("meta", {}).get("total_records") or len(parliament_cards) display_count = len(parliament_cards) bot_message = f"**Found {total} result(s).** Showing {display_count} items below:" if LANGUAGES.get(language, "en") == "en": bot_message += "\n\n*Note: English content is not available from the API. Results are displayed in German.*" if truncated: bot_message += f"\n\n_Only the first {MAX_RESULTS} items are displayed. Refine your search for more specific results._" elif last_request and last_request.get("tool") == "openparldata_search_meetings": bot_message = "No meetings found that match the requested filters. Try adjusting the date range or search keywords." else: print("❌ [respond] Data structure does not contain a 'data' array.") except json.JSONDecodeError as e: print(f"❌ [respond] JSON parsing failed: {e}") except Exception as e: print(f"❌ [respond] Unexpected error during card extraction: {e}") # Handle parliament cards (for Parliament dataset) if dataset_type == "parliament" and parliament_cards: cards_html, page_info, page_num, show_pagination = render_parliament_cards(parliament_cards, 1) return ( "", gr.update(value=bot_message, visible=True), None, gr.update(visible=False), current_datacube_state, gr.update(), gr.update(visible=False), parliament_cards, # parliament_cards_state page_num, # parliament_page_state cards_html, # parliament_cards_html page_info, # page_info gr.update(visible=True), # parliament_cards_row gr.update(visible=show_pagination), # prev_page_btn gr.update(visible=show_pagination) # next_page_btn ) # Handle datacube search results (for BFS dataset) if dataset_type == "statistics" and results_data: return ( "", gr.update(value=bot_message, visible=True), None, gr.update(visible=False), datacube_map, gr.update(choices=results_data, value=None), gr.update(visible=True), current_parliament_cards, current_page, "", "", gr.update(visible=False), gr.update(), gr.update() ) # Handle CSV download if csv_file: return ( "", gr.update(value=bot_message, visible=True), csv_file, gr.update(visible=True), current_datacube_state, gr.update(), gr.update(visible=False), current_parliament_cards, current_page, "", "", gr.update(visible=False), gr.update(), gr.update() ) return ( "", gr.update(value=bot_message, visible=True), None, gr.update(visible=False), current_datacube_state, gr.update(), gr.update(visible=False), current_parliament_cards, current_page, "", "", gr.update(visible=False), gr.update(), gr.update() ) # Handle parliament pagination def prev_page(cards, current_page): """Go to previous page of parliament results.""" new_page = max(1, current_page - 1) cards_html, page_info, page_num, show_pagination = render_parliament_cards(cards, new_page) return cards_html, page_info, page_num def next_page(cards, current_page): """Go to next page of parliament results.""" if not cards: return "", "No results", current_page total_pages = (len(cards) + 9) // 10 # 10 items per page new_page = min(total_pages, current_page + 1) cards_html, page_info, page_num, show_pagination = render_parliament_cards(cards, new_page) return cards_html, page_info, page_num # Handle "Get Data" button click for datacube selection def fetch_datacube_data(selected_choice, current_datacube_state, language, request: gr.Request): show_debug = False # Debug mode disabled in UI if not selected_choice or not current_datacube_state: error_msg = "⚠️ Please select a datacube first." return gr.update(value=error_msg, visible=True), None, gr.update(visible=False), gr.update(visible=False) # Check usage limit user_id = request.client.host if request and hasattr(request, 'client') else "unknown" if not tracker.check_limit(user_id): bot_message = ( "⚠️ Daily request limit reached. You have used all 50 requests for today. " "Please try again tomorrow.\n\nThis limit helps us keep the service free and available for everyone." ) return gr.update(value=bot_message, visible=True), None, gr.update(visible=False), gr.update(visible=False) # Get datacube ID from mapping datacube_id = current_datacube_state.get(selected_choice) if not datacube_id: error_msg = "❌ Error: Could not find datacube ID for selected option." return gr.update(value=error_msg, visible=True), None, gr.update(visible=False), gr.update(visible=False) # Get language code lang_code = LANGUAGES.get(language, "en") bfs_engine = DATASET_ENGINES.get("statistics") if not isinstance(bfs_engine, BFSEngine): error_msg = "❌ Error: BFS engine unavailable." return gr.update(value=error_msg, visible=True), None, gr.update(visible=False), gr.update(visible=False) bot_message, csv_file_path = bfs_engine.fetch_datacube_data(datacube_id, lang_code, show_debug) if csv_file_path: return gr.update(value=bot_message, visible=True), csv_file_path, gr.update(visible=True), gr.update(visible=False) return gr.update(value=bot_message, visible=True), None, gr.update(visible=False), gr.update(visible=False) msg.submit( respond, [msg, language, dataset, datacube_state, parliament_cards_state, parliament_page_state], [msg, status_text, download_file, download_file, datacube_state, datacube_radio, datacube_selection_row, parliament_cards_state, parliament_page_state, parliament_cards_html, page_info, parliament_cards_row, prev_page_btn, next_page_btn] ) submit.click( respond, [msg, language, dataset, datacube_state, parliament_cards_state, parliament_page_state], [msg, status_text, download_file, download_file, datacube_state, datacube_radio, datacube_selection_row, parliament_cards_state, parliament_page_state, parliament_cards_html, page_info, parliament_cards_row, prev_page_btn, next_page_btn] ) get_data_btn.click( fetch_datacube_data, [datacube_radio, datacube_state, language], [status_text, download_file, download_file, datacube_selection_row] ) prev_page_btn.click( prev_page, [parliament_cards_state, parliament_page_state], [parliament_cards_html, page_info, parliament_page_state] ) next_page_btn.click( next_page, [parliament_cards_state, parliament_page_state], [parliament_cards_html, page_info, parliament_page_state] ) # Update examples when dataset or language changes dataset.change( create_examples_text, [dataset, language], [examples_display] ) language.change( create_examples_text, [dataset, language], [examples_display] ) # Initialize examples on load demo.load( create_examples_text, [dataset, language], [examples_display] ) gr.Markdown( """ --- **Data Sources:** - **Swiss Parliament Data:** with thanks to Christian, Florin and the many contributors for creating OpenParlData.ch, the model queries their API to retrieve parliamentary data - **Swiss Statistics (BFS):** Federal Statistical Office data via PxWeb API **Rate Limit:** 50 requests per day per user (shared across both datasets) to keep the service affordable and accessible. Powered by [Llama-3.1-8B-Instruct](https://huggingface.co/meta-llama/Llama-3.1-8B-Instruct) via HF Inference Providers and [Model Context Protocol (MCP)](https://modelcontextprotocol.io/) """ ) # Launch the app if __name__ == "__main__": demo.launch()