本文主要讲述了我们是如何制作开源工具
pm2-intercom-log4js
的,从中你可以收获到:
一个 log4js 日志同步问题的处理工具——
pm2-intercom-log4js
(GitHub 地址已开源)
;
了解 log4js 的日志同步原理;
简单了解 PM2 的进程管理方式;
了解日志同步问题的工具的制作历程;
了解一些进程通信的方式;
近期我们开源了一个在
PM2
多进程模式下处理
log4js
日志同步的工具——
pm2-intercom-log4js(GitHub 地址已开源)
。我们只需要在开始正式打印日志之前简单调用一下
pm2-intercom-log4js
导出的默认函数,就能完美处理在 PM2 多进程模式下的日志同步问题,确保不发生日志丢失及多进程文件写入异常的问题。
const pm2Intercom = require('@takin/pm2-intercom-log4js');
async function init() {
await pm2Intercom();
log4js.configure({
pm2: true,
log4js.getLogger().info('Hello, Billion Bottle!');
先简单介绍一下 log4js,它是一个在 Node 端使用的日志输出工具,支持控制台、文件、邮件等多种输出方式,我们使用它进行 Node 端的日志文件输出。我们为什么要做这样一个日志同步工具呢,因为我们的 Node 服务通过 PM2 进行多进程管理,这些进程同时将内容写入一个文件时无法确保其正常工作。正好 log4js 官方支持在 PM2 多进程模式下进行同步处理,但是要求我们运行 pm2 install pm2-intercom
进行插件安装。我尝试了多次都是安装失败,查询原因发现可能和该插件源码丢失有关,无奈,只好自己从 log4js 源码中寻找答案。
log4js 的日志同步原理
为何在开启了 pm2: true
模式后没有插件辅助会发生日志丢失?又为何条件满足后它能确保多进程同时写文件不发生异常?
看答案 👇:
const receiver = (worker, message) => {
if (message && message.topic && message.topic === "log4js:message") {
sendToListeners(logEvent);
configuration.addListener(config => {
if (isPM2Master()) {
process.on("message", receiver);
module.exports = {
send: msg => {
if (isMaster()) {
sendToListeners(msg);
} else {
process.send({ topic: "log4js:message", data: msg.serialise() });
以上代码来自 log4js 源码,我对它做了无关逻辑的剔除,我们重点看一下它是如何处理以上两个疑问的。
首先它在我们调用完 log4js.configure()
的回调中进行判断,如果当前是 PM2 多进程中的主进程,则开启进程 message
事件的监听,在 message
事件的回调 receiver
中将日志内容派发给本进程内的观察者进行日志输出。
在做完上述这些准备工作后,我们调用实际的日志打印函数时,导出的 send
函数将会被执行,如果当前是主进程进行日志输出则无需进行特殊处理,直接将日志内容派发给本进程内的观察者进行日志输出。而如果当前是子进程进行日志输出则进行了 process.send
消息发送,此时主进程将会在 receiver
回调收到这份消息并进行输出。
如此便完美地解释了「它能确保多进程同时写文件不发生异常」的原因。我们继续探索「为何在开启了 pm2: true
模式后没有插件辅助会发生日志丢失」的原因。
探索 process.send
和 process.on('message')
既然 process.send
和 process.on('message')
可以进行进程间通信,那我们就直接开干。我们先尝试一下在一个 Node 进程中直接进行监听和发送:
process.on('message', (message) => {
console.log(message);
process.send('Hello, Billion Bottle!');
发现报错了,意思是 process.send
不存在?怎么回事,我们去看一下 文档 ,发现文档中是这么描述的:
如果使用 IPC 通道衍生 Node.js,则可以使用 process.send() 方法向父进程发送消息。 消息将作为父对象 ChildProcess 对象上的 'message' 事件接收。
如果 Node.js 没有使用 IPC 通道衍生,则 process.send 将是 undefined。
那好吧,那我们就继续尝试,直接创建一个子进程,写好简单的监听和发送:
const cluster = require('cluster');
if (cluster.isMaster) {
cluster.fork();
process.on('message', (message) => {
console.log(message);
} else if (cluster.isWorker) {
process.send('Hello, Billion Bottle!');
然后发现它什么都没输出?这又是为什么呢,我们继续阅读文档,我发现自己忽略了一个细节「消息将作为父对象 ChildProcess 对象上的 'message' 事件接收」,这个意思是我们不能单纯地使用 process.on('message')
进行监听,需要通过创建出的子进程对象中的 API 进行监听。
记一次成功的通信
通过文档的查阅和调试,我们终于成功了,喜悦之情溢于言表,父子进程通信通过操作子进程对象完美实现了。下面是我们尝试的两种双向通信示例:
子进程发送,主进程接收:
if (cluster.isMaster) {
const worker = cluster.fork();
worker.on('message', (message) => {
console.log(message);
} else if (cluster.isWorker) {
process.send('Hello, Billion Bottle!');
主进程发送,子进程接收:
if (cluster.isMaster) {
const worker = cluster.fork();
worker.send('Hello, Billion Bottle!');
} else if (cluster.isWorker) {
process.on('message', (message) => {
console.log(message);
但是等等,既然主进程中需要操作子进程实例进行消息接收,单纯地靠 process.on('message')
行不通,那为什么 log4js 还是通过这种方式在主进程中进行消息接收呢,难道本质上 log4js 所认定的主进程其实是个子进程?
PM2 的进程管理
我们带着上述疑问继续探索一下在 PM2 中是如何进行进程管理的。
log4js 中对主进程的认定方式:
const isPM2Master = () => pm2 && process.env[pm2InstanceVar] === "0";
const isMaster = () => disabled || (cluster && cluster.isMaster) || isPM2Master();
我们可以看到 log4js 将下标为 0 号位的进程当成了主进程,并且 log4js 只是做了监听和发送消息的简单操作,并没有直接操作进程实例,说明我们确实需要从 PM2 的层面去理解这一过程。
PM2 创建进程的方式(有删减):
God.injectVariables = function injectVariables (env, cb) {
var instanceKey = process.env.PM2_PROCESS_INSTANCE_VAR || env.instance_var;
var instanceNumber = typeof instances[0] === 'undefined' ? 0 : instances[0] + 1;
env[instanceKey] = instanceNumber;
return cb(null, env);
God.nodeApp = function nodeApp(env_copy, cb){
try {
clu = cluster.fork({pm2_env: JSON.stringify(env_copy), windowsHide: true});
} catch(e) {}
return cb(null, clu);
God.nodeApp(env_copy, function nodeApp(err, clu) {
God.clusters_db[clu.pm2_env.pm_id] = clu;
在 PM2 中启动进程时,会拿上一个进程的下标加一当作新进程的下标(这解释了进程下标从 0 开始以及 log4js 将 0 号下标当作主进程的原因),但是创建进程时并不是以我们真实应用的主进程进行 fork
派生,而是通过 PM2 自身的主管理进程进行子进程的派生(cluster
派生子进程时允许我们指定子进程的脚本路径),并在创建之后将子进程实例保存至对应进程 ID 的映射表中。简单来说,PM2 中的应用进程都是子进程,默认情况下是相互隔离的,想要互相通信就必须有对方的实例或者通过父进程中间人进行。
处理 log4js 日志同步的思路
在了解了它们两个工具对进程管理及通信方式的处理逻辑之后,我们也对工具的逻辑有了大致的思路。日志同步工具能正常运行的大前提就是必须通过 PM2 的 API 进行,因为 log4js 并没有相关逻辑,而所有进程都由 PM2 全权管理,我们无法脱离 PM2 达成上述目的。下面就是工具所需实现的逻辑:
监听所有非 0 子进程中的日志消息;
在 0 号主进程中进行其它子进程日志消息的监听;
在 0 号主进程中想办法拿到自己的实例,并使用实例 API 向自己发送日志消息使自己的 process.on('message')
回调能成功触发;
处理好主子应用刚启动时的初始化顺序,确保主进程准备好了子进程再进行日志输出;
PM2 API 的尝试
我们确定面向 PM2 进行工具开发后必须先寻找它能进行进程通信(收消息、发消息)的 API。经过文档查阅以及尝试,我们发现了 PM2 的 bus
可以接收到任何进程的消息,并且在消息回调中可以拿到触发消息的进程 ID,我们只需要合理处理消息内容中的标识符就可以做到区分主子进程。此外,我们还可以通过 sendDataToProcessId
向对应 ID 的进程发送消息使其 process.on('message')
回调能成功触发。下面的代码简单诠释了这个过程:进程监听 process.on('message')
以及 PM2 的 bus 事件,并在 PM2 的 bus 事件中向自身发送一条消息使 process.on('message')
回调能触发。
const pm2 = require('pm2');
const { promisify } = require('util');
process.on('message', (raw) => {
console.log('Cluster message received from worker:', JSON.stringify(raw));
(async () => {
const connect = promisify(pm2.connect.bind(pm2));
const launchBus = promisify(pm2.launchBus.bind(pm2));
await connect();
const bus = await launchBus();
bus.on('process:msg', (packet) => {
const { raw, process: { pm_id: processId } } = packet;
pm2.sendDataToProcessId(processId, raw, () => {});
process.send({ topic: 'test', data: 'Hello, Billion Bottle!' });
})();
注意:上述示例代码在直接运行时会报错,但是使用 pm2 start xxx.js
运行时可以在 PM2 日志中看到正确的日志输出。
PM2 bus
及 sendDataToProcessId
原理
有了上述的尝试之后,我们思考一下,为什么 PM2 的 bus 能监听所有进程的消息呢?经过寻找,我发现了 PM2 是这么处理的(有删减):
God.nodeApp = function nodeApp(env_copy, cb){
clu = cluster.fork({pm2_env: JSON.stringify(env_copy), windowsHide: true});
clu.on('message', function cluMessage(msg) {
return God.bus.emit('process:msg', {
raw : msg,
process : {
pm_id : clu.pm2_env.pm_id,
PM2 在创建子进程之后会立刻监听它的消息触发,当子进程中调用了 process.send()
则会触发消息回调,并将消息内容进行包装后通过 bus.emit
再次触发。
在我们明确了主进程 ID 以及消息内容之后,就可以调用 sendDataToProcessId
向主应用发送消息使其 process.on('message')
回调触发。sendDataToProcessId
原理比较简单,上文我们提到,创建子进程之后 PM2 会将进程实例保存在对应进程 ID 的映射表中,由此 sendDataToProcessId
可以根据进程 ID 从映射表中找到对应实例并调用其 send
方法发送消息。下面是它大致的实现代码(有删减):
God.sendDataToProcessId = function (packet, cb) {
var pm_id = packet.id;
var proc = God.clusters_db[pm_id];
proc.send(packet);
千呼万唤始出来
做完所有准备工作之后,我们终于可以着手设计我们的工具了。下面就是我们开源工具的大致处理流程,具体逻辑可以下载我们这个工具的 源代码 查看。
export default async function intercom(pm2: PM2): Promise<void> {
if (pm2InstanceId === mainInstanceId) {
bus.on('process:msg', (packet: PM2Packet): void => {
if (topic === mainTopic) {
myProcessId = processId!;
return;
pm2.sendDataToProcessId(myProcessId, packet.raw, (err): void => {});
process.send!({ topic: mainTopic });
} else {
await new Promise((resolve) => {
bus.on('process:msg', (packet: PM2Packet): void => {
const { topic } = packet.raw;
if (topic === replyTopic) {
setTimeout(resolve);
process.send!({ topic: pingTopic });
在主进程中:
等待连接至 PM2 完成并且获得 bus
实例;
在 bus
消息回调中根据 topic
判断是否是自己并将进程 ID 保存下来;
bus
消息回调如果是日志消息则将消息内容发送至自身进程让 log4js 去处理;
调用 process.send()
触发 bus
回调;
在子进程中主要做了等待主进程初始化完毕的事,确保日志不会在主进程还未准备好时发送出去而造成丢失:
等待连接至 PM2 完成并且获得 bus
实例;
在 bus
消息回调中根据 topic
判断主进程是否已启动完毕,启动完毕后结束自身等待继续之后的逻辑;
调用 process.send()
触发主进程 bus
回调;
其它进程通信的方式
简单地延伸一下,除了 Node 原生 IPC 父子进程通信方式 之外,还有这些通信方式:
child_process
的 stdin
/stdout
Socket
Message Queue
Redis 的 Pub/Sub 机制(即发布订阅模式)
Message Queue 俗称 MQ,是目前常见的服务间通信解决方案之一。MQ 和 Redis 这两种方式这里不展开细说,大家有兴趣可以自己了解一下。下面我们就简单看一下前两种方式如何实现进程通信。
child_process
的 stdin
/stdout
先来简单看一下示例:
main.js
:
const childProcess = require('child_process');
const worker = childProcess.spawn('node', ['./worker.js']);
worker.stdout.setEncoding('utf8');
worker.stdin.write('Hello, worker!');
worker.stdout.on('data', function (message) {
console.log(message);
worker.js
:
process.stdin.setEncoding('utf8');
process.stdin.on('data', (message) => {
console.log(message);
process.stdout.write('Hello, main!');
在示例中,我们使用 child_process
的流进行数据传递,可以通过序列化数据以及反序列化数据进行通信。
Socket
Socket 通信是一种比较万能的通信方式,它不仅可以处理父子进程间的通信,还可以处理同机器中无关联的进程通信,甚至还可以处理跨机器间的进程通信。
下面的示例代码使用的是一个优秀的 Node 进程通信库 axon ,底层也是基于 Node 的 net
库进行 Socket 的处理。
emitter.js
:
const axon = require('axon');
const sock = axon.socket('push');
sock.bind(1901);
setInterval(() => {
sock.send('Hello, Billion Bottle!');
}, 1000);
receiver.js
:
const axon = require('axon');
const sock = axon.socket('pull');
sock.connect(1901);
sock.on('message', (message) => {
console.log(message);
小小的总结
如果大家也遇到了 pm2 install pm2-intercom
执行失败而无法使用 log4js PM2 模式的问题,欢迎大家使用我们的工具,我们将根据需要持续迭代。
当然,欢迎大家去 GitHub 阅读我们的源代码 ,给我们点个 Star 哦 💗!
pm2-intercom-log4js :github.com/BillionBott…
log4js :log4js-node.github.io/log4js-node…
PM2 :pm2.keymetrics.io/
axon :github.com/tj/axon
《Node.js v16.14.2 文档》 :nodejs.cn/api/
《PM2不2:源码分析》 :segmentfault.com/a/119000002…
《Nodejs进程间通信》 :www.ayqy.net/blog/nodejs…
更多精彩请关注我们的公众号「百瓶技术」,有不定期福利呦!