Atomics.wait
和 Atomics.notify
是低階同步原語,對於實作互斥鎖和其他同步方式很有用。然而,由於 Atomics.wait
會封鎖,因此無法在主執行緒上呼叫它(嘗試這麼做會擲回 TypeError
)。
從版本 8.7 開始,V8 支援非封鎖版本 Atomics.waitAsync
,它也可以在主執行緒上使用。
在本文中,我們將說明如何使用這些低階 API 來實作一個互斥鎖,它可以在同步(用於工作執行緒)和非同步(用於工作執行緒或主執行緒)的情況下運作。
Atomics.wait
和 Atomics.waitAsync
使用下列參數
buffer
:由SharedArrayBuffer
支援的Int32Array
或BigInt64Array
index
:陣列中的一個有效索引expectedValue
:我們預期會出現在(buffer, index)
所描述的記憶體位置中的值timeout
:以毫秒為單位的逾時時間(選用,預設為Infinity
)
Atomics.wait
的回傳值是一個字串。如果記憶體位置不包含預期值,Atomics.wait
會立即回傳值 'not-equal'
。否則,執行緒會被封鎖,直到另一個執行緒使用相同的記憶體位置呼叫 Atomics.notify
或達到逾時時間。在前一種情況下,Atomics.wait
會回傳值 'ok'
,在後一種情況下,Atomics.wait
會回傳值 'timed-out'
。
Atomics.notify
使用下列參數
- 由
SharedArrayBuffer
支援的Int32Array
或BigInt64Array
- 一個索引(在陣列中有效)
- 要通知的等待者數量(選用,預設為
Infinity
)
它會以先進先出順序通知指定數量的等待者,這些等待者正在等待 (buffer, index)
所描述的記憶體位置。如果有多個待處理的 Atomics.wait
呼叫或 Atomics.waitAsync
呼叫與同一個位置相關,它們都會在同一個先進先出佇列中。
與 Atomics.wait
相反,Atomics.waitAsync
始終會立即回傳。回傳值為下列其中之一
{ async: false, value: 'not-equal' }
(如果記憶體位置不包含預期值){ async: false, value: 'timed-out' }
(僅適用於立即逾時 0){ async: true, value: promise }
稍後可能會以字串值 'ok'
(如果 Atomics.notify
以相同的記憶體位置呼叫)或 'timed-out'
(如果已逾時)來解決承諾。承諾絕不會被拒絕。
下列範例說明 Atomics.waitAsync
的基本用法
const sab = new SharedArrayBuffer(16);
const i32a = new Int32Array(sab);
const result = Atomics.waitAsync(i32a, 0, 0, 1000);
// | | ^ timeout (opt)
// | ^ expected value
// ^ index
if (result.value === 'not-equal') {
// The value in the SharedArrayBuffer was not the expected one.
} else {
result.value instanceof Promise; // true
result.value.then(
(value) => {
if (value == 'ok') { /* notified */ }
else { /* value is 'timed-out' */ }
});
}
// In this thread, or in another thread:
Atomics.notify(i32a, 0);
接下來,我們將展示如何實作互斥鎖,它可用於同步和非同步。實作互斥鎖的同步版本已在先前討論過,例如 這篇部落格文章。
在範例中,我們不會在 Atomics.wait
和 Atomics.waitAsync
中使用逾時參數。參數可用於實作具有逾時的條件變數。
我們的互斥鎖類別 AsyncLock
在 SharedArrayBuffer
上運作,並實作下列方法
lock
— 鎖定執行緒,直到我們可以鎖定互斥鎖(僅可在工作執行緒上使用)unlock
— 解鎖互斥鎖(lock
的對應項)executeLocked(callback)
— 非封鎖鎖定,主執行緒可以使用;排程callback
,以便在我們設法取得鎖定後執行
讓我們看看如何實作這些方法。類別定義包括常數和建構函式,它將 SharedArrayBuffer
作為參數。
class AsyncLock {
static INDEX = 0;
static UNLOCKED = 0;
static LOCKED = 1;
constructor(sab) {
this.sab = sab;
this.i32a = new Int32Array(sab);
}
lock() {
/* … */
}
unlock() {
/* … */
}
executeLocked(f) {
/* … */
}
}
這裡的 i32a[0]
包含值 LOCKED
或 UNLOCKED
。它也是 Atomics.wait
和 Atomics.waitAsync
的等待位置。AsyncLock
類別確保下列不變式
- 如果
i32a[0] == LOCKED
,而且執行緒開始在i32a[0]
上等待(透過Atomics.wait
或Atomics.waitAsync
),最終會收到通知。 - 收到通知後,執行緒會嘗試取得鎖定。如果它取得鎖定,它會在釋放鎖定時再次通知。
同步鎖定和解鎖 #
接下來,我們將展示封鎖 lock
方法,它只能從工作執行緒呼叫
lock() {
while (true) {
const oldValue = Atomics.compareExchange(this.i32a, AsyncLock.INDEX,
/* old value >>> */ AsyncLock.UNLOCKED,
/* new value >>> */ AsyncLock.LOCKED);
if (oldValue == AsyncLock.UNLOCKED) {
return;
}
Atomics.wait(this.i32a, AsyncLock.INDEX,
AsyncLock.LOCKED); // <<< expected value at start
}
}
當執行緒呼叫 lock()
時,它會先嘗試使用 Atomics.compareExchange
取得鎖定,以將鎖定狀態從 UNLOCKED
變更為 LOCKED
。Atomics.compareExchange
嘗試以原子方式變更狀態,並傳回記憶體位置的原始值。如果原始值是 UNLOCKED
,我們知道狀態變更已成功,而且執行緒已取得鎖定。不需要再做任何事。
如果 Atomics.compareExchange
無法變更鎖定狀態,另一個執行緒一定正在持有鎖定。因此,這個執行緒會嘗試 Atomics.wait
以等待另一個執行緒釋放鎖定。如果記憶體位置仍持有預期的值(在本例中為 AsyncLock.LOCKED
),呼叫 Atomics.wait
會封鎖執行緒,而且 Atomics.wait
呼叫只會在另一個執行緒呼叫 Atomics.notify
時傳回。
unlock
方法將鎖定設定為 UNLOCKED
狀態,並呼叫 Atomics.notify
以喚醒正在等待鎖定的其中一個等待者。狀態變更預期總是會成功,因為這個執行緒正在持有鎖定,而且同時沒有其他人應該呼叫 unlock()
。
unlock() {
const oldValue = Atomics.compareExchange(this.i32a, AsyncLock.INDEX,
/* old value >>> */ AsyncLock.LOCKED,
/* new value >>> */ AsyncLock.UNLOCKED);
if (oldValue != AsyncLock.LOCKED) {
throw new Error('Tried to unlock while not holding the mutex');
}
Atomics.notify(this.i32a, AsyncLock.INDEX, 1);
}
簡單的案例如下:鎖是自由的,而執行緒 T1 透過使用 Atomics.compareExchange
變更鎖狀態來取得鎖。執行緒 T2 嘗試透過呼叫 Atomics.compareExchange
來取得鎖,但無法成功變更鎖狀態。然後 T2 會呼叫 Atomics.wait
,這會封鎖執行緒。在某個時間點,T1 會釋放鎖並呼叫 Atomics.notify
。這會讓 T2 中的 Atomics.wait
呼叫傳回 'ok'
,喚醒 T2。然後 T2 會再次嘗試取得鎖,而這次會成功。
還有 2 個可能的臨界狀況,這些狀況說明了 Atomics.wait
和 Atomics.waitAsync
在索引處檢查特定值的原因
- T1 持有鎖,而 T2 嘗試取得鎖。首先,T2 嘗試使用
Atomics.compareExchange
變更鎖狀態,但無法成功。不過,在 T2 設法呼叫Atomics.wait
之前,T1 會釋放鎖。當 T2 呼叫Atomics.wait
時,它會立即傳回值'not-equal'
。在這種情況下,T2 會繼續下一個迴圈反覆運算,再次嘗試取得鎖。 - T1 持有鎖,而 T2 使用
Atomics.wait
等待鎖。T1 釋放鎖,T2 會喚醒(Atomics.wait
呼叫傳回),並嘗試執行Atomics.compareExchange
來取得鎖,但另一個執行緒 T3 動作更快,已經取得鎖。因此,呼叫Atomics.compareExchange
無法取得鎖,而 T2 會再次呼叫Atomics.wait
,封鎖直到 T3 釋放鎖。
由於後者的臨界狀況,互斥鎖並非「公平」。T2 可能一直在等待鎖釋放,但 T3 卻突然出現並立即取得鎖。更實際的鎖實作可能會使用多個狀態來區分「已鎖定」和「競爭中已鎖定」。
非同步鎖 #
與封鎖的 lock
方法不同,非封鎖的 executeLocked
方法可從主執行緒呼叫。它取得一個回呼函式作為其唯一參數,並排程回呼函式,以便在成功取得鎖後執行。
executeLocked(f) {
const self = this;
async function tryGetLock() {
while (true) {
const oldValue = Atomics.compareExchange(self.i32a, AsyncLock.INDEX,
/* old value >>> */ AsyncLock.UNLOCKED,
/* new value >>> */ AsyncLock.LOCKED);
if (oldValue == AsyncLock.UNLOCKED) {
f();
self.unlock();
return;
}
const result = Atomics.waitAsync(self.i32a, AsyncLock.INDEX,
AsyncLock.LOCKED);
// ^ expected value at start
await result.value;
}
}
tryGetLock();
}
內部函式 tryGetLock
會先嘗試使用 Atomics.compareExchange
取得鎖,如同先前一樣。如果成功變更鎖狀態,它就能執行回呼函式、解除鎖定,並傳回。
如果 Atomics.compareExchange
無法取得鎖,我們需要在鎖可能自由時再次嘗試。我們無法封鎖並等待鎖自由,而是使用 Atomics.waitAsync
和它傳回的 Promise 排程新的嘗試。
如果我們成功啟動了 Atomics.waitAsync
,則當鎖定執行緒執行 Atomics.notify
時,傳回的 Promise 會解析。然後,等待鎖定的執行緒會嘗試再次取得鎖定,就像之前一樣。
在非同步版本中,也可能發生相同的臨界狀況(在 Atomics.compareExchange
呼叫與 Atomics.waitAsync
呼叫之間釋放鎖定,以及在 Promise 解析與 Atomics.compareExchange
呼叫之間再次取得鎖定),因此程式碼必須以穩健的方式處理這些狀況。
結論 #
在這篇文章中,我們展示了如何使用同步原語 Atomics.wait
、Atomics.waitAsync
和 Atomics.notify
,來實作一個互斥鎖,它可以在主執行緒和工作執行緒中使用。
功能支援 #
Atomics.wait
和 Atomics.notify
#
- Chrome: 自版本 68 開始支援
- Firefox: 自版本 78 開始支援
- Safari: 不支援
- Node.js: 自版本 8.10.0 開始支援
- Babel: 不支援
Atomics.waitAsync
#
- Chrome: 自版本 87 開始支援
- Firefox: 不支援
- Safari: 不支援
- Node.js: 自版本 16 開始支援
- Babel: 不支援