| | const { klona } = require('klona'); |
| | const { sleep } = require('@librechat/agents'); |
| | const { sendEvent } = require('@librechat/api'); |
| | const { logger } = require('@librechat/data-schemas'); |
| | const { |
| | StepTypes, |
| | RunStatus, |
| | StepStatus, |
| | ContentTypes, |
| | ToolCallTypes, |
| | imageGenTools, |
| | EModelEndpoint, |
| | defaultOrderQuery, |
| | } = require('librechat-data-provider'); |
| | const { retrieveAndProcessFile } = require('~/server/services/Files/process'); |
| | const { processRequiredActions } = require('~/server/services/ToolService'); |
| | const { RunManager, waitForRun } = require('~/server/services/Runs'); |
| | const { processMessages } = require('~/server/services/Threads'); |
| | const { createOnProgress } = require('~/server/utils'); |
| | const { TextStream } = require('~/app/clients'); |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | async function createOnTextProgress({ |
| | openai, |
| | conversationId, |
| | userMessageId, |
| | messageId, |
| | thread_id, |
| | }) { |
| | openai.responseMessage = { |
| | conversationId, |
| | parentMessageId: userMessageId, |
| | role: 'assistant', |
| | messageId, |
| | content: [], |
| | }; |
| |
|
| | openai.responseText = ''; |
| |
|
| | openai.addContentData = (data) => { |
| | const { type, index } = data; |
| | openai.responseMessage.content[index] = { type, [type]: data[type] }; |
| |
|
| | if (type === ContentTypes.TEXT) { |
| | openai.responseText += data[type].value; |
| | return; |
| | } |
| |
|
| | const contentData = { |
| | index, |
| | type, |
| | [type]: data[type], |
| | messageId, |
| | thread_id, |
| | conversationId, |
| | }; |
| |
|
| | logger.debug('Content data:', contentData); |
| | sendEvent(openai.res, contentData); |
| | }; |
| | } |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | async function getResponse({ openai, run_id, thread_id }) { |
| | const run = await waitForRun({ openai, run_id, thread_id, pollIntervalMs: 2000 }); |
| |
|
| | if (run.status === RunStatus.COMPLETED) { |
| | const messages = await openai.beta.threads.messages.list(thread_id, defaultOrderQuery); |
| | const newMessages = messages.data.filter((msg) => msg.run_id === run_id); |
| |
|
| | return newMessages; |
| | } else if (run.status === RunStatus.REQUIRES_ACTION) { |
| | const actions = []; |
| | run.required_action?.submit_tool_outputs.tool_calls.forEach((item) => { |
| | const functionCall = item.function; |
| | const args = JSON.parse(functionCall.arguments); |
| | actions.push({ |
| | tool: functionCall.name, |
| | toolInput: args, |
| | toolCallId: item.id, |
| | run_id, |
| | thread_id, |
| | }); |
| | }); |
| |
|
| | return actions; |
| | } |
| |
|
| | const runInfo = JSON.stringify(run, null, 2); |
| | throw new Error(`Unexpected run status ${run.status}.\nFull run info:\n\n${runInfo}`); |
| | } |
| |
|
| | |
| | |
| | |
| | |
| | |
| | function filterSteps(steps = []) { |
| | if (steps.length <= 1) { |
| | return steps; |
| | } |
| | const stepMap = new Map(); |
| |
|
| | steps.forEach((step) => { |
| | if (!step) { |
| | return; |
| | } |
| |
|
| | const effectiveTimestamp = Math.max( |
| | step.created_at, |
| | step.expired_at || 0, |
| | step.cancelled_at || 0, |
| | step.failed_at || 0, |
| | step.completed_at || 0, |
| | ); |
| |
|
| | if (!stepMap.has(step.id) || effectiveTimestamp > stepMap.get(step.id).effectiveTimestamp) { |
| | const latestStep = { ...step, effectiveTimestamp }; |
| | if (latestStep.last_error) { |
| | |
| | } |
| | stepMap.set(step.id, latestStep); |
| | } |
| | }); |
| |
|
| | return Array.from(stepMap.values()).map((step) => { |
| | delete step.effectiveTimestamp; |
| | return step; |
| | }); |
| | } |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | function hasToolCallChanged(previousCall, currentCall) { |
| | return JSON.stringify(previousCall) !== JSON.stringify(currentCall); |
| | } |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | function createInProgressHandler(openai, thread_id, messages) { |
| | openai.index = 0; |
| | openai.mappedOrder = new Map(); |
| | openai.seenToolCalls = new Map(); |
| | openai.processedFileIds = new Set(); |
| | openai.completeToolCallSteps = new Set(); |
| | openai.seenCompletedMessages = new Set(); |
| |
|
| | |
| | |
| | |
| | |
| | |
| | async function in_progress({ step }) { |
| | if (step.type === StepTypes.TOOL_CALLS) { |
| | const { tool_calls } = step.step_details; |
| |
|
| | for (const _toolCall of tool_calls) { |
| | |
| | const toolCall = _toolCall; |
| | const previousCall = openai.seenToolCalls.get(toolCall.id); |
| |
|
| | |
| | if (previousCall && !hasToolCallChanged(previousCall, toolCall)) { |
| | continue; |
| | } |
| |
|
| | let toolCallIndex = openai.mappedOrder.get(toolCall.id); |
| | if (toolCallIndex === undefined) { |
| | |
| | toolCallIndex = openai.index; |
| | openai.mappedOrder.set(toolCall.id, openai.index); |
| | openai.index++; |
| | } |
| |
|
| | if (step.status === StepStatus.IN_PROGRESS) { |
| | toolCall.progress = |
| | previousCall && previousCall.progress |
| | ? Math.min(previousCall.progress + 0.2, 0.95) |
| | : 0.01; |
| | } else { |
| | toolCall.progress = 1; |
| | openai.completeToolCallSteps.add(step.id); |
| | } |
| |
|
| | if ( |
| | toolCall.type === ToolCallTypes.CODE_INTERPRETER && |
| | step.status === StepStatus.COMPLETED |
| | ) { |
| | const { outputs } = toolCall[toolCall.type]; |
| |
|
| | for (const output of outputs) { |
| | if (output.type !== 'image') { |
| | continue; |
| | } |
| |
|
| | if (openai.processedFileIds.has(output.image?.file_id)) { |
| | continue; |
| | } |
| |
|
| | const { file_id } = output.image; |
| | const file = await retrieveAndProcessFile({ |
| | openai, |
| | client: openai, |
| | file_id, |
| | basename: `${file_id}.png`, |
| | }); |
| |
|
| | const prelimImage = file; |
| |
|
| | |
| | const prelimImageKeys = Object.keys(prelimImage); |
| | const validImageFile = prelimImageKeys.every((key) => prelimImage[key]); |
| |
|
| | if (!validImageFile) { |
| | continue; |
| | } |
| |
|
| | const image_file = { |
| | [ContentTypes.IMAGE_FILE]: prelimImage, |
| | type: ContentTypes.IMAGE_FILE, |
| | index: openai.index, |
| | }; |
| | openai.addContentData(image_file); |
| | openai.processedFileIds.add(file_id); |
| | openai.index++; |
| | } |
| | } else if ( |
| | toolCall.type === ToolCallTypes.FUNCTION && |
| | step.status === StepStatus.COMPLETED && |
| | imageGenTools.has(toolCall[toolCall.type].name) |
| | ) { |
| | |
| | openai.seenToolCalls.set(toolCall.id, toolCall); |
| | continue; |
| | } |
| |
|
| | openai.addContentData({ |
| | [ContentTypes.TOOL_CALL]: toolCall, |
| | index: toolCallIndex, |
| | type: ContentTypes.TOOL_CALL, |
| | }); |
| |
|
| | |
| | openai.seenToolCalls.set(toolCall.id, toolCall); |
| | } |
| | } else if (step.type === StepTypes.MESSAGE_CREATION && step.status === StepStatus.COMPLETED) { |
| | const { message_id } = step.step_details.message_creation; |
| | if (openai.seenCompletedMessages.has(message_id)) { |
| | return; |
| | } |
| |
|
| | openai.seenCompletedMessages.add(message_id); |
| |
|
| | const message = await openai.beta.threads.messages.retrieve(message_id, { thread_id }); |
| | if (!message?.content?.length) { |
| | return; |
| | } |
| | messages.push(message); |
| |
|
| | let messageIndex = openai.mappedOrder.get(step.id); |
| | if (messageIndex === undefined) { |
| | |
| | messageIndex = openai.index; |
| | openai.mappedOrder.set(step.id, openai.index); |
| | openai.index++; |
| | } |
| |
|
| | const result = await processMessages({ openai, client: openai, messages: [message] }); |
| | openai.addContentData({ |
| | [ContentTypes.TEXT]: { value: result.text }, |
| | type: ContentTypes.TEXT, |
| | index: messageIndex, |
| | }); |
| |
|
| | |
| | const { onProgress: progressCallback } = createOnProgress({ |
| | |
| | |
| | }); |
| |
|
| | |
| | |
| | const onProgress = progressCallback({ |
| | res: openai.res, |
| | index: messageIndex, |
| | messageId: openai.responseMessage.messageId, |
| | conversationId: openai.responseMessage.conversationId, |
| | type: ContentTypes.TEXT, |
| | thread_id, |
| | }); |
| |
|
| | |
| | await sleep(500); |
| |
|
| | const stream = new TextStream(result.text, { delay: 9 }); |
| | await stream.processTextStream(onProgress); |
| | } |
| | } |
| |
|
| | return in_progress; |
| | } |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | async function runAssistant({ |
| | openai, |
| | run_id, |
| | thread_id, |
| | accumulatedSteps = [], |
| | accumulatedMessages = [], |
| | in_progress: inProgress, |
| | }) { |
| | const appConfig = openai.req.config; |
| | let steps = accumulatedSteps; |
| | let messages = accumulatedMessages; |
| | const in_progress = inProgress ?? createInProgressHandler(openai, thread_id, messages); |
| | openai.in_progress = in_progress; |
| |
|
| | const runManager = new RunManager({ |
| | in_progress, |
| | final: async ({ step, runStatus, stepsByStatus }) => { |
| | logger.debug(`[runAssistant] Final step for ${run_id} with status ${runStatus}`, step); |
| |
|
| | const promises = []; |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| |
|
| | |
| | for (const [_status, stepsPromises] of Object.entries(stepsByStatus)) { |
| | promises.push(...stepsPromises); |
| | } |
| |
|
| | const resolved = await Promise.all(promises); |
| | const finalSteps = filterSteps(steps.concat(resolved)); |
| |
|
| | if (step.type === StepTypes.MESSAGE_CREATION) { |
| | const incompleteToolCallSteps = finalSteps.filter( |
| | (s) => s && s.type === StepTypes.TOOL_CALLS && !openai.completeToolCallSteps.has(s.id), |
| | ); |
| | for (const incompleteToolCallStep of incompleteToolCallSteps) { |
| | await in_progress({ step: incompleteToolCallStep }); |
| | } |
| | } |
| | await in_progress({ step }); |
| | |
| | |
| | resolved.push(step); |
| | |
| | steps = klona(finalSteps); |
| | }, |
| | }); |
| |
|
| | const { endpoint = EModelEndpoint.azureAssistants } = openai.req.body; |
| | |
| | const assistantsEndpointConfig = appConfig.endpoints?.[endpoint] ?? {}; |
| | const { pollIntervalMs, timeoutMs } = assistantsEndpointConfig; |
| |
|
| | const run = await waitForRun({ |
| | openai, |
| | run_id, |
| | thread_id, |
| | runManager, |
| | pollIntervalMs, |
| | timeout: timeoutMs, |
| | }); |
| |
|
| | if (!run.required_action) { |
| | |
| | |
| | const sortedMessages = messages.sort((a, b) => a.created_at - b.created_at); |
| | return { |
| | run, |
| | steps, |
| | messages: sortedMessages, |
| | finalMessage: openai.responseMessage, |
| | text: openai.responseText, |
| | }; |
| | } |
| |
|
| | const { submit_tool_outputs } = run.required_action; |
| | const actions = submit_tool_outputs.tool_calls.map((item) => { |
| | const functionCall = item.function; |
| | const args = JSON.parse(functionCall.arguments); |
| | return { |
| | tool: functionCall.name, |
| | toolInput: args, |
| | toolCallId: item.id, |
| | run_id, |
| | thread_id, |
| | }; |
| | }); |
| |
|
| | const tool_outputs = await processRequiredActions(openai, actions); |
| | const toolRun = await openai.beta.threads.runs.submitToolOutputs(run.id, { |
| | thread_id: run.thread_id, |
| | tool_outputs, |
| | }); |
| |
|
| | |
| | return await runAssistant({ |
| | openai, |
| | run_id: toolRun.id, |
| | thread_id, |
| | accumulatedSteps: steps, |
| | accumulatedMessages: messages, |
| | in_progress, |
| | }); |
| | } |
| |
|
| | module.exports = { |
| | getResponse, |
| | runAssistant, |
| | createOnTextProgress, |
| | }; |
| |
|