|
|
import { createLoggerWithContext } from "@midday/logger"; |
|
|
|
|
|
const logger = createLoggerWithContext("worker:batch"); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
export async function processBatch<T, R>( |
|
|
items: T[], |
|
|
batchSize: number, |
|
|
processor: (batch: T[]) => Promise<R[]>, |
|
|
): Promise<R[]> { |
|
|
const results: R[] = []; |
|
|
|
|
|
for (let i = 0; i < items.length; i += batchSize) { |
|
|
const batch = items.slice(i, i + batchSize); |
|
|
try { |
|
|
const batchResults = await processor(batch); |
|
|
results.push(...batchResults); |
|
|
} catch (error) { |
|
|
|
|
|
|
|
|
logger.error(`Batch processing failed at index ${i}`, { |
|
|
error: error instanceof Error ? error.message : String(error), |
|
|
}); |
|
|
throw error; |
|
|
} |
|
|
} |
|
|
|
|
|
return results; |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
export async function processBatchWithErrorIsolation<T, R>( |
|
|
items: T[], |
|
|
batchSize: number, |
|
|
processor: (batch: T[]) => Promise<R[]>, |
|
|
): Promise<{ |
|
|
results: R[]; |
|
|
errors: Array<{ index: number; error: unknown }>; |
|
|
}> { |
|
|
const results: R[] = []; |
|
|
const errors: Array<{ index: number; error: unknown }> = []; |
|
|
|
|
|
for (let i = 0; i < items.length; i += batchSize) { |
|
|
const batch = items.slice(i, i + batchSize); |
|
|
const batchIndex = Math.floor(i / batchSize); |
|
|
|
|
|
try { |
|
|
const batchResults = await processor(batch); |
|
|
results.push(...batchResults); |
|
|
} catch (error) { |
|
|
errors.push({ index: batchIndex, error }); |
|
|
|
|
|
} |
|
|
} |
|
|
|
|
|
return { results, errors }; |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
export async function processBatchParallel<T, R>( |
|
|
items: T[], |
|
|
batchSize: number, |
|
|
concurrency: number, |
|
|
processor: (batch: T[]) => Promise<R[]>, |
|
|
): Promise<R[]> { |
|
|
const results: R[] = []; |
|
|
const batches: T[][] = []; |
|
|
|
|
|
|
|
|
for (let i = 0; i < items.length; i += batchSize) { |
|
|
batches.push(items.slice(i, i + batchSize)); |
|
|
} |
|
|
|
|
|
|
|
|
for (let i = 0; i < batches.length; i += concurrency) { |
|
|
const concurrentBatches = batches.slice(i, i + concurrency); |
|
|
const batchResults = await Promise.all( |
|
|
concurrentBatches.map((batch) => processor(batch)), |
|
|
); |
|
|
results.push(...batchResults.flat()); |
|
|
} |
|
|
|
|
|
return results; |
|
|
} |
|
|
|