AQS提供了兩種鎖,獨占鎖和共享鎖。獨占鎖只有一把鎖,同一時間只允許一個線程獲得鎖;而共享鎖則有多把鎖,同一時間允許多個線程獲得鎖。我們本文主要講獨占鎖。
一. 獨占鎖的獲取
AQS中對獨占鎖的獲取一共有三個方法:
- acquire:不響應中斷獲取獨占鎖
- acquireInterruptibly:響應中斷獲取獨占鎖
- tryAcquireNanos:響應中斷+超時獲取獨占鎖
由于篇幅,我們主要著眼于acquire方法,當然,只要你理解了acquire,acquireInterruptibly和tryAcquireNanos自然不在話下了,因為這兩個方法只是在acquire的基礎上增加了一些判斷邏輯來處理中斷和超時情況而已。
我們上源碼
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
其acquire方法中一共有四個方法,其邏輯也分為4步:
- tryAcquire :嘗試獲取鎖,成功即acquire方法結束,否則調用addWaiter
- addWaiter :獲取鎖失敗即調用此方法入隊,即將獲取鎖失敗的線程包裝成Node放入同步隊列的隊尾
- acquireQueued :入隊成功后即調用此方法,如果Node在隊首則再次搶鎖,否則掛起等待喚醒(喚醒后再去獲取鎖)
- selfInterrupt :如果是被中斷喚醒,則再次執行中斷
粗略介紹完后,我們現在一個一個方法看。
1.1 tryAcquire
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
tryAcquire是鉤子方法,是我們根據需要重寫的。其功能就是在獨占模式下去獲取鎖,獲取成功則返回true,acquire方法直接結束;如果獲取失敗返回false,則后續會調用后面要講的addWaiter方法將線程入隊。
因為AQS是模板類,不同的子類只需要重寫不同的鉤子方法,因此,tryAcquire不能設置成抽象方法,不然一些不需要此鉤子方法的子類也要實現這個方法。所以作者對tryAcquire的默認實現是拋了一個異常(當然我認為直接寫個return也是ok的)。
1.2 addWaiter
如果tryAcquire獲取鎖失敗后,我們就會調用addWaiter將線程包裝成Node入隊掛起。addWaiter的大致邏輯是:先將線程包裝成Node,然后入隊,如果隊列未初始化或者入隊失敗,則會調用子方法enq,enq來進行初始化隊列和自旋入隊,我們看下具體代碼:
private Node addWaiter(Node mode) {
// 將此線程包裝成Node
Node node = new Node(Thread.currentThread(), mode);
// 將pred指向尾結點
Node pred = tail;
// 如果pred 即尾結點不為null,說明同步隊列初始化完成了。
if (pred != null) {
// 尾插法
// 步驟一:將node的前驅指針指向當前尾結點
node.prev = pred;
// 步驟二:通過CAS將尾結點指向當前節點
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 走到這一步有兩個原因
// 1是隊列未初始化,2是尾結點插入失敗
enq(node);
return node;
}
下面是enq方法,當執行到這個方法時,說明線程獲取鎖已經失敗了,然后入隊過程又失敗了,入隊過程失敗有兩個原因:
- 同步隊列未初始化
- 入隊過程中CAS操作失敗
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 隊列為空, 初始化隊列操作,即將head和tail指向一個空節點
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else { // 隊列不為空
// 并發下,cas操作可能會失敗,所以通過for循環不斷進行入隊,直到成功為止
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
CAS節點入隊失敗的原因,我們看到enq源碼中執行完尾插法的步驟一,即將Node的前驅指針指向當前尾結點,如果是并發情況下,應該是如下圖所示(紫色節點代表我們關注的Node):
此時,可能有多個Node都準備入隊,所以此時可能有多個Node的前驅節點都指向尾結點,所以我們在執行步驟二將尾結點指向Node時,采用的是CAS,即只有一個Node能成功,假設我們關注的Node入隊成功了,如下圖:
則另外兩個CAS操作肯定會失敗,即它們將要進入enq方法重新自旋入隊。
1.3 acquireQueued
執行完addWaiter方法后,說明我們已經入隊成功了,此時我們需要將Node中的線程掛起,等待下次被喚醒。
但在掛起之前,我們需要再次檢查下我們此時的Node是否是在隊首,如果在隊首,我們又會再次去搶鎖。否則我們會通過shouldParkAfterFailedAcquire判斷是否要掛起(shouldParkAfterFailedAcquire不僅僅是判斷此線程是否可以被掛起,還會將同步隊列中屬性為CANCELLED的Node移除隊列),如果需要掛起,則調用parkAndCheckInterrupt將線程掛起。具體源碼如下:
final boolean acquireQueued(final Node node, int arg) {
// 獲取失敗標簽,默認ture,如果獲取到鎖了后則會置為false
boolean failed = true;
try {
// 中斷標簽,默認false
boolean interrupted = false;
for (;;) {
// 獲取此節點的前驅節點
final Node p = node.predecessor();
// 如果前驅節點是頭結點,則會再次調用tryAcquire搶鎖
// 如果搶鎖成功了,則進入if語句,然后return
if (p == head && tryAcquire(arg)) {
// 將此節點設置為頭結點
setHead(node);
p.next = null; // help GC
// 獲取失敗標志置為false,因為拿到鎖了
failed = false;
// 返回中斷標志
return interrupted;
}
// shouldParkAfterFailedAcquire判斷是否要掛起
// 如果要掛起,則調用parkAndCheckInterrupt將線程掛起
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire源碼如下。其主要作用有2:
- 決定獲取鎖失敗后,是否將線程掛起
- 清除同步隊列中所有狀態為CANCELLED的節點
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 如果此節點的前驅節點為SIGNAL,則說明此節點需要掛起,返回true
if (ws == Node.SIGNAL)
return true;
// 如果此節點的前驅節點狀態大于0,即狀態為CANCELLED則移除前驅節點,然后再往前遍歷,直到清除完所有CANCELLED的節點
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 將前驅節點置為SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
這是acquireQueued中的最后一步,即將線程掛起,然后靜靜的等待被喚醒。除非該線程被其他線程unpark或者被中斷,否則該線程的程序將一直停止在這。
private final boolean parkAndCheckInterrupt() {
// 通過LockSupport掛起線程
LockSupport.park(this);
// 返回線程的標志位,true表示此線程被中斷過
return Thread.interrupted();
}
1.4 selfInterrupt
通過我們前面的分析可以知道,當線程被中斷過,則會進入到此方法。
而interrupte這個方法也只是將當前線程的中斷標志置為true,至于會不會被中斷,這個是由系統決定的。
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
二. 獨占鎖的的釋放
相比獨占鎖的獲取,獨占鎖的釋放邏輯就簡單多了。獨占鎖釋放只做了兩件事情:
- 釋放鎖
- 喚醒head結點后最近需要被喚醒的節點。
其釋放邏輯的實現是通過release方法,而做的兩件事分別對應了其子方法tryRelease和unparkSuccessor:
public final boolean release(int arg) {
// 如果釋放鎖成功,則進入if去喚醒同步隊列中的線程
if (tryRelease(arg)) {
Node h = head;
// head節點不為空(即同步隊列不為空) 且 狀態不為0(初始化隊列時,head結點waitStatus為0,此時等待隊列中是沒有節點的)
// 則喚醒head結點后繼節點
if (h != null && h.waitStatus != 0)
// 喚醒離head最近需要被喚醒的節點
unparkSuccessor(h);
return true;
}
return false;
}
2.1 tryRelease
這個方法和tryAcquire一樣,也是鉤子方法,是留給子類重寫的,作用是用來釋放鎖,如果釋放成功則返回true,失敗返回false,這個具體的實現我們也放在后續AQS的子類中講解,這里就不過多闡述了。
2.2 unparkSuccessor
此方法的作用是喚醒后繼Node,我們看代碼:
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
// waitStatus< 0,說明此時waitStatus為SIGNAL
if (ws < 0)
// 此時需要將waitStatus置為0,待會喚醒后繼節點
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
// 此Node的后繼節點如果是null或者狀態為CANCELLED,則此Node已經不存在或者取消
// 則我們需要從尾結點往前遍歷找到離head最近的需要被喚醒的Node
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 喚醒Node中的線程
LockSupport.unpark(s.thread);
}
這里需要注意的是,我們在找需要被喚醒的節點時,為什么是從后往前遍歷呢?
其實這和獲取鎖時的尾結點入隊有關,我們再看下入隊方法addWaiter中插入尾結點的相關代碼:
node.prev = pred; //step1
if (compareAndSetTail(pred, node)) // step2
pred.next = node; // step3
假設我們此時有個Node正在入隊,執行完step2,還未執行step3,unparkSuccessor中如果采用從head往后遍歷,是找不到這個新插入的Node的;但如果是采用從后往前遍歷,則不會出現這個問題。
三. 總結
對于獨占鎖的獲取與釋放,就分析完了,這里我再總結一下:
獲取獨占鎖是通過acquire來實現的,首先通過tryAcquire獲取鎖,如果獲取成功,則直接返回,如果失敗,則會調用addWaiter方法進行入隊,如果入隊過程中發現隊列未初始化,則會初始化隊列再進行入隊,入隊不成功則會一直自旋直到成功;入隊成功后就會掛起,直到被其他線程或者中斷喚醒;喚醒后會檢查線程的中斷標志位,如果被中斷過,會再次調用中斷方法,告訴系統自己需要被中斷。
釋放獨占鎖是通過release方法實現的,其首先通過tryRelease釋放鎖,如果失敗則直接返回false,如果成功則會調用unparkSuccessor喚醒后繼節點。
-
代碼
+關注
關注
30文章
4801瀏覽量
68735 -
線程
+關注
關注
0文章
505瀏覽量
19705
發布評論請先 登錄
相關推薦
評論