携手创作,共同成长!这是我参与「掘金日新计划 · 8 月更文挑战」的第1天, 点击查看活动详情
大家好,我是码农小余。很久不见,怪是想念。
今天给老表们分享 npm-run-all 源码实现, npm-run-all 是一个用来并行或者串行运行多个 npm 脚本的 CLI 工具。阅读完本文,你能收获到:
直入主题,整个 npm-run-all 的整体执行流程如下:
当我们在终端敲入命令,实际上会去调用 bin/xx/index.js 函数,然后调用 bootstrap 去分发不同命令不同参数的逻辑。help 和 version 比较简单,本文不做分析。任务控制方面,会先调用 npmRunAll 做参数解析,然后执行 runTasks 执行任务组中任务,全部任务执行后返回结果,结束整个流程。
npm-run-all
包支持三条命令,我们看到源码根目录的 package.json 文件:
bin 下面定义的命令脚本:
直接看到
bin/npm-run-all/index.js
:
require("../common/bootstrap")("npm-run-all")
上述代码中,如果是执行 run-p 这条命令,则函数传入的参数是 run-p
,run-s
同理。bootstrap 通过参数的不同,将任务分发到 bin 下不同目录中:
├── common
│ ├── bootstrap.js
│ ├── parse-cli-args.js
│ └── version.js
├── npm-run-all
│ ├── help.js
│ ├── index.js
│ └── main.js
├── run-p
│ ├── help.js
│ ├── index.js
│ └── main.js
└── run-s
├── help.js
├── index.js
└── main.js
照着上述代码结构,结合 ../common/bootstrap
代码:
"use strict"
module.exports = function bootstrap(name) {
const argv = process.argv.slice(2)
switch (argv[0]) {
case undefined:
case "-h":
case "--help":
return require(`../${name}/help`)(process.stdout)
case "-v":
case "--version":
return require("./version")(process.stdout)
default:
// https://github.com/mysticatea/npm-run-all/issues/105
// Avoid MaxListenersExceededWarnings.
process.stdout.setMaxListeners(0)
process.stderr.setMaxListeners(0)
process.stdin.setMaxListeners(0)
// Main
return require(`../${name}/main`)(
argv,
process.stdout,
process.stderr
).then(
() => {
// I'm not sure why, but maybe the process never exits
// on Git Bash (MINGW64)
process.exit(0)
() => {
process.exit(1)
bootstrap 函数依据调用的命令调用不同目录下的 help、version 或者调用 main 函数,达到了差异消除的效果。
然后再把目光放在 process.stdout.setMaxListeners(0)
这是啥玩意?打开 issue 链接,通过报错信息和翻阅官方文档:
By default EventEmitter
s will print a warning if more than 10
listeners are added for a particular event. This is a useful default that helps finding memory leaks. The emitter.setMaxListeners()
method allows the limit to be modified for this specific EventEmitter
instance. The value can be set to Infinity
(or 0
) to indicate an unlimited number of listeners.
默认情况下,如果为特定事件添加了超过 10 个侦听器,EventEmitters 将发出警告。这是一个有用的默认值,有助于发现内存泄漏。 emitter.setMaxListeners() 方法允许为这个特定的 EventEmitter 实例修改限制。该值可以设置为 Infinity(或 0)以指示无限数量的侦听器。
为什么要处理这个情况呢?因为用户可能这么使用:
$ run-p a1 a2 a3 a4 a5 a6 a7 a8 a9 a10 a11
你永远想象不到用户会怎么使用你的工具!
分析完不同命令的控制逻辑,我们进入核心的 npmRunAll 函数,参数解析部分逻辑如下:
module.exports = function npmRunAll(args, stdout, stderr) {
try {
const stdin = process.stdin
const argv = parseCLIArgs(args)
} catch () {
// ...
解析处理所有标准输入流参数,最终生成并返回 ArgumentSet 实例 set。parseCLIArgsCore 只看控制任务流执行的参数:
function addGroup(groups, initialValues) {
groups.push(Object.assign(
{ parallel: false, patterns: [] },
initialValues || {}
function parseCLIArgsCore(set, args) {
LOOP:
for (let i = 0; i < args.length; ++i) {
const arg = args[i]
switch (arg) {
// ...
case "-s":
case "--sequential":
case "--serial":
if (set.singleMode && arg === "-s") {
set.silent = true
break
if (set.singleMode) {
throw new Error(`Invalid Option: ${arg}`)
addGroup(set.groups)
break
case "-p":
case "--parallel":
if (set.singleMode) {
throw new Error(`Invalid Option: ${arg}`)
addGroup(set.groups, { parallel: true })
break
default: {
// ...
break
// ...
return set
将任务都装到 groups
数组中,如果是并行任务(传了 -p
、--parallel
参数),就给任务加上 { parallel: true }
标记。默认是 { parallel: false }
,即串行任务。
执行任务组
在进入这一小节之前,我们就 npm-run-all 源码在 scripts 下加一条 debug 命令:
$ "node ./bin/npm-run-all/index.js lint test"
解析完参数生成的 argv.groups
如下:
paralles: false,
patterns: ['lint', 'test']
有了这个数组结果,我们再看任务执行流程会更加明朗。
// bin/npm-run-all/main.js
module.exports = function npmRunAll(args, stdout, stderr) {
try {
// 省略解析参数
// 执行任务
const promise = argv.groups.reduce(
(prev, group) => {
// 分组中没有任务,直接返回 null
if (group.patterns.length === 0) {
return prev
return prev.then(() => runAll(
group.patterns, // ['lint', 'test']
// ……
// 是否并行执行
parallel: group.parallel,
// 并行的最大数量
maxParallel: group.parallel ? argv.maxParallel : 1,
// 一个任务失败后继续执行其他任务
// ……
arguments: argv.rest,
// 这个📌用于当任务以0码退出时,终止全部任务
race: group.parallel && argv.race,
Promise.resolve(null)
// ...
return promise
catch (err) {
//eslint-disable-next-line no-console
console.error("ERROR:", err.message)
return Promise.reject(err)
上述代码解析完命令行中的参数之后,通过 reduce 拼接所有任务组的结果。任务组就是 npm-run-all 支持同时配置并行和串行的任务,并生成多个任务组。例如:
$ npm-run-all -p a b -s c d e
上述命令会生成两个任务组,并行任务组是 ['a', 'b'],串行任务组是 ['c', 'd', 'e']。就会执行两次 runAll
。接下来就看看单个任务组的执行逻辑:
// lib/index.js
module.exports = function npmRunAll(patternOrPatterns, options) { //eslint-disable-line complexity
// 省略一系列参数格式化和默认值处理……
try {
const patterns = parsePatterns(patternOrPatterns, args)
// 省略非法参数报错和参数之间的校验……
return Promise.resolve()
.then(() => {
if (taskList != null) {
return { taskList, packageInfo: null }
return readPackageJson()
.then(x => {
// 从 package.json 中匹配任务
// 🌰中 patterns 是 ['lint', 'tests'],所以 lint 和 test 这两个任务一定要从 package.json 的 scripts 中能查看到
const tasks = matchTasks(x.taskList, patterns)
return runTasks(tasks, {
// 省略上面格式化和校验后的参数……
catch (err) {
return Promise.reject(new Error(err.message))
上述代码将参数的处理和校验全部省略掉了,看到核心的逻辑 matchTasks
和 runTasks
。matchTasks
通过读取 package.json
下 scripts
中的命令,然后判断任务组 patterns
中的任务是否都存在于 taskList
中。
然后就来到了本小节的核心逻辑——调用 runTasks
依次执行每个任务组中的任务:
module.exports = function runTasks(tasks, options) {
return new Promise((resolve, reject) => {
// 任务组中不存在任务,直接返回空数组
if (tasks.length === 0) {
resolve([])
return
// 结果数组
const results = tasks.map(task => ({ name: task, code: undefined }))
// 任务队列
const queue = tasks.map((task, index) => ({ name: task, index }))
// 用于判断并行的时候任务是否全部完成
const promises = []
let error = null
let aborted = false
* Done.
* @returns {void}
function done() {
// ……
* Aborts all tasks.
* @returns {void}
function abort() {
// ……
* Runs a next task.
* @returns {void}
function next() {
// 任务被终止了,则不需要再往下执行了
if (aborted) {
return
// 并行时,只有满足 queue 和 promises 的长度都为 0,才可以判断任务组完成
if (queue.length === 0) {
if (promises.length === 0) {
done()
return
const task = queue.shift()
const promise = runTask(task.name, optionsClone)
promises.push(promise)
promise.then(
(result) => {
// 完成一个任务,就将其从 promises 中删除
remove(promises, promise)
if (aborted) {
return
if (result.code) {
error = new NpmRunAllError(result, results)
// 失败后不继续执行后续的任务,则直接终止整个任务组
if (!options.continueOnError) {
abort()
return
// Aborts all tasks if options.race is true.
if (options.race && !result.code) {
abort()
return
// Call the next task.
next()
(thisError) => {
remove(promises, promise)
if (!options.continueOnError || options.race) {
error = thisError
abort()
return
next()
// 最大并发数
const max = options.maxParallel
// 对比任务数量、配置的并发数、取较小者。这是为了防止配置的 maxParallel 比实际执行的任务数量还大的情况
const end = (typeof max === "number" && max > 0)
? Math.min(tasks.length, max)
: tasks.length
for (let i = 0; i < end; ++i) {
next()
通过队列这个数据结构,依次执行组中的每一条任务,任务成功后将结果存入 result,然后调用 next 执行下一个任务;可以通过 abort 终止全部任务;通过 done 完成整个队列的状态更新,并将结果返回。
接下来,通过一张图和本小节的示例,来更好地理解串行机制:
从左到右得流程,讲解一下:
初始化时,根据任务组的 patterns,生成任务队列;
计算完任务数量 end 之后,执行 next 函数。此时会从任务队列中取出 lint 任务,调用 runTask 去执行该任务(图2所示)。(runTask 的细节放到下一小节分析。)执行完成后,会执行以下子任务:
如果配置了 aggregateOutput
参数,会将任务的输出流写入到内存流;
更新 result.code,如果配置了失败不继续执行(!continueOnError) 或者 race 参数,就直接调用 abort 终止整个任务队列,返回结果;
如果成功或配置失败了继续执行其他任务( continueOnError),就去任务队列中取出下一个任务(图3所示),会去执行 test 任务,重复上一步骤的逻辑。最终任务队列中没有其他任务了,此时也会执行 done 函数,结束整个任务组,并将 results 返回。
并行机制的不同就在于初始的时候会调用多次 next 函数,并且会判断当前是否还有正在执行的任务。
上图是我们执行以下命令的流程图:
$ node ./bin/npm-run-all/index.js -p lint test --max-parallel 2
命令的意思是并行执行 lint 和 test 任务,并且最大并发数是 2。
回到上面的流程图:
初始时还是会创建一个任务队列,并将 lint 和 test 两个任务添加到队列中;
然后在首次执行时,因为我们是并发执行,所以会调用两次 next 函数,promises 数组会保存两个 promise 实例;
当 lint 任务先完成(此时 test 任务还在执行,即 test promise 还未结束),此时会再调用 next 函数。此时会判断任务队列和正在进行的任务队列是否为空,如果是的话就调用 done 返回结果,否则什么都不做,等待其他任务执行完成。
当 test 任务也完成时(假设此时 lint 任务已经完成),同样也会再次执行 next。但此时 queue 和 promises 两个数组的长度都是 0,就执行 done 逻辑,输出任务组的结果。
本节我们学习了任务组中的任务不管是串行机制还是并行机制,都通过任务队列依次执行。不同的是,串行是首次只执行一次 next,并行根据参数执行多次 next。当满足队列为空并且所有任务都完成,就结束当前任务组,并将缓存在 results 中的结果返回。
单个任务如何执行
了解完任务组的串行和并行机制,这一小节就来了解单个任务是如何被执行的。
module.exports = function runTask(task, options) {
let cp = null
const promise = new Promise((resolve, reject) => {
// 包装输出、输入、错误信息流……
// 在输出流中写入任务名称
if (options.printName && stdout != null) {
stdout.write(createHeader(
task,
options.packageInfo,
options.stdout.isTTY
// 执行命令的 npm 路径,npm-cli 的路径
const npmPath = options.npmPath || process.env.npm_execpath
const npmPathIsJs = typeof npmPath === "string" && /\.m?js/.test(path.extname(npmPath))
// 执行路径,一般是全局的 bin/node 路径
const execPath = (npmPathIsJs ? process.execPath : npmPath || "npm")
// 判断是不是 yarn
const isYarn = path.basename(npmPath || "npm").startsWith("yarn")
const spawnArgs = ["run"]
if (npmPathIsJs) {
spawnArgs.unshift(npmPath)
if (!isYarn) {
Array.prototype.push.apply(spawnArgs, options.prefixOptions)
else if (options.prefixOptions.indexOf("--silent") !== -1) {
spawnArgs.push("--silent")
Array.prototype.push.apply(spawnArgs, parseArgs(task).map(cleanTaskArg))
// 执行命令
cp = spawn(execPath, spawnArgs, spawnOptions)
// 省略输出流格式化……
// Register
cp.on("error", (err) => {
cp = null
reject(err)
cp.on("close", (code, signal) => {
cp = null
// 成功后返回任务名称、状态
resolve({ task, code, signal })
// 给当前的promise挂上静态方法,用于结束当前子进程任务
promise.abort = function abort() {
if (cp != null) {
cp.kill()
cp = null
return promise
runTask 做了四件事:
格式化标准输入、输出流,添加一些任务名称头部信息之类的;
获取任务的执行器,获取 npm-cli、node 等路径信息,然后拼接整个任务的执行命令;
调用封装后的 spawn 执行命令,并监听 error 和 close 事件用于返回执行结果;因为系统的不一致,所以 spawn 通过 cross-spawn 做了一层封装。
给当前任务挂上了 abort 的静态方法,用于结束当前进程;当在任务组执行 abort 方法时,实际会调用这个静态方法。
有人会问为什么要去看一个 6 年前写的源码?语法老旧、甚至还能看到标记语法 。但我想表达的是,npm-run-all
这个包的核心逻辑——通过任务队列去实现串行和并行的任务流模型是非常经典的,类似 continueOnError、race 这样的逻辑控制节点也随处可见。例如当前非常时髦的构建系统 Turborepo ,它对 pipeline 的控制逻辑也能复用这个任务流模型。除此之外,通过 npm-run-all 的目录结构,我们可以从中总结出做一个开源项目所需的基本要求。
关注小余,下一篇文章我们结合 npm-run-all 聊聊开源项目所需的基本要素。