File size: 1,593 Bytes
2b7aae2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import { EventEmitter } from 'events';

interface DSPromiseOption {
	timeout?: number;
}

export function destructPromise<T>(
	options: DSPromiseOption = {}
): [promise: Promise<T>, resolve: (value: T | PromiseLike<T>) => void, reject: (reason?: any) => void] {
	const { timeout } = options;
	let rs: (value: T | PromiseLike<T>) => void;
	let rj: (reason: any) => void;

	return [
		new Promise((resolve, reject) => {
			rs = resolve;
			rj = reject;

			if (timeout >= 0) setTimeout(rj, timeout, 'timeout');
		}),
		rs,
		rj,
	];
}

type AsyncTask = [fn: (data: any) => Promise<any>, payload: any, resolve: (data: any) => void, reject: (reason: any) => void];

export class AsyncQueue extends EventEmitter {
	private working = false;

	tasks: AsyncTask[];

	constructor() {
		super();
		this.working = false;
		this.tasks = [];
		process.nextTick(() => {
			this.emit('idle');
		});
	}

	private async _digest(item: AsyncTask) {
		this.working = true;

		const [taskFn, payload, resolve, reject] = item;
		await taskFn(payload).then(resolve, reject);

		if (this.tasks.length > 0) {
			await this._digest(this.tasks.shift());
		} else {
			this.working = false;
			this.emit('idle');
		}
	}

	/**
	 * 添加队列任务
	 * @param task
	 * @param options
	 */
	addTask(task: [AsyncTask[0], AsyncTask[1]], { timeout = 600000 }: { timeout?: number } = {}): Promise<any> {
		const [promise, resolve, reject] = destructPromise({ timeout });

		if (this.working) {
			this.tasks.push([...task, resolve, reject]);
		} else {
			this._digest([...task, resolve, reject]);
		}

		return promise;
	}
}