LifeAdmin-AI / agent /mcp_client.py
Maheen001's picture
Create agent/mcp_client.py
2cb3622 verified
raw
history blame
7.95 kB
"""
MCP Client - JSONRPC communication with MCP tool servers
"""
import asyncio
import json
from typing import List, Dict, Any, Optional
import aiohttp
class MCPClient:
"""Client for communicating with MCP tool servers via JSONRPC"""
def __init__(self, base_port: int = 8100):
self.base_port = base_port
self.servers = {
'ocr': f'http://localhost:{base_port}',
'pdf': f'http://localhost:{base_port + 1}',
'form_filler': f'http://localhost:{base_port + 2}',
'calendar': f'http://localhost:{base_port + 3}',
'email': f'http://localhost:{base_port + 4}',
'file_manager': f'http://localhost:{base_port + 5}',
'rag': f'http://localhost:{base_port + 6}',
}
self.request_id = 0
# Tool registry (in production, this would be discovered from servers)
self.tool_registry = {
'ocr_extract_text': {
'server': 'ocr',
'description': 'Extract text from images or scanned documents using OCR',
'params': {'file_path': 'string', 'language': 'string'}
},
'pdf_summarize': {
'server': 'pdf',
'description': 'Summarize PDF documents',
'params': {'file_path': 'string', 'max_length': 'int'}
},
'pdf_extract_metadata': {
'server': 'pdf',
'description': 'Extract metadata from PDF (dates, amounts, etc.)',
'params': {'file_path': 'string'}
},
'form_fill': {
'server': 'form_filler',
'description': 'Auto-fill form with extracted data',
'params': {'template_path': 'string', 'data': 'object'}
},
'calendar_create_event': {
'server': 'calendar',
'description': 'Create ICS calendar event',
'params': {'title': 'string', 'start': 'string', 'end': 'string', 'description': 'string'}
},
'email_draft': {
'server': 'email',
'description': 'Draft an email based on context',
'params': {'recipient': 'string', 'subject': 'string', 'context': 'string', 'tone': 'string'}
},
'file_organize': {
'server': 'file_manager',
'description': 'Organize files by category',
'params': {'directory': 'string', 'strategy': 'string'}
},
'file_rename': {
'server': 'file_manager',
'description': 'Rename file with semantic name',
'params': {'file_path': 'string', 'new_name': 'string'}
},
'rag_search': {
'server': 'rag',
'description': 'Search documents semantically',
'params': {'query': 'string', 'k': 'int'}
},
}
def _get_next_id(self) -> int:
"""Get next request ID"""
self.request_id += 1
return self.request_id
async def call_tool(self, tool_name: str, args: Dict[str, Any]) -> Any:
"""
Call an MCP tool
Args:
tool_name: Name of the tool to call
args: Arguments for the tool
Returns:
Tool result
"""
if tool_name not in self.tool_registry:
raise ValueError(f"Unknown tool: {tool_name}")
tool_info = self.tool_registry[tool_name]
server_name = tool_info['server']
server_url = self.servers.get(server_name)
if not server_url:
# Fallback to local execution if server not available
return await self._execute_tool_locally(tool_name, args)
# Create JSONRPC request
request = {
'jsonrpc': '2.0',
'id': self._get_next_id(),
'method': 'tools/call',
'params': {
'name': tool_name,
'arguments': args
}
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(
server_url + '/rpc',
json=request,
timeout=aiohttp.ClientTimeout(total=30)
) as response:
if response.status != 200:
raise Exception(f"Server returned {response.status}")
result = await response.json()
if 'error' in result:
raise Exception(result['error'].get('message', 'Unknown error'))
return result.get('result', {})
except (aiohttp.ClientError, asyncio.TimeoutError):
# Server not available, execute locally
return await self._execute_tool_locally(tool_name, args)
async def _execute_tool_locally(self, tool_name: str, args: Dict[str, Any]) -> Any:
"""Execute tool locally when server is not available"""
# Import tool implementations
if tool_name == 'ocr_extract_text':
from tools.ocr_server import extract_text_ocr
return await extract_text_ocr(args.get('file_path'), args.get('language', 'en'))
elif tool_name == 'pdf_summarize':
from tools.pdf_server import summarize_pdf
return await summarize_pdf(args.get('file_path'), args.get('max_length', 500))
elif tool_name == 'pdf_extract_metadata':
from tools.pdf_server import extract_pdf_metadata
return await extract_pdf_metadata(args.get('file_path'))
elif tool_name == 'form_fill':
from tools.form_filler_server import fill_form
return await fill_form(args.get('template_path'), args.get('data'))
elif tool_name == 'calendar_create_event':
from tools.calendar_server import create_calendar_event
return await create_calendar_event(
args.get('title'),
args.get('start'),
args.get('end'),
args.get('description', '')
)
elif tool_name == 'email_draft':
from tools.email_server import draft_email
return await draft_email(
args.get('recipient'),
args.get('subject'),
args.get('context'),
args.get('tone', 'professional')
)
elif tool_name == 'file_organize':
from tools.file_manager_server import organize_files
return await organize_files(args.get('directory'), args.get('strategy', 'by_type'))
elif tool_name == 'file_rename':
from tools.file_manager_server import rename_file
return await rename_file(args.get('file_path'), args.get('new_name'))
elif tool_name == 'rag_search':
from tools.rag_server import search_documents
return await search_documents(args.get('query'), args.get('k', 5))
else:
raise ValueError(f"Cannot execute tool locally: {tool_name}")
async def list_tools(self) -> List[Dict[str, Any]]:
"""List all available tools"""
return [
{
'name': name,
'description': info['description'],
'params': info['params']
}
for name, info in self.tool_registry.items()
]
async def get_tool_schema(self, tool_name: str) -> Dict[str, Any]:
"""Get schema for a specific tool"""
if tool_name not in self.tool_registry:
raise ValueError(f"Unknown tool: {tool_name}")
return self.tool_registry[tool_name]