File size: 6,464 Bytes
24b4e8a
9a21c26
194d298
599628b
6d033fe
8f10289
9a21c26
8f10289
f3c82c1
 
194d298
 
 
 
8f10289
 
f3c82c1
 
24b4e8a
3a136c0
9a21c26
 
c1ef727
24b4e8a
 
194d298
 
24b4e8a
8f10289
 
194d298
9a21c26
194d298
9a21c26
fef2e57
9a21c26
 
fef2e57
 
 
 
9a21c26
 
 
 
 
0713a48
fef2e57
6d033fe
 
 
 
8f10289
fbb3b64
 
 
 
 
fef2e57
fbb3b64
fef2e57
fbb3b64
194d298
 
 
8dd92e6
9a21c26
 
 
 
 
 
0713a48
 
3a136c0
0713a48
3a136c0
0713a48
 
 
 
24483a3
0713a48
 
 
6d033fe
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3a136c0
9a21c26
 
 
 
 
 
 
 
 
 
 
 
e16055b
3a136c0
 
e16055b
3a136c0
 
9a21c26
0713a48
3a136c0
 
9a21c26
0713a48
9a21c26
 
 
 
 
8dd92e6
8f10289
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
import os
import json
import logging
import anyio
import asyncio
from fastapi import FastAPI, Request
from fastapi.responses import Response
from sse_starlette import EventSourceResponse
from mcp.server.lowlevel import Server

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

app = FastAPI()

# Define the MCP server
server = Server(name="airtable-mcp")

# Store write streams for each session ID
write_streams = {}

# Configure environment variables (for logging purposes)
token = os.getenv("AIRTABLE_API_TOKEN")
base_id = os.getenv("AIRTABLE_BASE_ID")
logger.info(f"Using Airtable token: {token}")
logger.info(f"Using Airtable base ID: {base_id}")

@app.get("/airtable/mcp")
async def handle_sse(request: Request):
    try:
        session_id = None
        async def sse_writer():
            nonlocal session_id
            async with sse_stream_writer, read_stream_reader:
                endpoint_data = f"/airtable/mcp?session_id={{session_id}}"
                await sse_stream_writer.send({"event": "endpoint", "data": endpoint_data})
                async for message in read_stream_reader:
                    message_data = json.loads(message) if isinstance(message, str) else message
                    if message_data.get("event") == "endpoint":
                        endpoint_url = message_data.get("data", "")
                        if "session_id=" in endpoint_url:
                            session_id = endpoint_url.split("session_id=")[1]
                            placeholder_id = f"placeholder_{id(write_stream)}"
                            if placeholder_id in write_streams:
                                write_streams[session_id] = write_streams.pop(placeholder_id)
                                logger.info(f"Updated placeholder {placeholder_id} to session_id {session_id}")
                    await sse_stream_writer.send(message_data)
                # Keep-alive loop to maintain the SSE connection
                while True:
                    await sse_stream_writer.send({"event": "ping", "data": "keep-alive"})
                    await asyncio.sleep(15)  # Send keep-alive every 15 seconds

        # Create separate send and receive streams for reading and writing
        read_stream_writer, read_stream_reader = anyio.create_memory_object_stream(0)
        write_stream_writer, write_stream_reader = anyio.create_memory_object_stream(0)
        placeholder_id = f"placeholder_{id(write_stream_writer)}"
        write_streams[placeholder_id] = write_stream_writer
        logger.info("Starting MCP server with streams")
        await server.run(read_stream_reader, write_stream_writer, server.create_initialization_options())
        logger.info("MCP server running")
        return EventSourceResponse(read_stream_writer, data_sender_callable=sse_writer)
    except Exception as e:
        logger.error(f"Error in handle_sse: {str(e)}")
        raise

@app.post("/airtable/mcp")
async def handle_post(request: Request):
    try:
        body = await request.body()
        message = json.loads(body.decode())
        session_id = request.query_params.get("session_id")
        logger.info(f"Received POST with session_id: {session_id}, message: {message}")

        # Try to find the write_stream
        write_stream = write_streams.get(session_id)
        if not write_stream:
            for sid, ws in list(write_streams.items()):
                if sid.startswith("placeholder_"):
                    write_streams[session_id] = ws
                    write_streams.pop(sid)
                    write_stream = write_streams[session_id]
                    logger.info(f"Associated placeholder {sid} with session_id {session_id}")
                    break

        if message.get("method") == "initialize" and write_stream:
            response = {
                "jsonrpc": "2.0",
                "id": message.get("id"),
                "result": {
                    "protocolVersion": "2025-03-26",
                    "capabilities": {
                        "tools": {"listChanged": True},
                        "prompts": {"listChanged": False},
                        "resources": {"subscribe": False, "listChanged": False},
                        "logging": {},
                        "experimental": {}
                    },
                    "serverInfo": {
                        "name": "airtable-mcp",
                        "version": "1.0.0"
                    },
                    "instructions": "Airtable MCP server for listing and creating records."
                }
            }
            response_data = json.dumps(response)
            await write_stream.send({"event": "message", "data": response_data})
            logger.info(f"Sent initialize response for session {session_id} via write_stream")
            return Response(status_code=202)

        if message.get("method") == "tools/list" and write_stream:
            response = {
                "jsonrpc": "2.0",
                "id": message.get("id"),
                "result": {
                    "tools": [
                        {"name": "list_airtable_records", "description": "Lists all records in the specified Airtable table", "inputSchema": {}},
                        {"name": "create_airtable_record", "description": "Creates a new record in the specified Airtable table", "inputSchema": {"record_data": {"type": "object"}}}
                    ],
                    "nextCursor": None
                }
            }
            response_data = json.dumps(response)
            try:
                await write_stream.send({"event": "message", "data": response_data})
                logger.info(f"Successfully sent tools/list response for session {session_id} via write_stream")
            except Exception as e:
                logger.error(f"Failed to send response for session {session_id} via write_stream: {str(e)}")
                return Response(status_code=202)
            return Response(status_code=202)

        if not write_stream:
            logger.error(f"No write_stream found for session_id: {session_id}")
            return Response(status_code=202)

        return Response(status_code=202)
    except Exception as e:
        logger.error(f"Error handling POST message: {str(e)}")
        return Response(status_code=202)

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=7860)