All files / src/lib concurrent-merger.ts

82.69% Statements 43/52
62.5% Branches 5/8
88.89% Functions 8/9
85% Lines 34/40

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 961x   1x                   1x   1x   1x   1x   1x     1x 1x         1x 1x     1x 6x 6x       6x           6x     1x   1x 6x 6x 6x       1x                 1x   1x   12x   6x   6x     6x     1x 1x   1x   1x         1x       1x  
import uuid from '../utils/uuid';
 
import { logger } from './logger';
 
/**
 * 单任务队列
 * 1. 被修饰的函数,同一时间只允许执行一个异步请求。
 * 2. 在异步请求完成之前,其他调用请求会被进入队列。
 * 3. 异步请求完成之后,队列中的请求会被释放,共用同一个结果。
 *
 * 使用场景:高并发秒杀场景、刷新登录态场景等
 */
export class ConcurrentMerger {
  // 最大队列负载
  public queueMaxLength = 100;
  // 单队列名称
  public name = 'unnamed';
  // 异步状态
  private locked = false;
  // 异步队列
  private queue: Array<any> = [];
 
  constructor({
    queueMaxLength,
    name,
  }: {
    queueMaxLength?: typeof ConcurrentMerger.prototype.queueMaxLength;
    name?: typeof ConcurrentMerger.prototype.name;
  }) {
    Iif (queueMaxLength) this.queueMaxLength = queueMaxLength;
    Eif (name) this.name = name;
  }
 
  private addJob(job: Array<any>) {
    const id = uuid();
    Iif (this.queue.length >= this.queueMaxLength) {
      throw new Error(`${this.name}-任务队列-超出容量:${this.queueMaxLength}`);
    }
 
    logger.info(
      `${this.name}-任务队列-入列(${this.queue.length + 1}/${
        this.queueMaxLength
      }, id: ${id})`
    );
 
    this.queue.push([...job, id]);
  }
 
  private consumeAllAsSuccess(result) {
    // eslint-disable-next-line functional/no-loop-statement
    while (this.queue.length > 0) {
      const [resolve, , id] = this.queue.shift();
      logger.info(`${this.name}-任务队列-消费-resolve(id: ${id})`);
      resolve(result);
    }
  }
 
  private consumeAllAsFail(error) {
    // eslint-disable-next-line functional/no-loop-statement
    while (this.queue.length > 0) {
      const [, reject, id] = this.queue.shift();
      logger.info(`${this.name}-任务队列-消费-reject(id: ${id})`);
      reject(error);
    }
  }
 
  public proxy(originMethods: (...any) => Promise<any>) {
    // eslint-disable-next-line @typescript-eslint/no-this-alias
    const that = this;
 
    return function (this: any, ...args) {
      // eslint-disable-next-line no-async-promise-executor
      return new Promise(async (resolve, reject) => {
        // 先入队
        that.addJob([resolve, reject]);
 
        // 通道被 locked,等待队列被消费。
        if (that.locked) return;
 
        // 发起原方法异步调用,然后 lock
        that.locked = true;
        logger.info(`${that.name}-任务队列-执行目标方法`);
        try {
          const result = await originMethods.apply(this, args);
          // 3. 结束之后,消费 queue
          that.consumeAllAsSuccess(result);
        } catch (error) {
          that.consumeAllAsFail(error);
        }
        // 4. 解除 lock
        that.locked = false;
      });
    };
  }
}