添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
相关文章推荐
鬼畜的香烟  ·  vue.js ...·  1 年前    · 
玩足球的炒粉  ·  python ...·  1 年前    · 

Node.js 由于其单线程模型的设计,导致一个Node进程(的主线程)只能利用一个CPU核心,然而现在的机器基本上都是多核的,这造成了严重的性能浪费。通常来说,想要利用到多个核心一般有以下的方法:

  • 编写Node的C++插件扩充线程池,并在JS代码中将CPU耗时任务委托给其它线程处理。
  • 使用 worker_threads 模块提供的多线程模型(尚在实验阶段)。
  • 使用 child_process 或者 cluster 模块提供的多进程模型,每个进程都是一个独立的Node.js进程。
  • 从易用、代码入侵性、稳定性的角度来说,多进程模型通常是首要的选择。

    Node.js cluster 多进程模型存在的问题

    在cluster模块提供的多进程模型中,每个Node进程都是一个独立且完整的应用进程,有自己的内存空间,其它进程无法访问。因此 虽然在项目启动时,所有Worker进程具有一致的状态和行为,但在之后的运行中无法保证其状态维持一致

    例如,项目启动时有两个Worker进程,进程A和进程B,两个进程都声明了变量a=1。但之后项目接收到一个请求,Master进程将其分派给进程A来处理,这个请求将a的值变更为了2,那么此时进程A的内存空间中a=2,但是进程B的内存空间中a依旧是1。此时如果有个请求读取a的值,Master进程将这个请求分派给进程A和进程B时读取到的结果是不一致的,这就出现了一致性问题。

    cluster模块在设计时并没有给出解决方案,而是要求Worker进程是无状态的,即程序员在写代码时不应该允许在处理请求时修改内存中的值,以此来保障所有Worker进程的一致性。然而在实践中总会有各种各样的情况需要写内存,比如记录用户的登录状态等,在许多企业的实践中, 通常会把这些状态数据记录在外部,例如数据库、redis、消息队列、文件系统等 ,每次处理有状态请求时会读写外部存储空间。

    这不失为一种有效的做法, 然而这需要额外引入一个外部存储空间,同时还要自行处理多进程并发访问下的一致性问题,自行维护数据的生命周期(因为Node进程和维护在外部的数据并不是同步创建和销毁的),以及在高并发访问情况下的IO性能瓶颈(如果是存储在数据库等非内存环境中) 。其实本质上来说,我们只是需要一个可供多个进程共享访问的空间罢了,并不需要持久化存储,这段空间的生命周期最好与Node进程强绑定,这样在使用时能省去不少麻烦。因此跨进程的共享内存就成了最适合在这种场景使用的方式。

    Node.js 的共享内存

    很遗憾Node本身并未提供共享内存的实现,因此我们可以看看npm仓库中第三方库的实现。这些库有些是通过C++插件扩充Node的函数实现的,有些是通过Node提供的IPC机制实现的,但很遗憾它们的实现都很简单,并未提供互斥访问、对象监听等功能,这使得使用者必须自己小心维护这段共享内存,否则就会导致时序问题。

    转了一圈下来没找到我想要的。。。那就算了,我自己写一个。

    共享内存的设计

    首先我们必须理清楚到底需要个什么样的共享内存,我是根据我自身的需求出发(为了在项目中用它来存储跨进程访问的状态数据),同时兼顾通用性,因此会首先考虑以下几点:

  • 以JS对象为基本单位进行读写访问。
  • 能够进程间互斥访问,一个进程访问时,其它进程被阻塞。
  • 能够监听共享内存中的对象,当对象发生变化的时候监听的进程能被通知到。
  • 在满足上述条件的前提下,实现方式尽可能简单。
  • 可以发现,其实我们并不需要操作系统层面的共享内存,只需要能够多个Node进程能访问同一个对象就行了,那么就可以在Node本身提供的机制上实现。 可以使用Master进程的一段内存空间作为共享内存空间,Worker进程通过IPC将读写请求委托给Master进程,由Master进程进行读写,然后再通过IPC将结果返回给Worker进程。

    为了让共享内存的使用方式在Master进程和Worker进程中一致,我们可以将对共享内存的操作抽离成一个接口,在Master进程和Worker进程中各自实现这个接口。类图如下图所示,用一个 SharedMemory 类作为抽象接口,在 server.js 入口文件中声明该对象。其在Master进程中实例化为 Manager 对象,在Worker进程中实例化为 Worker 对象。 Manager 对象来维护共享内存,并处理对共享内存的读写请求,而 Worker 对象则将读写请求发送到Master进程。

    可以使用 Manager 类中的一个属性作为共享内存对象,访问该对象的方式与访问普通JS对象的方式一致,然后再做一层封装,只暴露 get set remove 等基本操作,避免该属性直接被修改。

    由于Master进程会优先于所有Worker进程创建,因此,可以在Master进程中声明共享内存空间之后再创建Worker进程,以此来保证每个Worker进程创建后都可以立即访问共享内存。

    为了使用简单,我们可以将 SharedMemory 设计成单例,这样每个进程中就只有一个实例,并可以在 import SharedMemory 之后直接使用。

    读写控制与IPC通信

    首先实现对外接口 SharedMemory 类,这里没有使用让 Manager Worker 继承 SharedMemory 的方式,而是让 SharedMemory 在实例化的时候返回一个 Manager Worker 的实例,从而实现自动选择子类。

    在Node 16中 isPrimary 替代了 isMaster ,这里为了兼容使用了两种写法。

    // shared-memory.js
    class SharedMemory {
      constructor() {
        if (cluster.isMaster || cluster.isPrimary) {
          return new Manager();
        } else {
          return new Worker();
    

    Manager负责管理共享内存空间,我们直接在Manager对象中增加__sharedMemory__属性,由于其本身也是JS对象,会被纳入JS的垃圾回收管理中,因此我们不需要进行内存清理、数据迁移等操作,使得实现上非常简洁。之后在__sharedMemory__之中定义setgetremove等标准操作来提供访问方式。

    我们通过cluster.on('online', callback)来监听worker进程的创建事件,并在创建后立即用worker.on('message', callback)来监听来自worker进程的IPC通信,并把通信消息交给handle函数处理。

    handle函数的职责是区分worker进程是想进行哪种操作,并取出操作的参数委托给对应的setgetremove函数(注意不是__sharedMemory__中的setgetremove)进行处理,并将处理后的结果返还给worker进程。

    // manager.js
    const cluster = require('cluster');
    class Manager {
      constructor() {
        this.__sharedMemory__ = {
          set(key, value) {
            this.memory[key] = value;
          get(key) {
            return this.memory[key];
          remove(key) {
            delete this.memory[key];
          memory: {},
        // Listen the messages from worker processes.
        cluster.on('online', (worker) => {
          worker.on('message', (data) => {
            this.handle(data, worker);
            return false;
      handle(data, target) {
        const args = data.value ? [data.key, data.value] : [data.key];
        this[data.method](...args).then((value) => {
          const msg = {
            id: data.id, // workerId
            uuid: data.uuid, // communicationID
            value,
          target.send(msg);
      set(key, value) {
        return new Promise((resolve) => {
          this.__sharedMemory__.set(key, value);
          resolve('OK');
      get(key) {
        return new Promise((resolve) => {
          resolve(this.__sharedMemory__.get(key));
      remove(key) {
        return new Promise((resolve) => {
          this.__sharedMemory__.remove(key);
          resolve('OK');
    

    Worker自对象创建开始就使用process.on监听来自Master进程的返回消息(毕竟不能等消息发送出去以后再监听吧,那就来不及了)。至于__getCallbacks__对象的作用一会儿再说。此时Worker对象便创建完成。

    之后项目运行到某个地方的时候,如果要访问共享内存,就会调用Workersetgetremove函数,它们又会调用handle函数将消息通过process.send发送到master进程,同时,将得到返回结果时要进行的操作记录在__getCallbacks__中。当结果返回时,会被之前在process.on中的函数监听到,并从__getCallbacks__中取出对应的回调函数,并执行。

    因为访问共享内存的过程中会经过IPC,所以必定是异步操作,所以需要记录回调函数,不能实现成同步的方式,不然会阻塞原本的任务。

    // worker.js
    const cluster = require('cluster');
    const { v4: uuid4 } = require('uuid');
    class Worker {
      constructor() {
        this.__getCallbacks__ = {};
        process.on('message', (data) => {
          const callback = this.__getCallbacks__[data.uuid];
          if (callback && typeof callback === 'function') {
            callback(data.value);
          delete this.__getCallbacks__[data.uuid];
      set(key, value) {
        return new Promise((resolve) => {
          this.handle('set', key, value, () => {
            resolve();
      get(key) {
        return new Promise((resolve) => {
          this.handle('get', key, null, (value) => {
            resolve(value);
      remove(key) {
        return new Promise((resolve) => {
          this.handle('remove', key, null, () => {
            resolve();
      handle(method, key, value, callback) {
        const uuid = uuid4(); // 每次通信的uuid
        process.send({
          id: cluster.worker.id,
          method,
          uuid,
          value,
        this.__getCallbacks__[uuid] = callback;
    

    一次共享内存访问的完整流程是:调用Workerset/get/remove函数 -> 调用Workerhandle函数,向master进程通信并将回调函数记录在__getCallbacks__ -> master进程监听到来自worker进程的消息 -> 调用Managerhandle函数 -> 调用Managerset/get/remove函数 -> 调用__sharedMemory__set/get/remove函数 -> 操作完成返回Managerset/get/remove函数 -> 操作完成返回handle函数 -> 向worker进程发送通信消息 -> worker进程监听到来自master进程的消息 -> 从__getCallbacks__中取出回调函数并执行。

    到目前为止,我们已经实现了读写共享内存,但还没有结束,目前的共享内存是存在严重安全问题的。因为这个共享内存是可以所有进程同时访问的,然而我们并没有考虑并发访问时的时序问题。我们来看下面这个例子:

    时间进程A进程B共享内存中变量x的值
    t00
    t1读取x(x=0)0
    t2x1=x+1(x1=1)读取x(x=0)0
    t3将x1的值写回xx2=x+1(x2=1)1
    t4将x2的值写回x1

    进程A和进程B的目的都是将x的值加1,理想情况下最后x的值应该是2,可是最后的结果却是1。这是因为进程B在t3时刻给x的值加1的时候,使用的是t2时刻读取出来的x的值,但此时从全局角度来看,这个值已经过期了,因为t3时刻x最新的值已经被进程A写为了1,可是进程B无法知道进程外部的变化,所以导致了t4时刻最后写回的值又覆盖掉了进程A写回的值,等于是进程A的行为被覆盖掉了。

    在多线程、多进程和分布式中并发情况下的数据一致性问题是老大难问题了,这里不再展开讨论。

    为了解决上述问题,我们必须实现进程间互斥访问某个对象,来避免同时操作一个对象,从而使进程可以进行原子操作,所谓原子操作就是不可被打断的一小段连续操作,为此需要引入锁的概念。由于读写均以对象为基本单位,因此锁的粒度设置为对象级别。在某一个进程(的某一任务)获取了某个对象的锁之后,其它要获取锁的进程(的任务)会被阻塞,直到锁被归还。而要进行写操作,则必须要先获取对象的锁。这样在获取到锁直到锁被释放的这段时间里,该对象在共享内存中的值不会被其它进程修改,从而导致错误。

    Manager__sharedMemory__中加入locks属性,用来记录哪个对象的锁被拿走了,lockRequestQueues属性用来记录被阻塞的任务(正在等待锁的任务)。并增加getLock函数和releaseLock函数,用来申请和归还锁,以及handleLockRequest函数,用来使被阻塞的任务获得锁。在申请锁时,会先将回调函数记录到lockRequestQueues队尾(因为此时该对象的锁可能已被拿走),然后再调用handleLockRequest检查当前锁是否被拿走,若锁还在,则让队首的任务获得锁。归还锁时,先将__sharedMemory__.locks中对应的记录删掉,然后再调用handleLockRequest让队首的任务获得锁。

    // manager.js
    const { v4: uuid4 } = require('uuid');
    class Manager {
      constructor() {
        this.__sharedMemory__ = {
          locks: {},
          lockRequestQueues: {},
      getLock(key) {
        return new Promise((resolve) => {
          this.__sharedMemory__.lockRequestQueues[key] =
            this.__sharedMemory__.lockRequestQueues[key] ?? [];
          this.__sharedMemory__.lockRequestQueues[key].push(resolve);
          this.handleLockRequest(key);
      releaseLock(key, lockId) {
        return new Promise((resolve) => {
          if (lockId === this.__sharedMemory__.locks[key]) {
            delete this.__sharedMemory__.locks[key];
            this.handleLockRequest(key);
          resolve('OK');
      handleLockRequest(key) {
        return new Promise((resolve) => {
            !this.__sharedMemory__.locks[key] &&
            this.__sharedMemory__.lockRequestQueues[key]?.length > 0
            const callback = this.__sharedMemory__.lockRequestQueues[key].shift();
            const lockId = uuid4();
            this.__sharedMemory__.locks[key] = lockId;
            callback(lockId);
          resolve();
    

    Worker中,则是增加getLockreleaseLock两个函数,行为与getset类似,都是调用handle函数。

    // worker.js
    class Worker {
      getLock(key) {
        return new Promise((resolve) => {
          this.handle('getLock', key, null, (value) => {
            resolve(value);
      releaseLock(key, lockId) {
        return new Promise((resolve) => {
          this.handle('releaseLock', key, lockId, (value) => {
            resolve(value);
    

    有时候我们需要监听某个对象值的变化,在单进程Node应用中这很容易做到,只需要重写对象的set属性就可以了,然而在多进程共享内存中,对象和监听者都不在一个进程中,这只能依赖Manager的实现。这里,我们选择了经典的观察者模式来实现监听共享内存中的对象。

    为此,我们先在__sharedMemory__中加入listeners属性,用来记录在对象值发生变化时监听者注册的回调函数。然后增加listen函数,其将监听回调函数记录到__sharedMemory__.listeners中,这个监听回调函数会将变化的值发送给对应的worker进程。最后,在setremove函数返回前调用notifyListener,将所有记录在__sharedMemory__.listeners中监听该对象的所有函数取出并调用。

    // manager.js
    class Manager {
      constructor() {
        this.__sharedMemory__ = {
          listeners: {},
      handle(data, target) {
        if (data.method === 'listen') {
          this.listen(data.key, (value) => {
            const msg = {
              isNotified: true,
              id: data.id,
              uuid: data.uuid,
              value,
            target.send(msg);
        } else {
      notifyListener(key) {
        const listeners = this.__sharedMemory__.listeners[key];
        if (listeners?.length > 0) {
          Promise.all(
            listeners.map(
              (callback) =>
                new Promise((resolve) => {
                  callback(this.__sharedMemory__.get(key));
                  resolve();
      set(key, value) {
        return new Promise((resolve) => {
          this.__sharedMemory__.set(key, value);
          this.notifyListener(key);
          resolve('OK');
      remove(key) {
        return new Promise((resolve) => {
          this.__sharedMemory__.remove(key);
          this.notifyListener(key);
          resolve('OK');
      listen(key, callback) {
        if (typeof callback === 'function') {
          this.__sharedMemory__.listeners[key] =
            this.__sharedMemory__.listeners[key] ?? [];
          this.__sharedMemory__.listeners[key].push(callback);
        } else {
          throw new Error('a listener must have a callback.');
    

    Worker中由于监听操作与其它操作不一样,它是一次注册监听回调函数之后对象的值每次变化都会被通知,因此需要在增加一个__getListenerCallbacks__属性用来记录监听操作的回调函数,与__getCallbacks__不同,它里面的函数在收到master的回信之后不会删除。

    // worker.js
    class Worker {
      constructor() {
        this.__getListenerCallbacks__ = {};
        process.on('message', (data) => {
          if (data.isNotified) {
            const callback = this.__getListenerCallbacks__[data.uuid];
            if (callback && typeof callback === 'function') {
              callback(data.value);
          } else {
      handle(method, key, value, callback) {
        if (method === 'listen') {
          this.__getListenerCallbacks__[uuid] = callback;
        } else {
          this.__getCallbacks__[uuid] = callback;
      listen(key, callback) {
        if (typeof callback === 'function') {
          this.handle('listen', key, null, callback);
        } else {
          throw new Error('a listener must have a callback.');
    

    LRU缓存

    有时候我们需要用用内存作为缓存,但多进程中各进程的内存空间独立,不能共享,因此也需要用到共享内存。但是如果用共享内存中的一个对象作为缓存的话,由于每次IPC都需要传输整个缓存对象,会导致缓存对象不能太大(否则序列化和反序列化耗时太长),而且由于写缓存对象的操作需要加锁,进一步影响了性能,而原本我们使用缓存就是为了加快访问速度。其实在使用缓存的时候通常不会做复杂操作,大多数时候也不需要保障一致性,因此我们可以在Manager再增加一个共享内存__sharedLRUMemory__,其为一个lru-cache实例,并增加getLRUsetLRUremoveLRU函数,与setgetremove函数类似。

    // manager.js
    const LRU = require('lru-cache');
    class Manager {
      constructor() {
        this.defaultLRUOptions = { max: 10000, maxAge: 1000 * 60 * 5 };
        this.__sharedLRUMemory__ = new LRU(this.defaultLRUOptions);
      getLRU(key) {
        return new Promise((resolve) => {
          resolve(this.__sharedLRUMemory__.get(key));
      setLRU(key, value) {
        return new Promise((resolve) => {
          this.__sharedLRUMemory__.set(key, value);
          resolve('OK');
      removeLRU(key) {
        return new Promise((resolve) => {
          this.__sharedLRUMemory__.del(key);
          resolve('OK');
    

    Worker中也增加getLRUsetLRUremoveLRU函数。

    // worker.js
    class Worker {
      getLRU(key) {
        return new Promise((resolve) => {
          this.handle('getLRU', key, null, (value) => {
            resolve(value);
      setLRU(key, value) {
        return new Promise((resolve) => {
          this.handle('setLRU', key, value, () => {
            resolve();
      removeLRU(key) {
        return new Promise((resolve) => {
          this.handle('removeLRU', key, null, () => {
            resolve();
    

    共享内存的使用方式

    目前共享内存的实现已发到npm仓库(文档和源代码在Github仓库欢迎pull request和报bug),可以直接通过npm安装:

    npm i cluster-shared-memory
    

    下面的示例包含了基本使用方法:

    const cluster = require('cluster');
    // 引入模块时会根据当前进程 master 进程还是 worker 进程自动创建对应的 SharedMemory 对象
    require('cluster-shared-memory');
    if (cluster.isMaster) {
      // 在 master 进程中 fork 子进程
      for (let i = 0; i < 2; i++) {
        cluster.fork();
    } else {
      const sharedMemoryController = require('./src/shared-memory');
      const obj = {
        name: 'Tom',
        age: 10,
      // 写对象
      await sharedMemoryController.set('myObj', obj);
      // 读对象
      const myObj = await sharedMemoryController.get('myObj');
      // 互斥访问对象,首先获得对象的锁
      const lockId = await sharedMemoryController.getLock('myObj');
      const newObj = await sharedMemoryController.get('myObj');
      newObj.age = newObj.age + 1;
      await sharedMemoryController.set('myObj', newObj);
      // 操作完之后释放锁
      await sharedMemoryController.releaseLock('requestTimes', lockId);
      // 或者使用 mutex 函数自动获取和释放锁
      await sharedMemoryController.mutex('myObj', async () => {
        const newObjM = await sharedMemoryController.get('myObj');
        newObjM.age = newObjM.age + 1;
        await sharedMemoryController.set('myObj', newObjM);
      // 监听对象
      sharedMemoryController.listen('myObj', (value) => {
        console.log(`myObj: ${value}`);
      //写LRU缓存
      await sharedMemoryController.setLRU('cacheItem', {user: 'Tom'});
      // 读对象
      const cacheItem = await sharedMemoryController.getLRU('cacheItem');
    

    这种实现目前尚有几个缺点:

  • 不能使用PM2的自动创建worker进程的功能。
  • 由于PM2会使用自己的cluster模块的master进程的实现,而我们的共享内存模块需要在master进程维护一个内存空间,则不能使用PM2的实现,因此不能使用PM2的自动创建worker进程的功能。

  • 传输的对象必须可序列化,且不能太大。
  •