Node.jsにworkerが入った
2018 / 06 / 26
EditNode@10.5.0で入った worker の話です。
この記事は、Roppongi.js #4の登壇資料です。 5min で話しきれないので記事にまとめました。
実は、自分が Node.js に関わって、最初から最後(今現在)までずっと追っている珍しいモジュールです。
worker_threads とは?
実装著者は Anna (この PR は io.js 時代に petkaantonov が実装したのをベースに現環境へ移した)
worker_threads は独立したスレッドで動作する環境を構築し、それらの間にメッセージチャンネルを構築をする手段を提供します。 注意点として、Node.js の非同期は worker よりも効率的なため、I/O には使用しないほうが良いです。
目的
Node.js において、大量に負荷の高い処理することは苦手です。 なので、CPU 負荷の高い作業を別のスレッドに委ねて、負荷を分散させることが目的です。
child_process や cluster と違う部分
worker_threads の場合、ArrayBuffer のインスタンス間の転送をしたり、 それらの間で SharedArrayBuffer のインスタンスを共有をすることによりメモリを効率的に共有することが可能です。
child_process.fork()
や cluster.fork()
と比べるとマルチスレッドに特化したモジュールということです。
会話する方法
child_process の IPC と異なります。
ブラウザ同様に、postMessage
を使用し、会話します。
また、シリアライズされたデータはプロセスを離れる必要がないため全体的に通信に伴うオーバーヘッドが少なくなります。
// child_process
// parent
const { fork } = require("child_process");
const child = fork("child.js");
child.on("message", (message) => {
console.log("message", message);
child.send("from parent");
});
// child
process.on("message", (message) => {
console.log("message", message);
});
if (process.send) process.send("from child");
// worker_threads
const { MessageChannel } = require("worker_threads");
const { port1, port2 } = new MessageChannel();
port1.on("message", (message) => console.log(message));
port2.postMessage("hi");
port2.on("message", (message) => console.log(message));
port1.postMessage("bye");
共有範囲
各々の woker は自身のイベントループを持ちますが、いくつかのリソースに関しては worker 間で共有されます。 (e.g. libuv のスレッドプール、V8 Isolate、V8 Environment)
これは現時点では実装されてないだけで、今後変わる可能性はあります。
またグローバルはスレッド間で共有されません。
API 制限
禁止
process.chdir()
及びグループまたはユーザー ID を設定するプロセスメソッドprocess.abort()
- domain
- 親プロセスからの IPC チャンネルのアクセス
変更
process.env
は読み取り専用process.title
は変更不可process.exit()
は単一スレッドのみが処理対象process.stdin
,process.stdout
,process.stderr
は null- シグナルは行われない(
process.on
)
等
歴史
実は、worker とは Ayo.js から来ており、 2017/10/23 の時点で初期の実装は完了していました。
上記の記事と大幅な変化は無いため、ある程度説明は省きます。
また、当時はworker
という名前で決定するつもりでしたが、ユーザーランドのライブラリ名と衝突してしまい、最終的(?)にworker_threads
という名前になりました。
このように、長い期間や様々な問題がありながらもようやく入った珍しいモジュールです。
使い方
現時点では実験段階(stability:1 )なので、起動時に --experimental-worker
が必要です。
const {
threadId, isMainThread,
Worker, workerData, parentPort
MessageChannel, MessagePort
} = require('worker_threads');
長くなるので、変数、メソッド、クラス等の説明は以下を参照してください。
複数の worker を動かす
const { Worker, isMainThread, workerData } = require("worker_threads");
let current = 0;
function counter(title, cnt) {
console.log(`| ${title} |: ${cnt}`);
}
if (isMainThread) {
console.log("Main Thread");
// 4つ作成する
for (let i = 0; i < 4; i++) {
// 相対パスはダメ
// この場合(__filename)はworkerも同じこのファイルを参照する
// 第二引数はグローバルパラメーター
new Worker(__filename, { workerData: i });
}
setInterval(
(title) => {
counter(title, ++current);
},
1000,
"MainThread",
);
} else {
console.log(`worker: ${workerData}`);
setInterval(
(title) => {
counter(title, ++current);
},
1000,
`worker: ${workerData}`,
);
}
# output
$ node --experimental-worker index.js
Main Thread
worker: 0
worker: 1
worker: 2
worker: 3
| MainThread |: 1
| worker: 0 |: 1
| worker: 1 |: 1
| worker: 2 |: 1
| worker: 3 |: 1
| MainThread |: 2
| worker: 0 |: 2
| worker: 1 |: 2
| worker: 2 |: 2
| worker: 3 |: 2
| MainThread |: 3
| worker: 0 |: 3
| worker: 1 |: 3
| worker: 2 |: 3
| worker: 3 |: 3
スレッド間でメッセージングを行う
// parent.js
// main thread
const { resolve } = require("path");
const { Worker, workerData } = require("worker_threads");
console.log("| Main Thread |");
function createWorker(path, cb) {
const worker = new Worker(path, { workerData: null });
worker.on("message", (msg) => {
cb(null, msg);
});
worker.on("error", cb);
worker.on("exit", (code) => {
if (code !== 0) throw new Error("worker stopped");
console.log("| Main Thread | worker stopped");
});
return worker;
}
const w = createWorker(resolve("child.js"), (err, res) => {
// workerから結果を受け取る
if (err) {
console.error(err);
process.exit(1);
}
console.log(`| Main Thread | execution time: ${res}ms from Worker Thread`);
// workerを停止させるためにメッセージを送る
w.postMessage("thx;)");
});
// child.js
// worker thread
const { PerformanceObserver, performance } = require("perf_hooks");
const { parentPort } = require("worker_threads");
console.log("| Worker |");
const obs = new PerformanceObserver((items) => {
const time = items.getEntries()[0].duration;
// main threadへ処理時間の結果を返す
parentPort.postMessage(time);
performance.clearMarks();
});
obs.observe({ entryTypes: ["measure"] });
const len = 64 * 1024 * 1024;
const b = Buffer.allocUnsafe(len);
let s = "";
// 重い処理なためこの箇所の実行時間を計測する
performance.mark("A");
for (let i = 0; i < 256; ++i) s += String.fromCharCode(i);
for (let i = 0; i < 64 * 1024 * 1024; i += 256) b.write(s, i, 256, "ascii");
for (let i = 0; i < 32; ++i) b.toString("base64");
performance.mark("B");
performance.measure("A to B", "A", "B");
// main threadから返答が来たらこのworkerを終了する
parentPort.on("message", (msg) => {
console.log(`| Worker | ${msg} from Main Thread`);
process.exit();
});
# output
$ node --experimental-worker parent.js
| Main Thread |
| Worker |
| Main Thread | execution time: 3128.782205ms from Worker Thread
| Worker | thx;) from Main Thread
| Main Thread | worker stopped
パフォーマンス
$ ./node benchmark/cluster/echo.js
cluster/echo.js n=100000 sendsPerBroadcast=1 payload="string" workers=1: 33,647.30473442063
cluster/echo.js n=100000 sendsPerBroadcast=10 payload="string" workers=1: 12,927.907405288383
cluster/echo.js n=100000 sendsPerBroadcast=1 payload="object" workers=1: 28,496.37373941151
cluster/echo.js n=100000 sendsPerBroadcast=10 payload="object" workers=1: 8,975.53747186485
$ ./node --experimental-worker benchmark/worker/echo.js
worker/echo.js n=100000 sendsPerBroadcast=1 payload="string" workers=1: 88,044.32902365089
worker/echo.js n=100000 sendsPerBroadcast=10 payload="string" workers=1: 39,873.33697018837
worker/echo.js n=100000 sendsPerBroadcast=1 payload="object" workers=1: 64,451.29132425621
worker/echo.js n=100000 sendsPerBroadcast=10 payload="object" workers=1: 22,325.635443739284
まだオーバーヘッドが大きく、パフォーマンスには改善の余地があります。
まだできないこと
- ネイティブアドオンを worker からロードする
- inspector のサポート
- handle(ポインタを示す)はネットワークソケット同様に転送ができない
- File クラスや Blob クラスや File API などのサポート(対応するかは未定)
- …等々
また具体的な計画が出てないため、今後に期待。
さいごに
まだ入ったばかりでこれからどんどんおもしろくなるモジュールです! パフォーマンスのチューニングには欠かせないモジュールなので、stability2 に入ると今後使う機会が増えると思います。