添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

本文主要讲述了我们是如何制作开源工具 pm2-intercom-log4js 的,从中你可以收获到:

  • 一个 log4js 日志同步问题的处理工具—— pm2-intercom-log4js (GitHub 地址已开源)
  • 了解 log4js 的日志同步原理;
  • 简单了解 PM2 的进程管理方式;
  • 了解日志同步问题的工具的制作历程;
  • 了解一些进程通信的方式;
  • 近期我们开源了一个在 PM2 多进程模式下处理 log4js 日志同步的工具—— pm2-intercom-log4js(GitHub 地址已开源) 。我们只需要在开始正式打印日志之前简单调用一下 pm2-intercom-log4js 导出的默认函数,就能完美处理在 PM2 多进程模式下的日志同步问题,确保不发生日志丢失及多进程文件写入异常的问题。

    const pm2Intercom = require('@takin/pm2-intercom-log4js');
    async function init() {
      await pm2Intercom();
      log4js.configure({
        // Make sure logs4js has PM2 mode enabled.
        pm2: true,
      log4js.getLogger().info('Hello, Billion Bottle!');
    

    先简单介绍一下 log4js,它是一个在 Node 端使用的日志输出工具,支持控制台、文件、邮件等多种输出方式,我们使用它进行 Node 端的日志文件输出。我们为什么要做这样一个日志同步工具呢,因为我们的 Node 服务通过 PM2 进行多进程管理,这些进程同时将内容写入一个文件时无法确保其正常工作。正好 log4js 官方支持在 PM2 多进程模式下进行同步处理,但是要求我们运行 pm2 install pm2-intercom 进行插件安装。我尝试了多次都是安装失败,查询原因发现可能和该插件源码丢失有关,无奈,只好自己从 log4js 源码中寻找答案。

    log4js 的日志同步原理

    为何在开启了 pm2: true 模式后没有插件辅助会发生日志丢失?又为何条件满足后它能确保多进程同时写文件不发生异常?

    看答案 👇:

    const receiver = (worker, message) => {
      if (message && message.topic && message.topic === "log4js:message") {
        sendToListeners(logEvent);
    configuration.addListener(config => {
      if (isPM2Master()) {
        process.on("message", receiver);
    module.exports = {
      send: msg => {
        if (isMaster()) {
          sendToListeners(msg);
        } else {
          process.send({ topic: "log4js:message", data: msg.serialise() });
    

    以上代码来自 log4js 源码,我对它做了无关逻辑的剔除,我们重点看一下它是如何处理以上两个疑问的。

    首先它在我们调用完 log4js.configure() 的回调中进行判断,如果当前是 PM2 多进程中的主进程,则开启进程 message 事件的监听,在 message 事件的回调 receiver 中将日志内容派发给本进程内的观察者进行日志输出。

    在做完上述这些准备工作后,我们调用实际的日志打印函数时,导出的 send 函数将会被执行,如果当前是主进程进行日志输出则无需进行特殊处理,直接将日志内容派发给本进程内的观察者进行日志输出。而如果当前是子进程进行日志输出则进行了 process.send 消息发送,此时主进程将会在 receiver 回调收到这份消息并进行输出。

    如此便完美地解释了「它能确保多进程同时写文件不发生异常」的原因。我们继续探索「为何在开启了 pm2: true 模式后没有插件辅助会发生日志丢失」的原因。

    探索 process.sendprocess.on('message')

    既然 process.sendprocess.on('message') 可以进行进程间通信,那我们就直接开干。我们先尝试一下在一个 Node 进程中直接进行监听和发送:

    process.on('message', (message) => {
      console.log(message);
    process.send('Hello, Billion Bottle!');
    // TypeError: process.send is not a function
    

    发现报错了,意思是 process.send 不存在?怎么回事,我们去看一下 文档 ,发现文档中是这么描述的:

    如果使用 IPC 通道衍生 Node.js,则可以使用 process.send() 方法向父进程发送消息。 消息将作为父对象 ChildProcess 对象上的 'message' 事件接收。

    如果 Node.js 没有使用 IPC 通道衍生,则 process.send 将是 undefined。

    那好吧,那我们就继续尝试,直接创建一个子进程,写好简单的监听和发送:

    const cluster = require('cluster');
    if (cluster.isMaster) {
      cluster.fork();
      process.on('message', (message) => {
        console.log(message);
    } else if (cluster.isWorker) {
      process.send('Hello, Billion Bottle!');
    

    然后发现它什么都没输出?这又是为什么呢,我们继续阅读文档,我发现自己忽略了一个细节「消息将作为父对象 ChildProcess 对象上的 'message' 事件接收」,这个意思是我们不能单纯地使用 process.on('message') 进行监听,需要通过创建出的子进程对象中的 API 进行监听。

    记一次成功的通信

    通过文档的查阅和调试,我们终于成功了,喜悦之情溢于言表,父子进程通信通过操作子进程对象完美实现了。下面是我们尝试的两种双向通信示例:

  • 子进程发送,主进程接收:
  • if (cluster.isMaster) {
      const worker = cluster.fork();
      worker.on('message', (message) => {
        console.log(message);
    } else if (cluster.isWorker) {
      process.send('Hello, Billion Bottle!');
    
  • 主进程发送,子进程接收:
  • if (cluster.isMaster) {
      const worker = cluster.fork();
      worker.send('Hello, Billion Bottle!');
    } else if (cluster.isWorker) {
      process.on('message', (message) => {
        console.log(message);
    

    但是等等,既然主进程中需要操作子进程实例进行消息接收,单纯地靠 process.on('message') 行不通,那为什么 log4js 还是通过这种方式在主进程中进行消息接收呢,难道本质上 log4js 所认定的主进程其实是个子进程?

    PM2 的进程管理

    我们带着上述疑问继续探索一下在 PM2 中是如何进行进程管理的。

    log4js 中对主进程的认定方式:

    const isPM2Master = () => pm2 && process.env[pm2InstanceVar] === "0";
    const isMaster = () => disabled || (cluster && cluster.isMaster) || isPM2Master();
    

    我们可以看到 log4js 将下标为 0 号位的进程当成了主进程,并且 log4js 只是做了监听和发送消息的简单操作,并没有直接操作进程实例,说明我们确实需要从 PM2 的层面去理解这一过程。

    PM2 创建进程的方式(有删减):

    God.injectVariables = function injectVariables (env, cb) {
      var instanceKey = process.env.PM2_PROCESS_INSTANCE_VAR || env.instance_var;
      var instanceNumber = typeof instances[0] === 'undefined' ? 0 : instances[0] + 1;
      env[instanceKey] = instanceNumber;
      return cb(null, env);
    God.nodeApp = function nodeApp(env_copy, cb){
      try {
        clu = cluster.fork({pm2_env: JSON.stringify(env_copy), windowsHide: true});
      } catch(e) {}
      return cb(null, clu);
    God.nodeApp(env_copy, function nodeApp(err, clu) {
      God.clusters_db[clu.pm2_env.pm_id] = clu;
    

    在 PM2 中启动进程时,会拿上一个进程的下标加一当作新进程的下标(这解释了进程下标从 0 开始以及 log4js 将 0 号下标当作主进程的原因),但是创建进程时并不是以我们真实应用的主进程进行 fork 派生,而是通过 PM2 自身的主管理进程进行子进程的派生(cluster 派生子进程时允许我们指定子进程的脚本路径),并在创建之后将子进程实例保存至对应进程 ID 的映射表中。简单来说,PM2 中的应用进程都是子进程,默认情况下是相互隔离的,想要互相通信就必须有对方的实例或者通过父进程中间人进行。

    处理 log4js 日志同步的思路

    在了解了它们两个工具对进程管理及通信方式的处理逻辑之后,我们也对工具的逻辑有了大致的思路。日志同步工具能正常运行的大前提就是必须通过 PM2 的 API 进行,因为 log4js 并没有相关逻辑,而所有进程都由 PM2 全权管理,我们无法脱离 PM2 达成上述目的。下面就是工具所需实现的逻辑:

  • 监听所有非 0 子进程中的日志消息;
  • 在 0 号主进程中进行其它子进程日志消息的监听;
  • 在 0 号主进程中想办法拿到自己的实例,并使用实例 API 向自己发送日志消息使自己的 process.on('message') 回调能成功触发;
  • 处理好主子应用刚启动时的初始化顺序,确保主进程准备好了子进程再进行日志输出;
  • PM2 API 的尝试

    我们确定面向 PM2 进行工具开发后必须先寻找它能进行进程通信(收消息、发消息)的 API。经过文档查阅以及尝试,我们发现了 PM2 的 bus 可以接收到任何进程的消息,并且在消息回调中可以拿到触发消息的进程 ID,我们只需要合理处理消息内容中的标识符就可以做到区分主子进程。此外,我们还可以通过 sendDataToProcessId 向对应 ID 的进程发送消息使其 process.on('message') 回调能成功触发。下面的代码简单诠释了这个过程:进程监听 process.on('message') 以及 PM2 的 bus 事件,并在 PM2 的 bus 事件中向自身发送一条消息使 process.on('message') 回调能触发。

    const pm2 = require('pm2');
    const { promisify } = require('util');
    process.on('message', (raw) => {
      console.log('Cluster message received from worker:', JSON.stringify(raw));
    (async () => {
      const connect = promisify(pm2.connect.bind(pm2));
      const launchBus = promisify(pm2.launchBus.bind(pm2));
      await connect();
      const bus = await launchBus();
      bus.on('process:msg', (packet) => {
        const { raw, process: { pm_id: processId } } = packet;
        pm2.sendDataToProcessId(processId, raw, () => {});
      process.send({ topic: 'test', data: 'Hello, Billion Bottle!' });
    })();
    // Cluster message received from worker: {"data":"Hello, Billion Bottle!","topic":"test","id":0}
    

    注意:上述示例代码在直接运行时会报错,但是使用 pm2 start xxx.js 运行时可以在 PM2 日志中看到正确的日志输出。

    PM2 bussendDataToProcessId 原理

    有了上述的尝试之后,我们思考一下,为什么 PM2 的 bus 能监听所有进程的消息呢?经过寻找,我发现了 PM2 是这么处理的(有删减):

    God.nodeApp = function nodeApp(env_copy, cb){
      clu = cluster.fork({pm2_env: JSON.stringify(env_copy), windowsHide: true});
      clu.on('message', function cluMessage(msg) {
        return God.bus.emit('process:msg', {
          raw     : msg,
          process :  {
            pm_id      : clu.pm2_env.pm_id,
    

    PM2 在创建子进程之后会立刻监听它的消息触发,当子进程中调用了 process.send() 则会触发消息回调,并将消息内容进行包装后通过 bus.emit 再次触发。

    在我们明确了主进程 ID 以及消息内容之后,就可以调用 sendDataToProcessId 向主应用发送消息使其 process.on('message') 回调触发。sendDataToProcessId 原理比较简单,上文我们提到,创建子进程之后 PM2 会将进程实例保存在对应进程 ID 的映射表中,由此 sendDataToProcessId 可以根据进程 ID 从映射表中找到对应实例并调用其 send 方法发送消息。下面是它大致的实现代码(有删减):

    God.sendDataToProcessId = function (packet, cb) {
      var pm_id = packet.id;
      var proc = God.clusters_db[pm_id];
      proc.send(packet);
    

    千呼万唤始出来

    做完所有准备工作之后,我们终于可以着手设计我们的工具了。下面就是我们开源工具的大致处理流程,具体逻辑可以下载我们这个工具的 源代码 查看。

    export default async function intercom(pm2: PM2): Promise<void> {
      if (pm2InstanceId === mainInstanceId) {
        bus.on('process:msg', (packet: PM2Packet): void => {
          if (topic === mainTopic) {
            myProcessId = processId!;
            return;
          pm2.sendDataToProcessId(myProcessId, packet.raw, (err): void => {});
        process.send!({ topic: mainTopic });
      } else {
        await new Promise((resolve) => {
          bus.on('process:msg', (packet: PM2Packet): void => {
            const { topic } = packet.raw;
            if (topic === replyTopic) {
              setTimeout(resolve);
          process.send!({ topic: pingTopic });
    

    在主进程中:

  • 等待连接至 PM2 完成并且获得 bus 实例;
  • bus 消息回调中根据 topic 判断是否是自己并将进程 ID 保存下来;
  • bus 消息回调如果是日志消息则将消息内容发送至自身进程让 log4js 去处理;
  • 调用 process.send() 触发 bus 回调;
  • 在子进程中主要做了等待主进程初始化完毕的事,确保日志不会在主进程还未准备好时发送出去而造成丢失:

  • 等待连接至 PM2 完成并且获得 bus 实例;
  • bus 消息回调中根据 topic 判断主进程是否已启动完毕,启动完毕后结束自身等待继续之后的逻辑;
  • 调用 process.send() 触发主进程 bus 回调;
  • 其它进程通信的方式

    简单地延伸一下,除了 Node 原生 IPC 父子进程通信方式 之外,还有这些通信方式:

  • child_processstdin/stdout
  • Socket
  • Message Queue
  • Redis 的 Pub/Sub 机制(即发布订阅模式)
  • Message Queue 俗称 MQ,是目前常见的服务间通信解决方案之一。MQ 和 Redis 这两种方式这里不展开细说,大家有兴趣可以自己了解一下。下面我们就简单看一下前两种方式如何实现进程通信。

    child_processstdin/stdout

    先来简单看一下示例:

    main.js

    const childProcess = require('child_process');
    const worker = childProcess.spawn('node', ['./worker.js']);
    worker.stdout.setEncoding('utf8');
    worker.stdin.write('Hello, worker!');
    worker.stdout.on('data', function (message) {
      console.log(message);
    

    worker.js

    process.stdin.setEncoding('utf8');
    process.stdin.on('data', (message) => {
      console.log(message);
      process.stdout.write('Hello, main!');
    

    在示例中,我们使用 child_process 的流进行数据传递,可以通过序列化数据以及反序列化数据进行通信。

    Socket

    Socket 通信是一种比较万能的通信方式,它不仅可以处理父子进程间的通信,还可以处理同机器中无关联的进程通信,甚至还可以处理跨机器间的进程通信。

    下面的示例代码使用的是一个优秀的 Node 进程通信库 axon ,底层也是基于 Node 的 net 库进行 Socket 的处理。

    emitter.js

    const axon = require('axon');
    const sock = axon.socket('push');
    sock.bind(1901);
    setInterval(() => {
      sock.send('Hello, Billion Bottle!');
    }, 1000);
    

    receiver.js

    const axon = require('axon');
    const sock = axon.socket('pull');
    sock.connect(1901);
    sock.on('message', (message) => {
      console.log(message);
    

    小小的总结

    如果大家也遇到了 pm2 install pm2-intercom 执行失败而无法使用 log4js PM2 模式的问题,欢迎大家使用我们的工具,我们将根据需要持续迭代。

    当然,欢迎大家去 GitHub 阅读我们的源代码 ,给我们点个 Star 哦 💗!

  • pm2-intercom-log4jsgithub.com/BillionBott…
  • log4jslog4js-node.github.io/log4js-node…
  • PM2pm2.keymetrics.io/
  • axongithub.com/tj/axon
  • 《Node.js v16.14.2 文档》nodejs.cn/api/
  • 《PM2不2:源码分析》segmentfault.com/a/119000002…
  • 《Nodejs进程间通信》www.ayqy.net/blog/nodejs…
  • 更多精彩请关注我们的公众号「百瓶技术」,有不定期福利呦!

    分类:
    前端
    标签: