| | import IoRedis from 'ioredis'; |
| | import type { Redis, Cluster } from 'ioredis'; |
| | import { logger } from '@librechat/data-schemas'; |
| | import { createClient, createCluster } from '@keyv/redis'; |
| | import type { RedisClientType, RedisClusterType } from '@redis/client'; |
| | import type { ScanCommandOptions } from '@redis/client/dist/lib/commands/SCAN'; |
| | import { cacheConfig } from './cacheConfig'; |
| |
|
| | const urls = cacheConfig.REDIS_URI?.split(',').map((uri) => new URL(uri)) || []; |
| | const username = urls?.[0]?.username || cacheConfig.REDIS_USERNAME; |
| | const password = urls?.[0]?.password || cacheConfig.REDIS_PASSWORD; |
| | const ca = cacheConfig.REDIS_CA; |
| |
|
| | let ioredisClient: Redis | Cluster | null = null; |
| | if (cacheConfig.USE_REDIS) { |
| | const redisOptions: Record<string, unknown> = { |
| | username: username, |
| | password: password, |
| | tls: ca ? { ca } : undefined, |
| | keyPrefix: `${cacheConfig.REDIS_KEY_PREFIX}${cacheConfig.GLOBAL_PREFIX_SEPARATOR}`, |
| | maxListeners: cacheConfig.REDIS_MAX_LISTENERS, |
| | retryStrategy: (times: number) => { |
| | if ( |
| | cacheConfig.REDIS_RETRY_MAX_ATTEMPTS > 0 && |
| | times > cacheConfig.REDIS_RETRY_MAX_ATTEMPTS |
| | ) { |
| | logger.error( |
| | `ioredis giving up after ${cacheConfig.REDIS_RETRY_MAX_ATTEMPTS} reconnection attempts`, |
| | ); |
| | return null; |
| | } |
| | const delay = Math.min(times * 50, cacheConfig.REDIS_RETRY_MAX_DELAY); |
| | logger.info(`ioredis reconnecting... attempt ${times}, delay ${delay}ms`); |
| | return delay; |
| | }, |
| | reconnectOnError: (err: Error) => { |
| | const targetError = 'READONLY'; |
| | if (err.message.includes(targetError)) { |
| | logger.warn('ioredis reconnecting due to READONLY error'); |
| | return 2; |
| | } |
| | return false; |
| | }, |
| | enableOfflineQueue: cacheConfig.REDIS_ENABLE_OFFLINE_QUEUE, |
| | connectTimeout: cacheConfig.REDIS_CONNECT_TIMEOUT, |
| | maxRetriesPerRequest: 3, |
| | }; |
| |
|
| | ioredisClient = |
| | urls.length === 1 && !cacheConfig.USE_REDIS_CLUSTER |
| | ? new IoRedis(cacheConfig.REDIS_URI!, redisOptions) |
| | : new IoRedis.Cluster( |
| | urls.map((url) => ({ host: url.hostname, port: parseInt(url.port, 10) || 6379 })), |
| | { |
| | ...(cacheConfig.REDIS_USE_ALTERNATIVE_DNS_LOOKUP |
| | ? { |
| | dnsLookup: ( |
| | address: string, |
| | callback: (err: Error | null, address: string) => void, |
| | ) => callback(null, address), |
| | } |
| | : {}), |
| | redisOptions, |
| | clusterRetryStrategy: (times: number) => { |
| | if ( |
| | cacheConfig.REDIS_RETRY_MAX_ATTEMPTS > 0 && |
| | times > cacheConfig.REDIS_RETRY_MAX_ATTEMPTS |
| | ) { |
| | logger.error( |
| | `ioredis cluster giving up after ${cacheConfig.REDIS_RETRY_MAX_ATTEMPTS} reconnection attempts`, |
| | ); |
| | return null; |
| | } |
| | const delay = Math.min(times * 100, cacheConfig.REDIS_RETRY_MAX_DELAY); |
| | logger.info(`ioredis cluster reconnecting... attempt ${times}, delay ${delay}ms`); |
| | return delay; |
| | }, |
| | enableOfflineQueue: cacheConfig.REDIS_ENABLE_OFFLINE_QUEUE, |
| | }, |
| | ); |
| |
|
| | ioredisClient.on('error', (err) => { |
| | logger.error('ioredis client error:', err); |
| | }); |
| |
|
| | ioredisClient.on('connect', () => { |
| | logger.info('ioredis client connected'); |
| | }); |
| |
|
| | ioredisClient.on('ready', () => { |
| | logger.info('ioredis client ready'); |
| | }); |
| |
|
| | ioredisClient.on('reconnecting', (delay: number) => { |
| | logger.info(`ioredis client reconnecting in ${delay}ms`); |
| | }); |
| |
|
| | ioredisClient.on('close', () => { |
| | logger.warn('ioredis client connection closed'); |
| | }); |
| |
|
| | |
| | let pingInterval: NodeJS.Timeout | null = null; |
| | const clearPingInterval = () => { |
| | if (pingInterval) { |
| | clearInterval(pingInterval); |
| | pingInterval = null; |
| | } |
| | }; |
| |
|
| | if (cacheConfig.REDIS_PING_INTERVAL > 0) { |
| | pingInterval = setInterval(() => { |
| | if (ioredisClient && ioredisClient.status === 'ready') { |
| | ioredisClient.ping().catch((err) => { |
| | logger.error('ioredis ping failed:', err); |
| | }); |
| | } |
| | }, cacheConfig.REDIS_PING_INTERVAL * 1000); |
| | ioredisClient.on('close', clearPingInterval); |
| | ioredisClient.on('end', clearPingInterval); |
| | } |
| | } |
| |
|
| | let keyvRedisClient: RedisClientType | RedisClusterType | null = null; |
| | let keyvRedisClientReady: |
| | | Promise<void> |
| | | Promise<RedisClientType<Record<string, never>, Record<string, never>, Record<string, never>>> |
| | | null = null; |
| |
|
| | if (cacheConfig.USE_REDIS) { |
| | |
| | |
| | |
| | |
| | const redisOptions: Record<string, unknown> = { |
| | username, |
| | password, |
| | socket: { |
| | tls: ca != null, |
| | ca, |
| | connectTimeout: cacheConfig.REDIS_CONNECT_TIMEOUT, |
| | reconnectStrategy: (retries: number) => { |
| | if ( |
| | cacheConfig.REDIS_RETRY_MAX_ATTEMPTS > 0 && |
| | retries > cacheConfig.REDIS_RETRY_MAX_ATTEMPTS |
| | ) { |
| | logger.error( |
| | `@keyv/redis client giving up after ${cacheConfig.REDIS_RETRY_MAX_ATTEMPTS} reconnection attempts`, |
| | ); |
| | return new Error('Max reconnection attempts reached'); |
| | } |
| | const delay = Math.min(retries * 100, cacheConfig.REDIS_RETRY_MAX_DELAY); |
| | logger.info(`@keyv/redis reconnecting... attempt ${retries}, delay ${delay}ms`); |
| | return delay; |
| | }, |
| | }, |
| | disableOfflineQueue: !cacheConfig.REDIS_ENABLE_OFFLINE_QUEUE, |
| | ...(cacheConfig.REDIS_PING_INTERVAL > 0 |
| | ? { pingInterval: cacheConfig.REDIS_PING_INTERVAL * 1000 } |
| | : {}), |
| | }; |
| |
|
| | keyvRedisClient = |
| | urls.length === 1 && !cacheConfig.USE_REDIS_CLUSTER |
| | ? createClient({ url: cacheConfig.REDIS_URI, ...redisOptions }) |
| | : createCluster({ |
| | rootNodes: urls.map((url) => ({ url: url.href })), |
| | defaults: redisOptions, |
| | }); |
| |
|
| | |
| | if (!('scanIterator' in keyvRedisClient)) { |
| | const clusterClient = keyvRedisClient as RedisClusterType; |
| | (keyvRedisClient as unknown as RedisClientType).scanIterator = async function* ( |
| | options?: ScanCommandOptions, |
| | ) { |
| | const masters = clusterClient.masters; |
| | for (const master of masters) { |
| | const nodeClient = await clusterClient.nodeClient(master); |
| | for await (const key of nodeClient.scanIterator(options)) { |
| | yield key; |
| | } |
| | } |
| | }; |
| | } |
| |
|
| | keyvRedisClient.setMaxListeners(cacheConfig.REDIS_MAX_LISTENERS); |
| |
|
| | keyvRedisClient.on('error', (err) => { |
| | logger.error('@keyv/redis client error:', err); |
| | }); |
| |
|
| | keyvRedisClient.on('connect', () => { |
| | logger.info('@keyv/redis client connected'); |
| | }); |
| |
|
| | keyvRedisClient.on('ready', () => { |
| | logger.info('@keyv/redis client ready'); |
| | }); |
| |
|
| | keyvRedisClient.on('reconnecting', () => { |
| | logger.info('@keyv/redis client reconnecting...'); |
| | }); |
| |
|
| | keyvRedisClient.on('disconnect', () => { |
| | logger.warn('@keyv/redis client disconnected'); |
| | }); |
| |
|
| | |
| | keyvRedisClientReady = keyvRedisClient.connect(); |
| |
|
| | keyvRedisClientReady.catch((err): void => { |
| | logger.error('@keyv/redis initial connection failed:', err); |
| | throw err; |
| | }); |
| | } |
| |
|
| | export { ioredisClient, keyvRedisClient, keyvRedisClientReady }; |
| |
|