| | const { logger } = require('@librechat/data-schemas'); |
| | const { countTokens, isEnabled, sendEvent, sanitizeMessageForTransmit } = require('@librechat/api'); |
| | const { isAssistantsEndpoint, ErrorTypes, Constants } = require('librechat-data-provider'); |
| | const { truncateText, smartTruncateText } = require('~/app/clients/prompts'); |
| | const clearPendingReq = require('~/cache/clearPendingReq'); |
| | const { sendError } = require('~/server/middleware/error'); |
| | const { spendTokens } = require('~/models/spendTokens'); |
| | const abortControllers = require('./abortControllers'); |
| | const { saveMessage, getConvo } = require('~/models'); |
| | const { abortRun } = require('./abortRun'); |
| |
|
| | const abortDataMap = new WeakMap(); |
| |
|
| | |
| | |
| | |
| | |
| | function cleanupAbortController(abortKey) { |
| | if (!abortControllers.has(abortKey)) { |
| | return false; |
| | } |
| |
|
| | const { abortController } = abortControllers.get(abortKey); |
| |
|
| | if (!abortController) { |
| | abortControllers.delete(abortKey); |
| | return true; |
| | } |
| |
|
| | |
| | try { |
| | |
| | const composedSignal = AbortSignal.any([abortController.signal]); |
| |
|
| | |
| | const eventTypes = ['abort']; |
| |
|
| | |
| | for (const eventType of eventTypes) { |
| | const dummyHandler = () => {}; |
| | composedSignal.addEventListener(eventType, dummyHandler); |
| | composedSignal.removeEventListener(eventType, dummyHandler); |
| |
|
| | const listeners = composedSignal.listeners?.(eventType) || []; |
| | for (const listener of listeners) { |
| | composedSignal.removeEventListener(eventType, listener); |
| | } |
| | } |
| | } catch (e) { |
| | logger.debug(`Error cleaning up composed signals: ${e}`); |
| | } |
| |
|
| | |
| | if (!abortController.signal.aborted) { |
| | abortController.abort(); |
| | } |
| |
|
| | |
| | abortControllers.delete(abortKey); |
| |
|
| | |
| | if (abortDataMap.has(abortController)) { |
| | abortDataMap.delete(abortController); |
| | } |
| |
|
| | |
| | if (abortController.getAbortData) { |
| | abortController.getAbortData = null; |
| | } |
| |
|
| | if (abortController.abortCompletion) { |
| | abortController.abortCompletion = null; |
| | } |
| |
|
| | return true; |
| | } |
| |
|
| | |
| | |
| | |
| | |
| | function createCleanUpHandler(abortKey) { |
| | return function () { |
| | try { |
| | cleanupAbortController(abortKey); |
| | } catch { |
| | |
| | } |
| | }; |
| | } |
| |
|
| | async function abortMessage(req, res) { |
| | let { abortKey, endpoint } = req.body; |
| |
|
| | if (isAssistantsEndpoint(endpoint)) { |
| | return await abortRun(req, res); |
| | } |
| |
|
| | const conversationId = abortKey?.split(':')?.[0] ?? req.user.id; |
| |
|
| | if (!abortControllers.has(abortKey) && abortControllers.has(conversationId)) { |
| | abortKey = conversationId; |
| | } |
| |
|
| | if (!abortControllers.has(abortKey) && !res.headersSent) { |
| | return res.status(204).send({ message: 'Request not found' }); |
| | } |
| |
|
| | const { abortController } = abortControllers.get(abortKey) ?? {}; |
| | if (!abortController) { |
| | return res.status(204).send({ message: 'Request not found' }); |
| | } |
| |
|
| | const finalEvent = await abortController.abortCompletion?.(); |
| | logger.debug( |
| | `[abortMessage] ID: ${req.user.id} | ${req.user.email} | Aborted request: ` + |
| | JSON.stringify({ abortKey }), |
| | ); |
| | cleanupAbortController(abortKey); |
| |
|
| | if (res.headersSent && finalEvent) { |
| | return sendEvent(res, finalEvent); |
| | } |
| |
|
| | res.setHeader('Content-Type', 'application/json'); |
| | res.send(JSON.stringify(finalEvent)); |
| | } |
| |
|
| | const handleAbort = function () { |
| | return async function (req, res) { |
| | try { |
| | if (isEnabled(process.env.LIMIT_CONCURRENT_MESSAGES)) { |
| | await clearPendingReq({ userId: req.user.id }); |
| | } |
| | return await abortMessage(req, res); |
| | } catch (err) { |
| | logger.error('[abortMessage] handleAbort error', err); |
| | } |
| | }; |
| | }; |
| |
|
| | const createAbortController = (req, res, getAbortData, getReqData) => { |
| | const abortController = new AbortController(); |
| | const { endpointOption } = req.body; |
| |
|
| | |
| | abortDataMap.set(abortController, { |
| | getAbortDataFn: getAbortData, |
| | userId: req.user.id, |
| | endpoint: endpointOption.endpoint, |
| | iconURL: endpointOption.iconURL, |
| | model: endpointOption.modelOptions?.model || endpointOption.model_parameters?.model, |
| | }); |
| |
|
| | |
| | abortController.getAbortData = function () { |
| | const data = abortDataMap.get(this); |
| | if (!data || typeof data.getAbortDataFn !== 'function') { |
| | return {}; |
| | } |
| |
|
| | try { |
| | const result = data.getAbortDataFn(); |
| |
|
| | |
| | const cleanResult = { ...result }; |
| |
|
| | |
| | if ( |
| | cleanResult.userMessagePromise && |
| | typeof cleanResult.userMessagePromise.then === 'function' |
| | ) { |
| | |
| | const originalPromise = cleanResult.userMessagePromise; |
| | cleanResult.userMessagePromise = new Promise((resolve, reject) => { |
| | originalPromise.then( |
| | (result) => resolve({ ...result }), |
| | (error) => reject(error), |
| | ); |
| | }); |
| | } |
| |
|
| | return cleanResult; |
| | } catch (err) { |
| | logger.error('[abortController.getAbortData] Error:', err); |
| | return {}; |
| | } |
| | }; |
| |
|
| | |
| | |
| | |
| | |
| | |
| | const onStart = (userMessage, responseMessageId, isNewConvo) => { |
| | sendEvent(res, { message: userMessage, created: true }); |
| |
|
| | const prelimAbortKey = userMessage?.conversationId ?? req.user.id; |
| | const abortKey = isNewConvo |
| | ? `${prelimAbortKey}${Constants.COMMON_DIVIDER}${Constants.NEW_CONVO}` |
| | : prelimAbortKey; |
| | getReqData({ abortKey }); |
| | const prevRequest = abortControllers.get(abortKey); |
| | const { overrideUserMessageId } = req?.body ?? {}; |
| |
|
| | if (overrideUserMessageId != null && prevRequest && prevRequest?.abortController) { |
| | const data = prevRequest.abortController.getAbortData(); |
| | getReqData({ userMessage: data?.userMessage }); |
| | const addedAbortKey = `${abortKey}:${responseMessageId}`; |
| |
|
| | |
| | const minimalOptions = { |
| | endpoint: endpointOption.endpoint, |
| | iconURL: endpointOption.iconURL, |
| | model: endpointOption.modelOptions?.model || endpointOption.model_parameters?.model, |
| | }; |
| |
|
| | abortControllers.set(addedAbortKey, { abortController, ...minimalOptions }); |
| | const cleanupHandler = createCleanUpHandler(addedAbortKey); |
| | res.on('finish', cleanupHandler); |
| | return; |
| | } |
| |
|
| | |
| | const minimalOptions = { |
| | endpoint: endpointOption.endpoint, |
| | iconURL: endpointOption.iconURL, |
| | model: endpointOption.modelOptions?.model || endpointOption.model_parameters?.model, |
| | }; |
| |
|
| | abortControllers.set(abortKey, { abortController, ...minimalOptions }); |
| | const cleanupHandler = createCleanUpHandler(abortKey); |
| | res.on('finish', cleanupHandler); |
| | }; |
| |
|
| | |
| | abortController.abortCompletion = async function () { |
| | this.abort(); |
| |
|
| | |
| | const ctrlData = abortDataMap.get(this); |
| | if (!ctrlData || !ctrlData.getAbortDataFn) { |
| | return { final: true, conversation: {}, title: 'New Chat' }; |
| | } |
| |
|
| | |
| | const { conversationId, userMessage, userMessagePromise, promptTokens, ...responseData } = |
| | ctrlData.getAbortDataFn(); |
| |
|
| | const completionTokens = await countTokens(responseData?.text ?? ''); |
| | const user = ctrlData.userId; |
| |
|
| | const responseMessage = { |
| | ...responseData, |
| | conversationId, |
| | finish_reason: 'incomplete', |
| | endpoint: ctrlData.endpoint, |
| | iconURL: ctrlData.iconURL, |
| | model: ctrlData.modelOptions?.model ?? ctrlData.model_parameters?.model, |
| | unfinished: false, |
| | error: false, |
| | isCreatedByUser: false, |
| | tokenCount: completionTokens, |
| | }; |
| |
|
| | await spendTokens( |
| | { ...responseMessage, context: 'incomplete', user }, |
| | { promptTokens, completionTokens }, |
| | ); |
| |
|
| | await saveMessage( |
| | req, |
| | { ...responseMessage, user }, |
| | { context: 'api/server/middleware/abortMiddleware.js' }, |
| | ); |
| |
|
| | let conversation; |
| | if (userMessagePromise) { |
| | const resolved = await userMessagePromise; |
| | conversation = resolved?.conversation; |
| | |
| | resolved.conversation = null; |
| | } |
| |
|
| | if (!conversation) { |
| | conversation = await getConvo(user, conversationId); |
| | } |
| |
|
| | return { |
| | title: conversation && !conversation.title ? null : conversation?.title || 'New Chat', |
| | final: true, |
| | conversation, |
| | requestMessage: sanitizeMessageForTransmit(userMessage), |
| | responseMessage: responseMessage, |
| | }; |
| | }; |
| |
|
| | return { abortController, onStart }; |
| | }; |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | const handleAbortError = async (res, req, error, data) => { |
| | if (error?.message?.includes('base64')) { |
| | logger.error('[handleAbortError] Error in base64 encoding', { |
| | ...error, |
| | stack: smartTruncateText(error?.stack, 1000), |
| | message: truncateText(error.message, 350), |
| | }); |
| | } else { |
| | logger.error('[handleAbortError] AI response error; aborting request:', error); |
| | } |
| | const { sender, conversationId, messageId, parentMessageId, userMessageId, partialText } = data; |
| |
|
| | if (error.stack && error.stack.includes('google')) { |
| | logger.warn( |
| | `AI Response error for conversation ${conversationId} likely caused by Google censor/filter`, |
| | ); |
| | } |
| |
|
| | let errorText = error?.message?.includes('"type"') |
| | ? error.message |
| | : 'An error occurred while processing your request. Please contact the Admin.'; |
| |
|
| | if (error?.type === ErrorTypes.INVALID_REQUEST) { |
| | errorText = `{"type":"${ErrorTypes.INVALID_REQUEST}"}`; |
| | } |
| |
|
| | if (error?.message?.includes("does not support 'system'")) { |
| | errorText = `{"type":"${ErrorTypes.NO_SYSTEM_MESSAGES}"}`; |
| | } |
| |
|
| | |
| | |
| | |
| | |
| | const respondWithError = async (partialText) => { |
| | const endpointOption = req.body?.endpointOption; |
| | let options = { |
| | sender, |
| | messageId, |
| | conversationId, |
| | parentMessageId, |
| | text: errorText, |
| | user: req.user.id, |
| | spec: endpointOption?.spec, |
| | iconURL: endpointOption?.iconURL, |
| | modelLabel: endpointOption?.modelLabel, |
| | shouldSaveMessage: userMessageId != null, |
| | model: endpointOption?.modelOptions?.model || req.body?.model, |
| | }; |
| |
|
| | if (req.body?.agent_id) { |
| | options.agent_id = req.body.agent_id; |
| | } |
| |
|
| | if (partialText) { |
| | options = { |
| | ...options, |
| | error: false, |
| | unfinished: true, |
| | text: partialText, |
| | }; |
| | } |
| |
|
| | const callback = createCleanUpHandler(conversationId); |
| | await sendError(req, res, options, callback); |
| | }; |
| |
|
| | if (partialText && partialText.length > 5) { |
| | try { |
| | return await abortMessage(req, res); |
| | } catch (err) { |
| | logger.error('[handleAbortError] error while trying to abort message', err); |
| | return respondWithError(partialText); |
| | } |
| | } else { |
| | return respondWithError(); |
| | } |
| | }; |
| |
|
| | module.exports = { |
| | handleAbort, |
| | handleAbortError, |
| | createAbortController, |
| | cleanupAbortController, |
| | }; |
| |
|