Spaces:
Running
Running
File size: 7,954 Bytes
2cb3622 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 |
"""
MCP Client - JSONRPC communication with MCP tool servers
"""
import asyncio
import json
from typing import List, Dict, Any, Optional
import aiohttp
class MCPClient:
"""Client for communicating with MCP tool servers via JSONRPC"""
def __init__(self, base_port: int = 8100):
self.base_port = base_port
self.servers = {
'ocr': f'http://localhost:{base_port}',
'pdf': f'http://localhost:{base_port + 1}',
'form_filler': f'http://localhost:{base_port + 2}',
'calendar': f'http://localhost:{base_port + 3}',
'email': f'http://localhost:{base_port + 4}',
'file_manager': f'http://localhost:{base_port + 5}',
'rag': f'http://localhost:{base_port + 6}',
}
self.request_id = 0
# Tool registry (in production, this would be discovered from servers)
self.tool_registry = {
'ocr_extract_text': {
'server': 'ocr',
'description': 'Extract text from images or scanned documents using OCR',
'params': {'file_path': 'string', 'language': 'string'}
},
'pdf_summarize': {
'server': 'pdf',
'description': 'Summarize PDF documents',
'params': {'file_path': 'string', 'max_length': 'int'}
},
'pdf_extract_metadata': {
'server': 'pdf',
'description': 'Extract metadata from PDF (dates, amounts, etc.)',
'params': {'file_path': 'string'}
},
'form_fill': {
'server': 'form_filler',
'description': 'Auto-fill form with extracted data',
'params': {'template_path': 'string', 'data': 'object'}
},
'calendar_create_event': {
'server': 'calendar',
'description': 'Create ICS calendar event',
'params': {'title': 'string', 'start': 'string', 'end': 'string', 'description': 'string'}
},
'email_draft': {
'server': 'email',
'description': 'Draft an email based on context',
'params': {'recipient': 'string', 'subject': 'string', 'context': 'string', 'tone': 'string'}
},
'file_organize': {
'server': 'file_manager',
'description': 'Organize files by category',
'params': {'directory': 'string', 'strategy': 'string'}
},
'file_rename': {
'server': 'file_manager',
'description': 'Rename file with semantic name',
'params': {'file_path': 'string', 'new_name': 'string'}
},
'rag_search': {
'server': 'rag',
'description': 'Search documents semantically',
'params': {'query': 'string', 'k': 'int'}
},
}
def _get_next_id(self) -> int:
"""Get next request ID"""
self.request_id += 1
return self.request_id
async def call_tool(self, tool_name: str, args: Dict[str, Any]) -> Any:
"""
Call an MCP tool
Args:
tool_name: Name of the tool to call
args: Arguments for the tool
Returns:
Tool result
"""
if tool_name not in self.tool_registry:
raise ValueError(f"Unknown tool: {tool_name}")
tool_info = self.tool_registry[tool_name]
server_name = tool_info['server']
server_url = self.servers.get(server_name)
if not server_url:
# Fallback to local execution if server not available
return await self._execute_tool_locally(tool_name, args)
# Create JSONRPC request
request = {
'jsonrpc': '2.0',
'id': self._get_next_id(),
'method': 'tools/call',
'params': {
'name': tool_name,
'arguments': args
}
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(
server_url + '/rpc',
json=request,
timeout=aiohttp.ClientTimeout(total=30)
) as response:
if response.status != 200:
raise Exception(f"Server returned {response.status}")
result = await response.json()
if 'error' in result:
raise Exception(result['error'].get('message', 'Unknown error'))
return result.get('result', {})
except (aiohttp.ClientError, asyncio.TimeoutError):
# Server not available, execute locally
return await self._execute_tool_locally(tool_name, args)
async def _execute_tool_locally(self, tool_name: str, args: Dict[str, Any]) -> Any:
"""Execute tool locally when server is not available"""
# Import tool implementations
if tool_name == 'ocr_extract_text':
from tools.ocr_server import extract_text_ocr
return await extract_text_ocr(args.get('file_path'), args.get('language', 'en'))
elif tool_name == 'pdf_summarize':
from tools.pdf_server import summarize_pdf
return await summarize_pdf(args.get('file_path'), args.get('max_length', 500))
elif tool_name == 'pdf_extract_metadata':
from tools.pdf_server import extract_pdf_metadata
return await extract_pdf_metadata(args.get('file_path'))
elif tool_name == 'form_fill':
from tools.form_filler_server import fill_form
return await fill_form(args.get('template_path'), args.get('data'))
elif tool_name == 'calendar_create_event':
from tools.calendar_server import create_calendar_event
return await create_calendar_event(
args.get('title'),
args.get('start'),
args.get('end'),
args.get('description', '')
)
elif tool_name == 'email_draft':
from tools.email_server import draft_email
return await draft_email(
args.get('recipient'),
args.get('subject'),
args.get('context'),
args.get('tone', 'professional')
)
elif tool_name == 'file_organize':
from tools.file_manager_server import organize_files
return await organize_files(args.get('directory'), args.get('strategy', 'by_type'))
elif tool_name == 'file_rename':
from tools.file_manager_server import rename_file
return await rename_file(args.get('file_path'), args.get('new_name'))
elif tool_name == 'rag_search':
from tools.rag_server import search_documents
return await search_documents(args.get('query'), args.get('k', 5))
else:
raise ValueError(f"Cannot execute tool locally: {tool_name}")
async def list_tools(self) -> List[Dict[str, Any]]:
"""List all available tools"""
return [
{
'name': name,
'description': info['description'],
'params': info['params']
}
for name, info in self.tool_registry.items()
]
async def get_tool_schema(self, tool_name: str) -> Dict[str, Any]:
"""Get schema for a specific tool"""
if tool_name not in self.tool_registry:
raise ValueError(f"Unknown tool: {tool_name}")
return self.tool_registry[tool_name] |