Spaces:
Running
Running
| """ | |
| MCP Client - JSONRPC communication with MCP tool servers | |
| """ | |
| import asyncio | |
| import json | |
| from typing import List, Dict, Any, Optional | |
| import aiohttp | |
| class MCPClient: | |
| """Client for communicating with MCP tool servers via JSONRPC""" | |
| def __init__(self, base_port: int = 8100): | |
| self.base_port = base_port | |
| self.servers = { | |
| 'ocr': f'http://localhost:{base_port}', | |
| 'pdf': f'http://localhost:{base_port + 1}', | |
| 'form_filler': f'http://localhost:{base_port + 2}', | |
| 'calendar': f'http://localhost:{base_port + 3}', | |
| 'email': f'http://localhost:{base_port + 4}', | |
| 'file_manager': f'http://localhost:{base_port + 5}', | |
| 'rag': f'http://localhost:{base_port + 6}', | |
| } | |
| self.request_id = 0 | |
| # Tool registry (in production, this would be discovered from servers) | |
| self.tool_registry = { | |
| 'ocr_extract_text': { | |
| 'server': 'ocr', | |
| 'description': 'Extract text from images or scanned documents using OCR', | |
| 'params': {'file_path': 'string', 'language': 'string'} | |
| }, | |
| 'pdf_summarize': { | |
| 'server': 'pdf', | |
| 'description': 'Summarize PDF documents', | |
| 'params': {'file_path': 'string', 'max_length': 'int'} | |
| }, | |
| 'pdf_extract_metadata': { | |
| 'server': 'pdf', | |
| 'description': 'Extract metadata from PDF (dates, amounts, etc.)', | |
| 'params': {'file_path': 'string'} | |
| }, | |
| 'form_fill': { | |
| 'server': 'form_filler', | |
| 'description': 'Auto-fill form with extracted data', | |
| 'params': {'template_path': 'string', 'data': 'object'} | |
| }, | |
| 'calendar_create_event': { | |
| 'server': 'calendar', | |
| 'description': 'Create ICS calendar event', | |
| 'params': {'title': 'string', 'start': 'string', 'end': 'string', 'description': 'string'} | |
| }, | |
| 'email_draft': { | |
| 'server': 'email', | |
| 'description': 'Draft an email based on context', | |
| 'params': {'recipient': 'string', 'subject': 'string', 'context': 'string', 'tone': 'string'} | |
| }, | |
| 'file_organize': { | |
| 'server': 'file_manager', | |
| 'description': 'Organize files by category', | |
| 'params': {'directory': 'string', 'strategy': 'string'} | |
| }, | |
| 'file_rename': { | |
| 'server': 'file_manager', | |
| 'description': 'Rename file with semantic name', | |
| 'params': {'file_path': 'string', 'new_name': 'string'} | |
| }, | |
| 'rag_search': { | |
| 'server': 'rag', | |
| 'description': 'Search documents semantically', | |
| 'params': {'query': 'string', 'k': 'int'} | |
| }, | |
| } | |
| def _get_next_id(self) -> int: | |
| """Get next request ID""" | |
| self.request_id += 1 | |
| return self.request_id | |
| async def call_tool(self, tool_name: str, args: Dict[str, Any]) -> Any: | |
| """ | |
| Call an MCP tool | |
| Args: | |
| tool_name: Name of the tool to call | |
| args: Arguments for the tool | |
| Returns: | |
| Tool result | |
| """ | |
| if tool_name not in self.tool_registry: | |
| raise ValueError(f"Unknown tool: {tool_name}") | |
| tool_info = self.tool_registry[tool_name] | |
| server_name = tool_info['server'] | |
| server_url = self.servers.get(server_name) | |
| if not server_url: | |
| # Fallback to local execution if server not available | |
| return await self._execute_tool_locally(tool_name, args) | |
| # Create JSONRPC request | |
| request = { | |
| 'jsonrpc': '2.0', | |
| 'id': self._get_next_id(), | |
| 'method': 'tools/call', | |
| 'params': { | |
| 'name': tool_name, | |
| 'arguments': args | |
| } | |
| } | |
| try: | |
| async with aiohttp.ClientSession() as session: | |
| async with session.post( | |
| server_url + '/rpc', | |
| json=request, | |
| timeout=aiohttp.ClientTimeout(total=30) | |
| ) as response: | |
| if response.status != 200: | |
| raise Exception(f"Server returned {response.status}") | |
| result = await response.json() | |
| if 'error' in result: | |
| raise Exception(result['error'].get('message', 'Unknown error')) | |
| return result.get('result', {}) | |
| except (aiohttp.ClientError, asyncio.TimeoutError): | |
| # Server not available, execute locally | |
| return await self._execute_tool_locally(tool_name, args) | |
| async def _execute_tool_locally(self, tool_name: str, args: Dict[str, Any]) -> Any: | |
| """Execute tool locally when server is not available""" | |
| # Import tool implementations | |
| if tool_name == 'ocr_extract_text': | |
| from tools.ocr_server import extract_text_ocr | |
| return await extract_text_ocr(args.get('file_path'), args.get('language', 'en')) | |
| elif tool_name == 'pdf_summarize': | |
| from tools.pdf_server import summarize_pdf | |
| return await summarize_pdf(args.get('file_path'), args.get('max_length', 500)) | |
| elif tool_name == 'pdf_extract_metadata': | |
| from tools.pdf_server import extract_pdf_metadata | |
| return await extract_pdf_metadata(args.get('file_path')) | |
| elif tool_name == 'form_fill': | |
| from tools.form_filler_server import fill_form | |
| return await fill_form(args.get('template_path'), args.get('data')) | |
| elif tool_name == 'calendar_create_event': | |
| from tools.calendar_server import create_calendar_event | |
| return await create_calendar_event( | |
| args.get('title'), | |
| args.get('start'), | |
| args.get('end'), | |
| args.get('description', '') | |
| ) | |
| elif tool_name == 'email_draft': | |
| from tools.email_server import draft_email | |
| return await draft_email( | |
| args.get('recipient'), | |
| args.get('subject'), | |
| args.get('context'), | |
| args.get('tone', 'professional') | |
| ) | |
| elif tool_name == 'file_organize': | |
| from tools.file_manager_server import organize_files | |
| return await organize_files(args.get('directory'), args.get('strategy', 'by_type')) | |
| elif tool_name == 'file_rename': | |
| from tools.file_manager_server import rename_file | |
| return await rename_file(args.get('file_path'), args.get('new_name')) | |
| elif tool_name == 'rag_search': | |
| from tools.rag_server import search_documents | |
| return await search_documents(args.get('query'), args.get('k', 5)) | |
| else: | |
| raise ValueError(f"Cannot execute tool locally: {tool_name}") | |
| async def list_tools(self) -> List[Dict[str, Any]]: | |
| """List all available tools""" | |
| return [ | |
| { | |
| 'name': name, | |
| 'description': info['description'], | |
| 'params': info['params'] | |
| } | |
| for name, info in self.tool_registry.items() | |
| ] | |
| async def get_tool_schema(self, tool_name: str) -> Dict[str, Any]: | |
| """Get schema for a specific tool""" | |
| if tool_name not in self.tool_registry: | |
| raise ValueError(f"Unknown tool: {tool_name}") | |
| return self.tool_registry[tool_name] |