nodejs进程与集群cluster

我们启动一个服务、运行一个实例,就是开一个服务进程,Node.js 里通过 node app.js 开启一个服务进程,多进程就是进程的复制(fork),fork 出来的每个进程都拥有自己的独立空间地址、数据栈,一个进程无法访问另外一个进程里定义的变量、数据结构,只有建立了 IPC 通信,进程之间才可数据共享。

child_process

node.js中可以通过下面四种方式创建子进程:

  • child_process.spawn(command[, args][, options])
  • child_process.exec(command[, options][, callback])
  • child_process.execFile(file[, args][, options][, callback])
  • child_process.fork(modulePath[, args][, options])

spawn

const {spawn} = require("child_process");
// 创建 文件
spawn("touch",["index.js"]);

spawn()会返回child-process子进程实例:

const {spawn} = require("child_process");
// cwd 指定子进程的工作目录,默认当前目录
const child = spawn("ls",["-l"],{cwd:__dirname});
// 输出进程信息
child.stdout.pipe(process.stdout);
console.log(process.pid,child.pid);

子进程同样基于事件机制(EventEmitter API),提供了一些事件:

  • exit:子进程退出时触发,可以得知进程退出状态(code和signal)
  • disconnect:父进程调用child.disconnect()时触发
  • error:子进程创建失败,或被kill时触发
  • close:子进程的stdio流(标准输入输出流)关闭时触发
  • message:子进程通过process.send()发送消息时触发,父子进程消息通信

close与exit的区别主要体现在多进程共享同一stdio流的场景,某个进程退出了并不意味着stdio流被关闭了

子进程具有可读流的特性,利用可读流实现find . -type f | wc -l,递归统计当前目录文件数量:

const { spawn } = require('child_process');

const find = spawn('find', ['.', '-type', 'f']);
const wc = spawn('wc', ['-l']);

find.stdout.pipe(wc.stdin);

wc.stdout.on('data', (data) => {
  console.log(`Number of files ${data}`);
});

exec

spawn()exec()方法的区别在于,exec()不是基于stream的,exec()会将传入命令的执行结果暂存到buffer中,再整个传递给回调函数。

spawn()默认不会创建shell去执行命令(性能上会稍好),而exec()方法执行是会先创建shell,所以可以在exec()方法中传入任意shell脚本。

const {exec} = require("child_process");

exec("node -v",(error,stdout,stderr)=>{
    if (error) console.log(error);
    console.log(stdout)
})

exec()方法因为可以传入任意shell脚本所以存在安全风险。

spawn()方法默认不会创建shell去执行传入的命令(所以性能上稍微好一点),不过可以通过参数实现:

const { spawn } = require('child_process');
const child = spawn('node -v', {
  shell: true
});
child.stdout.pipe(process.stdout);

这种做法的好处是,既能支持shell语法,也能通过stream IO进行标准输入输出。

execFile

const {execFile} = require("child_process");

execFile("node",["-v"],(error,stdout,stderr)=>{
    console.log({ error, stdout, stderr })
    console.log(stdout)
})

通过可执行文件路径执行:

const {execFile} = require("child_process");

execFile("/Users/.nvm/versions/node/v12.1.0/bin/node",["-v"],(error,stdout,stderr)=>{
    console.log({ error, stdout, stderr })
    console.log(stdout)
})

fork

fork()方法可以用来创建Node进程,并且父子进程可以互相通信

//master.js
const {fork} = require("child_process");
const worker = fork("worker.js");

worker.on("message",(msg)=>{
    console.log(`from worder:${msg}`)
});
worker.send("this is master");

// worker.js
process.on("message",(msg)=>{
    console.log("worker",msg)
});
process.send("this is worker");

利用fork()可以用来处理计算量大,耗时长的任务:

const longComputation = () => {
  let sum = 0;
  for (let i = 0; i < 1e10; i++) {
    sum += i;
  };
  return sum;
};

longComputation方法拆分到子进程中,这样主进程的事件循环不会被耗时计算阻塞:

const http = require('http');
const { fork } = require('child_process');

const server = http.createServer();

server.on('request', (req, res) => {
  if (req.url === '/compute') {
    // 将计算量大的任务,拆分到子进程中处理
    const compute = fork('compute.js');
    compute.send('start');
    compute.on('message', sum => {
        // 收到子进程任务后,返回
      res.end(`Sum is ${sum}`);
    });
  } else {
    res.end('Ok')
  }
});

server.listen(3000);

进程间通信IPC

每个进程各自有不同的用户地址空间,任何一个进程的全局变量在另一个进程中都看不到,所以进程之间要交换数据必须通过内核,在内核中开辟一块缓冲区,进程1把数据从用户空间拷到内核缓冲区,进程2再从内核缓冲区把数据读走,内核提供的这种机制称为进程间通信(IPC,InterProcess Communication)

进程之间可以借助内置的IPC机制通信

父进程:

  • 接收事件process.on('message')
  • 发送信息给子进程master.send()

子进程:

  • 接收事件process.on('message')
  • 发送信息给父进程process.send()

fork 多进程

nodejs中的多进程是 多进程 + 单线程 的模式

// master.js. 
process.title = 'node-master'
const net = require("net");
const {fork} = require("child_process");

const handle = net._createServerHandle("127.0.0.1",3000);

for(let i=0;i<4;i++){
    fork("./worker.js").send({},handle);
}

// worker.js
process.title = 'worker-master';

const net = require("net");

process.on("message",(msg,handle)=>start(handle));

const buf = "hello nodejs";
const res= ["HTTP/1.1 200 ok","content-length:"+buf.length].join("\r\n")+"\r\n\r\n"+buf;

function start(server){
    server.listen();
    let num=0;
    server.onconnection = function(err,handle){
        num++;
        console.log(`worker ${process.pid} num ${num}`);
        let socket = new net.Socket({handle});
        socket.readable = socket.writable = true
        socket.end(res);
    }
}

运行node master.js,这里可以使用测试工具 Siege

siege -c 20 -r 10 http://localhost:3000

-c 并发量,并发数为20人 -r 是重复次数, 重复10次

这种创建进程的特点是:

  • 在一个服务上同时启动多个进程
  • 每个进程运行同样的代码(start方法)
  • 多个进程可以同时监听一个端口(3000)

不过每次请求过来交给哪个worker处理,master并不清楚,我们更希望master能够掌控全局,将请求指定给worker,我们做下面的改造:

//master.js
process.title = 'node-master'
const net =require("net");
const {fork} = require("child_process");

// 定义workers变量,保存子进程worker
let workers = [];
for(let i=0;i<4;i++){
    workers.push(fork("./worker.js"));
}
const handle = net._createServerHandle("0.0.0.0", 3000)
handle.listen();
// master控制请求
handle.onconnection = function(err,handle){
    let worker = workers.pop();
    // 将请求传递给子进程
    worker.send({},handle);
    workers.unshift(worker);
}

// worker.js
process.title = 'worker-master';
const net = require("net")
process.on("message", (msg, handle) => start(handle))

const buf = "hello nodejs"
const res = ["HTTP/1.1 200 ok", "content-length:" + buf.length].join("\r\n") + "\r\n\r\n" + buf

function start(handle) {
  console.log(`get a connection on worker,pid = %d`, process.pid)
  let socket = new net.Socket({ handle })
  socket.readable = socket.writable = true
  socket.end(res)
}

Cluster 多进程

Node.js 官方提供的 Cluster 模块不仅充分利用机器 CPU 内核开箱即用的解决方案,还有助于 Node 进程增加可用性的能力,Cluster模块是对多进程服务能力的封装。

// master.js
const cluster = require("cluster");
const numCPUS = require("os").cpus().length;

if(cluster.isMaster){
    console.log(`master start...`)
    for(let i=0;i<numCPUS;i++){
        cluster.fork();
    };

    cluster.on("listening",(worker,address)=>{
        console.log(`master listing worker pid ${worker.process.pid} address port:${address.port}`)
    })

}else if(cluster.isWorker){
    require("./wroker.js")
}
//wroker.js
const http = require("http");
http.createServer((req,res)=>res.end(`hello`)).listen(3000)

进程重启和守护

进程重启

为了增加服务器的可用性,我们希望实例在出现崩溃或者异常退出时,能够自动重启。

//master.js
const cluster = require("cluster")
const numCPUS = require("os").cpus().length

if (cluster.isMaster) {
  console.log("master start..")
  for (let i = 0; i < numCPUS; i++) {
      cluster.fork()
    }
  cluster.on("listening", (worker, address) => {
    console.log("listening worker pid " + worker.process.pid)
  })
  cluster.on("exit", (worker, code, signal) => {
      // 子进程出现异常或者奔溃退出
    if (code !== 0 && !worker.exitedAfterDisconnect) {
      console.log(`工作进程 ${worker.id} 崩溃了,正在开始一个新的工作进程`)
      // 重新开启子进程
      cluster.fork()
    }
  })
} else if (cluster.isWorker) {
  require("./server")
}
const http = require("http")
const server = http.createServer((req, res) => {
    // 随机触发错误
  if (Math.random() > 0.5) {
      throw new Error(`worker error pid=${process.pid}`)
  }
  res.end(`worker pid:${process.pid} num:${num}`)
}).listen(3000)

如果请求抛出异常而结束子进程,主进程能够监听到结束事件,重启开启子进程。

上面的重启只是简单处理,真正项目中要考虑到的就很多了,这里可以参考egg的多进程模型和进程间通讯

下面是来自文章Node.js进阶之进程与线程更全面的例子:

// master.js
const {fork} = require("child_process");
const numCPUS = require("os").cpus().length;

const server = require("net").createServer();
server.listen(3000);
process.title="node-master";

const workers = {};
const createWorker = ()=>{
    const worker = fork("worker.js");
    worker.on("message",message=>{
        if(message.act==="suicide"){
            createWorker();
        }
    })

    worker.on("exit",(code,signal)=>{
        console.log('worker process exited,code %s signal:%s',code,signal);
        delete workers[worker.pid];
    });

    worker.send("server",server);
    workers[worker.pid] = worker;
    console.log("worker process created,pid %s ppid:%s", worker.pid, process.ppid)
}

for (let i = 0; i < numCPUS; i++) {
  createWorker()
}

process.once("SIGINT",close.bind(this,"SIGINT")); // kill(2) Ctrl+C
process.once("SIGQUIT", close.bind(this, "SIGQUIT")) // kill(3) Ctrl+l
process.once("SIGTERM", close.bind(this, "SIGTERM")) // kill(15) default
process.once("exit", close.bind(this))

function close(code){
    console.log('process exit',code);
    if(code!=0){
        for(let pid in workers){
            console.log('master process exit,kill worker pid:',pid);
            workers[pid].kill("SIGINT");
        }
    };
    process.exit(0);
}
//worker.js
const http=require("http");
const server = http.createServer((req,res)=>{
    res.writeHead(200,{"Content-Type":"text/plain"});
    res.end(`worker pid:${process.pid},ppid:${process.ppid}`)
    throw new Error("worker process exception!");
});

let worker;
process.title = "node-worker";
process.on("message",(message,handle)=>{
    if(message==="server"){
        worker = handle;
        worker.on("connection",socket=>{
            server.emit("connection",socket)
        })
    }
})
process.on("uncaughtException",(error)=>{
    console.log('some error')
    process.send({act:"suicide"});
    worker.close(()=>{
        console.log(process.pid+" close")
        process.exit(1);
    })
})

这个例子考虑更加周到,通过uncaughtException捕获子进程异常后,发送信息给主进程重启,并在链接关闭后退出。

进程守护

pm2可以使服务在后台运行不受终端的影响,这里主要通过两步处理:

  • options.detached:为true时运行子进程在父进程退出后继续运行
  • unref() 方法可以断绝跟父进程的关系,使父进程退出后子进程不会跟着退出
const { spawn } = require("child_process")

function startDaemon() {
  const daemon = spawn("node", ["daemon.js"], {
    // 当前工作目录
    cwd: __dirname,
    // 作为独立进程存在
    detached: true,
    // 忽视输入输出流
    stdio: "ignore",
  })
  console.log(`守护进程 ppid:%s pid:%s`, process.pid, daemon.pid)
  // 断绝父子进程关系
  daemon.unref()
}

startDaemon()
// daemon.js
const fs = require("fs")
const {Console} = require("console");
// 输出日志
const logger = new Console(fs.createWriteStream("./stdout.log"),fs.createWriteStream("./stderr.log"));
// 保持进程一直在后台运行
setInterval(()=>{
    logger.log("daemon pid:",process.pid,"ppid:",process.ppid)
},1000*10);

// 生成关闭文件
fs.writeFileSync("./stop.js", `process.kill(${process.pid}, "SIGTERM")`)

参考链接