關于CAS等原子操作
在開始說無鎖隊列之前,我們需要知道一個很重要的技術就是CAS操作——Compare & Set,或是 Compare & Swap,現在幾乎所有的CPU指令都支持CAS的原子操作,X86下對應的是 CMPXCHG 匯編指令。有了這個原子操作,我們就可以用其來實現各種無鎖(lock free)的數據結構。
這個操作用C語言來描述就是下面這個樣子:意思就是說,看一看內存*reg里的值是不是oldval,如果是的話,則對其賦值newval。
{
int old_reg_val = *reg;
if (old_reg_val == oldval) {
*reg = newval;
}
return old_reg_val;
}
我們可以看到,old_reg_val 總是返回,于是,我們可以在 compare_and_swap 操作之后對其進行測試,以查看它是否與 oldval相匹配,因為它可能有所不同,這意味著另一個并發線程已成功地競爭到 compare_and_swap 并成功將 reg 值從 oldval 更改為別的值了。
這個操作可以變種為返回bool值的形式(返回 bool值的好處在于,可以調用者知道有沒有更新成功):
{
if ( *addr != oldval ) {
return false;
}
*addr = newval;
return true;
}
與CAS相似的還有下面的原子操作:
- Fetch And Add,一般用來對變量做 +1 的原子操作
- Test-and-set,寫值到某個內存位置并傳回其舊值。匯編指令BST
- Test and Test-and-set,用來低低Test-and-Set的資源爭奪情況
注:在實際的C/C++程序中,CAS的各種實現版本如下:
1)GCC的CAS
GCC4.1+版本中支持CAS的原子操作
type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...)
2)Windows的CAS
在Windows下,你可以使用下面的Windows API來完成CAS:
__in LONG Exchange,
__in LONG Comperand);
3) C++11中的CAS
C++11中的STL中的atomic類的函數可以讓你跨平臺。
bool atomic_compare_exchange_weak( std::atomic* obj,
T* expected, T desired );
template< class T >
bool atomic_compare_exchange_weak( volatile std::atomic* obj,
T* expected, T desired );
無鎖隊列的鏈表實現
初始化一個隊列的代碼很簡,初始化一個dummy結點(注:在鏈表操作中,使用一個dummy結點,可以少掉很多邊界條件的判斷),如下所示:
{
node = new node()
node->next = NULL;
Q->head = Q->tail = node;
}
我們先來看一下進隊列用CAS實現的方式,基本上來說就是鏈表的兩步操作:
第一步,把tail指針的next指向要加入的結點。tail->next = p;
第二步,把tail指針移到隊尾。tail = p;
{
//準備新加入的結點數據
n = new node();
n->value = data;
n->next = NULL;
do {
p = Q->tail; //取鏈表尾指針的快照
} while( CAS(p->next, NULL, n) != TRUE);
//while條件注釋:如果沒有把結點鏈在尾指針上,再試
CAS(Q->tail, p, n); //置尾結點 tail = n;
}
我們可以看到,程序中的那個 do-while 的 Retry-Loop 中的 CAS 操作:如果 p->next 是 NULL,那么,把新結點 n 加到隊尾。如果不成功,則重新再來一次!
就是說,很有可能我在準備在隊列尾加入結點時,別的線程已經加成功了,于是tail指針就變了,于是我的CAS返回了false,于是程序再試,直到試成功為止。這個很像我們的搶電話熱線的不停重播的情況。
但是你會看到,為什么我們的“置尾結點”的操作不判斷是否成功,因為:
- 如果有一個線程T1,它的while中的CAS如果成功的話,那么其它所有的 隨后線程的CAS都會失敗,然后就會再循環,
- 此時,如果T1 線程還沒有更新tail指針,其它的線程繼續失敗,因為tail->next不是NULL了。
- 直到T1線程更新完 tail 指針,于是其它的線程中的某個線程就可以得到新的 tail 指針,繼續往下走了。
- 所以,只要線程能從 while 循環中退出來,意味著,它已經“獨占”了,tail 指針必然可以被更新。
- 這里有一個潛在的問題——如果T1線程在用CAS更新tail指針的之前,線程停掉或是掛掉了,那么其它線程就進入死循環了。下面是改良版的EnQueue()
{
n = new node();
n->value = data;
n->next = NULL;
p = Q->tail;
oldp = p
do {
while (p->next != NULL)
p = p->next;
} while( CAS(p.next, NULL, n) != TRUE); //如果沒有把結點鏈在尾上,再試
CAS(Q->tail, oldp, n); //置尾結點
}
我們讓每個線程,自己fetch 指針 p 到鏈表尾。但是這樣的fetch會很影響性能。而且,如果一個線程不斷的EnQueue,會導致所有的其它線程都去 fetch 他們的 p 指針到隊尾,能不能不要所有的線程都干同一個事?這樣可以節省整體的時間?
比如:直接 fetch Q->tail 到隊尾?因為,所有的線程都共享著 Q->tail,所以,一旦有人動了它后,相當于其它的線程也跟著動了,于是,我們的代碼可以改進成如下的實現:
{
n = new node();
n->value = data;
n->next = NULL;
while(TRUE) {
//先取一下尾指針和尾指針的next
tail = Q->tail;
next = tail->next;
//如果尾指針已經被移動了,則重新開始
if ( tail != Q->tail ) continue;
//如果尾指針的 next 不為NULL,則 fetch 全局尾指針到next
if ( next != NULL ) {
CAS(Q->tail, tail, next);
continue;
}
//如果加入結點成功,則退出
if ( CAS(tail->next, next, n) == TRUE ) break;
}
CAS(Q->tail, tail, n); //置尾結點
}
上述的代碼還是很清楚的,相信你一定能看懂,而且,這也是 Java 中的 ConcurrentLinkedQueue 的實現邏輯,當然,我上面的這個版本比 Java 的好一點,因為沒有 if 嵌套,嘿嘿。
好了,我們解決了EnQueue,我們再來看看DeQueue的代碼:(很簡單,我就不解釋了)
{
do{
p = Q->head;
if (p->next == NULL){
return ERR_EMPTY_QUEUE;
}
while( CAS(Q->head, p, p->next) != TRUE );
return p->next->value;
}
我們可以看到,DeQueue的代碼操作的是 head->next,而不是 head 本身。這樣考慮是因為一個邊界條件,我們需要一個dummy的頭指針來解決鏈表中如果只有一個元素,head 和 tail 都指向同一個結點的問題,這樣 EnQueue 和 DeQueue 要互相排斥了。
但是,如果 head 和 tail 都指向同一個結點,這意味著隊列為空,應該返回 ERR_EMPTY_QUEUE,但是,在判斷 p->next == NULL 時,另外一個EnQueue操作做了一半,此時的 p->next 不為 NULL了,但是 tail 指針還差最后一步,沒有更新到新加的結點,這個時候就會出現,在 EnQueue 并沒有完成的時候, DeQueue 已經把新增加的結點給取走了,此時,隊列為空,但是,head 與 tail 并沒有指向同一個結點。如下所示:
雖然,EnQueue的函數會把 tail 指針置對,但是,這種情況可能還是會導致一些并發問題,所以,嚴謹來說,我們需要避免這種情況。于是,我們需要加入更多的判斷條件,還確保這個問題。下面是相關的改進代碼:
{
while(TRUE) {
//取出頭指針,尾指針,和第一個元素的指針
head = Q->head;
tail = Q->tail;
next = head->next;
// Q->head 指針已移動,重新取 head指針
if ( head != Q->head ) continue;
// 如果是空隊列
if ( head == tail && next == NULL ) {
return ERR_EMPTY_QUEUE;
}
//如果 tail 指針落后了
if ( head == tail && next == NULL ) {
CAS(Q->tail, tail, next);
continue;
}
//移動 head 指針成功后,取出數據
if ( CAS( Q->head, head, next) == TRUE){
value = next->value;
break;
}
}
free(head); //釋放老的dummy結點
return value;
}
CAS的ABA問題
所謂ABA,問題基本是這個樣子:
- 進程P1在共享變量中讀到值為A
- P1被搶占了,進程P2執行
- P2把共享變量里的值從A改成了B,再改回到A,此時被P1搶占。
- P1回來看到共享變量里的值沒有被改變,于是繼續執行。
雖然P1以為變量值沒有改變,繼續執行了,但是這個會引發一些潛在的問題。ABA問題最容易發生在lock free 的算法中的,CAS首當其沖,因為CAS判斷的是指針的值。很明顯,值是很容易又變成原樣的。
比如上述的DeQueue()函數,因為我們要讓head和tail分開,所以我們引入了一個dummy指針給head,當我們做CAS的之前,如果head的那塊內存被回收并被重用了,而重用的內存又被EnQueue()進來了,這會有很大的問題。(內存管理中重用內存基本上是一種很常見的行為)
這個例子你可能沒有看懂,一個活生生的例子——
你拿著一個裝滿錢的手提箱在飛機場,此時過來了一個火辣性感的美女,然后她很暖昧地挑逗著你,并趁你不注意的時候,把用一個一模一樣的手提箱和你那裝滿錢的箱子調了個包,然后就離開了,你看到你的手提箱還在那,于是就提著手提箱去趕飛機去了。
這就是ABA的問題。
解決ABA的問題
維基百科上給了一個解——使用double-CAS(雙保險的CAS),例如,在32位系統上,我們要檢查64位的內容
- 一次用CAS檢查雙倍長度的值,前半部是值,后半部分是一個計數器。
- 只有這兩個都一樣,才算通過檢查,要吧賦新的值。并把計數器累加1。
這樣一來,ABA發生時,雖然值一樣,但是計數器就不一樣(但是在32位的系統上,這個計數器會溢出回來又從1開始的,這還是會有ABA的問題)
當然,我們這個隊列的問題就是不想讓那個內存重用,這樣明確的業務問題比較好解決。
{
loop:
p = q->next;
if (p == NULL){
return p;
}
Fetch&Add(p->refcnt, 1);
if (p == q->next){
return p;
}else{
Release(p);
}
goto loop;
}
其中的 Fetch&Add和Release分是是加引用計數和減引用計數,都是原子操作,這樣就可以阻止內存被回收了。
用數組實現無鎖隊列
使用數組來實現隊列是很常見的方法,因為沒有內存的分部和釋放,一切都會變得簡單,實現的思路如下:
- 數組隊列應該是一個ring buffer形式的數組(環形數組)
- 數組的元素應該有三個可能的值:HEAD,TAIL,EMPTY(當然,還有實際的數據)
- 數組一開始全部初始化成EMPTY,有兩個相鄰的元素要初始化成HEAD和TAIL,這代表空隊列。
- EnQueue操作。假設數據x要入隊列,定位TAIL的位置,使用double-CAS方法把(TAIL, EMPTY) 更新成 (x, TAIL)。需要注意,如果找不到(TAIL, EMPTY),則說明隊列滿了。
- DeQueue操作。定位HEAD的位置,把(HEAD, x)更新成(EMPTY, HEAD),并把x返回。同樣需要注意,如果x是TAIL,則說明隊列為空。
算法的一個關鍵是——如何定位HEAD或TAIL?
- 我們可以聲明兩個計數器,一個用來計數EnQueue的次數,一個用來計數DeQueue的次數。
- 這兩個計算器使用使用Fetch&ADD來進行原子累加,在EnQueue或DeQueue完成的時候累加就好了。
- 累加后求個模什么的就可以知道TAIL和HEAD的位置了。
如下圖所示:
小結
以上基本上就是所有的無鎖隊列的技術細節,這些技術都可以用在其它的無鎖數據結構上。
- 無鎖隊列主要是通過CAS、FAA這些原子操作,和Retry-Loop實現。
- 對于Retry-Loop,我個人感覺其實和鎖什么什么兩樣。只是這種“鎖”的粒度變小了,主要是“鎖”HEAD和TAIL這兩個關鍵資源。而不是整個數據結構
-
內存
+關注
關注
8文章
3028瀏覽量
74100 -
數據結構
+關注
關注
3文章
573瀏覽量
40146 -
CAS
+關注
關注
0文章
34瀏覽量
15213 -
線程
+關注
關注
0文章
505瀏覽量
19700
發布評論請先 登錄
相關推薦
評論