airtable-node-mcp / server.py
aeonshift's picture
Update server.py
fbb3b64 verified
import os
import json
import logging
import anyio
import asyncio
from fastapi import FastAPI, Request
from fastapi.responses import Response
from sse_starlette import EventSourceResponse
from mcp.server.lowlevel import Server
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI()
# Define the MCP server
server = Server(name="airtable-mcp")
# Store write streams for each session ID
write_streams = {}
# Configure environment variables (for logging purposes)
token = os.getenv("AIRTABLE_API_TOKEN")
base_id = os.getenv("AIRTABLE_BASE_ID")
logger.info(f"Using Airtable token: {token}")
logger.info(f"Using Airtable base ID: {base_id}")
@app.get("/airtable/mcp")
async def handle_sse(request: Request):
try:
session_id = None
async def sse_writer():
nonlocal session_id
async with sse_stream_writer, read_stream_reader:
endpoint_data = f"/airtable/mcp?session_id={{session_id}}"
await sse_stream_writer.send({"event": "endpoint", "data": endpoint_data})
async for message in read_stream_reader:
message_data = json.loads(message) if isinstance(message, str) else message
if message_data.get("event") == "endpoint":
endpoint_url = message_data.get("data", "")
if "session_id=" in endpoint_url:
session_id = endpoint_url.split("session_id=")[1]
placeholder_id = f"placeholder_{id(write_stream)}"
if placeholder_id in write_streams:
write_streams[session_id] = write_streams.pop(placeholder_id)
logger.info(f"Updated placeholder {placeholder_id} to session_id {session_id}")
await sse_stream_writer.send(message_data)
# Keep-alive loop to maintain the SSE connection
while True:
await sse_stream_writer.send({"event": "ping", "data": "keep-alive"})
await asyncio.sleep(15) # Send keep-alive every 15 seconds
# Create separate send and receive streams for reading and writing
read_stream_writer, read_stream_reader = anyio.create_memory_object_stream(0)
write_stream_writer, write_stream_reader = anyio.create_memory_object_stream(0)
placeholder_id = f"placeholder_{id(write_stream_writer)}"
write_streams[placeholder_id] = write_stream_writer
logger.info("Starting MCP server with streams")
await server.run(read_stream_reader, write_stream_writer, server.create_initialization_options())
logger.info("MCP server running")
return EventSourceResponse(read_stream_writer, data_sender_callable=sse_writer)
except Exception as e:
logger.error(f"Error in handle_sse: {str(e)}")
raise
@app.post("/airtable/mcp")
async def handle_post(request: Request):
try:
body = await request.body()
message = json.loads(body.decode())
session_id = request.query_params.get("session_id")
logger.info(f"Received POST with session_id: {session_id}, message: {message}")
# Try to find the write_stream
write_stream = write_streams.get(session_id)
if not write_stream:
for sid, ws in list(write_streams.items()):
if sid.startswith("placeholder_"):
write_streams[session_id] = ws
write_streams.pop(sid)
write_stream = write_streams[session_id]
logger.info(f"Associated placeholder {sid} with session_id {session_id}")
break
if message.get("method") == "initialize" and write_stream:
response = {
"jsonrpc": "2.0",
"id": message.get("id"),
"result": {
"protocolVersion": "2025-03-26",
"capabilities": {
"tools": {"listChanged": True},
"prompts": {"listChanged": False},
"resources": {"subscribe": False, "listChanged": False},
"logging": {},
"experimental": {}
},
"serverInfo": {
"name": "airtable-mcp",
"version": "1.0.0"
},
"instructions": "Airtable MCP server for listing and creating records."
}
}
response_data = json.dumps(response)
await write_stream.send({"event": "message", "data": response_data})
logger.info(f"Sent initialize response for session {session_id} via write_stream")
return Response(status_code=202)
if message.get("method") == "tools/list" and write_stream:
response = {
"jsonrpc": "2.0",
"id": message.get("id"),
"result": {
"tools": [
{"name": "list_airtable_records", "description": "Lists all records in the specified Airtable table", "inputSchema": {}},
{"name": "create_airtable_record", "description": "Creates a new record in the specified Airtable table", "inputSchema": {"record_data": {"type": "object"}}}
],
"nextCursor": None
}
}
response_data = json.dumps(response)
try:
await write_stream.send({"event": "message", "data": response_data})
logger.info(f"Successfully sent tools/list response for session {session_id} via write_stream")
except Exception as e:
logger.error(f"Failed to send response for session {session_id} via write_stream: {str(e)}")
return Response(status_code=202)
return Response(status_code=202)
if not write_stream:
logger.error(f"No write_stream found for session_id: {session_id}")
return Response(status_code=202)
return Response(status_code=202)
except Exception as e:
logger.error(f"Error handling POST message: {str(e)}")
return Response(status_code=202)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)