File size: 7,954 Bytes
2cb3622
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
"""
MCP Client - JSONRPC communication with MCP tool servers
"""

import asyncio
import json
from typing import List, Dict, Any, Optional
import aiohttp


class MCPClient:
    """Client for communicating with MCP tool servers via JSONRPC"""
    
    def __init__(self, base_port: int = 8100):
        self.base_port = base_port
        self.servers = {
            'ocr': f'http://localhost:{base_port}',
            'pdf': f'http://localhost:{base_port + 1}',
            'form_filler': f'http://localhost:{base_port + 2}',
            'calendar': f'http://localhost:{base_port + 3}',
            'email': f'http://localhost:{base_port + 4}',
            'file_manager': f'http://localhost:{base_port + 5}',
            'rag': f'http://localhost:{base_port + 6}',
        }
        self.request_id = 0
        
        # Tool registry (in production, this would be discovered from servers)
        self.tool_registry = {
            'ocr_extract_text': {
                'server': 'ocr',
                'description': 'Extract text from images or scanned documents using OCR',
                'params': {'file_path': 'string', 'language': 'string'}
            },
            'pdf_summarize': {
                'server': 'pdf',
                'description': 'Summarize PDF documents',
                'params': {'file_path': 'string', 'max_length': 'int'}
            },
            'pdf_extract_metadata': {
                'server': 'pdf',
                'description': 'Extract metadata from PDF (dates, amounts, etc.)',
                'params': {'file_path': 'string'}
            },
            'form_fill': {
                'server': 'form_filler',
                'description': 'Auto-fill form with extracted data',
                'params': {'template_path': 'string', 'data': 'object'}
            },
            'calendar_create_event': {
                'server': 'calendar',
                'description': 'Create ICS calendar event',
                'params': {'title': 'string', 'start': 'string', 'end': 'string', 'description': 'string'}
            },
            'email_draft': {
                'server': 'email',
                'description': 'Draft an email based on context',
                'params': {'recipient': 'string', 'subject': 'string', 'context': 'string', 'tone': 'string'}
            },
            'file_organize': {
                'server': 'file_manager',
                'description': 'Organize files by category',
                'params': {'directory': 'string', 'strategy': 'string'}
            },
            'file_rename': {
                'server': 'file_manager',
                'description': 'Rename file with semantic name',
                'params': {'file_path': 'string', 'new_name': 'string'}
            },
            'rag_search': {
                'server': 'rag',
                'description': 'Search documents semantically',
                'params': {'query': 'string', 'k': 'int'}
            },
        }
    
    def _get_next_id(self) -> int:
        """Get next request ID"""
        self.request_id += 1
        return self.request_id
    
    async def call_tool(self, tool_name: str, args: Dict[str, Any]) -> Any:
        """
        Call an MCP tool
        
        Args:
            tool_name: Name of the tool to call
            args: Arguments for the tool
            
        Returns:
            Tool result
        """
        if tool_name not in self.tool_registry:
            raise ValueError(f"Unknown tool: {tool_name}")
        
        tool_info = self.tool_registry[tool_name]
        server_name = tool_info['server']
        server_url = self.servers.get(server_name)
        
        if not server_url:
            # Fallback to local execution if server not available
            return await self._execute_tool_locally(tool_name, args)
        
        # Create JSONRPC request
        request = {
            'jsonrpc': '2.0',
            'id': self._get_next_id(),
            'method': 'tools/call',
            'params': {
                'name': tool_name,
                'arguments': args
            }
        }
        
        try:
            async with aiohttp.ClientSession() as session:
                async with session.post(
                    server_url + '/rpc',
                    json=request,
                    timeout=aiohttp.ClientTimeout(total=30)
                ) as response:
                    if response.status != 200:
                        raise Exception(f"Server returned {response.status}")
                    
                    result = await response.json()
                    
                    if 'error' in result:
                        raise Exception(result['error'].get('message', 'Unknown error'))
                    
                    return result.get('result', {})
        
        except (aiohttp.ClientError, asyncio.TimeoutError):
            # Server not available, execute locally
            return await self._execute_tool_locally(tool_name, args)
    
    async def _execute_tool_locally(self, tool_name: str, args: Dict[str, Any]) -> Any:
        """Execute tool locally when server is not available"""
        
        # Import tool implementations
        if tool_name == 'ocr_extract_text':
            from tools.ocr_server import extract_text_ocr
            return await extract_text_ocr(args.get('file_path'), args.get('language', 'en'))
        
        elif tool_name == 'pdf_summarize':
            from tools.pdf_server import summarize_pdf
            return await summarize_pdf(args.get('file_path'), args.get('max_length', 500))
        
        elif tool_name == 'pdf_extract_metadata':
            from tools.pdf_server import extract_pdf_metadata
            return await extract_pdf_metadata(args.get('file_path'))
        
        elif tool_name == 'form_fill':
            from tools.form_filler_server import fill_form
            return await fill_form(args.get('template_path'), args.get('data'))
        
        elif tool_name == 'calendar_create_event':
            from tools.calendar_server import create_calendar_event
            return await create_calendar_event(
                args.get('title'),
                args.get('start'),
                args.get('end'),
                args.get('description', '')
            )
        
        elif tool_name == 'email_draft':
            from tools.email_server import draft_email
            return await draft_email(
                args.get('recipient'),
                args.get('subject'),
                args.get('context'),
                args.get('tone', 'professional')
            )
        
        elif tool_name == 'file_organize':
            from tools.file_manager_server import organize_files
            return await organize_files(args.get('directory'), args.get('strategy', 'by_type'))
        
        elif tool_name == 'file_rename':
            from tools.file_manager_server import rename_file
            return await rename_file(args.get('file_path'), args.get('new_name'))
        
        elif tool_name == 'rag_search':
            from tools.rag_server import search_documents
            return await search_documents(args.get('query'), args.get('k', 5))
        
        else:
            raise ValueError(f"Cannot execute tool locally: {tool_name}")
    
    async def list_tools(self) -> List[Dict[str, Any]]:
        """List all available tools"""
        return [
            {
                'name': name,
                'description': info['description'],
                'params': info['params']
            }
            for name, info in self.tool_registry.items()
        ]
    
    async def get_tool_schema(self, tool_name: str) -> Dict[str, Any]:
        """Get schema for a specific tool"""
        if tool_name not in self.tool_registry:
            raise ValueError(f"Unknown tool: {tool_name}")
        return self.tool_registry[tool_name]