Spaces:
Sleeping
Sleeping
| """ | |
| MCP Integration for OpenParlData | |
| Provides a wrapper for connecting to the OpenParlData MCP server | |
| and executing tools from the Gradio app. | |
| """ | |
| import os | |
| import sys | |
| import json | |
| import asyncio | |
| from typing import Optional, Dict, Any, List | |
| from pathlib import Path | |
| # Add mcp directory to path | |
| mcp_dir = Path(__file__).parent / "mcp" | |
| sys.path.insert(0, str(mcp_dir)) | |
| from mcp.client.session import ClientSession | |
| from mcp.client.stdio import stdio_client, StdioServerParameters | |
| class OpenParlDataClient: | |
| """Client for interacting with OpenParlData MCP server.""" | |
| def __init__(self): | |
| self.session: Optional[ClientSession] = None | |
| self.available_tools: List[Dict[str, Any]] = [] | |
| async def connect(self): | |
| """Connect to the MCP server.""" | |
| # Get the path to the MCP server script | |
| server_script = Path(__file__).parent / "mcp" / "openparldata_mcp.py" | |
| if not server_script.exists(): | |
| raise FileNotFoundError(f"MCP server script not found at {server_script}") | |
| # Server parameters for stdio connection | |
| server_params = StdioServerParameters( | |
| command=sys.executable, # Python interpreter | |
| args=[str(server_script)], | |
| env=None | |
| ) | |
| # Create stdio client context | |
| self.stdio_context = stdio_client(server_params) | |
| read, write = await self.stdio_context.__aenter__() | |
| # Create session | |
| self.session = ClientSession(read, write) | |
| await self.session.__aenter__() | |
| # Initialize and get available tools | |
| await self.session.initialize() | |
| # List available tools | |
| tools_result = await self.session.list_tools() | |
| self.available_tools = [ | |
| { | |
| "name": tool.name, | |
| "description": tool.description, | |
| "input_schema": tool.inputSchema | |
| } | |
| for tool in tools_result.tools | |
| ] | |
| return self.available_tools | |
| async def disconnect(self): | |
| """Disconnect from the MCP server.""" | |
| if self.session: | |
| await self.session.__aexit__(None, None, None) | |
| if hasattr(self, 'stdio_context'): | |
| await self.stdio_context.__aexit__(None, None, None) | |
| async def call_tool(self, tool_name: str, arguments: Dict[str, Any]) -> str: | |
| """ | |
| Call an MCP tool with given arguments. | |
| Args: | |
| tool_name: Name of the tool to call | |
| arguments: Dictionary of arguments for the tool | |
| Returns: | |
| Tool response as string | |
| """ | |
| if not self.session: | |
| raise RuntimeError("Not connected to MCP server. Call connect() first.") | |
| # Wrap arguments in 'params' key as expected by MCP server | |
| tool_arguments = {"params": arguments} | |
| # Call the tool | |
| result = await self.session.call_tool(tool_name, arguments=tool_arguments) | |
| # Extract text content from result | |
| if result.content: | |
| # MCP returns list of content blocks | |
| text_parts = [] | |
| for content in result.content: | |
| if hasattr(content, 'text'): | |
| text_parts.append(content.text) | |
| elif isinstance(content, dict) and 'text' in content: | |
| text_parts.append(content['text']) | |
| return "\n".join(text_parts) | |
| return "No response from tool" | |
| def get_tool_info(self) -> List[Dict[str, Any]]: | |
| """Get information about available tools.""" | |
| return self.available_tools | |
| # Convenience functions for common operations | |
| async def search_parliamentarians( | |
| query: Optional[str] = None, | |
| canton: Optional[str] = None, | |
| party: Optional[str] = None, | |
| language: str = "en", | |
| limit: int = 20, | |
| show_debug: bool = False | |
| ) -> tuple[str, Optional[str]]: | |
| """ | |
| Search for parliamentarians. | |
| Returns: | |
| Tuple of (response_text, debug_info) | |
| """ | |
| client = OpenParlDataClient() | |
| try: | |
| await client.connect() | |
| arguments = { | |
| "language": language, | |
| "limit": limit, | |
| "response_format": "markdown" | |
| } | |
| if query: | |
| arguments["query"] = query | |
| if canton: | |
| arguments["canton"] = canton | |
| if party: | |
| arguments["party"] = party | |
| debug_info = None | |
| if show_debug: | |
| debug_info = f"**Tool:** openparldata_search_parliamentarians\n**Arguments:** ```json\n{json.dumps(arguments, indent=2)}\n```" | |
| response = await client.call_tool("openparldata_search_parliamentarians", arguments) | |
| return response, debug_info | |
| finally: | |
| await client.disconnect() | |
| async def search_votes( | |
| query: Optional[str] = None, | |
| date_from: Optional[str] = None, | |
| date_to: Optional[str] = None, | |
| language: str = "en", | |
| limit: int = 20, | |
| show_debug: bool = False | |
| ) -> tuple[str, Optional[str]]: | |
| """ | |
| Search for parliamentary votes. | |
| Returns: | |
| Tuple of (response_text, debug_info) | |
| """ | |
| client = OpenParlDataClient() | |
| try: | |
| await client.connect() | |
| arguments = { | |
| "language": language, | |
| "limit": limit, | |
| "response_format": "markdown" | |
| } | |
| if query: | |
| arguments["query"] = query | |
| if date_from: | |
| arguments["date_from"] = date_from | |
| if date_to: | |
| arguments["date_to"] = date_to | |
| debug_info = None | |
| if show_debug: | |
| debug_info = f"**Tool:** openparldata_search_votes\n**Arguments:** ```json\n{json.dumps(arguments, indent=2)}\n```" | |
| response = await client.call_tool("openparldata_search_votes", arguments) | |
| return response, debug_info | |
| finally: | |
| await client.disconnect() | |
| async def search_motions( | |
| query: Optional[str] = None, | |
| status: Optional[str] = None, | |
| language: str = "en", | |
| limit: int = 20, | |
| show_debug: bool = False | |
| ) -> tuple[str, Optional[str]]: | |
| """ | |
| Search for motions and proposals. | |
| Returns: | |
| Tuple of (response_text, debug_info) | |
| """ | |
| client = OpenParlDataClient() | |
| try: | |
| await client.connect() | |
| arguments = { | |
| "language": language, | |
| "limit": limit, | |
| "response_format": "markdown" | |
| } | |
| if query: | |
| arguments["query"] = query | |
| if status: | |
| arguments["status"] = status | |
| debug_info = None | |
| if show_debug: | |
| debug_info = f"**Tool:** openparldata_search_motions\n**Arguments:** ```json\n{json.dumps(arguments, indent=2)}\n```" | |
| response = await client.call_tool("openparldata_search_motions", arguments) | |
| return response, debug_info | |
| finally: | |
| await client.disconnect() | |
| async def execute_mcp_query( | |
| user_query: str, | |
| tool_name: str, | |
| arguments: Dict[str, Any], | |
| show_debug: bool = False | |
| ) -> tuple[str, Optional[str]]: | |
| """ | |
| Execute any MCP tool query. | |
| Args: | |
| user_query: The original user question (for context) | |
| tool_name: Name of the MCP tool to call | |
| arguments: Arguments for the tool | |
| show_debug: Whether to return debug information | |
| Returns: | |
| Tuple of (response_text, debug_info) | |
| """ | |
| client = OpenParlDataClient() | |
| try: | |
| await client.connect() | |
| debug_info = None | |
| if show_debug: | |
| debug_info = f"**User Query:** {user_query}\n\n**Tool:** {tool_name}\n**Arguments:** ```json\n{json.dumps(arguments, indent=2)}\n```" | |
| response = await client.call_tool(tool_name, arguments) | |
| return response, debug_info | |
| finally: | |
| await client.disconnect() | |