Atomics.waitAtomics.notifyAtomics.waitAsync

發佈於 · 標籤為 ECMAScript ES2020

Atomics.waitAtomics.notify 是低階同步原語,對於實作互斥鎖和其他同步方式很有用。然而,由於 Atomics.wait 會封鎖,因此無法在主執行緒上呼叫它(嘗試這麼做會擲回 TypeError)。

從版本 8.7 開始,V8 支援非封鎖版本 Atomics.waitAsync,它也可以在主執行緒上使用。

在本文中,我們將說明如何使用這些低階 API 來實作一個互斥鎖,它可以在同步(用於工作執行緒)和非同步(用於工作執行緒或主執行緒)的情況下運作。

Atomics.waitAtomics.waitAsync 使用下列參數

  • buffer:由 SharedArrayBuffer 支援的 Int32ArrayBigInt64Array
  • index:陣列中的一個有效索引
  • expectedValue:我們預期會出現在 (buffer, index) 所描述的記憶體位置中的值
  • timeout:以毫秒為單位的逾時時間(選用,預設為 Infinity

Atomics.wait 的回傳值是一個字串。如果記憶體位置不包含預期值,Atomics.wait 會立即回傳值 'not-equal'。否則,執行緒會被封鎖,直到另一個執行緒使用相同的記憶體位置呼叫 Atomics.notify 或達到逾時時間。在前一種情況下,Atomics.wait 會回傳值 'ok',在後一種情況下,Atomics.wait 會回傳值 'timed-out'

Atomics.notify 使用下列參數

  • SharedArrayBuffer 支援的 Int32ArrayBigInt64Array
  • 一個索引(在陣列中有效)
  • 要通知的等待者數量(選用,預設為 Infinity

它會以先進先出順序通知指定數量的等待者,這些等待者正在等待 (buffer, index) 所描述的記憶體位置。如果有多個待處理的 Atomics.wait 呼叫或 Atomics.waitAsync 呼叫與同一個位置相關,它們都會在同一個先進先出佇列中。

Atomics.wait 相反,Atomics.waitAsync 始終會立即回傳。回傳值為下列其中之一

  • { async: false, value: 'not-equal' }(如果記憶體位置不包含預期值)
  • { async: false, value: 'timed-out' }(僅適用於立即逾時 0)
  • { async: true, value: promise }

稍後可能會以字串值 'ok'(如果 Atomics.notify 以相同的記憶體位置呼叫)或 'timed-out'(如果已逾時)來解決承諾。承諾絕不會被拒絕。

下列範例說明 Atomics.waitAsync 的基本用法

const sab = new SharedArrayBuffer(16);
const i32a = new Int32Array(sab);
const result = Atomics.waitAsync(i32a, 0, 0, 1000);
// | | ^ timeout (opt)
// | ^ expected value
// ^ index

if (result.value === 'not-equal') {
// The value in the SharedArrayBuffer was not the expected one.
} else {
result.value instanceof Promise; // true
result.value.then(
(value) => {
if (value == 'ok') { /* notified */ }
else { /* value is 'timed-out' */ }
});
}

// In this thread, or in another thread:
Atomics.notify(i32a, 0);

接下來,我們將展示如何實作互斥鎖,它可用於同步和非同步。實作互斥鎖的同步版本已在先前討論過,例如 這篇部落格文章

在範例中,我們不會在 Atomics.waitAtomics.waitAsync 中使用逾時參數。參數可用於實作具有逾時的條件變數。

我們的互斥鎖類別 AsyncLockSharedArrayBuffer 上運作,並實作下列方法

  • lock — 鎖定執行緒,直到我們可以鎖定互斥鎖(僅可在工作執行緒上使用)
  • unlock — 解鎖互斥鎖(lock 的對應項)
  • executeLocked(callback) — 非封鎖鎖定,主執行緒可以使用;排程 callback,以便在我們設法取得鎖定後執行

讓我們看看如何實作這些方法。類別定義包括常數和建構函式,它將 SharedArrayBuffer 作為參數。

class AsyncLock {
static INDEX = 0;
static UNLOCKED = 0;
static LOCKED = 1;

constructor(sab) {
this.sab = sab;
this.i32a = new Int32Array(sab);
}

lock() {
/* … */
}

unlock() {
/* … */
}

executeLocked(f) {
/* … */
}
}

這裡的 i32a[0] 包含值 LOCKEDUNLOCKED。它也是 Atomics.waitAtomics.waitAsync 的等待位置。AsyncLock 類別確保下列不變式

  1. 如果 i32a[0] == LOCKED,而且執行緒開始在 i32a[0] 上等待(透過 Atomics.waitAtomics.waitAsync),最終會收到通知。
  2. 收到通知後,執行緒會嘗試取得鎖定。如果它取得鎖定,它會在釋放鎖定時再次通知。

同步鎖定和解鎖 #

接下來,我們將展示封鎖 lock 方法,它只能從工作執行緒呼叫

lock() {
while (true) {
const oldValue = Atomics.compareExchange(this.i32a, AsyncLock.INDEX,
/* old value >>> */ AsyncLock.UNLOCKED,
/* new value >>> */ AsyncLock.LOCKED);
if (oldValue == AsyncLock.UNLOCKED) {
return;
}
Atomics.wait(this.i32a, AsyncLock.INDEX,
AsyncLock.LOCKED); // <<< expected value at start
}
}

當執行緒呼叫 lock() 時,它會先嘗試使用 Atomics.compareExchange 取得鎖定,以將鎖定狀態從 UNLOCKED 變更為 LOCKEDAtomics.compareExchange 嘗試以原子方式變更狀態,並傳回記憶體位置的原始值。如果原始值是 UNLOCKED,我們知道狀態變更已成功,而且執行緒已取得鎖定。不需要再做任何事。

如果 Atomics.compareExchange 無法變更鎖定狀態,另一個執行緒一定正在持有鎖定。因此,這個執行緒會嘗試 Atomics.wait 以等待另一個執行緒釋放鎖定。如果記憶體位置仍持有預期的值(在本例中為 AsyncLock.LOCKED),呼叫 Atomics.wait 會封鎖執行緒,而且 Atomics.wait 呼叫只會在另一個執行緒呼叫 Atomics.notify 時傳回。

unlock 方法將鎖定設定為 UNLOCKED 狀態,並呼叫 Atomics.notify 以喚醒正在等待鎖定的其中一個等待者。狀態變更預期總是會成功,因為這個執行緒正在持有鎖定,而且同時沒有其他人應該呼叫 unlock()

unlock() {
const oldValue = Atomics.compareExchange(this.i32a, AsyncLock.INDEX,
/* old value >>> */ AsyncLock.LOCKED,
/* new value >>> */ AsyncLock.UNLOCKED);
if (oldValue != AsyncLock.LOCKED) {
throw new Error('Tried to unlock while not holding the mutex');
}
Atomics.notify(this.i32a, AsyncLock.INDEX, 1);
}

簡單的案例如下:鎖是自由的,而執行緒 T1 透過使用 Atomics.compareExchange 變更鎖狀態來取得鎖。執行緒 T2 嘗試透過呼叫 Atomics.compareExchange 來取得鎖,但無法成功變更鎖狀態。然後 T2 會呼叫 Atomics.wait,這會封鎖執行緒。在某個時間點,T1 會釋放鎖並呼叫 Atomics.notify。這會讓 T2 中的 Atomics.wait 呼叫傳回 'ok',喚醒 T2。然後 T2 會再次嘗試取得鎖,而這次會成功。

還有 2 個可能的臨界狀況,這些狀況說明了 Atomics.waitAtomics.waitAsync 在索引處檢查特定值的原因

  • T1 持有鎖,而 T2 嘗試取得鎖。首先,T2 嘗試使用 Atomics.compareExchange 變更鎖狀態,但無法成功。不過,在 T2 設法呼叫 Atomics.wait 之前,T1 會釋放鎖。當 T2 呼叫 Atomics.wait 時,它會立即傳回值 'not-equal'。在這種情況下,T2 會繼續下一個迴圈反覆運算,再次嘗試取得鎖。
  • T1 持有鎖,而 T2 使用 Atomics.wait 等待鎖。T1 釋放鎖,T2 會喚醒(Atomics.wait 呼叫傳回),並嘗試執行 Atomics.compareExchange 來取得鎖,但另一個執行緒 T3 動作更快,已經取得鎖。因此,呼叫 Atomics.compareExchange 無法取得鎖,而 T2 會再次呼叫 Atomics.wait,封鎖直到 T3 釋放鎖。

由於後者的臨界狀況,互斥鎖並非「公平」。T2 可能一直在等待鎖釋放,但 T3 卻突然出現並立即取得鎖。更實際的鎖實作可能會使用多個狀態來區分「已鎖定」和「競爭中已鎖定」。

非同步鎖 #

與封鎖的 lock 方法不同,非封鎖的 executeLocked 方法可從主執行緒呼叫。它取得一個回呼函式作為其唯一參數,並排程回呼函式,以便在成功取得鎖後執行。

executeLocked(f) {
const self = this;

async function tryGetLock() {
while (true) {
const oldValue = Atomics.compareExchange(self.i32a, AsyncLock.INDEX,
/* old value >>> */ AsyncLock.UNLOCKED,
/* new value >>> */ AsyncLock.LOCKED);
if (oldValue == AsyncLock.UNLOCKED) {
f();
self.unlock();
return;
}
const result = Atomics.waitAsync(self.i32a, AsyncLock.INDEX,
AsyncLock.LOCKED);
// ^ expected value at start
await result.value;
}
}

tryGetLock();
}

內部函式 tryGetLock 會先嘗試使用 Atomics.compareExchange 取得鎖,如同先前一樣。如果成功變更鎖狀態,它就能執行回呼函式、解除鎖定,並傳回。

如果 Atomics.compareExchange 無法取得鎖,我們需要在鎖可能自由時再次嘗試。我們無法封鎖並等待鎖自由,而是使用 Atomics.waitAsync 和它傳回的 Promise 排程新的嘗試。

如果我們成功啟動了 Atomics.waitAsync,則當鎖定執行緒執行 Atomics.notify 時,傳回的 Promise 會解析。然後,等待鎖定的執行緒會嘗試再次取得鎖定,就像之前一樣。

在非同步版本中,也可能發生相同的臨界狀況(在 Atomics.compareExchange 呼叫與 Atomics.waitAsync 呼叫之間釋放鎖定,以及在 Promise 解析與 Atomics.compareExchange 呼叫之間再次取得鎖定),因此程式碼必須以穩健的方式處理這些狀況。

結論 #

在這篇文章中,我們展示了如何使用同步原語 Atomics.waitAtomics.waitAsyncAtomics.notify,來實作一個互斥鎖,它可以在主執行緒和工作執行緒中使用。

功能支援 #

Atomics.waitAtomics.notify #

  • Chrome: 自版本 68 開始支援
  • Firefox: 自版本 78 開始支援
  • Safari: 不支援
  • Node.js: 自版本 8.10.0 開始支援
  • Babel: 不支援

Atomics.waitAsync #

  • Chrome: 自版本 87 開始支援
  • Firefox: 不支援
  • Safari: 不支援
  • Node.js: 自版本 16 開始支援
  • Babel: 不支援