File size: 3,261 Bytes
f0743f4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
const { sendEvent } = require('@librechat/api');
const { logger } = require('@librechat/data-schemas');
const { CacheKeys, RunStatus, isUUID } = require('librechat-data-provider');
const { initializeClient } = require('~/server/services/Endpoints/assistants');
const { checkMessageGaps, recordUsage } = require('~/server/services/Threads');
const { deleteMessages } = require('~/models/Message');
const { getConvo } = require('~/models/Conversation');
const getLogStores = require('~/cache/getLogStores');

const three_minutes = 1000 * 60 * 3;

async function abortRun(req, res) {
  res.setHeader('Content-Type', 'application/json');
  const { abortKey, endpoint } = req.body;
  const [conversationId, latestMessageId] = abortKey.split(':');
  const conversation = await getConvo(req.user.id, conversationId);

  if (conversation?.model) {
    req.body.model = conversation.model;
  }

  if (!isUUID.safeParse(conversationId).success) {
    logger.error('[abortRun] Invalid conversationId', { conversationId });
    return res.status(400).send({ message: 'Invalid conversationId' });
  }

  const cacheKey = `${req.user.id}:${conversationId}`;
  const cache = getLogStores(CacheKeys.ABORT_KEYS);
  const runValues = await cache.get(cacheKey);
  if (!runValues) {
    logger.warn('[abortRun] Run not found in cache', { cacheKey });
    return res.status(204).send({ message: 'Run not found' });
  }
  const [thread_id, run_id] = runValues.split(':');

  if (!run_id) {
    logger.warn("[abortRun] Couldn't find run for cancel request", { thread_id });
    return res.status(204).send({ message: 'Run not found' });
  } else if (run_id === 'cancelled') {
    logger.warn('[abortRun] Run already cancelled', { thread_id });
    return res.status(204).send({ message: 'Run already cancelled' });
  }

  let runMessages = [];
  /** @type {{ openai: OpenAI }} */
  const { openai } = await initializeClient({ req, res });

  try {
    await cache.set(cacheKey, 'cancelled', three_minutes);
    const cancelledRun = await openai.beta.threads.runs.cancel(run_id, { thread_id });
    logger.debug('[abortRun] Cancelled run:', cancelledRun);
  } catch (error) {
    logger.error('[abortRun] Error cancelling run', error);
    if (
      error?.message?.includes(RunStatus.CANCELLED) ||
      error?.message?.includes(RunStatus.CANCELLING)
    ) {
      return res.end();
    }
  }

  try {
    const run = await openai.beta.threads.runs.retrieve(run_id, { thread_id });
    await recordUsage({
      ...run.usage,
      model: run.model,
      user: req.user.id,
      conversationId,
    });
  } catch (error) {
    logger.error('[abortRun] Error fetching or processing run', error);
  }

  /* TODO: a reconciling strategy between the existing intermediate message would be more optimal than deleting it */
  await deleteMessages({
    user: req.user.id,
    unfinished: true,
    conversationId,
  });
  runMessages = await checkMessageGaps({
    openai,
    run_id,
    endpoint,
    thread_id,
    conversationId,
    latestMessageId,
  });

  const finalEvent = {
    final: true,
    conversation,
    runMessages,
  };

  if (res.headersSent && finalEvent) {
    return sendEvent(res, finalEvent);
  }

  res.json(finalEvent);
}

module.exports = {
  abortRun,
};