Spaces:
Sleeping
Sleeping
File size: 6,464 Bytes
24b4e8a 9a21c26 194d298 599628b 6d033fe 8f10289 9a21c26 8f10289 f3c82c1 194d298 8f10289 f3c82c1 24b4e8a 3a136c0 9a21c26 c1ef727 24b4e8a 194d298 24b4e8a 8f10289 194d298 9a21c26 194d298 9a21c26 fef2e57 9a21c26 fef2e57 9a21c26 0713a48 fef2e57 6d033fe 8f10289 fbb3b64 fef2e57 fbb3b64 fef2e57 fbb3b64 194d298 8dd92e6 9a21c26 0713a48 3a136c0 0713a48 3a136c0 0713a48 24483a3 0713a48 6d033fe 3a136c0 9a21c26 e16055b 3a136c0 e16055b 3a136c0 9a21c26 0713a48 3a136c0 9a21c26 0713a48 9a21c26 8dd92e6 8f10289 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
import os
import json
import logging
import anyio
import asyncio
from fastapi import FastAPI, Request
from fastapi.responses import Response
from sse_starlette import EventSourceResponse
from mcp.server.lowlevel import Server
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI()
# Define the MCP server
server = Server(name="airtable-mcp")
# Store write streams for each session ID
write_streams = {}
# Configure environment variables (for logging purposes)
token = os.getenv("AIRTABLE_API_TOKEN")
base_id = os.getenv("AIRTABLE_BASE_ID")
logger.info(f"Using Airtable token: {token}")
logger.info(f"Using Airtable base ID: {base_id}")
@app.get("/airtable/mcp")
async def handle_sse(request: Request):
try:
session_id = None
async def sse_writer():
nonlocal session_id
async with sse_stream_writer, read_stream_reader:
endpoint_data = f"/airtable/mcp?session_id={{session_id}}"
await sse_stream_writer.send({"event": "endpoint", "data": endpoint_data})
async for message in read_stream_reader:
message_data = json.loads(message) if isinstance(message, str) else message
if message_data.get("event") == "endpoint":
endpoint_url = message_data.get("data", "")
if "session_id=" in endpoint_url:
session_id = endpoint_url.split("session_id=")[1]
placeholder_id = f"placeholder_{id(write_stream)}"
if placeholder_id in write_streams:
write_streams[session_id] = write_streams.pop(placeholder_id)
logger.info(f"Updated placeholder {placeholder_id} to session_id {session_id}")
await sse_stream_writer.send(message_data)
# Keep-alive loop to maintain the SSE connection
while True:
await sse_stream_writer.send({"event": "ping", "data": "keep-alive"})
await asyncio.sleep(15) # Send keep-alive every 15 seconds
# Create separate send and receive streams for reading and writing
read_stream_writer, read_stream_reader = anyio.create_memory_object_stream(0)
write_stream_writer, write_stream_reader = anyio.create_memory_object_stream(0)
placeholder_id = f"placeholder_{id(write_stream_writer)}"
write_streams[placeholder_id] = write_stream_writer
logger.info("Starting MCP server with streams")
await server.run(read_stream_reader, write_stream_writer, server.create_initialization_options())
logger.info("MCP server running")
return EventSourceResponse(read_stream_writer, data_sender_callable=sse_writer)
except Exception as e:
logger.error(f"Error in handle_sse: {str(e)}")
raise
@app.post("/airtable/mcp")
async def handle_post(request: Request):
try:
body = await request.body()
message = json.loads(body.decode())
session_id = request.query_params.get("session_id")
logger.info(f"Received POST with session_id: {session_id}, message: {message}")
# Try to find the write_stream
write_stream = write_streams.get(session_id)
if not write_stream:
for sid, ws in list(write_streams.items()):
if sid.startswith("placeholder_"):
write_streams[session_id] = ws
write_streams.pop(sid)
write_stream = write_streams[session_id]
logger.info(f"Associated placeholder {sid} with session_id {session_id}")
break
if message.get("method") == "initialize" and write_stream:
response = {
"jsonrpc": "2.0",
"id": message.get("id"),
"result": {
"protocolVersion": "2025-03-26",
"capabilities": {
"tools": {"listChanged": True},
"prompts": {"listChanged": False},
"resources": {"subscribe": False, "listChanged": False},
"logging": {},
"experimental": {}
},
"serverInfo": {
"name": "airtable-mcp",
"version": "1.0.0"
},
"instructions": "Airtable MCP server for listing and creating records."
}
}
response_data = json.dumps(response)
await write_stream.send({"event": "message", "data": response_data})
logger.info(f"Sent initialize response for session {session_id} via write_stream")
return Response(status_code=202)
if message.get("method") == "tools/list" and write_stream:
response = {
"jsonrpc": "2.0",
"id": message.get("id"),
"result": {
"tools": [
{"name": "list_airtable_records", "description": "Lists all records in the specified Airtable table", "inputSchema": {}},
{"name": "create_airtable_record", "description": "Creates a new record in the specified Airtable table", "inputSchema": {"record_data": {"type": "object"}}}
],
"nextCursor": None
}
}
response_data = json.dumps(response)
try:
await write_stream.send({"event": "message", "data": response_data})
logger.info(f"Successfully sent tools/list response for session {session_id} via write_stream")
except Exception as e:
logger.error(f"Failed to send response for session {session_id} via write_stream: {str(e)}")
return Response(status_code=202)
return Response(status_code=202)
if not write_stream:
logger.error(f"No write_stream found for session_id: {session_id}")
return Response(status_code=202)
return Response(status_code=202)
except Exception as e:
logger.error(f"Error handling POST message: {str(e)}")
return Response(status_code=202)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860) |