我們在第一篇中說到AQS使用的是管程模型,而管程模型是使用條件變量來解決同步通信問題的。條件變量會有兩個方法,喚醒和等待。當條件滿足時,我們會通過喚醒方法將條件隊列中的線程放入第二篇所說的同步隊列中;如果不滿足條件,則會通過等待方法將線程阻塞放入條件隊列中。而AQS中通過ConditionObject類實現了條件變量,所以接下來我們就具體看看ConditionObject類吧。
一 屬性
我們先看下ConditionObject中的屬性
/** 鏈表頭節點 */
private transient Node firstWaiter;
/** 鏈表尾節點 */
private transient Node lastWaiter;
開頭說了,條件變量中會有一個條件隊列,ConditionObject中的條件隊列使用的是單向鏈表,firstWaiter和lastWaiter為頭尾節點,節點也是使用AQS的內部類Node,但同步隊列是個雙向鏈表,條件隊列是單向鏈表,所以條件隊列使用的是Node類中的nextWaiter屬性作為下一個節點的鏈接指針。
volatile Node prev;
volatile Node next;
Node nextWaiter;
我們可以注意到nextWaiter是沒用volatile修飾的,這是因為線程在調用await方法進入條件隊列時,是已經擁有了鎖的。還有一點需要注意是,條件隊列里面的Node只會存在CANCELLED和CONDITION的狀態,有別于同步隊列。
二 喚醒方法
2.1 signalAll
此方法是喚醒所有條件隊列中的節點,即將條件隊列中的所有節點都移動到我們第二篇所說的同步隊列中,然后再去競爭鎖,具體源碼如下:
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
上面我們說了,要調用喚醒和等待方法,都需要此線程獲取鎖,首先我們會通過子類復寫的方法isHeldExclusively來看此時的線程是否已經獲得了鎖。如果獲得了鎖,我們會判斷條件隊列的頭節點是否為null,為null則說明條件隊列中沒有阻塞的Node;如果不為null,則會通過doSignalAll方法來將條件隊列中的所有Node移動到同步隊列中
2.1.1 doSignalAll
doSignalAll方法主要功能就是遍歷條件隊列里面的節點Node,然后通過transferForSignal方法將Node移動到同步隊列中,源碼如下:
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
// 將next指向first的后繼Node
Node next = first.nextWaiter;
// 切斷first與后繼Node的聯系
first.nextWaiter = null;
// 將此node轉移到同步隊列中
transferForSignal(first);
// 將first指向first的后繼Node
first = next;
// 在判斷此時的first是否為null,不是則繼續循環
} while (first != null);
}
2.1.2 transferForSignal
transferForSignal主要功能就是將條件隊列中的節點Node轉移到同步隊列中,源碼如下:
final boolean transferForSignal(Node node) {
// 說明此節點狀態為CANCELLED,所以跳過該節點(GC會回收)
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 入隊方法(獨占鎖獲取中詳細闡述過)
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
首先通過CAS來將Node的狀態置為0,如果失敗,則說明此時Node狀態是CANCELLED,則直接返回false;如果Node狀態成功置為了0,我們就通過enq方法將此節點入隊到同步隊列中,enq方法已經在第二篇文章中講過,這里就不再復述了。enq方法執行完成后,說明node已經成功進入同步隊列了,然后其返回的是入隊的前驅節點,如果前驅節點是CANCELLED狀態,或者我們將前驅節點的狀態變為SIGNAL失敗,則我們就需要喚醒此節點去搶鎖。這個如果你看了第二篇文章,你肯定是能夠想到的。
2.2 signal
看名字也能大概猜到,因為signalAll是將條件隊列中所有的Node轉移到同步隊列中,所以signal肯定是轉移單個Node。
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
我們可以對比上面的signalAll方法,其唯一不同點就是signalAll內部調用的是doSignalAll方法,而signal內部調用的是doSignal方法,我們接著來看doSignal:
private void doSignal(Node first) {
do {
// 將firstWaiter指向傳入的first的后繼節點,
// 然后判斷firstWaiter是否為null,
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
我們可以看到方法里面是個do-While的循環,我們首先將firstWaiter指向first的后繼節點并判斷是否為null,如果為空,則說明條件隊列中只有first這一個節點,所以我們將整個隊列清空。然后我們再將first的的nextWaiter指向null斷開連接,進入while條件語句中。while條件語句中,會先調transferForSignal來轉移Node,如果返回為false,即轉移失敗,我們會判斷此節點下一個節點是否為null,不為null則又進入循環。
三 等待方法
喚醒方法wait,就是將線程阻塞包裝成節點放入條件隊列中,等到其他線程喚醒(signal)或者自身中斷后再重新去獲取鎖。所以其又可以大致分為兩個階段,線程阻塞前和阻塞后。
3.1 await—阻塞前
我們先來看下await的源碼:
public final void await() throws InterruptedException {
// 如果此線程被中斷過,直接拋中斷異常
if (Thread.interrupted())
throw new InterruptedException();
// 將當前線程包裝成節點放入條件隊列
Node node = addConditionWaiter();
// 釋放當前線程持有的鎖
long savedState = fullyRelease(node);
// 初始化中斷模式參數
int interruptMode = 0;
// 檢查節點是否在同步隊列中
while (!isOnSyncQueue(node)) {
// 不在同步隊列中則阻塞此線程
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 被喚醒后再去獲取鎖
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 當線程是被中斷喚醒,node和后繼節點是沒有斷開的
if (node.nextWaiter != null)
unlinkCancelledWaiters();
// 根據異常標志位對異常進行處理
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
3.1.1 addConditionWaiter
private Node addConditionWaiter() {
Node t = lastWaiter;
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
addConditionWaiter的大致邏輯為:lastWaiter不是null且它的等待狀態不是CONDITION,說明lastWaiter的狀態是CANCELLED,所以我們會通過unlinkCancelledWaiters方法來移除條件隊列中所有CANCELLED的節點。然后我們會將當前線程包裝成一個節點,我們再會判斷尾節點是否為null,為null說明條件隊列為空,所以我們就將firstWaiter指向新的節點;如果不為null,就將尾節點的后繼節點指向新節點,然后再重置lastWaiter。最后將新節點返回。
3.1.2 fullyRelease
此時入隊成功后,我們就會調用fullyRelease方法來釋放當前線程所持有的鎖了,我們具體看下源碼:
final long fullyRelease(Node node) {
boolean failed = true;
try {
long savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
其中釋放鎖成功調用的是release方法,這個方法在第二篇文章中詳述過。如果釋放鎖成功,則將failed狀態置為false,然后返回savedState狀態,否則我們就會拋出異常。其中savedState是重入鎖的數量,release方法會一起釋放掉。
再看下finally,如果釋放鎖失敗,我們此線程會拋異常終止,然后在finally將waitStatus置為CANCELLED,然后等待后面被移出條件隊列。
3.1.3 isOnSyncQueue
isOnSyncQueue方法是檢查此節點是否在同步隊列中,具體源碼如下:
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
return findNodeFromTail(node);
}
先看第一個if語句,如果狀態是CONDITION或者prev參數是null,說明此節點是在條件隊列中,返回為false。再來看第二個if,我們知道,prev和next都是同步隊列中的節點連接是用的prev和next,所以如果兩個屬性不為null,說明此節點是在同步隊列中,所以node.next不為null則需要返回true。如果兩個if都不成立,說明這個節點狀態是0且prev不為null,即屬于我們中CAS進入同步隊列的情況,則我們會通過findNodeFromTail方法來確認是不是這種情況
3.1.3.1 findNodeFromTail
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
如果此時tail就是node的話,說明node在同步隊列中,如果不是就像前遍歷。我們再回到await方法:
// 省略
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 省略
如果不在同步隊列中,則此線程就被park方法阻塞了,只有當線程被喚醒才會在這里開始繼續執行下面代碼。
3.2 wait—喚醒后
我們再來看看await喚醒后的情形:
public final void await() throws InterruptedException {
// 省略。。。。
while (!isOnSyncQueue(node)) {
// 不在同步隊列中則阻塞此線程
LockSupport.park(this); // < ----- 被喚醒后從下面開始
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 被喚醒后再去獲取鎖
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 當線程是被中斷喚醒時,node和后繼節點是沒有斷開的
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// 根據異常標志位對異常進行處理
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
我們需要注意的是,線程在這里被喚醒有兩種情況:
- 其他線程調用了doSignal或doSignalAll,
- 線程被中斷。
我們需要確定我們被喚醒的情況是哪種,這里是通過checkInterruptWhileWaiting方法來判斷。但在講這個方法前,我們需先了解這個interruptMode有幾種狀態:
/** wait方法退出時,會重新再中斷一次 */
private static final int REINTERRUPT = 1;
/** wait方法退出時,會拋出InterruptedException異常 */
private static final int THROW_IE = -1;
除了上面兩種,還有一種初始態0,它代表線程沒有被中斷過,不做任何處理。
3.2.1 checkInterruptWhileWaiting
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
我們看下代碼,首先我們會檢查中斷標志位,如果interrupted方法返回false,說明沒發生中斷,方法最終返回0;如果返回了true,則說明中斷了,則我們需要通過transferAfterCancelledWait方法進一步檢查其他線程是否執行了喚醒操作。
3.2.1.1 transferAfterCancelledWait
final boolean transferAfterCancelledWait(Node node) {
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
我們先看第一個if條件,如果條件中的CAS操作成功,說明此時的節點肯定是在條件隊列中,則我們調動 enq 方法將此節點放入到同步隊列中,然后返回true。但是這里需要特別注意,這個節點的nextWaiter還沒置為null;如果CAS失敗,說明這個節點可能已經在同步隊列中或者在入隊的過程中,所以我們通過while循環等待此節點入隊后返回false。
我們再回到調用transferAfterCncelled 的 checkInterruptWhileWaiting方法中,根據transferAfterCancelledWait方法返回值我們最終會返回REINTERRUPT或THROW_IE。
然后我們返回到調用checkInterruptWhileWaiting方法的await方法中。
public final void await() throws InterruptedException {
// 代碼省略
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) // 我們現在在這里!!!
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
我們可以看到,如果返回值不為0,則直接break跳出循環,如果為0,則再次回到while條件檢查是否在同步隊列中。最后我們看最后剩下的三個if語句:
- 通過acquireQueued方法來獲取鎖,這個方法在第二篇中詳細講過,acquireQueued返回true(即獲取鎖的的過程中被中斷了),我們再將interruptMode為0置為REINTERRUPT。
- 如果node的nextWaiter不是null。我們會通過unlinkCancelledWaiters方法將條件隊列中所有不為CONDITION的節點移除。
- 最后一個if,線程拿到鎖了,且節點沒在同步隊列和條件隊列中,await方法其實算完成了,我們這時候只需要對中斷進行善后處理。如果interruptMode不為0,說明線程是被中斷過的,需要通過reportInterruptAfterWait對中斷進行處理。
3.2.1.2 reportInterruptAfterWait
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
如果是THROW_IE,就是拋異常,如果是REINTERRUPT,就再自我中斷一次。
四 總結
好了,AQS如何解決線程同步與通信問題,就分析完了,這里我再總結一下:
AQS通過ConditionObject類來實現條件變量,并通過其喚醒方法、阻塞方法來進行線程的通信。當線程獲取鎖之后,可以通過signal、signalAll等喚醒方法將條件隊列中被阻塞的線程節點轉移到同步隊列中,然后喚醒去競爭鎖;也可以通過wait方法將自己包裝成節點并放入條件隊列中,然后等待被其他線程喚醒或中斷。
-
通信
+關注
關注
18文章
6024瀏覽量
135950 -
模型
+關注
關注
1文章
3226瀏覽量
48809 -
線程
+關注
關注
0文章
504瀏覽量
19675
發布評論請先 登錄
相關推薦
評論