| | const { isEnabled } = require('@librechat/api'); |
| | const { Time, CacheKeys, ViolationTypes } = require('librechat-data-provider'); |
| | const clearPendingReq = require('~/cache/clearPendingReq'); |
| | const { logViolation, getLogStores } = require('~/cache'); |
| | const denyRequest = require('./denyRequest'); |
| |
|
| | const { |
| | USE_REDIS, |
| | CONCURRENT_MESSAGE_MAX = 1, |
| | CONCURRENT_VIOLATION_SCORE: score, |
| | } = process.env ?? {}; |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | const concurrentLimiter = async (req, res, next) => { |
| | const namespace = CacheKeys.PENDING_REQ; |
| | const cache = getLogStores(namespace); |
| | if (!cache) { |
| | return next(); |
| | } |
| |
|
| | if (Object.keys(req?.body ?? {}).length === 1 && req?.body?.abortKey) { |
| | return next(); |
| | } |
| |
|
| | const userId = req.user?.id ?? req.user?._id ?? ''; |
| | const limit = Math.max(CONCURRENT_MESSAGE_MAX, 1); |
| | const type = ViolationTypes.CONCURRENT; |
| |
|
| | const key = `${isEnabled(USE_REDIS) ? namespace : ''}:${userId}`; |
| | const pendingRequests = +((await cache.get(key)) ?? 0); |
| |
|
| | if (pendingRequests >= limit) { |
| | const errorMessage = { |
| | type, |
| | limit, |
| | pendingRequests, |
| | }; |
| |
|
| | await logViolation(req, res, type, errorMessage, score); |
| | return await denyRequest(req, res, errorMessage); |
| | } else { |
| | await cache.set(key, pendingRequests + 1, Time.ONE_MINUTE); |
| | } |
| |
|
| | |
| | let cleared = false; |
| | const cleanUp = async () => { |
| | if (cleared) { |
| | return; |
| | } |
| | cleared = true; |
| | await clearPendingReq({ userId, cache }); |
| | }; |
| |
|
| | if (pendingRequests < limit) { |
| | res.on('finish', cleanUp); |
| | res.on('close', cleanUp); |
| | } |
| |
|
| | next(); |
| | }; |
| |
|
| | module.exports = concurrentLimiter; |
| |
|