如何控制nodejs的线程数

时间:2022-07-22
本文章向大家介绍如何控制nodejs的线程数,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

nodejs提供了线程能力,但是我们不能一味地开启线程,需要控制数据,本文分享如何去控制数量。

1 设计思路 设计的思路比较简单,就是在用户和work_threads模块之前加一层,如果用户直接调用work_threads模块,那就可以创建任意数量的线程,控制就无从说起。通过加一层,可以缓存用户提交的任务,等到有线程退出(有任务处理完)的时候,再开启新的线程去处理缓存的任务。

2 具体实现 2.1 配置的实现 定义一些配置,比如最多能创建的线程数。使用Proxy,劫持一下对象,可以做一些事情,不过目前没有。

// 线程池配置,使用Proxy,可以劫持属性的读写,做点事情
const CONFIG = new Proxy({
    MAX_THREAD: 3,
}, {
    get(obj, key) {
        return obj[key];
    },
    set(obj, key, value) {
        obj[key] = value;
        return true;
    }
})

2.2 控制逻辑的实现

2.2.1 构造函数 线程池记录当前的线程数和缓存的任务队列。

    constructor() {
        // 任务队列
        this._workQueue = [];
        // 当前线程数
        this._count = 0;
    }

2.2.2 新建一个线程

_newThread(...rest) {
        // todo 新建失败处理
        const worker = new Worker(...rest);
        // 线程数加一
        this._count++;
        // 退出后,如果有缓存的任务,则新建线程处理
        worker.on('exit', () => {
            this._count--;
            // 有名额了并且有任务在等待
            if (this._workQueue.length) {
               const {
                   resolve,
                   reject,
                   params,
               } = this._workQueue.shift();
               // 开启线程,并且通知用户
               resolve(this._newThread(...params));
            }
        });
        return worker;
    }

2.2.3 提交任务给线程池

  // 给线程池提交一个任务
    submit(...rest) {
        return new Promise((resolve, reject) => {
            // 还没有达到阈值,则新建线程,否则缓存起来
            if (this._count < CONFIG.MAX_THREAD) {
                resolve(this._newThread(...rest));
            } else {
                this._workQueue.push({resolve, reject, params: rest});
            }
        });
    }
}

思想很简单,根据配置,最多创建n个线程,如果任务太多的话,就缓存起来,等待有线程退出的时候,再新建一个线程处理缓存起来的任务。API使用Promise方式。这样可以实现等待操作。等到创建线程的时候可以通知用户。而且用户使用的时候,几乎是透明的,没有太多额外的成本,因为只是做了一些封装,几乎是透传nodejs的线程功能。最后提供多种方式调用,包括一个默认的控制器、创建多个控制器。

const defaultThreadGate = new ThreadGate();
module.exports = {
    ThreadGate,
    defaultThreadGate,
    submit: (...rest) => {
        return defaultThreadGate.submit(...rest);
    },
    CONFIG,
}