Node.jsにworkerが入った

2018 / 06 / 26

Edit
🚨 This article hasn't been updated in over a year
💁‍♀️ This post was copied from Hatena Blog

Node@10.5.0で入った worker の話です。

この記事は、Roppongi.js #4の登壇資料です。 5min で話しきれないので記事にまとめました。

実は、自分が Node.js に関わって、最初から最後(今現在)までずっと追っている珍しいモジュールです。

worker_threads とは?


worker: initial implementation by addaleax · Pull Request #20876 · nodejs/node Hi everyone! 👋 This PR adds threading support for to Node.js. I realize that this is not exactly a ...

実装著者は Anna (この PR は io.js 時代に petkaantonov が実装したのをベースに現環境へ移した)

worker_threads は独立したスレッドで動作する環境を構築し、それらの間にメッセージチャンネルを構築をする手段を提供します。 注意点として、Node.js の非同期は worker よりも効率的なため、I/O には使用しないほうが良いです。

目的

Node.js において、大量に負荷の高い処理することは苦手です。 なので、CPU 負荷の高い作業を別のスレッドに委ねて、負荷を分散させることが目的です。

child_process や cluster と違う部分

worker_threads の場合、ArrayBuffer のインスタンス間の転送をしたり、 それらの間で SharedArrayBuffer のインスタンスを共有をすることによりメモリを効率的に共有することが可能です。 child_process.fork()cluster.fork() と比べるとマルチスレッドに特化したモジュールということです。

会話する方法

child_process の IPC と異なります。 ブラウザ同様に、postMessage を使用し、会話します。 また、シリアライズされたデータはプロセスを離れる必要がないため全体的に通信に伴うオーバーヘッドが少なくなります。

// child_process

// parent
const { fork } = require("child_process");

const child = fork("child.js");

child.on("message", (message) => {
  console.log("message", message);
  child.send("from parent");
});

// child
process.on("message", (message) => {
  console.log("message", message);
});

if (process.send) process.send("from child");
// worker_threads
const { MessageChannel } = require("worker_threads");

const { port1, port2 } = new MessageChannel();

port1.on("message", (message) => console.log(message));
port2.postMessage("hi");

port2.on("message", (message) => console.log(message));
port1.postMessage("bye");

共有範囲

各々の woker は自身のイベントループを持ちますが、いくつかのリソースに関しては worker 間で共有されます。 (e.g. libuv のスレッドプール、V8 Isolate、V8 Environment)

これは現時点では実装されてないだけで、今後変わる可能性はあります。

またグローバルはスレッド間で共有されません。

API 制限

禁止
  • process.chdir() 及びグループまたはユーザー ID を設定するプロセスメソッド
  • process.abort()
  • domain
  • 親プロセスからの IPC チャンネルのアクセス
変更
  • process.env は読み取り専用
  • process.titleは変更不可
  • process.exit()は単一スレッドのみが処理対象
  • process.stdin, process.stdout, process.stderr は null
  • シグナルは行われない(process.on)

歴史

実は、worker とは Ayo.js から来ており、 2017/10/23 の時点で初期の実装は完了していました。

AyoでWorkerの実装が進んでいる - 技術探し Chrome Dev Summitのため、サンフランシスコで書いています。 Ayo側 Node側 Workerとは? メソッド・変数 isMainThread postMessage(value[, ...

上記の記事と大幅な変化は無いため、ある程度説明は省きます。

また、当時はworker という名前で決定するつもりでしたが、ユーザーランドのライブラリ名と衝突してしまい、最終的(?)にworker_threadsという名前になりました。

npm module name / Node.js built-in `worker` module · Issue #1 · blake-regalia/worker.js Hi there! I’m currently working on built-in Worker support (as in, threaded workers rather than clus...

Node.jsのビルトインモジュールに名前空間が使われるかもしれない - 技術探し Node.jsのビルトインモジュールに対しての新しい提案として、名前空間で保護する案が出ている話

このように、長い期間や様々な問題がありながらもようやく入った珍しいモジュールです。

使い方

現時点では実験段階(stability:1 )なので、起動時に --experimental-worker が必要です。

const {
  threadId, isMainThread,
  Worker, workerData, parentPort
  MessageChannel, MessagePort
} = require('worker_threads');

長くなるので、変数、メソッド、クラス等の説明は以下を参照してください。

AyoでWorkerの実装が進んでいる - 技術探し Chrome Dev Summitのため、サンフランシスコで書いています。 Ayo側 Node側 Workerとは? メソッド・変数 isMainThread postMessage(value[, ...

複数の worker を動かす

const { Worker, isMainThread, workerData } = require("worker_threads");

let current = 0;

function counter(title, cnt) {
  console.log(`| ${title} |: ${cnt}`);
}

if (isMainThread) {
  console.log("Main Thread");

  // 4つ作成する
  for (let i = 0; i < 4; i++) {
    // 相対パスはダメ
    // この場合(__filename)はworkerも同じこのファイルを参照する
    // 第二引数はグローバルパラメーター
    new Worker(__filename, { workerData: i });
  }

  setInterval(
    (title) => {
      counter(title, ++current);
    },
    1000,
    "MainThread",
  );
} else {
  console.log(`worker: ${workerData}`);

  setInterval(
    (title) => {
      counter(title, ++current);
    },
    1000,
    `worker: ${workerData}`,
  );
}
# output
$ node --experimental-worker index.js
Main Thread
worker: 0
worker: 1
worker: 2
worker: 3
| MainThread |: 1
| worker: 0 |: 1
| worker: 1 |: 1
| worker: 2 |: 1
| worker: 3 |: 1
| MainThread |: 2
| worker: 0 |: 2
| worker: 1 |: 2
| worker: 2 |: 2
| worker: 3 |: 2
| MainThread |: 3
| worker: 0 |: 3
| worker: 1 |: 3
| worker: 2 |: 3
| worker: 3 |: 3

スレッド間でメッセージングを行う

// parent.js
// main thread

const { resolve } = require("path");
const { Worker, workerData } = require("worker_threads");

console.log("| Main Thread |");

function createWorker(path, cb) {
  const worker = new Worker(path, { workerData: null });

  worker.on("message", (msg) => {
    cb(null, msg);
  });
  worker.on("error", cb);
  worker.on("exit", (code) => {
    if (code !== 0) throw new Error("worker stopped");
    console.log("| Main Thread | worker stopped");
  });

  return worker;
}

const w = createWorker(resolve("child.js"), (err, res) => {
  // workerから結果を受け取る
  if (err) {
    console.error(err);
    process.exit(1);
  }

  console.log(`| Main Thread | execution time: ${res}ms from Worker Thread`);

  // workerを停止させるためにメッセージを送る
  w.postMessage("thx;)");
});
// child.js
// worker thread

const { PerformanceObserver, performance } = require("perf_hooks");
const { parentPort } = require("worker_threads");

console.log("| Worker |");

const obs = new PerformanceObserver((items) => {
  const time = items.getEntries()[0].duration;

  // main threadへ処理時間の結果を返す
  parentPort.postMessage(time);

  performance.clearMarks();
});
obs.observe({ entryTypes: ["measure"] });

const len = 64 * 1024 * 1024;
const b = Buffer.allocUnsafe(len);
let s = "";

// 重い処理なためこの箇所の実行時間を計測する
performance.mark("A");
for (let i = 0; i < 256; ++i) s += String.fromCharCode(i);
for (let i = 0; i < 64 * 1024 * 1024; i += 256) b.write(s, i, 256, "ascii");
for (let i = 0; i < 32; ++i) b.toString("base64");
performance.mark("B");
performance.measure("A to B", "A", "B");

// main threadから返答が来たらこのworkerを終了する
parentPort.on("message", (msg) => {
  console.log(`| Worker | ${msg} from Main Thread`);
  process.exit();
});
# output
$ node --experimental-worker parent.js
| Main Thread |
| Worker |
| Main Thread | execution time: 3128.782205ms from Worker Thread
| Worker | thx;) from Main Thread
| Main Thread | worker stopped

パフォーマンス

    $ ./node benchmark/cluster/echo.js
    cluster/echo.js n=100000 sendsPerBroadcast=1 payload="string" workers=1: 33,647.30473442063
    cluster/echo.js n=100000 sendsPerBroadcast=10 payload="string" workers=1: 12,927.907405288383
    cluster/echo.js n=100000 sendsPerBroadcast=1 payload="object" workers=1: 28,496.37373941151
    cluster/echo.js n=100000 sendsPerBroadcast=10 payload="object" workers=1: 8,975.53747186485
    $ ./node --experimental-worker benchmark/worker/echo.js
    worker/echo.js n=100000 sendsPerBroadcast=1 payload="string" workers=1: 88,044.32902365089
    worker/echo.js n=100000 sendsPerBroadcast=10 payload="string" workers=1: 39,873.33697018837
    worker/echo.js n=100000 sendsPerBroadcast=1 payload="object" workers=1: 64,451.29132425621
    worker/echo.js n=100000 sendsPerBroadcast=10 payload="object" workers=1: 22,325.635443739284

まだオーバーヘッドが大きく、パフォーマンスには改善の余地があります。

まだできないこと

  • ネイティブアドオンを worker からロードする
  • inspector のサポート
  • handle(ポインタを示す)はネットワークソケット同様に転送ができない
  • File クラスや Blob クラスや File API などのサポート(対応するかは未定)
  • …等々

また具体的な計画が出てないため、今後に期待。

さいごに

まだ入ったばかりでこれからどんどんおもしろくなるモジュールです! パフォーマンスのチューニングには欠かせないモジュールなので、stability2 に入ると今後使う機会が増えると思います。