| | const path = require('path'); |
| | const { v4 } = require('uuid'); |
| | const { countTokens, escapeRegExp } = require('@librechat/api'); |
| | const { |
| | Constants, |
| | ContentTypes, |
| | AnnotationTypes, |
| | defaultOrderQuery, |
| | } = require('librechat-data-provider'); |
| | const { retrieveAndProcessFile } = require('~/server/services/Files/process'); |
| | const { recordMessage, getMessages } = require('~/models/Message'); |
| | const { spendTokens } = require('~/models/spendTokens'); |
| | const { saveConvo } = require('~/models/Conversation'); |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | async function initThread({ openai, body, thread_id: _thread_id }) { |
| | let thread = {}; |
| | const messages = []; |
| | if (_thread_id) { |
| | const message = await openai.beta.threads.messages.create(_thread_id, body.messages[0]); |
| | messages.push(message); |
| | } else { |
| | thread = await openai.beta.threads.create(body); |
| | } |
| |
|
| | const thread_id = _thread_id || thread.id; |
| | return { messages, thread_id, ...thread }; |
| | } |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | async function saveUserMessage(req, params) { |
| | const tokenCount = await countTokens(params.text); |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | const userMessage = { |
| | user: params.user, |
| | endpoint: params.endpoint, |
| | messageId: params.messageId, |
| | conversationId: params.conversationId, |
| | parentMessageId: params.parentMessageId ?? Constants.NO_PARENT, |
| | |
| | model: params.assistant_id, |
| | thread_id: params.thread_id, |
| | sender: 'User', |
| | text: params.text, |
| | isCreatedByUser: true, |
| | tokenCount, |
| | }; |
| |
|
| | const convo = { |
| | endpoint: params.endpoint, |
| | conversationId: params.conversationId, |
| | promptPrefix: params.promptPrefix, |
| | instructions: params.instructions, |
| | assistant_id: params.assistant_id, |
| | model: params.model, |
| | }; |
| |
|
| | if (params.files?.length) { |
| | userMessage.files = params.files.map(({ file_id }) => ({ file_id })); |
| | convo.file_ids = params.file_ids; |
| | } |
| |
|
| | const message = await recordMessage(userMessage); |
| | await saveConvo(req, convo, { |
| | context: 'api/server/services/Threads/manage.js #saveUserMessage', |
| | }); |
| | return message; |
| | } |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | async function saveAssistantMessage(req, params) { |
| | |
| |
|
| | const message = await recordMessage({ |
| | user: params.user, |
| | endpoint: params.endpoint, |
| | messageId: params.messageId, |
| | conversationId: params.conversationId, |
| | parentMessageId: params.parentMessageId, |
| | thread_id: params.thread_id, |
| | |
| | model: params.assistant_id, |
| | content: params.content, |
| | sender: 'Assistant', |
| | isCreatedByUser: false, |
| | text: params.text, |
| | unfinished: false, |
| | |
| | iconURL: params.iconURL, |
| | spec: params.spec, |
| | }); |
| |
|
| | await saveConvo( |
| | req, |
| | { |
| | endpoint: params.endpoint, |
| | conversationId: params.conversationId, |
| | promptPrefix: params.promptPrefix, |
| | instructions: params.instructions, |
| | assistant_id: params.assistant_id, |
| | model: params.model, |
| | iconURL: params.iconURL, |
| | spec: params.spec, |
| | }, |
| | { context: 'api/server/services/Threads/manage.js #saveAssistantMessage' }, |
| | ); |
| |
|
| | return message; |
| | } |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | async function addThreadMetadata({ openai, thread_id, messageId, messages }) { |
| | const promises = []; |
| | for (const message of messages) { |
| | promises.push( |
| | openai.beta.threads.messages.update(message.id, { |
| | thread_id, |
| | metadata: { |
| | messageId, |
| | }, |
| | }), |
| | ); |
| | } |
| |
|
| | return await Promise.all(promises); |
| | } |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | async function syncMessages({ |
| | openai, |
| | endpoint, |
| | thread_id, |
| | dbMessages, |
| | apiMessages, |
| | assistant_id, |
| | conversationId, |
| | }) { |
| | let result = []; |
| | let dbMessageMap = new Map(dbMessages.map((msg) => [msg.messageId, msg])); |
| |
|
| | const modifyPromises = []; |
| | const recordPromises = []; |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | const processNewMessage = async ({ dbMessage, apiMessage }) => { |
| | recordPromises.push(recordMessage({ ...dbMessage, user: openai.req.user.id })); |
| |
|
| | if (!apiMessage.id.includes('msg_')) { |
| | return; |
| | } |
| |
|
| | if (dbMessage.aggregateMessages?.length > 1) { |
| | modifyPromises.push( |
| | addThreadMetadata({ |
| | openai, |
| | thread_id, |
| | messageId: dbMessage.messageId, |
| | messages: dbMessage.aggregateMessages, |
| | }), |
| | ); |
| | return; |
| | } |
| |
|
| | modifyPromises.push( |
| | openai.beta.threads.messages.update(apiMessage.id, { |
| | thread_id, |
| | metadata: { |
| | messageId: dbMessage.messageId, |
| | }, |
| | }), |
| | ); |
| | }; |
| |
|
| | let lastMessage = null; |
| |
|
| | for (let i = 0; i < apiMessages.length; i++) { |
| | const apiMessage = apiMessages[i]; |
| |
|
| | |
| | const dbMessageId = apiMessage.metadata && apiMessage.metadata.messageId; |
| | let dbMessage = dbMessageMap.get(dbMessageId); |
| |
|
| | if (dbMessage) { |
| | |
| | dbMessage.parentMessageId = lastMessage ? lastMessage.messageId : Constants.NO_PARENT; |
| | lastMessage = dbMessage; |
| | result.push(dbMessage); |
| | continue; |
| | } |
| |
|
| | if (apiMessage.role === 'assistant' && lastMessage && lastMessage.role === 'assistant') { |
| | |
| | lastMessage.content = [...lastMessage.content, ...apiMessage.content]; |
| | lastMessage.files = [...(lastMessage.files ?? []), ...(apiMessage.files ?? [])]; |
| | lastMessage.aggregateMessages.push({ id: apiMessage.id }); |
| | } else { |
| | |
| | const newMessage = { |
| | thread_id, |
| | conversationId, |
| | messageId: v4(), |
| | endpoint, |
| | parentMessageId: lastMessage ? lastMessage.messageId : Constants.NO_PARENT, |
| | role: apiMessage.role, |
| | isCreatedByUser: apiMessage.role === 'user', |
| | |
| | content: apiMessage.content, |
| | aggregateMessages: [{ id: apiMessage.id }], |
| | model: apiMessage.role === 'user' ? null : apiMessage.assistant_id, |
| | user: openai.req.user.id, |
| | unfinished: false, |
| | }; |
| |
|
| | if (apiMessage.file_ids?.length) { |
| | |
| | newMessage.files = apiMessage.file_ids.map((file_id) => ({ file_id })); |
| | } |
| |
|
| | |
| | if (assistant_id && apiMessage.role === 'assistant' && !newMessage.model) { |
| | apiMessage.model = assistant_id; |
| | newMessage.model = assistant_id; |
| | } |
| |
|
| | result.push(newMessage); |
| | lastMessage = newMessage; |
| |
|
| | if (apiMessage.role === 'user') { |
| | processNewMessage({ dbMessage: newMessage, apiMessage }); |
| | continue; |
| | } |
| | } |
| |
|
| | const nextMessage = apiMessages[i + 1]; |
| | const processAssistant = !nextMessage || nextMessage.role === 'user'; |
| |
|
| | if (apiMessage.role === 'assistant' && processAssistant) { |
| | processNewMessage({ dbMessage: lastMessage, apiMessage }); |
| | } |
| | } |
| |
|
| | const attached_file_ids = apiMessages.reduce((acc, msg) => { |
| | if (msg.role === 'user' && msg.file_ids?.length) { |
| | return [...acc, ...msg.file_ids]; |
| | } |
| |
|
| | return acc; |
| | }, []); |
| |
|
| | await Promise.all(modifyPromises); |
| | await Promise.all(recordPromises); |
| |
|
| | await saveConvo( |
| | openai.req, |
| | { |
| | conversationId, |
| | file_ids: attached_file_ids, |
| | }, |
| | { context: 'api/server/services/Threads/manage.js #syncMessages' }, |
| | ); |
| |
|
| | return result; |
| | } |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | function mapMessagesToSteps(steps, messages) { |
| | |
| | const messageMap = messages.reduce((acc, msg) => { |
| | acc[msg.id] = msg; |
| | return acc; |
| | }, {}); |
| |
|
| | |
| | return steps |
| | .sort((a, b) => a.created_at - b.created_at) |
| | .map((step) => { |
| | const messageId = step.step_details?.message_creation?.message_id; |
| |
|
| | if (messageId && messageMap[messageId]) { |
| | return { step, message: messageMap[messageId] }; |
| | } |
| | return step; |
| | }); |
| | } |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | async function checkMessageGaps({ |
| | openai, |
| | endpoint, |
| | latestMessageId, |
| | thread_id, |
| | run_id, |
| | conversationId, |
| | }) { |
| | const promises = []; |
| | promises.push(openai.beta.threads.messages.list(thread_id, defaultOrderQuery)); |
| | promises.push(openai.beta.threads.runs.steps.list(run_id, { thread_id })); |
| | |
| | const [response, stepsResponse] = await Promise.all(promises); |
| |
|
| | const steps = mapMessagesToSteps(stepsResponse.data, response.data); |
| | |
| | const currentMessage = { |
| | id: v4(), |
| | content: [], |
| | assistant_id: null, |
| | created_at: Math.floor(new Date().getTime() / 1000), |
| | object: 'thread.message', |
| | role: 'assistant', |
| | run_id, |
| | thread_id, |
| | endpoint, |
| | metadata: { |
| | messageId: latestMessageId, |
| | }, |
| | }; |
| |
|
| | for (const step of steps) { |
| | if (!currentMessage.assistant_id && step.assistant_id) { |
| | currentMessage.assistant_id = step.assistant_id; |
| | } |
| | if (step.message) { |
| | currentMessage.id = step.message.id; |
| | currentMessage.created_at = step.message.created_at; |
| | currentMessage.content = currentMessage.content.concat(step.message.content); |
| | } else if (step.step_details?.type === 'tool_calls' && step.step_details?.tool_calls?.length) { |
| | currentMessage.content = currentMessage.content.concat( |
| | step.step_details?.tool_calls.map((toolCall) => ({ |
| | [ContentTypes.TOOL_CALL]: { |
| | ...toolCall, |
| | progress: 2, |
| | }, |
| | type: ContentTypes.TOOL_CALL, |
| | })), |
| | ); |
| | } |
| | } |
| |
|
| | let addedCurrentMessage = false; |
| | const apiMessages = response.data |
| | .map((msg) => { |
| | if (msg.id === currentMessage.id) { |
| | addedCurrentMessage = true; |
| | return currentMessage; |
| | } |
| | return msg; |
| | }) |
| | .sort((a, b) => new Date(a.created_at) - new Date(b.created_at)); |
| |
|
| | if (!addedCurrentMessage) { |
| | apiMessages.push(currentMessage); |
| | } |
| |
|
| | const dbMessages = await getMessages({ conversationId }); |
| | const assistant_id = dbMessages?.[0]?.model; |
| |
|
| | const syncedMessages = await syncMessages({ |
| | openai, |
| | endpoint, |
| | thread_id, |
| | dbMessages, |
| | apiMessages, |
| | assistant_id, |
| | conversationId, |
| | }); |
| |
|
| | return Object.values( |
| | [...dbMessages, ...syncedMessages].reduce( |
| | (acc, message) => ({ ...acc, [message.messageId]: message }), |
| | {}, |
| | ), |
| | ); |
| | } |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | const recordUsage = async ({ |
| | prompt_tokens, |
| | completion_tokens, |
| | model, |
| | user, |
| | conversationId, |
| | context = 'message', |
| | }) => { |
| | await spendTokens( |
| | { |
| | user, |
| | model, |
| | context, |
| | conversationId, |
| | }, |
| | { promptTokens: prompt_tokens, completionTokens: completion_tokens }, |
| | ); |
| | }; |
| |
|
| | const uniqueCitationStart = '^====||==='; |
| | const uniqueCitationEnd = '==|||||^'; |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | async function processMessages({ openai, client, messages = [] }) { |
| | const sorted = messages.sort((a, b) => a.created_at - b.created_at); |
| |
|
| | let text = ''; |
| | let edited = false; |
| | const sources = new Map(); |
| | const fileRetrievalPromises = []; |
| |
|
| | for (const message of sorted) { |
| | message.files = []; |
| | for (const content of message.content) { |
| | const type = content.type; |
| | const contentType = content[type]; |
| | const currentFileId = contentType?.file_id; |
| |
|
| | if (type === ContentTypes.IMAGE_FILE && !client.processedFileIds.has(currentFileId)) { |
| | fileRetrievalPromises.push( |
| | retrieveAndProcessFile({ |
| | openai, |
| | client, |
| | file_id: currentFileId, |
| | basename: `${currentFileId}.png`, |
| | }) |
| | .then((file) => { |
| | client.processedFileIds.add(currentFileId); |
| | message.files.push(file); |
| | }) |
| | .catch((error) => { |
| | console.error(`Failed to retrieve file: ${error.message}`); |
| | }), |
| | ); |
| | continue; |
| | } |
| |
|
| | let currentText = contentType?.value ?? ''; |
| |
|
| | |
| | const { annotations } = contentType ?? {}; |
| |
|
| | if (!annotations?.length) { |
| | text += currentText; |
| | continue; |
| | } |
| |
|
| | const replacements = []; |
| | const annotationPromises = annotations.map(async (annotation) => { |
| | const type = annotation.type; |
| | const annotationType = annotation[type]; |
| | const file_id = annotationType?.file_id; |
| | const alreadyProcessed = client.processedFileIds.has(file_id); |
| |
|
| | let file; |
| | let replacementText = ''; |
| |
|
| | try { |
| | if (alreadyProcessed) { |
| | file = await retrieveAndProcessFile({ openai, client, file_id, unknownType: true }); |
| | } else if (type === AnnotationTypes.FILE_PATH) { |
| | const basename = path.basename(annotation.text); |
| | file = await retrieveAndProcessFile({ |
| | openai, |
| | client, |
| | file_id, |
| | basename, |
| | }); |
| | replacementText = file.filepath; |
| | } else if (type === AnnotationTypes.FILE_CITATION && file_id) { |
| | file = await retrieveAndProcessFile({ |
| | openai, |
| | client, |
| | file_id, |
| | unknownType: true, |
| | }); |
| | if (file && file.filename) { |
| | if (!sources.has(file.filename)) { |
| | sources.set(file.filename, sources.size + 1); |
| | } |
| | replacementText = `${uniqueCitationStart}${sources.get( |
| | file.filename, |
| | )}${uniqueCitationEnd}`; |
| | } |
| | } |
| |
|
| | if (file && replacementText) { |
| | replacements.push({ |
| | start: annotation.start_index, |
| | end: annotation.end_index, |
| | text: replacementText, |
| | }); |
| | edited = true; |
| | if (!alreadyProcessed) { |
| | client.processedFileIds.add(file_id); |
| | message.files.push(file); |
| | } |
| | } |
| | } catch (error) { |
| | console.error(`Failed to process annotation: ${error.message}`); |
| | } |
| | }); |
| |
|
| | await Promise.all(annotationPromises); |
| |
|
| | |
| | replacements.sort((a, b) => b.start - a.start); |
| | for (const { start, end, text: replacementText } of replacements) { |
| | currentText = currentText.slice(0, start) + replacementText + currentText.slice(end); |
| | } |
| |
|
| | text += currentText; |
| | } |
| | } |
| |
|
| | await Promise.all(fileRetrievalPromises); |
| |
|
| | |
| | const adjacentCitationRegex = new RegExp( |
| | `${escapeRegExp(uniqueCitationStart)}(\\d+)${escapeRegExp( |
| | uniqueCitationEnd, |
| | )}(\\s*)${escapeRegExp(uniqueCitationStart)}(\\d+)${escapeRegExp(uniqueCitationEnd)}`, |
| | 'g', |
| | ); |
| | text = text.replace(adjacentCitationRegex, (match, num1, space, num2) => { |
| | return num1 === num2 |
| | ? `${uniqueCitationStart}${num1}${uniqueCitationEnd}` |
| | : `${uniqueCitationStart}${num1}${uniqueCitationEnd}${space}${uniqueCitationStart}${num2}${uniqueCitationEnd}`; |
| | }); |
| |
|
| | |
| | const remainingAdjacentRegex = new RegExp( |
| | `(${escapeRegExp(uniqueCitationStart)}(\\d+)${escapeRegExp(uniqueCitationEnd)})\\s*\\1+`, |
| | 'g', |
| | ); |
| | text = text.replace(remainingAdjacentRegex, '$1'); |
| |
|
| | |
| | text = text.replace(new RegExp(escapeRegExp(uniqueCitationStart), 'g'), '^'); |
| | text = text.replace(new RegExp(escapeRegExp(uniqueCitationEnd), 'g'), '^'); |
| |
|
| | if (sources.size) { |
| | text += '\n\n'; |
| | Array.from(sources.entries()).forEach(([source, index], arrayIndex) => { |
| | text += `^${index}.^ ${source}${arrayIndex === sources.size - 1 ? '' : '\n'}`; |
| | }); |
| | } |
| |
|
| | return { messages: sorted, text, edited }; |
| | } |
| |
|
| | module.exports = { |
| | initThread, |
| | recordUsage, |
| | processMessages, |
| | saveUserMessage, |
| | checkMessageGaps, |
| | addThreadMetadata, |
| | mapMessagesToSteps, |
| | saveAssistantMessage, |
| | }; |
| |
|