""" MCP Client - JSONRPC communication with MCP tool servers """ import asyncio import json from typing import List, Dict, Any, Optional import aiohttp class MCPClient: """Client for communicating with MCP tool servers via JSONRPC""" def __init__(self, base_port: int = 8100): self.base_port = base_port self.servers = { 'ocr': f'http://localhost:{base_port}', 'pdf': f'http://localhost:{base_port + 1}', 'form_filler': f'http://localhost:{base_port + 2}', 'calendar': f'http://localhost:{base_port + 3}', 'email': f'http://localhost:{base_port + 4}', 'file_manager': f'http://localhost:{base_port + 5}', 'rag': f'http://localhost:{base_port + 6}', } self.request_id = 0 # Tool registry (in production, this would be discovered from servers) self.tool_registry = { 'ocr_extract_text': { 'server': 'ocr', 'description': 'Extract text from images or scanned documents using OCR', 'params': {'file_path': 'string', 'language': 'string'} }, 'pdf_summarize': { 'server': 'pdf', 'description': 'Summarize PDF documents', 'params': {'file_path': 'string', 'max_length': 'int'} }, 'pdf_extract_metadata': { 'server': 'pdf', 'description': 'Extract metadata from PDF (dates, amounts, etc.)', 'params': {'file_path': 'string'} }, 'form_fill': { 'server': 'form_filler', 'description': 'Auto-fill form with extracted data', 'params': {'template_path': 'string', 'data': 'object'} }, 'calendar_create_event': { 'server': 'calendar', 'description': 'Create ICS calendar event', 'params': {'title': 'string', 'start': 'string', 'end': 'string', 'description': 'string'} }, 'email_draft': { 'server': 'email', 'description': 'Draft an email based on context', 'params': {'recipient': 'string', 'subject': 'string', 'context': 'string', 'tone': 'string'} }, 'file_organize': { 'server': 'file_manager', 'description': 'Organize files by category', 'params': {'directory': 'string', 'strategy': 'string'} }, 'file_rename': { 'server': 'file_manager', 'description': 'Rename file with semantic name', 'params': {'file_path': 'string', 'new_name': 'string'} }, 'rag_search': { 'server': 'rag', 'description': 'Search documents semantically', 'params': {'query': 'string', 'k': 'int'} }, } def _get_next_id(self) -> int: """Get next request ID""" self.request_id += 1 return self.request_id async def call_tool(self, tool_name: str, args: Dict[str, Any]) -> Any: """ Call an MCP tool Args: tool_name: Name of the tool to call args: Arguments for the tool Returns: Tool result """ if tool_name not in self.tool_registry: raise ValueError(f"Unknown tool: {tool_name}") tool_info = self.tool_registry[tool_name] server_name = tool_info['server'] server_url = self.servers.get(server_name) if not server_url: # Fallback to local execution if server not available return await self._execute_tool_locally(tool_name, args) # Create JSONRPC request request = { 'jsonrpc': '2.0', 'id': self._get_next_id(), 'method': 'tools/call', 'params': { 'name': tool_name, 'arguments': args } } try: async with aiohttp.ClientSession() as session: async with session.post( server_url + '/rpc', json=request, timeout=aiohttp.ClientTimeout(total=30) ) as response: if response.status != 200: raise Exception(f"Server returned {response.status}") result = await response.json() if 'error' in result: raise Exception(result['error'].get('message', 'Unknown error')) return result.get('result', {}) except (aiohttp.ClientError, asyncio.TimeoutError): # Server not available, execute locally return await self._execute_tool_locally(tool_name, args) async def _execute_tool_locally(self, tool_name: str, args: Dict[str, Any]) -> Any: """Execute tool locally when server is not available""" # Import tool implementations if tool_name == 'ocr_extract_text': from tools.ocr_server import extract_text_ocr return await extract_text_ocr(args.get('file_path'), args.get('language', 'en')) elif tool_name == 'pdf_summarize': from tools.pdf_server import summarize_pdf return await summarize_pdf(args.get('file_path'), args.get('max_length', 500)) elif tool_name == 'pdf_extract_metadata': from tools.pdf_server import extract_pdf_metadata return await extract_pdf_metadata(args.get('file_path')) elif tool_name == 'form_fill': from tools.form_filler_server import fill_form return await fill_form(args.get('template_path'), args.get('data')) elif tool_name == 'calendar_create_event': from tools.calendar_server import create_calendar_event return await create_calendar_event( args.get('title'), args.get('start'), args.get('end'), args.get('description', '') ) elif tool_name == 'email_draft': from tools.email_server import draft_email return await draft_email( args.get('recipient'), args.get('subject'), args.get('context'), args.get('tone', 'professional') ) elif tool_name == 'file_organize': from tools.file_manager_server import organize_files return await organize_files(args.get('directory'), args.get('strategy', 'by_type')) elif tool_name == 'file_rename': from tools.file_manager_server import rename_file return await rename_file(args.get('file_path'), args.get('new_name')) elif tool_name == 'rag_search': from tools.rag_server import search_documents return await search_documents(args.get('query'), args.get('k', 5)) else: raise ValueError(f"Cannot execute tool locally: {tool_name}") async def list_tools(self) -> List[Dict[str, Any]]: """List all available tools""" return [ { 'name': name, 'description': info['description'], 'params': info['params'] } for name, info in self.tool_registry.items() ] async def get_tool_schema(self, tool_name: str) -> Dict[str, Any]: """Get schema for a specific tool""" if tool_name not in self.tool_registry: raise ValueError(f"Unknown tool: {tool_name}") return self.tool_registry[tool_name]