File size: 12,440 Bytes
f0743f4 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 | const { logger } = require('@librechat/data-schemas');
const { getMultiplier, getCacheMultiplier } = require('./tx');
const { Transaction, Balance } = require('~/db/models');
const cancelRate = 1.15;
/**
* Updates a user's token balance based on a transaction using optimistic concurrency control
* without schema changes. Compatible with DocumentDB.
* @async
* @function
* @param {Object} params - The function parameters.
* @param {string|mongoose.Types.ObjectId} params.user - The user ID.
* @param {number} params.incrementValue - The value to increment the balance by (can be negative).
* @param {import('mongoose').UpdateQuery<import('@librechat/data-schemas').IBalance>['$set']} [params.setValues] - Optional additional fields to set.
* @returns {Promise<Object>} Returns the updated balance document (lean).
* @throws {Error} Throws an error if the update fails after multiple retries.
*/
const updateBalance = async ({ user, incrementValue, setValues }) => {
let maxRetries = 10; // Number of times to retry on conflict
let delay = 50; // Initial retry delay in ms
let lastError = null;
for (let attempt = 1; attempt <= maxRetries; attempt++) {
let currentBalanceDoc;
try {
// 1. Read the current document state
currentBalanceDoc = await Balance.findOne({ user }).lean();
const currentCredits = currentBalanceDoc ? currentBalanceDoc.tokenCredits : 0;
// 2. Calculate the desired new state
const potentialNewCredits = currentCredits + incrementValue;
const newCredits = Math.max(0, potentialNewCredits); // Ensure balance doesn't go below zero
// 3. Prepare the update payload
const updatePayload = {
$set: {
tokenCredits: newCredits,
...(setValues || {}), // Merge other values to set
},
};
// 4. Attempt the conditional update or upsert
let updatedBalance = null;
if (currentBalanceDoc) {
// --- Document Exists: Perform Conditional Update ---
// Try to update only if the tokenCredits match the value we read (currentCredits)
updatedBalance = await Balance.findOneAndUpdate(
{
user: user,
tokenCredits: currentCredits, // Optimistic lock: condition based on the read value
},
updatePayload,
{
new: true, // Return the modified document
// lean: true, // .lean() is applied after query execution in Mongoose >= 6
},
).lean(); // Use lean() for plain JS object
if (updatedBalance) {
// Success! The update was applied based on the expected current state.
return updatedBalance;
}
// If updatedBalance is null, it means tokenCredits changed between read and write (conflict).
lastError = new Error(`Concurrency conflict for user ${user} on attempt ${attempt}.`);
// Proceed to retry logic below.
} else {
// --- Document Does Not Exist: Perform Conditional Upsert ---
// Try to insert the document, but only if it still doesn't exist.
// Using tokenCredits: {$exists: false} helps prevent race conditions where
// another process creates the doc between our findOne and findOneAndUpdate.
try {
updatedBalance = await Balance.findOneAndUpdate(
{
user: user,
// Attempt to match only if the document doesn't exist OR was just created
// without tokenCredits (less likely but possible). A simple { user } filter
// might also work, relying on the retry for conflicts.
// Let's use a simpler filter and rely on retry for races.
// tokenCredits: { $exists: false } // This condition might be too strict if doc exists with 0 credits
},
updatePayload,
{
upsert: true, // Create if doesn't exist
new: true, // Return the created/updated document
// setDefaultsOnInsert: true, // Ensure schema defaults are applied on insert
// lean: true,
},
).lean();
if (updatedBalance) {
// Upsert succeeded (likely created the document)
return updatedBalance;
}
// If null, potentially a rare race condition during upsert. Retry should handle it.
lastError = new Error(
`Upsert race condition suspected for user ${user} on attempt ${attempt}.`,
);
} catch (error) {
if (error.code === 11000) {
// E11000 duplicate key error on index
// This means another process created the document *just* before our upsert.
// It's a concurrency conflict during creation. We should retry.
lastError = error; // Store the error
// Proceed to retry logic below.
} else {
// Different error, rethrow
throw error;
}
}
} // End if/else (document exists?)
} catch (error) {
// Catch errors from findOne or unexpected findOneAndUpdate errors
logger.error(`[updateBalance] Error during attempt ${attempt} for user ${user}:`, error);
lastError = error; // Store the error
// Consider stopping retries for non-transient errors, but for now, we retry.
}
// If we reached here, it means the update failed (conflict or error), wait and retry
if (attempt < maxRetries) {
const jitter = Math.random() * delay * 0.5; // Add jitter to delay
await new Promise((resolve) => setTimeout(resolve, delay + jitter));
delay = Math.min(delay * 2, 2000); // Exponential backoff with cap
}
} // End for loop (retries)
// If loop finishes without success, throw the last encountered error or a generic one
logger.error(
`[updateBalance] Failed to update balance for user ${user} after ${maxRetries} attempts.`,
);
throw (
lastError ||
new Error(
`Failed to update balance for user ${user} after maximum retries due to persistent conflicts.`,
)
);
};
/** Method to calculate and set the tokenValue for a transaction */
function calculateTokenValue(txn) {
if (!txn.valueKey || !txn.tokenType) {
txn.tokenValue = txn.rawAmount;
}
const { valueKey, tokenType, model, endpointTokenConfig } = txn;
const multiplier = Math.abs(getMultiplier({ valueKey, tokenType, model, endpointTokenConfig }));
txn.rate = multiplier;
txn.tokenValue = txn.rawAmount * multiplier;
if (txn.context && txn.tokenType === 'completion' && txn.context === 'incomplete') {
txn.tokenValue = Math.ceil(txn.tokenValue * cancelRate);
txn.rate *= cancelRate;
}
}
/**
* New static method to create an auto-refill transaction that does NOT trigger a balance update.
* @param {object} txData - Transaction data.
* @param {string} txData.user - The user ID.
* @param {string} txData.tokenType - The type of token.
* @param {string} txData.context - The context of the transaction.
* @param {number} txData.rawAmount - The raw amount of tokens.
* @returns {Promise<object>} - The created transaction.
*/
async function createAutoRefillTransaction(txData) {
if (txData.rawAmount != null && isNaN(txData.rawAmount)) {
return;
}
const transaction = new Transaction(txData);
transaction.endpointTokenConfig = txData.endpointTokenConfig;
calculateTokenValue(transaction);
await transaction.save();
const balanceResponse = await updateBalance({
user: transaction.user,
incrementValue: txData.rawAmount,
setValues: { lastRefill: new Date() },
});
const result = {
rate: transaction.rate,
user: transaction.user.toString(),
balance: balanceResponse.tokenCredits,
};
logger.debug('[Balance.check] Auto-refill performed', result);
result.transaction = transaction;
return result;
}
/**
* Static method to create a transaction and update the balance
* @param {txData} _txData - Transaction data.
*/
async function createTransaction(_txData) {
const { balance, transactions, ...txData } = _txData;
if (txData.rawAmount != null && isNaN(txData.rawAmount)) {
return;
}
if (transactions?.enabled === false) {
return;
}
const transaction = new Transaction(txData);
transaction.endpointTokenConfig = txData.endpointTokenConfig;
calculateTokenValue(transaction);
await transaction.save();
if (!balance?.enabled) {
return;
}
let incrementValue = transaction.tokenValue;
const balanceResponse = await updateBalance({
user: transaction.user,
incrementValue,
});
return {
rate: transaction.rate,
user: transaction.user.toString(),
balance: balanceResponse.tokenCredits,
[transaction.tokenType]: incrementValue,
};
}
/**
* Static method to create a structured transaction and update the balance
* @param {txData} _txData - Transaction data.
*/
async function createStructuredTransaction(_txData) {
const { balance, transactions, ...txData } = _txData;
if (transactions?.enabled === false) {
return;
}
const transaction = new Transaction({
...txData,
endpointTokenConfig: txData.endpointTokenConfig,
});
calculateStructuredTokenValue(transaction);
await transaction.save();
if (!balance?.enabled) {
return;
}
let incrementValue = transaction.tokenValue;
const balanceResponse = await updateBalance({
user: transaction.user,
incrementValue,
});
return {
rate: transaction.rate,
user: transaction.user.toString(),
balance: balanceResponse.tokenCredits,
[transaction.tokenType]: incrementValue,
};
}
/** Method to calculate token value for structured tokens */
function calculateStructuredTokenValue(txn) {
if (!txn.tokenType) {
txn.tokenValue = txn.rawAmount;
return;
}
const { model, endpointTokenConfig } = txn;
if (txn.tokenType === 'prompt') {
const inputMultiplier = getMultiplier({ tokenType: 'prompt', model, endpointTokenConfig });
const writeMultiplier =
getCacheMultiplier({ cacheType: 'write', model, endpointTokenConfig }) ?? inputMultiplier;
const readMultiplier =
getCacheMultiplier({ cacheType: 'read', model, endpointTokenConfig }) ?? inputMultiplier;
txn.rateDetail = {
input: inputMultiplier,
write: writeMultiplier,
read: readMultiplier,
};
const totalPromptTokens =
Math.abs(txn.inputTokens || 0) +
Math.abs(txn.writeTokens || 0) +
Math.abs(txn.readTokens || 0);
if (totalPromptTokens > 0) {
txn.rate =
(Math.abs(inputMultiplier * (txn.inputTokens || 0)) +
Math.abs(writeMultiplier * (txn.writeTokens || 0)) +
Math.abs(readMultiplier * (txn.readTokens || 0))) /
totalPromptTokens;
} else {
txn.rate = Math.abs(inputMultiplier); // Default to input rate if no tokens
}
txn.tokenValue = -(
Math.abs(txn.inputTokens || 0) * inputMultiplier +
Math.abs(txn.writeTokens || 0) * writeMultiplier +
Math.abs(txn.readTokens || 0) * readMultiplier
);
txn.rawAmount = -totalPromptTokens;
} else if (txn.tokenType === 'completion') {
const multiplier = getMultiplier({ tokenType: txn.tokenType, model, endpointTokenConfig });
txn.rate = Math.abs(multiplier);
txn.tokenValue = -Math.abs(txn.rawAmount) * multiplier;
txn.rawAmount = -Math.abs(txn.rawAmount);
}
if (txn.context && txn.tokenType === 'completion' && txn.context === 'incomplete') {
txn.tokenValue = Math.ceil(txn.tokenValue * cancelRate);
txn.rate *= cancelRate;
if (txn.rateDetail) {
txn.rateDetail = Object.fromEntries(
Object.entries(txn.rateDetail).map(([k, v]) => [k, v * cancelRate]),
);
}
}
}
/**
* Queries and retrieves transactions based on a given filter.
* @async
* @function getTransactions
* @param {Object} filter - MongoDB filter object to apply when querying transactions.
* @returns {Promise<Array>} A promise that resolves to an array of matched transactions.
* @throws {Error} Throws an error if querying the database fails.
*/
async function getTransactions(filter) {
try {
return await Transaction.find(filter).lean();
} catch (error) {
logger.error('Error querying transactions:', error);
throw error;
}
}
module.exports = {
getTransactions,
createTransaction,
createAutoRefillTransaction,
createStructuredTransaction,
};
|