Press n or j to go to the next uncovered block, b, p or k for the previous block.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 | 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 6x 6x 6x 6x 1x 1x 6x 6x 6x 1x 1x 1x 12x 6x 6x 6x 1x 1x 1x 1x 1x 1x | import uuid from '../utils/uuid';
import { logger } from './logger';
/**
* 单任务队列
* 1. 被修饰的函数,同一时间只允许执行一个异步请求。
* 2. 在异步请求完成之前,其他调用请求会被进入队列。
* 3. 异步请求完成之后,队列中的请求会被释放,共用同一个结果。
*
* 使用场景:高并发秒杀场景、刷新登录态场景等
*/
export class ConcurrentMerger {
// 最大队列负载
public queueMaxLength = 100;
// 单队列名称
public name = 'unnamed';
// 异步状态
private locked = false;
// 异步队列
private queue: Array<any> = [];
constructor({
queueMaxLength,
name,
}: {
queueMaxLength?: typeof ConcurrentMerger.prototype.queueMaxLength;
name?: typeof ConcurrentMerger.prototype.name;
}) {
Iif (queueMaxLength) this.queueMaxLength = queueMaxLength;
Eif (name) this.name = name;
}
private addJob(job: Array<any>) {
const id = uuid();
Iif (this.queue.length >= this.queueMaxLength) {
throw new Error(`${this.name}-任务队列-超出容量:${this.queueMaxLength}`);
}
logger.info(
`${this.name}-任务队列-入列(${this.queue.length + 1}/${
this.queueMaxLength
}, id: ${id})`
);
this.queue.push([...job, id]);
}
private consumeAllAsSuccess(result) {
// eslint-disable-next-line functional/no-loop-statement
while (this.queue.length > 0) {
const [resolve, , id] = this.queue.shift();
logger.info(`${this.name}-任务队列-消费-resolve(id: ${id})`);
resolve(result);
}
}
private consumeAllAsFail(error) {
// eslint-disable-next-line functional/no-loop-statement
while (this.queue.length > 0) {
const [, reject, id] = this.queue.shift();
logger.info(`${this.name}-任务队列-消费-reject(id: ${id})`);
reject(error);
}
}
public proxy(originMethods: (...any) => Promise<any>) {
// eslint-disable-next-line @typescript-eslint/no-this-alias
const that = this;
return function (this: any, ...args) {
// eslint-disable-next-line no-async-promise-executor
return new Promise(async (resolve, reject) => {
// 先入队
that.addJob([resolve, reject]);
// 通道被 locked,等待队列被消费。
if (that.locked) return;
// 发起原方法异步调用,然后 lock
that.locked = true;
logger.info(`${that.name}-任务队列-执行目标方法`);
try {
const result = await originMethods.apply(this, args);
// 3. 结束之后,消费 queue
that.consumeAllAsSuccess(result);
} catch (error) {
that.consumeAllAsFail(error);
}
// 4. 解除 lock
that.locked = false;
});
};
}
}
|