最近面试被问到过的问题,查了资料后,自己总结一下记录下来。

问题的提出

众所周知。Node.js 是单线程的,单线程特性使得能够以较低的系统资源消耗快速处理许多请求。 有异步 I/O,所以 Node.js 是可以处理 IO 密集型任务,但是 Node.js 碰到 CPU 密集型,或者说计算密集型任务(例如机器学习,求斐波那契数列,求素数等),node 的单线程优势就往往变成劣势。

有人说,那可以用他的异步队列来解决。那让我们看看 Promise 的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// serverWithPromises.js
const express = require("express");
const app = express();

app.get("/isprime", async (req, res) => {
const startTime = new Date();
const result = await isPrime(parseInt(req.query.number));
const endTime = new Date();
res.json({
number: parseInt(req.query.number),
isprime: result,
time: endTime.getTime() - startTime.getTime() + "ms",
});
});

app.get("/testrequest", (req, res) => {
res.send("I am unblocked now");
});

const isPrime = (number) => {
return new Promise((resolve) => {
let isPrime = true;
for (let i = 3; i < number; i++) {
if (number % i === 0) {
isPrime = false;
break;
}
}
resolve(isPrime);
});
};

app.listen(3000, () => console.log("listening on port 3000"));

可以看出 Promise 是异步的部分在 .then()代码里,但是他自身的阻塞是无法避免的,例如上面的代码,我们把
for (let i = 3; i < number; i++) 改成 for (let i = 3; i < 1000000000; i++),那么整个任务就会卡死。从而阻塞 '/testrequest' api 的请求。

Promise 之所以在 JavaScript 社区中被推崇为“异步非阻塞操作”的一种方式,是因为 Promise 擅长完成需要更多时间而不是更多 CPU 的工作。 这里所说的“需要更多时间的工作”通常包括:数据库通信、跨服务器通信等,这也是 Web 服务器所做的 99% 的工作。

JavaScript Promise 通过将任务推送到特殊队列并侦听事件(例如:数据库返回)发生并在该事件发生时执行一个回调函数,这就是著名的事件循环。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import express from "express";
import fetch from "node-fetch";
const app = express();

// node-fetch 用于在Nodejs中发送情况

app.get("/calltoslowserver", async (req, res) => {
const result = await fetch("http://localhost:5000/slowrequest");
// 返回一个Promise
const resJson = await result.json();
res.json(resJson);
});

app.get("/slowrequest", (req, res) => {
setTimeout(() => res.json({ message: "sry i was late" }), 20000);
// setTimeout 模拟10s的时间花销,即使花销很长时间也不会阻塞其他访问
});

app.get("/testrequest", (req, res) => {
res.send("I am unblocked now");
});

app.listen(4000, () => console.log("listening on port 4000"));

在上面的示例中可以看到,即使对 /slowrequest 或者 /calltoslowserver 的调用花费了很长时间,所有其他请求,比如:/testrequest,都没有被阻止。 这是因为 node-fetchfetch 函数返回一个 Promise,而 这种单线程、非阻塞、异步的处理方式是 Node 中默认的。

解决方案

Node js 提供了三种解决方案来解决上面问题。

child_process 模块

child_process 模块提供了生成拥有自己内存的新进程的能力, 这些进程之间的通信是通过 OS 提供的 IPC(Inter-process Communication)建立。

该模块内部主要有 3 个 api:

  • child_process.exec()
  • child_process.spawn()
  • child_process.fork()

exec()

该 api 允许你执行系统命令。虽然它不能直接解决 CPU 密集型任务,但你可以使用它来调用其他语言编写的程序或脚本来处理 CPU 密集型任务,如计算斐波那契数列。

首先我们写一个 fibonacci.py 文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# fibonacci.py
import sys

def fibonacci(n):
if n <= 0:
return 0
elif n == 1:
return 1
else:
return fibonacci(n-1) + fibonacci(n-2)

if __name__ == "__main__":
n = int(sys.argv[1])
print(fibonacci(n))

接下来,在 Express 中使用 child_process.exec() 调用这个 Python 脚本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
const express = require("express");
const bodyParser = require("body-parser");
// npm i express body-parser
const { execSync, exec } = require("child_process");
const app = express();
const port = 3000;

app.use(bodyParser.json());

// 阻塞接口的示例
app.post("/testblock", (req, res) => {
const number = req.body.number || 10;
// 使用 execSync 会阻塞Node.js事件循环
const result = execSync(`python fibonacci.py ${number}`).toString().trim();
res.json({ fibonacci: result });
});

// 非阻塞接口的示例
app.post("/fibonaccipy", (req, res) => {
const number = req.body.number || 10;
exec(`python fibonacci.py ${number}`, (error, stdout, stderr) => {
if (error) {
return res.status(500).json({ error: `执行的错误: ${error.message}` });
}
if (stderr) {
return res.status(500).json({ error: `标准错误输出: ${stderr}` });
}
res.json({ fibonacci: stdout.trim() });
});
});

app.listen(port, () => {
console.log(`Server is running on port ${port}`);
});

在这个示例中,我们创建了两个 POST 接口。/testblock接口使用 execSync 来同步执行 Python 脚本,这会阻塞 Node.js 的事件循环,直到 Python 脚本执行完成。而/fibonaccipy 接口则使用 exec 来异步执行 Python 脚本,不会阻塞 Node.js 的事件循环。

spawn()

该方法用于异步生成子进程,该子进程可以是允许终端运行的任何命令。用法与上一个 exec() 方法类似,但 spawn() 方法返回一个 ChildProcess 对象,该对象具有 stdoutstderr 属性,用于读取子进程的输出。
他可以和子进程间进行通信,获取其输出,这点是 exec() 方法无法做到的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
const express = require("express");
const app = express();
const { spawn } = require("child_process");
// 导入child_process子进程
app.get("/ls", (req, res) => {
const ls = spawn("ls", ["-lash", req.query.directory]);
ls.stdout.on("data", (data) => {
// stdin,stdout,stderr 管道(connection)是通过父级parent建立的
// Node.js 进程和生成的子进程,可以在标准输出上监听数据事件
res.write(data.toString());
// 日期将以流(数据块)的形式出现
// 由于 res 是一个可写流,支持写入
});
ls.on("close", (code) => {
console.log(`child process exited with code ${code}`);
res.end();
// 最后,当子进程退出时,所有写入的流都会被发送回来
});
});

app.listen(7000, () => console.log("listening on port 7000"));

fork() 重点关注

child_process.fork() 专门用于生成新的 Nodejs 进程。 与 spawn 一样,返回的 childProcess 对象将具有内置的 IPC 通信通道,允许消息在父进程和子进程之间来回传递。

例如以下主程序代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
const express = require("express");
const app = express();
const { fork } = require("child_process");

app.get("/isprime", (req, res) => {
const childProcess = fork("./forkedchild.js");
// fork() 的第一个参数是子进程要运行的 js 文件的名称
childProcess.send({ number: parseInt(req.query.number) });
// send方法用于通过IPC向子进程发送消息
const startTime = new Date();
childProcess.on("message", (message) => {
// on("message")方法用于监听子进程发送的消息
const endTime = new Date();
res.json({
...message,
time: endTime.getTime() - startTime.getTime() + "ms",
});
});
});
app.get("/testrequest", (req, res) => {
res.send("I am unblocked now");
});
app.listen(3636, () => console.log("listening on port 3636"));

其中,forkedchild.js 文件如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
process.on("message", (message) => {
//子进程正在监听父进程的消息
const result = isPrime(message.number);
process.send(result);
// 使用后要exit() 防止进程孤立
process.exit();
});

function isPrime(number) {
let isPrime = true;

for (let i = 3; i < number; i++) {
if (number % i === 0) {
isPrime = false;
break;
}
}

return {
number: number,
isPrime: isPrime,
};
}

此时当访问 /isprime?number=29355126551 时候虽然浏览器也会一直处于 loading 等待直到返回,但是此时不会阻塞后续的其他请求,比如:/testrequest api 的请求。

总结:

  • 如果你需要执行一个简短的命令并获取其完整输出,可以使用 exec()。
  • 如果你需要处理大量数据或需要与子进程进行实时通信,应该使用 spawn()。
  • 如果你正在创建多个 Node.js 工作进程,并希望通过 IPC 进行通信,那么 fork() 是最佳选择。

cluster 模块

cluster 主要用于垂直(为现有机器添加更多功能)扩展 Nodejs Web 服务器,构建在 child_process 模块之上。 在 Http 服务器中,cluster 模块使用 child_process.fork() 自动 fork 进程并建立主从架构,其中父进程以循环方式将传入请求分发给子进程。 理想情况下,fork 的进程数应等于计算机具有的 CPU 核数。

下面示例使用 cluster 模块构建一个 Express 服务器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
const cluster = require("cluster");
const http = require("http");
const cpuCount = require("os").cpus().length;
// 返回cpu的核数
if (cluster.isMaster) {
masterProcess();
} else {
childProcess();
}

// 父进程
function masterProcess() {
console.log(`Master process ${process.pid} is running`);
// fork()更多 workers
for (let i = 0; i < cpuCount; i++) {
console.log(`Forking process number ${i}...`);
cluster.fork();
//creates new node js processes
}
cluster.on("exit", (worker, code, signal) => {
console.log(`worker ${worker.process.pid} died`);
cluster.fork(); //forks a new process if any process dies
});
}
// 子进程
function childProcess() {
const express = require("express");
const app = express();
// workers 可以共享 TCP connection
app.get("/", (req, res) => {
res.send(`hello from server ${process.pid}`);
});
app.listen(5555, () =>
console.log(`server ${process.pid} listening on port 5555`)
);
}

当运行上面的代码时,cluster.isMaster 第一次为 true 并且 masterProcess() 函数被执行。masterProcess()函数 fork()了 4 个 NodeJS 进程(依赖于设备 CPU 的核数),每当 fork()另一个进程时,都会再次运行相同的文件,但 cluster.isMaster 将返回 false,因为该进程现在是一个子进程,因为是 fork()的,因此控制转到 else 条件。

最终 childProcess() 函数执行了 4 次,并创建了 4 个 Express 服务器实例,后续请求以循环方式分发到四台服务器,从而充分利用机器的 CPU。 Node js 文档还指出,有一些内置的智能功能可以避免工作进程过载。

Cluster 模块是垂直扩展简单 Nodejs 服务器的最简单、最快的方法。 但是,为了实现更高级和弹性的扩展,可以使用 docker 容器和 Kubernetes 等工具。

worker threads

本质上,工作线程和子进程之间的区别就像线程和进程之间的区别一样。理想情况下,创建的线程数应等于 cpu 核数。

下面演示 worker_threads 的使用。先创建一个单进程的服务器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
const express = require("express");
const app = express();
// 求和
function sumOfPrimes(n: number): number {
var sum = 0;
for (var i = 2; i <= n; i++) {
for (var j = 2; j <= i / 2; j++) {
if (i % j == 0) {
break;
}
}
if (j > i / 2) {
sum += i;
}
}
return sum;
}

app.get(
"/sumofprimes",
(
{ query: { number } }: any,
res: {
json: (arg0: { number: any; sum: number; timeTaken: string }) => void;
}
) => {
const startTime = new Date().getTime();
const sum = sumOfPrimes(number as number);
const endTime = new Date().getTime();
res.json({
number: number,
sum: sum,
timeTaken: (endTime - startTime) / 1000 + " seconds",
});
}
);

app.listen(6868, () => console.log("listening on port 6868"));

当我们请求 100 万的质数之和时,我们看到以下结果:我用的电脑是 MacBook’s M1,也要足足阻塞 12 秒

花费12秒

我们把他改造为工作线程模式:主程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import express from "express";
import { Worker } from "worker_threads";
const app = express();

function runWorker(workerData) {
return new Promise((resolve, reject) => {
// 第一个参数是worker的文件名
const worker = new Worker("./cpu密集型演示_worker_threads_质数加总.js", {
workerData,
});
worker.on("message", resolve);
// 当数据从worker线程返回的时候改Promise为resolve
worker.on("error", reject);
worker.on("exit", (code) => {
if (code !== 0) {
reject(new Error(`Worker stopped with exit code ${code}`));
}
});
});
}

function divideWorkAndGetSum() {
// 为了简单起见,对值 600000 进行硬编码分成4等份
const start1 = 2;
const end1 = 150000;
const start2 = 150001;
const end2 = 300000;
const start3 = 300001;
const end3 = 450000;
const start4 = 450001;
const end4 = 600000;
const start5 = 600001;
const end5 = 750000;
const start6 = 750001;
const end6 = 900000;
const start7 = 900001;
const end7 = 1000000;
// 为每一个worker单独分配内容
const worker1 = runWorker({ start: start1, end: end1 });
const worker2 = runWorker({ start: start2, end: end2 });
const worker3 = runWorker({ start: start3, end: end3 });
const worker4 = runWorker({ start: start4, end: end4 });
const worker5 = runWorker({ start: start5, end: end5 });
const worker6 = runWorker({ start: start6, end: end6 });
const worker7 = runWorker({ start: start7, end: end7 });
// 要求所有都resolve
return Promise.all([
worker1,
worker2,
worker3,
worker4,
worker5,
worker6,
worker7,
]);
}

app.get("/sumofprimeswiththreads", async (req, res) => {
const startTime = new Date().getTime();
const sum = await divideWorkAndGetSum()
.then(
(
values
//values is an array containing all the resolved values
) => values.reduce((accumulator, part) => accumulator + part.result, 0)
//reduce is used to sum all the results from the workers
)
.then((finalAnswer) => finalAnswer);

const endTime = new Date().getTime();
res.json({
number: 1000000,
sum: sum,
timeTaken: (endTime - startTime) / 1000 + " seconds",
});
});

app.listen(7777, () => console.log("listening on port 7777"));

文件 cpu密集型演示_worker_threads_质数加总.js :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import { workerData, parentPort } from "worker_threads";
// workerData 将是 multiThreadServer.js 中 Worker 构造函数的第二个参数

const start = workerData.start;
const end = workerData.end;

var sum = 0;
for (var i = start; i <= end; i++) {
for (var j = 2; j <= i / 2; j++) {
if (i % j == 0) {
break;
}
}
if (j > i / 2) {
sum += i;
}
}

parentPort.postMessage({
// 将结果消息发送回父进程
start: start,
end: end,
result: sum,
});

看一下运行时间,只有 3 秒多

花费3秒多