File size: 8,778 Bytes
f0743f4 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 | import { logger } from '@librechat/data-schemas';
import { ErrorCode, McpError } from '@modelcontextprotocol/sdk/types.js';
import { MCPConnectionFactory } from '~/mcp/MCPConnectionFactory';
import { mcpServersRegistry as serversRegistry } from '~/mcp/registry/MCPServersRegistry';
import { MCPConnection } from './connection';
import type * as t from './types';
import { ConnectionsRepository } from '~/mcp/ConnectionsRepository';
import { mcpConfig } from './mcpConfig';
/**
* Abstract base class for managing user-specific MCP connections with lifecycle management.
* Only meant to be extended by MCPManager.
* Much of the logic was move here from the old MCPManager to make it more manageable.
* User connections will soon be ephemeral and not cached anymore:
* https://github.com/danny-avila/LibreChat/discussions/8790
*/
export abstract class UserConnectionManager {
// Connections shared by all users.
public appConnections: ConnectionsRepository | null = null;
// Connections per userId -> serverName -> connection
protected userConnections: Map<string, Map<string, MCPConnection>> = new Map();
/** Last activity timestamp for users (not per server) */
protected userLastActivity: Map<string, number> = new Map();
/** Updates the last activity timestamp for a user */
protected updateUserLastActivity(userId: string): void {
const now = Date.now();
this.userLastActivity.set(userId, now);
logger.debug(
`[MCP][User: ${userId}] Updated last activity timestamp: ${new Date(now).toISOString()}`,
);
}
/** Gets or creates a connection for a specific user */
public async getUserConnection({
serverName,
forceNew,
user,
flowManager,
customUserVars,
requestBody,
tokenMethods,
oauthStart,
oauthEnd,
signal,
returnOnOAuth = false,
connectionTimeout,
}: {
serverName: string;
forceNew?: boolean;
} & Omit<t.OAuthConnectionOptions, 'useOAuth'>): Promise<MCPConnection> {
const userId = user.id;
if (!userId) {
throw new McpError(ErrorCode.InvalidRequest, `[MCP] User object missing id property`);
}
if (this.appConnections!.has(serverName)) {
throw new McpError(
ErrorCode.InvalidRequest,
`[MCP][User: ${userId}] Trying to create user-specific connection for app-level server "${serverName}"`,
);
}
const userServerMap = this.userConnections.get(userId);
let connection = forceNew ? undefined : userServerMap?.get(serverName);
const now = Date.now();
// Check if user is idle
const lastActivity = this.userLastActivity.get(userId);
if (lastActivity && now - lastActivity > mcpConfig.USER_CONNECTION_IDLE_TIMEOUT) {
logger.info(`[MCP][User: ${userId}] User idle for too long. Disconnecting all connections.`);
// Disconnect all user connections
try {
await this.disconnectUserConnections(userId);
} catch (err) {
logger.error(`[MCP][User: ${userId}] Error disconnecting idle connections:`, err);
}
connection = undefined; // Force creation of a new connection
} else if (connection) {
if (await connection.isConnected()) {
logger.debug(`[MCP][User: ${userId}][${serverName}] Reusing active connection`);
this.updateUserLastActivity(userId);
return connection;
} else {
// Connection exists but is not connected, attempt to remove potentially stale entry
logger.warn(
`[MCP][User: ${userId}][${serverName}] Found existing but disconnected connection object. Cleaning up.`,
);
this.removeUserConnection(userId, serverName); // Clean up maps
connection = undefined;
}
}
// If no valid connection exists, create a new one
if (!connection) {
logger.info(`[MCP][User: ${userId}][${serverName}] Establishing new connection`);
}
const config = await serversRegistry.getServerConfig(serverName, userId);
if (!config) {
throw new McpError(
ErrorCode.InvalidRequest,
`[MCP][User: ${userId}] Configuration for server "${serverName}" not found.`,
);
}
try {
connection = await MCPConnectionFactory.create(
{
serverName: serverName,
serverConfig: config,
},
{
useOAuth: true,
user: user,
customUserVars: customUserVars,
flowManager: flowManager,
tokenMethods: tokenMethods,
signal: signal,
oauthStart: oauthStart,
oauthEnd: oauthEnd,
returnOnOAuth: returnOnOAuth,
requestBody: requestBody,
connectionTimeout: connectionTimeout,
},
);
if (!(await connection?.isConnected())) {
throw new Error('Failed to establish connection after initialization attempt.');
}
if (!this.userConnections.has(userId)) {
this.userConnections.set(userId, new Map());
}
this.userConnections.get(userId)?.set(serverName, connection);
logger.info(`[MCP][User: ${userId}][${serverName}] Connection successfully established`);
// Update timestamp on creation
this.updateUserLastActivity(userId);
return connection;
} catch (error) {
logger.error(`[MCP][User: ${userId}][${serverName}] Failed to establish connection`, error);
// Ensure partial connection state is cleaned up if initialization fails
await connection?.disconnect().catch((disconnectError) => {
logger.error(
`[MCP][User: ${userId}][${serverName}] Error during cleanup after failed connection`,
disconnectError,
);
});
// Ensure cleanup even if connection attempt fails
this.removeUserConnection(userId, serverName);
throw error; // Re-throw the error to the caller
}
}
/** Returns all connections for a specific user */
public getUserConnections(userId: string) {
return this.userConnections.get(userId);
}
/** Removes a specific user connection entry */
protected removeUserConnection(userId: string, serverName: string): void {
const userMap = this.userConnections.get(userId);
if (userMap) {
userMap.delete(serverName);
if (userMap.size === 0) {
this.userConnections.delete(userId);
// Only remove user activity timestamp if all connections are gone
this.userLastActivity.delete(userId);
}
}
logger.debug(`[MCP][User: ${userId}][${serverName}] Removed connection entry.`);
}
/** Disconnects and removes a specific user connection */
public async disconnectUserConnection(userId: string, serverName: string): Promise<void> {
const userMap = this.userConnections.get(userId);
const connection = userMap?.get(serverName);
if (connection) {
logger.info(`[MCP][User: ${userId}][${serverName}] Disconnecting...`);
await connection.disconnect();
this.removeUserConnection(userId, serverName);
}
}
/** Disconnects and removes all connections for a specific user */
public async disconnectUserConnections(userId: string): Promise<void> {
const userMap = this.userConnections.get(userId);
const disconnectPromises: Promise<void>[] = [];
if (userMap) {
logger.info(`[MCP][User: ${userId}] Disconnecting all servers...`);
const userServers = Array.from(userMap.keys());
for (const serverName of userServers) {
disconnectPromises.push(
this.disconnectUserConnection(userId, serverName).catch((error) => {
logger.error(
`[MCP][User: ${userId}][${serverName}] Error during disconnection:`,
error,
);
}),
);
}
await Promise.allSettled(disconnectPromises);
// Ensure user activity timestamp is removed
this.userLastActivity.delete(userId);
logger.info(`[MCP][User: ${userId}] All connections processed for disconnection.`);
}
}
/** Check for and disconnect idle connections */
protected checkIdleConnections(currentUserId?: string): void {
const now = Date.now();
// Iterate through all users to check for idle ones
for (const [userId, lastActivity] of this.userLastActivity.entries()) {
if (currentUserId && currentUserId === userId) {
continue;
}
if (now - lastActivity > mcpConfig.USER_CONNECTION_IDLE_TIMEOUT) {
logger.info(
`[MCP][User: ${userId}] User idle for too long. Disconnecting all connections...`,
);
// Disconnect all user connections asynchronously (fire and forget)
this.disconnectUserConnections(userId).catch((err) =>
logger.error(`[MCP][User: ${userId}] Error disconnecting idle connections:`, err),
);
}
}
}
}
|