Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import logging | |
| import anyio | |
| import asyncio | |
| from fastapi import FastAPI, Request | |
| from fastapi.responses import Response | |
| from sse_starlette import EventSourceResponse | |
| from mcp.server.lowlevel import Server | |
| # Set up logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| app = FastAPI() | |
| # Define the MCP server | |
| server = Server(name="airtable-mcp") | |
| # Store write streams for each session ID | |
| write_streams = {} | |
| # Configure environment variables (for logging purposes) | |
| token = os.getenv("AIRTABLE_API_TOKEN") | |
| base_id = os.getenv("AIRTABLE_BASE_ID") | |
| logger.info(f"Using Airtable token: {token}") | |
| logger.info(f"Using Airtable base ID: {base_id}") | |
| async def handle_sse(request: Request): | |
| try: | |
| session_id = None | |
| async def sse_writer(): | |
| nonlocal session_id | |
| async with sse_stream_writer, read_stream_reader: | |
| endpoint_data = f"/airtable/mcp?session_id={{session_id}}" | |
| await sse_stream_writer.send({"event": "endpoint", "data": endpoint_data}) | |
| async for message in read_stream_reader: | |
| message_data = json.loads(message) if isinstance(message, str) else message | |
| if message_data.get("event") == "endpoint": | |
| endpoint_url = message_data.get("data", "") | |
| if "session_id=" in endpoint_url: | |
| session_id = endpoint_url.split("session_id=")[1] | |
| placeholder_id = f"placeholder_{id(write_stream)}" | |
| if placeholder_id in write_streams: | |
| write_streams[session_id] = write_streams.pop(placeholder_id) | |
| logger.info(f"Updated placeholder {placeholder_id} to session_id {session_id}") | |
| await sse_stream_writer.send(message_data) | |
| # Keep-alive loop to maintain the SSE connection | |
| while True: | |
| await sse_stream_writer.send({"event": "ping", "data": "keep-alive"}) | |
| await asyncio.sleep(15) # Send keep-alive every 15 seconds | |
| # Create separate send and receive streams for reading and writing | |
| read_stream_writer, read_stream_reader = anyio.create_memory_object_stream(0) | |
| write_stream_writer, write_stream_reader = anyio.create_memory_object_stream(0) | |
| placeholder_id = f"placeholder_{id(write_stream_writer)}" | |
| write_streams[placeholder_id] = write_stream_writer | |
| logger.info("Starting MCP server with streams") | |
| await server.run(read_stream_reader, write_stream_writer, server.create_initialization_options()) | |
| logger.info("MCP server running") | |
| return EventSourceResponse(read_stream_writer, data_sender_callable=sse_writer) | |
| except Exception as e: | |
| logger.error(f"Error in handle_sse: {str(e)}") | |
| raise | |
| async def handle_post(request: Request): | |
| try: | |
| body = await request.body() | |
| message = json.loads(body.decode()) | |
| session_id = request.query_params.get("session_id") | |
| logger.info(f"Received POST with session_id: {session_id}, message: {message}") | |
| # Try to find the write_stream | |
| write_stream = write_streams.get(session_id) | |
| if not write_stream: | |
| for sid, ws in list(write_streams.items()): | |
| if sid.startswith("placeholder_"): | |
| write_streams[session_id] = ws | |
| write_streams.pop(sid) | |
| write_stream = write_streams[session_id] | |
| logger.info(f"Associated placeholder {sid} with session_id {session_id}") | |
| break | |
| if message.get("method") == "initialize" and write_stream: | |
| response = { | |
| "jsonrpc": "2.0", | |
| "id": message.get("id"), | |
| "result": { | |
| "protocolVersion": "2025-03-26", | |
| "capabilities": { | |
| "tools": {"listChanged": True}, | |
| "prompts": {"listChanged": False}, | |
| "resources": {"subscribe": False, "listChanged": False}, | |
| "logging": {}, | |
| "experimental": {} | |
| }, | |
| "serverInfo": { | |
| "name": "airtable-mcp", | |
| "version": "1.0.0" | |
| }, | |
| "instructions": "Airtable MCP server for listing and creating records." | |
| } | |
| } | |
| response_data = json.dumps(response) | |
| await write_stream.send({"event": "message", "data": response_data}) | |
| logger.info(f"Sent initialize response for session {session_id} via write_stream") | |
| return Response(status_code=202) | |
| if message.get("method") == "tools/list" and write_stream: | |
| response = { | |
| "jsonrpc": "2.0", | |
| "id": message.get("id"), | |
| "result": { | |
| "tools": [ | |
| {"name": "list_airtable_records", "description": "Lists all records in the specified Airtable table", "inputSchema": {}}, | |
| {"name": "create_airtable_record", "description": "Creates a new record in the specified Airtable table", "inputSchema": {"record_data": {"type": "object"}}} | |
| ], | |
| "nextCursor": None | |
| } | |
| } | |
| response_data = json.dumps(response) | |
| try: | |
| await write_stream.send({"event": "message", "data": response_data}) | |
| logger.info(f"Successfully sent tools/list response for session {session_id} via write_stream") | |
| except Exception as e: | |
| logger.error(f"Failed to send response for session {session_id} via write_stream: {str(e)}") | |
| return Response(status_code=202) | |
| return Response(status_code=202) | |
| if not write_stream: | |
| logger.error(f"No write_stream found for session_id: {session_id}") | |
| return Response(status_code=202) | |
| return Response(status_code=202) | |
| except Exception as e: | |
| logger.error(f"Error handling POST message: {str(e)}") | |
| return Response(status_code=202) | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) |