アトミックス(Atomics)

目次

アトミックス(Atomics)とは

スレッド間で共有メモリ(SharedArrayBuffer)を使用している場合、あるスレッドがメモリに書き込みを行っている最中に他のスレッドがメモリを読み込むと、書込み途中の不完全なデータを読み込んでしまうことがあります。Atomics を用いてメモリ操作を行うことで、操作のアトミック性(処理中は他のスレッドの影響を受けず、かつ、他のスレッドに影響を与えないこと)を保つことができます。

アトミックな読み書き

Atomics.load(typedArray, index)

型付き配列 typedArrayindex 番目の要素を読み込みます。

const buf = new SharedArrayBuffer(12);
const arr = new Int16Array(buf);
Atomics.store(arr, 3, 100);
console.log(Atomics.load(arr, 3));           // => 100

Atomics.store(typedArray, index, value)

型付き配列 typedArrayindex 番目の要素に値 value を格納します。戻り値は格納した値です。

const buf = new SharedArrayBuffer(12);
const arr = new Int16Array(buf);
console.log(Atomics.store(arr, 3, 100));     // => 100 (格納した値)
console.log(Atomics.load(arr, 3));           // => 100 (格納後の値)

Atomics.exchange(typedArray, index, value)

型付き配列 typedArrayindex 番目の要素に値 value を格納します。戻り値は格納前の値です。

const buf = new SharedArrayBuffer(12);
const arr = new Int16Array(buf);
console.log(Atomics.exchange(arr, 0, 100));  // => 0 (格納前の値)
console.log(Atomics.load(arr, 0));           // => 100 (格納後の値)

Atomics.compareExchange(typedArray, index, expectedValue, replacementValue)

型付き配列 typedArrayindex 番目の要素が expectedValue と等しければその値を replacementValue で書き換えます。等しくない場合は書き換えません。戻り値は置換前の値です。

const buf = new SharedArrayBuffer(12);
const arr = new Int16Array(buf);
Atomics.store(arr, 0, 3);
Atomics.compareExchange(arr, 0, 3, 5);       // => 3 (置換前の値)
console.log(Atomics.load(arr, 0));           // => 5 (置換後の値)
Atomics.compareExchange(arr, 0, 5, 8);       // => 5 (置換前の値)
console.log(Atomics.load(arr, 0));           // => 5 (置換は行われなかった)

アトミックな演算

Atomics.add(typedArray, index, value)

型付き配列 typedArrayindex 番目の要素に値 value を加算します。戻り値は演算前の値です。

const buf = new SharedArrayBuffer(12);
const arr = new Int16Array(buf);
console.log(Atomics.add(arr, 0, 25));        // => 0
console.log(Atomics.load(arr, 0));           // => 25

Atomics.sub(typedArray, index, value)

型付き配列 typedArrayindex 番目の要素に値 value を減算します。戻り値は演算前の値です。

const buf = new SharedArrayBuffer(12);
const arr = new Int16Array(buf);
Atomics.store(arr, 0, 30);
console.log(Atomics.sub(arr, 0, 20));        // => 30
console.log(Atomics.load(arr, 0));           // => 10

Atomics.and(typedArray, index, value)

型付き配列 typedArrayindex 番目の要素に対して値 value のビット単位の AND 演算を行います。戻り値は演算前の値です。

const buf = new SharedArrayBuffer(12);
const arr = new Uint32Array(buf);
Atomics.store(arr, 0, 0xfff0);
console.log(Atomics.and(arr, 0, 0x0fff));    // => 65520(0xfff0)
console.log(Atomics.load(arr, 0));           // => 4080(0x0ff0)

Atomics.or(typedArray, index, value)

型付き配列 typedArrayindex 番目の要素に対して値 value のビット単位の OR 演算を行います。戻り値は演算前の値です。

const buf = new SharedArrayBuffer(12);
const arr = new Uint32Array(buf);
Atomics.store(arr, 0, 0xfff0);
console.log(Atomics.or(arr, 0, 0x0fff));     // => 65520(0xfff0)
console.log(Atomics.load(arr, 0));           // => 65535(0xffff)

待ち合わせ

Atomics.notify(typedArray, index, count)

型付き配列 typedArrayindex 番目の要素に対して Atomics.wait()Atomics.waitAsync() で休眠状態にある Web Worker などのエージェントスレッドに通知を行い起こします。count は複数のエージェントが休眠している場合、起こすエージェントの数を指定します。省略時は無限大となります。起こしたエージェント数を返します。Int32Array または BigInt64Array でのみ使用できます。

Main Thread
const buf = new SharedArrayBuffer(4);     // 共有メモリを作成
const arr = new Int32Array(buf);          // Int32Arrayにマッピング
const worker = new Worker("worker.js");   // Web Workerを作成
worker.postMessage(buf);                  // Web Workerに共有メモリを渡す 
setTimeout(() => {                        // 2秒後に
  Atomics.store(arr, 0, 99);              // 値99を書き込んで
  Atomics.notify(arr, 0);                 // 通知を行う
  console.log("Main: Change data");
}, 2000);
Worker Thread
self.onmessage = async (event) => {       // 共有メモリを受け取る
  const arr = new Int32Array(event.data); // Int32Arrayにマッピング
  Atomics.wait(arr, 0, 0);                // 通知を待つ
  console.log(Atomics.load(arr, 0));      // => 99
}

Atomics.wait(typedArray, index, value, timeout)

型付き配列 typedArrayindex 番目の要素が値 value 以外の値になるのを待ちます。timeout は待ち時間をミリ秒で指定します。省略時は無限大と見なします。呼び出し時にすでに値が異なっていた場合は "not-equal"、値が変更された場合は "ok"、タイムアウトした場合は "timed-out" の文字列が返されます。Int32Array または BigInt64Array でのみ使用できます。

Main Thread
const buf = new SharedArrayBuffer(4);     // 共有メモリを作成
const arr = new Int32Array(buf);          // Int32Arrayにマッピング
const worker = new Worker("worker.js");   // Web Workerを作成
worker.postMessage(buf);                  // Web Workerに共有メモリを渡す 
setTimeout(() => {                        // 2秒後に
  Atomics.store(arr, 0, 99);              // 値99を書き込んで
  Atomics.notify(arr, 0);                 // 通知を行う
  console.log("Main: Change data");
}, 2000);
Worker Thread
self.onmessage = async (event) => {       // 共有メモリを受け取る
  const arr = new Int32Array(event.data); // Int32Arrayにマッピング
  console.log(Atomics.wait(arr, 0, 0));   // => "ok"
  console.log(Atomics.load(arr, 0));      // => 99
}

Atomics.waitAsync(typedArray, index, value, timeout)

Atomics.wait() の非同期版です。ES2024 で追加されました。typedArrayindex 番目の要素が value と合致するかを調べ、その結果により下記のオブジェクトを返します。

Promise が履行(fulfilled)状態になると下記の値を返します。

Main Thread
const buf = new SharedArrayBuffer(4);     // 共有メモリを作成
const arr = new Int32Array(buf);          // Int32Arrayにマッピング
const worker = new Worker("worker.js");   // Web Workerを作成
worker.postMessage(buf);                  // Web Workerに共有メモリを渡す 
setTimeout(() => {                        // 2秒後に
  Atomics.store(arr, 0, 99);              // 値99を書き込んで
  Atomics.notify(arr, 0);                 // 通知を行う
}, 2000);
Worker Thread
self.onmessage = async (event) => {
  const arr = new Int32Array(event.data);
  const result = Atomics.waitAsync(arr, 0, 0, 3000);
  console.log(`async: ${result.async}`);    // true or false
  if (result.async) {
    result.value.then((value) => {
      console.log(`value: ${value}`);       // "ok" or "timed-out"
    });
  } else {
    console.log(`value: ${result.value}`);  // "not-equal" or "timed-out"
  }
};

Worker Thread は ES2025 でサポートされた Promise.try() を用いると次のようにも書けます。

Worker Thread
self.onmessage = async (event) => {
  const arr = new Int32Array(event.data);
  Promise.try(() => Atomics.waitAsync(arr, 0, 0, 0).value)
    .then((value) => {
      console.log(`value: ${value}`);
    });
};

その他

Atomics.pause(durationHint)

スレッドが Atomics.wait()Atomics.waitAsync() ではなくループで監視を行っている場合、短時間にかなりの CPU を使用します。これをビジー待機やスピンロックと呼びます。Atomics.pause() はループ監視の最中に CPU を手放すヒントを与えます。実際に CPU が解放されるか否かは実装によります。durationHint は待機時間を決めるためのヒントを与えますが、この値の解釈も実装に依存します。メインスレッドではビジー待機やスピンロックや Atomics.pause() は使用すべきではありません。

let spin = 0;
do {
  if (Atomics.compareExchange(i32, 0, 0, 1) === 0) {
    break;
  }
  Atomics.pause();
  spin++;
} while (spin < 10);

Atomics.isLockFree(n)

n バイトの型付き配列操作がロックフリー(ロック操作が不要)である場合 true を、さもなくば false を返します。ロックフリーの場合ロック操作が不要なため速度低下が発生しません。ただし、ロックフリーであっても array[i] を直接操作すると、CPU キャッシュによって書き込んだ値が別スレッドから読み出せないタイミングがあるなどの問題があるため、ロックフリーであっても Atomics.store()Atomics.load() を使用した方がよさそうです。

console.log(Atomics.isLockFree(3));          // => false
console.log(Atomics.isLockFree(4));          // => true