| | import { logger } from '@librechat/data-schemas'; |
| | import { ErrorCode, McpError } from '@modelcontextprotocol/sdk/types.js'; |
| | import { MCPConnectionFactory } from '~/mcp/MCPConnectionFactory'; |
| | import { mcpServersRegistry as serversRegistry } from '~/mcp/registry/MCPServersRegistry'; |
| | import { MCPConnection } from './connection'; |
| | import type * as t from './types'; |
| | import { ConnectionsRepository } from '~/mcp/ConnectionsRepository'; |
| | import { mcpConfig } from './mcpConfig'; |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | export abstract class UserConnectionManager { |
| | |
| | public appConnections: ConnectionsRepository | null = null; |
| | |
| | protected userConnections: Map<string, Map<string, MCPConnection>> = new Map(); |
| | |
| | protected userLastActivity: Map<string, number> = new Map(); |
| |
|
| | |
| | protected updateUserLastActivity(userId: string): void { |
| | const now = Date.now(); |
| | this.userLastActivity.set(userId, now); |
| | logger.debug( |
| | `[MCP][User: ${userId}] Updated last activity timestamp: ${new Date(now).toISOString()}`, |
| | ); |
| | } |
| |
|
| | |
| | public async getUserConnection({ |
| | serverName, |
| | forceNew, |
| | user, |
| | flowManager, |
| | customUserVars, |
| | requestBody, |
| | tokenMethods, |
| | oauthStart, |
| | oauthEnd, |
| | signal, |
| | returnOnOAuth = false, |
| | connectionTimeout, |
| | }: { |
| | serverName: string; |
| | forceNew?: boolean; |
| | } & Omit<t.OAuthConnectionOptions, 'useOAuth'>): Promise<MCPConnection> { |
| | const userId = user.id; |
| | if (!userId) { |
| | throw new McpError(ErrorCode.InvalidRequest, `[MCP] User object missing id property`); |
| | } |
| |
|
| | if (this.appConnections!.has(serverName)) { |
| | throw new McpError( |
| | ErrorCode.InvalidRequest, |
| | `[MCP][User: ${userId}] Trying to create user-specific connection for app-level server "${serverName}"`, |
| | ); |
| | } |
| |
|
| | const userServerMap = this.userConnections.get(userId); |
| | let connection = forceNew ? undefined : userServerMap?.get(serverName); |
| | const now = Date.now(); |
| |
|
| | |
| | const lastActivity = this.userLastActivity.get(userId); |
| | if (lastActivity && now - lastActivity > mcpConfig.USER_CONNECTION_IDLE_TIMEOUT) { |
| | logger.info(`[MCP][User: ${userId}] User idle for too long. Disconnecting all connections.`); |
| | |
| | try { |
| | await this.disconnectUserConnections(userId); |
| | } catch (err) { |
| | logger.error(`[MCP][User: ${userId}] Error disconnecting idle connections:`, err); |
| | } |
| | connection = undefined; |
| | } else if (connection) { |
| | if (await connection.isConnected()) { |
| | logger.debug(`[MCP][User: ${userId}][${serverName}] Reusing active connection`); |
| | this.updateUserLastActivity(userId); |
| | return connection; |
| | } else { |
| | |
| | logger.warn( |
| | `[MCP][User: ${userId}][${serverName}] Found existing but disconnected connection object. Cleaning up.`, |
| | ); |
| | this.removeUserConnection(userId, serverName); |
| | connection = undefined; |
| | } |
| | } |
| |
|
| | |
| | if (!connection) { |
| | logger.info(`[MCP][User: ${userId}][${serverName}] Establishing new connection`); |
| | } |
| |
|
| | const config = await serversRegistry.getServerConfig(serverName, userId); |
| | if (!config) { |
| | throw new McpError( |
| | ErrorCode.InvalidRequest, |
| | `[MCP][User: ${userId}] Configuration for server "${serverName}" not found.`, |
| | ); |
| | } |
| |
|
| | try { |
| | connection = await MCPConnectionFactory.create( |
| | { |
| | serverName: serverName, |
| | serverConfig: config, |
| | }, |
| | { |
| | useOAuth: true, |
| | user: user, |
| | customUserVars: customUserVars, |
| | flowManager: flowManager, |
| | tokenMethods: tokenMethods, |
| | signal: signal, |
| | oauthStart: oauthStart, |
| | oauthEnd: oauthEnd, |
| | returnOnOAuth: returnOnOAuth, |
| | requestBody: requestBody, |
| | connectionTimeout: connectionTimeout, |
| | }, |
| | ); |
| |
|
| | if (!(await connection?.isConnected())) { |
| | throw new Error('Failed to establish connection after initialization attempt.'); |
| | } |
| |
|
| | if (!this.userConnections.has(userId)) { |
| | this.userConnections.set(userId, new Map()); |
| | } |
| | this.userConnections.get(userId)?.set(serverName, connection); |
| |
|
| | logger.info(`[MCP][User: ${userId}][${serverName}] Connection successfully established`); |
| | |
| | this.updateUserLastActivity(userId); |
| | return connection; |
| | } catch (error) { |
| | logger.error(`[MCP][User: ${userId}][${serverName}] Failed to establish connection`, error); |
| | |
| | await connection?.disconnect().catch((disconnectError) => { |
| | logger.error( |
| | `[MCP][User: ${userId}][${serverName}] Error during cleanup after failed connection`, |
| | disconnectError, |
| | ); |
| | }); |
| | |
| | this.removeUserConnection(userId, serverName); |
| | throw error; |
| | } |
| | } |
| |
|
| | |
| | public getUserConnections(userId: string) { |
| | return this.userConnections.get(userId); |
| | } |
| |
|
| | |
| | protected removeUserConnection(userId: string, serverName: string): void { |
| | const userMap = this.userConnections.get(userId); |
| | if (userMap) { |
| | userMap.delete(serverName); |
| | if (userMap.size === 0) { |
| | this.userConnections.delete(userId); |
| | |
| | this.userLastActivity.delete(userId); |
| | } |
| | } |
| |
|
| | logger.debug(`[MCP][User: ${userId}][${serverName}] Removed connection entry.`); |
| | } |
| |
|
| | |
| | public async disconnectUserConnection(userId: string, serverName: string): Promise<void> { |
| | const userMap = this.userConnections.get(userId); |
| | const connection = userMap?.get(serverName); |
| | if (connection) { |
| | logger.info(`[MCP][User: ${userId}][${serverName}] Disconnecting...`); |
| | await connection.disconnect(); |
| | this.removeUserConnection(userId, serverName); |
| | } |
| | } |
| |
|
| | |
| | public async disconnectUserConnections(userId: string): Promise<void> { |
| | const userMap = this.userConnections.get(userId); |
| | const disconnectPromises: Promise<void>[] = []; |
| | if (userMap) { |
| | logger.info(`[MCP][User: ${userId}] Disconnecting all servers...`); |
| | const userServers = Array.from(userMap.keys()); |
| | for (const serverName of userServers) { |
| | disconnectPromises.push( |
| | this.disconnectUserConnection(userId, serverName).catch((error) => { |
| | logger.error( |
| | `[MCP][User: ${userId}][${serverName}] Error during disconnection:`, |
| | error, |
| | ); |
| | }), |
| | ); |
| | } |
| | await Promise.allSettled(disconnectPromises); |
| | |
| | this.userLastActivity.delete(userId); |
| | logger.info(`[MCP][User: ${userId}] All connections processed for disconnection.`); |
| | } |
| | } |
| |
|
| | |
| | protected checkIdleConnections(currentUserId?: string): void { |
| | const now = Date.now(); |
| |
|
| | |
| | for (const [userId, lastActivity] of this.userLastActivity.entries()) { |
| | if (currentUserId && currentUserId === userId) { |
| | continue; |
| | } |
| | if (now - lastActivity > mcpConfig.USER_CONNECTION_IDLE_TIMEOUT) { |
| | logger.info( |
| | `[MCP][User: ${userId}] User idle for too long. Disconnecting all connections...`, |
| | ); |
| | |
| | this.disconnectUserConnections(userId).catch((err) => |
| | logger.error(`[MCP][User: ${userId}] Error disconnecting idle connections:`, err), |
| | ); |
| | } |
| | } |
| | } |
| | } |
| |
|