最近面试被问到过的问题,查了资料后,自己总结一下记录下来。
问题的提出 众所周知。Node.js 是单线程的,单线程特性使得能够以较低的系统资源消耗快速处理许多请求。 有异步 I/O,所以 Node.js 是可以处理 IO 密集型任务,但是 Node.js 碰到 CPU 密集型,或者说计算密集型任务(例如机器学习,求斐波那契数列,求素数等),node 的单线程优势就往往变成劣势。
有人说,那可以用他的异步队列来解决。那让我们看看 Promise
的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 const express = require ("express" );const app = express();app.get("/isprime" , async (req, res) => { const startTime = new Date (); const result = await isPrime(parseInt (req.query.number)); const endTime = new Date (); res.json({ number: parseInt (req.query.number), isprime: result, time: endTime.getTime() - startTime.getTime() + "ms" , }); }); app.get("/testrequest" , (req, res ) => { res.send("I am unblocked now" ); }); const isPrime = (number ) => { return new Promise ((resolve ) => { let isPrime = true ; for (let i = 3 ; i < number; i++) { if (number % i === 0 ) { isPrime = false ; break ; } } resolve(isPrime); }); }; app.listen(3000 , () => console .log("listening on port 3000" ));
可以看出 Promise
是异步的部分在 .then()
代码里,但是他自身的阻塞是无法避免的,例如上面的代码,我们把for (let i = 3; i < number; i++)
改成 for (let i = 3; i < 1000000000; i++)
,那么整个任务就会卡死。从而阻塞 '/testrequest'
api 的请求。
Promise 之所以在 JavaScript 社区中被推崇为“异步非阻塞操作”的一种方式,是因为 Promise 擅长完成需要更多时间而不是更多 CPU 的工作 。 这里所说的“需要更多时间的工作”通常包括:数据库通信、跨服务器通信等,这也是 Web 服务器所做的 99% 的工作。
JavaScript Promise 通过将任务推送到特殊队列并侦听事件(例如:数据库返回)发生并在该事件发生时执行一个回调函数,这就是著名的事件循环。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 import express from "express" ;import fetch from "node-fetch" ;const app = express();app.get("/calltoslowserver" , async (req, res) => { const result = await fetch("http://localhost:5000/slowrequest" ); const resJson = await result.json(); res.json(resJson); }); app.get("/slowrequest" , (req, res ) => { setTimeout (() => res.json({ message : "sry i was late" }), 20000 ); }); app.get("/testrequest" , (req, res ) => { res.send("I am unblocked now" ); }); app.listen(4000 , () => console .log("listening on port 4000" ));
在上面的示例中可以看到,即使对 /slowrequest
或者 /calltoslowserver
的调用花费了很长时间,所有其他请求,比如:/testrequest
,都没有被阻止。 这是因为 node-fetch
的 fetch
函数返回一个 Promise
,而 这种单线程、非阻塞、异步的处理方式是 Node 中默认的。
解决方案 Node js 提供了三种解决方案来解决上面问题。
child_process
模块child_process
模块提供了生成拥有自己内存的新进程的能力, 这些进程之间的通信是通过 OS 提供的 IPC(Inter-process Communication)建立。
该模块内部主要有 3 个 api:
child_process.exec()
child_process.spawn()
child_process.fork()
exec()
该 api 允许你执行系统命令。虽然它不能直接解决 CPU 密集型任务,但你可以使用它来调用其他语言编写的程序或脚本来处理 CPU 密集型任务,如计算斐波那契数列。
首先我们写一个 fibonacci.py
文件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import sysdef fibonacci (n ): if n <= 0 : return 0 elif n == 1 : return 1 else : return fibonacci(n-1 ) + fibonacci(n-2 ) if __name__ == "__main__" : n = int (sys.argv[1 ]) print (fibonacci(n))
接下来,在 Express 中使用 child_process.exec() 调用这个 Python 脚本:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 const express = require ("express" );const bodyParser = require ("body-parser" );const { execSync, exec } = require ("child_process" );const app = express();const port = 3000 ;app.use(bodyParser.json()); app.post("/testblock" , (req, res ) => { const number = req.body.number || 10 ; const result = execSync(`python fibonacci.py ${number} ` ).toString().trim(); res.json({ fibonacci : result }); }); app.post("/fibonaccipy" , (req, res ) => { const number = req.body.number || 10 ; exec(`python fibonacci.py ${number} ` , (error, stdout, stderr ) => { if (error) { return res.status(500 ).json({ error : `执行的错误: ${error.message} ` }); } if (stderr) { return res.status(500 ).json({ error : `标准错误输出: ${stderr} ` }); } res.json({ fibonacci : stdout.trim() }); }); }); app.listen(port, () => { console .log(`Server is running on port ${port} ` ); });
在这个示例中,我们创建了两个 POST 接口。/testblock
接口使用 execSync 来同步执行 Python 脚本,这会阻塞 Node.js 的事件循环,直到 Python 脚本执行完成。而/fibonaccipy
接口则使用 exec 来异步执行 Python 脚本,不会阻塞 Node.js 的事件循环。
spawn()
该方法用于异步生成子进程,该子进程可以是允许终端运行的任何命令。用法与上一个 exec()
方法类似,但 spawn()
方法返回一个 ChildProcess
对象,该对象具有 stdout
和 stderr
属性,用于读取子进程的输出。 他可以和子进程间进行通信,获取其输出,这点是 exec()
方法无法做到的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 const express = require ("express" );const app = express();const { spawn } = require ("child_process" );app.get("/ls" , (req, res ) => { const ls = spawn("ls" , ["-lash" , req.query.directory]); ls.stdout.on("data" , (data ) => { res.write(data.toString()); }); ls.on("close" , (code ) => { console .log(`child process exited with code ${code} ` ); res.end(); }); }); app.listen(7000 , () => console .log("listening on port 7000" ));
fork()
重点关注child_process.fork() 专门用于生成新的 Nodejs 进程。 与 spawn 一样,返回的 childProcess 对象将具有内置的 IPC 通信通道,允许消息在父进程和子进程之间来回传递。
例如以下主程序代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 const express = require ("express" );const app = express();const { fork } = require ("child_process" );app.get("/isprime" , (req, res ) => { const childProcess = fork("./forkedchild.js" ); childProcess.send({ number : parseInt (req.query.number) }); const startTime = new Date (); childProcess.on("message" , (message ) => { const endTime = new Date (); res.json({ ...message, time: endTime.getTime() - startTime.getTime() + "ms" , }); }); }); app.get("/testrequest" , (req, res ) => { res.send("I am unblocked now" ); }); app.listen(3636 , () => console .log("listening on port 3636" ));
其中,forkedchild.js
文件如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 process.on("message" , (message ) => { const result = isPrime(message.number); process.send(result); process.exit(); }); function isPrime (number ) { let isPrime = true ; for (let i = 3 ; i < number; i++) { if (number % i === 0 ) { isPrime = false ; break ; } } return { number: number, isPrime: isPrime, }; }
此时当访问 /isprime?number=29355126551
时候虽然浏览器也会一直处于 loading 等待直到返回,但是此时不会阻塞后续的其他请求,比如:/testrequest
api 的请求。
总结:
如果你需要执行一个简短的命令并获取其完整输出,可以使用 exec()。
如果你需要处理大量数据或需要与子进程进行实时通信,应该使用 spawn()。
如果你正在创建多个 Node.js 工作进程,并希望通过 IPC 进行通信,那么 fork() 是最佳选择。
cluster
模块cluster 主要用于垂直(为现有机器添加更多功能)扩展 Nodejs Web 服务器,构建在 child_process
模块之上。 在 Http 服务器中,cluster 模块使用 child_process.fork()
自动 fork 进程并建立主从架构,其中父进程以循环方式将传入请求分发给子进程。 理想情况下,fork 的进程数应等于计算机具有的 CPU 核数。
下面示例使用 cluster 模块构建一个 Express 服务器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 const cluster = require ("cluster" );const http = require ("http" );const cpuCount = require ("os" ).cpus().length;if (cluster.isMaster) { masterProcess(); } else { childProcess(); } function masterProcess ( ) { console .log(`Master process ${process.pid} is running` ); for (let i = 0 ; i < cpuCount; i++) { console .log(`Forking process number ${i} ...` ); cluster.fork(); } cluster.on("exit" , (worker, code, signal ) => { console .log(`worker ${worker.process.pid} died` ); cluster.fork(); }); } function childProcess ( ) { const express = require ("express" ); const app = express(); app.get("/" , (req, res ) => { res.send(`hello from server ${process.pid} ` ); }); app.listen(5555 , () => console .log(`server ${process.pid} listening on port 5555` ) ); }
当运行上面的代码时,cluster.isMaster 第一次为 true 并且 masterProcess() 函数被执行。masterProcess()函数 fork()了 4 个 NodeJS 进程(依赖于设备 CPU 的核数),每当 fork()另一个进程时,都会再次运行相同的文件,但 cluster.isMaster 将返回 false,因为该进程现在是一个子进程,因为是 fork()的,因此控制转到 else 条件。
最终 childProcess() 函数执行了 4 次,并创建了 4 个 Express 服务器实例,后续请求以循环方式分发到四台服务器,从而充分利用机器的 CPU。 Node js 文档还指出,有一些内置的智能功能可以避免工作进程过载。
Cluster 模块是垂直扩展简单 Nodejs 服务器的最简单、最快的方法。 但是,为了实现更高级和弹性的扩展,可以使用 docker 容器和 Kubernetes 等工具。
worker threads 本质上,工作线程和子进程之间的区别就像线程和进程之间的区别一样。理想情况下,创建的线程数应等于 cpu 核数。
下面演示 worker_threads 的使用。先创建一个单进程的服务器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 const express = require ("express" );const app = express();function sumOfPrimes (n: number ): number { var sum = 0 ; for (var i = 2 ; i <= n; i++) { for (var j = 2 ; j <= i / 2 ; j++) { if (i % j == 0 ) { break ; } } if (j > i / 2 ) { sum += i; } } return sum; } app.get( "/sumofprimes" , ( { query : { number } }: any , res: { json: (arg0: { number : any ; sum: number ; timeTaken: string } ) => void ; } ) => { const startTime = new Date ().getTime(); const sum = sumOfPrimes(number as number ); const endTime = new Date ().getTime(); res.json({ number : number , sum: sum, timeTaken: (endTime - startTime) / 1000 + " seconds" , }); } ); app.listen(6868 , () => console .log("listening on port 6868" ));
当我们请求 100 万的质数之和时,我们看到以下结果:我用的电脑是 MacBook’s M1,也要足足阻塞 12 秒
我们把他改造为工作线程模式:主程序:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 import express from "express" ;import { Worker } from "worker_threads" ;const app = express();function runWorker (workerData ) { return new Promise ((resolve, reject ) => { const worker = new Worker("./cpu密集型演示_worker_threads_质数加总.js" , { workerData, }); worker.on("message" , resolve); worker.on("error" , reject); worker.on("exit" , (code ) => { if (code !== 0 ) { reject(new Error (`Worker stopped with exit code ${code} ` )); } }); }); } function divideWorkAndGetSum ( ) { const start1 = 2 ; const end1 = 150000 ; const start2 = 150001 ; const end2 = 300000 ; const start3 = 300001 ; const end3 = 450000 ; const start4 = 450001 ; const end4 = 600000 ; const start5 = 600001 ; const end5 = 750000 ; const start6 = 750001 ; const end6 = 900000 ; const start7 = 900001 ; const end7 = 1000000 ; const worker1 = runWorker({ start : start1, end : end1 }); const worker2 = runWorker({ start : start2, end : end2 }); const worker3 = runWorker({ start : start3, end : end3 }); const worker4 = runWorker({ start : start4, end : end4 }); const worker5 = runWorker({ start : start5, end : end5 }); const worker6 = runWorker({ start : start6, end : end6 }); const worker7 = runWorker({ start : start7, end : end7 }); return Promise .all([ worker1, worker2, worker3, worker4, worker5, worker6, worker7, ]); } app.get("/sumofprimeswiththreads" , async (req, res) => { const startTime = new Date ().getTime(); const sum = await divideWorkAndGetSum() .then( ( values ) => values.reduce((accumulator, part ) => accumulator + part.result, 0 ) ) .then((finalAnswer ) => finalAnswer); const endTime = new Date ().getTime(); res.json({ number: 1000000 , sum: sum, timeTaken: (endTime - startTime) / 1000 + " seconds" , }); }); app.listen(7777 , () => console .log("listening on port 7777" ));
文件 cpu密集型演示_worker_threads_质数加总.js
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import { workerData, parentPort } from "worker_threads" ;const start = workerData.start;const end = workerData.end;var sum = 0 ;for (var i = start; i <= end; i++) { for (var j = 2 ; j <= i / 2 ; j++) { if (i % j == 0 ) { break ; } } if (j > i / 2 ) { sum += i; } } parentPort.postMessage({ start: start, end: end, result: sum, });
看一下运行时间,只有 3 秒多