深入理解RCU實(shí)現(xiàn)
——基于內(nèi)核2.6.21RCU實(shí)現(xiàn)
RCU(Read-Copy Update),顧名思義就是讀-拷貝修改,它是基于其原理命名的。對(duì)于被RCU保護(hù)的共享數(shù)據(jù)結(jié)構(gòu),讀者不需要獲得任何鎖就可以訪(fǎng)問(wèn)它,但寫(xiě)者在訪(fǎng)問(wèn)它時(shí)首先拷貝一個(gè)副本,然后對(duì)副本進(jìn)行修改,最后使用一個(gè)回調(diào)(callback)機(jī)制在適當(dāng)?shù)臅r(shí)機(jī)把指向原來(lái)數(shù)據(jù)的指針重新指向新的被修改的數(shù)據(jù)。那么這個(gè)“適當(dāng)?shù)臅r(shí)機(jī)”是怎么確定的呢?這是由內(nèi)核確定的,也是我們后面討論的重點(diǎn)。
RCU原理
RCU實(shí)際上是一種改進(jìn)的rwlock,讀者幾乎沒(méi)有什么同步開(kāi)銷(xiāo),它不需要鎖,不使用原子指令,而且在除alpha的所有架構(gòu)上也不需要內(nèi)存柵(Memory Barrier),因此不會(huì)導(dǎo)致鎖競(jìng)爭(zhēng),內(nèi)存延遲以及流水線(xiàn)停滯。不需要鎖也使得使用更容易,因?yàn)樗梨i問(wèn)題就不需要考慮了。寫(xiě)者的同步開(kāi)銷(xiāo)比較大,它需要延遲數(shù)據(jù)結(jié)構(gòu)的釋放,復(fù)制被修改的數(shù)據(jù)結(jié)構(gòu),它也必須使用某種鎖機(jī)制同步并行的其它寫(xiě)者的修改操作。
讀者必須提供一個(gè)信號(hào)給寫(xiě)者以便寫(xiě)者能夠確定數(shù)據(jù)可以被安全地釋放或修改的時(shí)機(jī)。有一個(gè)專(zhuān)門(mén)的垃圾收集器來(lái)探測(cè)讀者的信號(hào),一旦所有的讀者都已經(jīng)發(fā)送信號(hào)告知它們都不在使用被RCU保護(hù)的數(shù)據(jù)結(jié)構(gòu),垃圾收集器就調(diào)用回調(diào)函數(shù)完成最后的數(shù)據(jù)釋放或修改操作。
RCU與rwlock的不同之處是:它既允許多個(gè)讀者同時(shí)訪(fǎng)問(wèn)被保護(hù)的數(shù)據(jù),又允許多個(gè)讀者和多個(gè)寫(xiě)者同時(shí)訪(fǎng)問(wèn)被保護(hù)的數(shù)據(jù)(注意:是否可以有多個(gè)寫(xiě)者并行訪(fǎng)問(wèn)取決于寫(xiě)者之間使用的同步機(jī)制),讀者沒(méi)有任何同步開(kāi)銷(xiāo),而寫(xiě)者的同步開(kāi)銷(xiāo)則取決于使用的寫(xiě)者間同步機(jī)制。但RCU不能替代rwlock,因?yàn)槿绻麑?xiě)比較多時(shí),對(duì)讀者的性能提高不能彌補(bǔ)寫(xiě)者導(dǎo)致的損失。
讀者在訪(fǎng)問(wèn)被RCU保護(hù)的共享數(shù)據(jù)期間不能被阻塞,這是RCU機(jī)制得以實(shí)現(xiàn)的一個(gè)基本前提,也就說(shuō)當(dāng)讀者在引用被RCU保護(hù)的共享數(shù)據(jù)期間,讀者所在的CPU不能發(fā)生上下文切換,spinlock和rwlock都需要這樣的前提。寫(xiě)者在訪(fǎng)問(wèn)被RCU保護(hù)的共享數(shù)據(jù)時(shí)不需要和讀者競(jìng)爭(zhēng)任何鎖,只有在有多于一個(gè)寫(xiě)者的情況下需要獲得某種鎖以與其他寫(xiě)者同步。
寫(xiě)者修改數(shù)據(jù)前首先拷貝一個(gè)被修改元素的副本,然后在副本上進(jìn)行修改,修改完畢后它向垃圾回收器注冊(cè)一個(gè)回調(diào)函數(shù)以便在適當(dāng)?shù)臅r(shí)機(jī)執(zhí)行真正的修改操作。等待適當(dāng)時(shí)機(jī)的這一時(shí)期稱(chēng)為grace period,而CPU發(fā)生了上下文切換稱(chēng)為經(jīng)歷一個(gè)quiescent state,grace period就是所有CPU都經(jīng)歷一次quiescent state所需要的等待的時(shí)間。垃圾收集器就是在grace period之后調(diào)用寫(xiě)者注冊(cè)的回調(diào)函數(shù)來(lái)完成真正的數(shù)據(jù)修改或數(shù)據(jù)釋放操作的。
要想使用好RCU,就要知道RCU的實(shí)現(xiàn)原理。我們拿linux 2.6.21 kernel的實(shí)現(xiàn)開(kāi)始分析,為什么選擇這個(gè)版本的實(shí)現(xiàn)呢?因?yàn)檫@個(gè)版本的實(shí)現(xiàn)相對(duì)較為單純,也比較簡(jiǎn)單。當(dāng)然之后內(nèi)核做了不少改進(jìn),如搶占RCU、可睡眠RCU、分層RCU。但是基本思想都是類(lèi)似的。所以先從簡(jiǎn)單入手。
首先,上一節(jié)我們提到,寫(xiě)者在訪(fǎng)問(wèn)它時(shí)首先拷貝一個(gè)副本,然后對(duì)副本進(jìn)行修改,最后使用一個(gè)回調(diào)(callback)機(jī)制在適當(dāng)?shù)臅r(shí)機(jī)把指向原來(lái)數(shù)據(jù)的指針重新指向新的被修改的數(shù)據(jù)。而這個(gè)“適當(dāng)?shù)臅r(shí)機(jī)”就是所有CPU經(jīng)歷了一次進(jìn)程切換(也就是一個(gè)grace period)。為什么這么設(shè)計(jì)?因?yàn)镽CU讀者的實(shí)現(xiàn)就是關(guān)搶占執(zhí)行讀取,讀完了當(dāng)然就可以進(jìn)程切換了,也就等于是寫(xiě)者可以操作臨界區(qū)了。那么就自然可以想到,內(nèi)核會(huì)設(shè)計(jì)兩個(gè)元素,來(lái)分別表示寫(xiě)者被掛起的起始點(diǎn),以及每cpu變量,來(lái)表示該cpu是否經(jīng)過(guò)了一次進(jìn)程切換(quies state)。就是說(shuō),當(dāng)寫(xiě)者被掛起后,
1)重置每cpu變量,值為0。
2)當(dāng)某個(gè)cpu經(jīng)歷一次進(jìn)程切換后,就將自己的變量設(shè)為1。
3)當(dāng)所有的cpu變量都為1后,就可以喚醒寫(xiě)者了。
下面我們來(lái)分別看linux里是如何完成這三步的。
從一個(gè)例子開(kāi)始
我們從一個(gè)例子入手,這個(gè)例子來(lái)源于linux kernel文檔中的whatisRCU.txt。這個(gè)例子使用RCU的核心API來(lái)保護(hù)一個(gè)指向動(dòng)態(tài)分配內(nèi)存的全局指針。
struct foo {int a; char b; long c;};DEFINE_SPINLOCK(foo_mutex);struct foo *gbl_foo;void foo_update_a(int new_a){ struct foo *new_fp;struct foo *old_fp;new_fp = kmalloc(sizeof(*new_fp), GFP_KERNEL);spin_lock(&foo_mutex);old_fp = gbl_foo;*new_fp = *old_fp;new_fp->a = new_a;rcu_assign_pointer(gbl_foo, new_fp);spin_unlock(&foo_mutex);synchronize_rcu();kfree(old_fp);}int foo_get_a(void){ int retval;rcu_read_lock();retval = rcu_dereference(gbl_foo)->a;rcu_read_unlock();return retval; }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
struct foo {
int a;
char b;
long c;
};
DEFINE_SPINLOCK(foo_mutex);
struct foo *gbl_foo;
void foo_update_a(int new_a)
{
struct foo *new_fp;
struct foo *old_fp;
new_fp = kmalloc(sizeof(*new_fp), GFP_KERNEL);
spin_lock(&foo_mutex);
old_fp = gbl_foo;
*new_fp = *old_fp;
new_fp->a = new_a;
rcu_assign_pointer(gbl_foo, new_fp);
spin_unlock(&foo_mutex);
synchronize_rcu();
kfree(old_fp);
}
int foo_get_a(void)
{
int retval;
rcu_read_lock();
retval = rcu_dereference(gbl_foo)->a;
rcu_read_unlock();
return retval;
}
如上代碼所示,RCU被用來(lái)保護(hù)全局指針struct foo *gbl_foo。foo_get_a()用來(lái)從RCU保護(hù)的結(jié)構(gòu)中取得gbl_foo的值。而foo_update_a()用來(lái)更新被RCU保護(hù)的gbl_foo的值(更新其a成員)。
首先,我們思考一下,為什么要在foo_update_a()中使用自旋鎖foo_mutex呢?假設(shè)中間沒(méi)有使用自旋鎖.那foo_update_a()的代碼如下:
C++
void foo_update_a(int new_a) { struct foo *new_fp; struct foo *old_fp; new_fp = kmalloc(sizeof(*new_fp), GFP_KERNEL); old_fp = gbl_foo; 1:------------------------- *new_fp = *old_fp; new_fp->a = new_a; rcu_assign_pointer(gbl_foo, new_fp); synchronize_rcu(); kfree(old_fp); }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void foo_update_a(int new_a)
{
struct foo *new_fp;
struct foo *old_fp;
new_fp = kmalloc(sizeof(*new_fp), GFP_KERNEL);
old_fp = gbl_foo;
1:-------------------------
*new_fp = *old_fp;
new_fp->a = new_a;
rcu_assign_pointer(gbl_foo, new_fp);
synchronize_rcu();
kfree(old_fp);
}
假設(shè)A進(jìn)程在上圖—-標(biāo)識(shí)處被B進(jìn)程搶點(diǎn).B進(jìn)程也執(zhí)行了goo_ipdate_a().等B執(zhí)行完后,再切換回A進(jìn)程.此時(shí),A進(jìn)程所持的old_fd實(shí)際上已經(jīng)被B進(jìn)程給釋放掉了.此后A進(jìn)程對(duì)old_fd的操作都是非法的。所以在此我們得到一個(gè)重要結(jié)論:RCU允許多個(gè)讀者同時(shí)訪(fǎng)問(wèn)被保護(hù)的數(shù)據(jù),也允許多個(gè)讀者在有寫(xiě)者時(shí)訪(fǎng)問(wèn)被保護(hù)的數(shù)據(jù)(但是注意:是否可以有多個(gè)寫(xiě)者并行訪(fǎng)問(wèn)取決于寫(xiě)者之間使用的同步機(jī)制)。
說(shuō)明:本文中說(shuō)的進(jìn)程不是用戶(hù)態(tài)的進(jìn)程,而是內(nèi)核的調(diào)用路徑,也可能是內(nèi)核線(xiàn)程或軟中斷等。
RCU的核心API
另外,我們?cè)谏厦嬉部吹搅藥讉€(gè)有關(guān)RCU的核心API。它們?yōu)閯e是:
C++
rcu_read_lock() rcu_read_unlock() synchronize_rcu() rcu_assign_pointer() rcu_dereference()
1
2
3
4
5
rcu_read_lock()
rcu_read_unlock()
synchronize_rcu()
rcu_assign_pointer()
rcu_dereference()
其中,rcu_read_lock()和rcu_read_unlock()用來(lái)保持一個(gè)讀者的RCU臨界區(qū).在該臨界區(qū)內(nèi)不允許發(fā)生上下文切換。為什么不能發(fā)生切換呢?因?yàn)閮?nèi)核要根據(jù)“是否發(fā)生過(guò)切換”來(lái)判斷讀者是否已結(jié)束讀操作,我們后面再分析。
rcu_dereference():讀者調(diào)用它來(lái)獲得一個(gè)被RCU保護(hù)的指針。
rcu_assign_pointer():寫(xiě)者使用該函數(shù)來(lái)為被RCU保護(hù)的指針?lè)峙湟粋€(gè)新的值。
synchronize_rcu():這是RCU的核心所在,它掛起寫(xiě)者,等待讀者都退出后釋放老的數(shù)據(jù)。
RCU API實(shí)現(xiàn)分析
lrcu_read_lock()和rcu_read_unlock()
rcu_read_lock()和rcu_read_unlock()的實(shí)現(xiàn)如下:
C++
#define rcu_read_lock() __rcu_read_lock() #define rcu_read_unlock() __rcu_read_unlock() #define __rcu_read_lock() do { preempt_disable(); __acquire(RCU); rcu_read_acquire(); } while (0) #define __rcu_read_unlock() do { rcu_read_release(); __release(RCU); preempt_enable(); } while (0)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#define rcu_read_lock() __rcu_read_lock()
#define rcu_read_unlock() __rcu_read_unlock()
#define __rcu_read_lock()
do {
preempt_disable();
__acquire(RCU);
rcu_read_acquire();
} while (0)
#define __rcu_read_unlock()
do {
rcu_read_release();
__release(RCU);
preempt_enable();
} while (0)
其中__acquire(),rcu_read_acquire(),rcu_read_release(),__release()都是一些選擇編譯函數(shù),可以忽略不可看。因此可以得知:rcu_read_lock(),rcu_read_unlock()只是禁止和啟用搶占.因?yàn)樵谧x者臨界區(qū),不允許發(fā)生上下文切換。
lrcu_dereference()和rcu_assign_pointer()
rcu_dereference()和rcu_assign_pointer()的實(shí)現(xiàn)如下:
C++
rcu_dereference()和rcu_assign_pointer()的實(shí)現(xiàn)如下: #define rcu_dereference(p) ({ typeof(p) _________p1 = ACCESS_ONCE(p); smp_read_barrier_depends(); (_________p1); }) #define rcu_assign_pointer(p, v) ({ if (!__builtin_constant_p(v) || ((v) != NULL)) smp_wmb(); (p) = (v); })
1
2
3
4
5
6
7
8
9
10
11
12
rcu_dereference()和rcu_assign_pointer()的實(shí)現(xiàn)如下:
#define rcu_dereference(p) ({
typeof(p) _________p1 = ACCESS_ONCE(p);
smp_read_barrier_depends();
(_________p1);
})
#define rcu_assign_pointer(p, v)
({
if (!__builtin_constant_p(v) || ((v) != NULL))
smp_wmb();
(p) = (v);
})
它們的實(shí)現(xiàn)也很簡(jiǎn)單.因?yàn)樗鼈儽旧矶际窃硬僮?。只是為了cache一致性,插上了內(nèi)存屏障??梢宰屍渌淖x者/寫(xiě)者可以看到保護(hù)指針的最新值.
lsynchronize_rcu()
synchronize_rcu()在RCU中是一個(gè)最核心的函數(shù),它用來(lái)等待之前的讀者全部退出.我們后面的大部份分析也是圍繞著它而進(jìn)行.實(shí)現(xiàn)如下:
C++
void synchronize_rcu(void) { struct rcu_synchronize rcu; init_completion(&rcu.completion); /* Will wake me after RCU finished */ call_rcu(&rcu.head, wakeme_after_rcu); /* Wait for it */ wait_for_completion(&rcu.completion); }
1
2
3
4
5
6
7
8
9
10
void synchronize_rcu(void)
{
struct rcu_synchronize rcu;
init_completion(&rcu.completion);
/* Will wake me after RCU finished */
call_rcu(&rcu.head, wakeme_after_rcu);
/* Wait for it */
wait_for_completion(&rcu.completion);
}
我們可以看到,它初始化了一個(gè)本地變量,它的類(lèi)型為struct rcu_synchronize.調(diào)用call_rcu()之后.一直等待條件變量rcu.competion的滿(mǎn)足。
在這里看到了RCU的另一個(gè)核心API,它就是call_run()。它的定義如下:
C++
void call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *rcu)) { unsigned long flags; struct rcu_data *rdp; head->func = func; head->next = NULL; local_irq_save(flags); rdp = &__get_cpu_var(rcu_data); *rdp->nxttail = head; rdp->nxttail = &head->next; if (unlikely(++rdp->qlen > qhimark)) { rdp->blimit = INT_MAX; force_quiescent_state(rdp, &rcu_ctrlblk); } local_irq_restore(flags); }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void call_rcu(struct rcu_head *head,??void (*func)(struct rcu_head *rcu))
{
unsigned long flags;
struct rcu_data *rdp;
head->func = func;
head->next = NULL;
local_irq_save(flags);
rdp = &__get_cpu_var(rcu_data);
*rdp->nxttail = head;
rdp->nxttail = &head->next;
if (unlikely(++rdp->qlen > qhimark)) {
rdp->blimit = INT_MAX;
force_quiescent_state(rdp, &rcu_ctrlblk);
}
local_irq_restore(flags);
}
該函數(shù)也很簡(jiǎn)單,就是將參數(shù)傳入的回調(diào)函數(shù)fun賦值給一個(gè)struct rcu_head變量,再將這個(gè)struct rcu_head加在了per_cpu變量rcu_data的nxttail 鏈表上。
rcu_data定義如下,是個(gè)每cpu變量:
DEFINE_PER_CPU(struct rcu_data, rcu_data) = { 0L };
接著我們看下call_rcu注冊(cè)的函數(shù),我們也可以看到,在synchronize_rcu()中,傳入call_rcu的函數(shù)為wakeme_after_rcu(),其實(shí)現(xiàn)如下:
C++
static void wakeme_after_rcu(struct rcu_head *head) { struct rcu_synchronize *rcu; rcu = container_of(head, struct rcu_synchronize, head); complete(&rcu->completion); }
1
2
3
4
5
6
7
static void wakeme_after_rcu(struct rcu_head *head)
{
struct rcu_synchronize *rcu;
rcu = container_of(head, struct rcu_synchronize, head);
complete(&rcu->completion);
}
我們可以看到,該函數(shù)將條件變量置真,然后喚醒了在條件變量上等待的進(jìn)程。
由此,我們可以得知,每一個(gè)CPU都有一個(gè)rcu_data.每個(gè)調(diào)用call_rcu()/synchronize_rcu()進(jìn)程的進(jìn)程都會(huì)將一個(gè)rcu_head都會(huì)掛到rcu_data的nxttail鏈表上(這個(gè)rcu_head其實(shí)就相當(dāng)于這個(gè)進(jìn)程在RCU機(jī)制中的體現(xiàn)),然后掛起。當(dāng)讀者都完成讀操作后(經(jīng)過(guò)一個(gè)grace period后)就會(huì)觸發(fā)這個(gè)rcu_head上的回調(diào)函數(shù)來(lái)喚醒寫(xiě)者。整個(gè)過(guò)程如下圖所示:
看到這里,也就到了問(wèn)題的關(guān)鍵,內(nèi)核是如何判斷當(dāng)前讀者都已經(jīng)完成讀操作了呢(經(jīng)過(guò)了一個(gè)grace period)?又是由誰(shuí)來(lái)觸發(fā)這個(gè)回調(diào)函數(shù)wakeme_after_rcu呢?下一小節(jié)再來(lái)分析。
從RCU的初始化說(shuō)起
那究竟怎么去判斷當(dāng)前的讀者已經(jīng)操作完了呢?我們?cè)谥翱吹?不是讀者在調(diào)用rcu_read_lock()的時(shí)候要禁止搶占么?因此,我們只需要判斷所有的CPU都進(jìn)過(guò)了一次上下文切換,就說(shuō)明所有讀者已經(jīng)退出了。要徹底弄清楚這個(gè)問(wèn)題,我們得從RCU的初始化說(shuō)起。
RCU的初始化開(kāi)始于start_kernel()–>rcu_init()。而其主要是對(duì)每個(gè)cpu調(diào)用了rcu_online_cpu函數(shù)。
lrcu_online_cpu
C++
static void __cpuinit rcu_online_cpu(int cpu) { struct rcu_data *rdp = &per_cpu(rcu_data, cpu); struct rcu_data *bh_rdp = &per_cpu(rcu_bh_data, cpu); rcu_init_percpu_data(cpu, &rcu_ctrlblk, rdp); rcu_init_percpu_data(cpu, &rcu_bh_ctrlblk, bh_rdp); open_softirq(RCU_SOFTIRQ, rcu_process_callbacks, NULL); }
1
2
3
4
5
6
7
8
9
static void __cpuinit rcu_online_cpu(int cpu)
{
struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
struct rcu_data *bh_rdp = &per_cpu(rcu_bh_data, cpu);
rcu_init_percpu_data(cpu, &rcu_ctrlblk, rdp);
rcu_init_percpu_data(cpu, &rcu_bh_ctrlblk, bh_rdp);
open_softirq(RCU_SOFTIRQ, rcu_process_callbacks, NULL);
}
這個(gè)函數(shù)主要完成兩個(gè)操作:初始化兩個(gè)per cpu變量;注冊(cè)RCU_SOFTIRQ軟中斷處理函數(shù)rcu_process_callbacks。我們從這里又看到了另一個(gè)percpu變量:rcu_bh_data.有關(guān)bh的部份之后再來(lái)分析.在這里略過(guò)這些部分。下面看下rcu_init_percpu_data()的實(shí)現(xiàn)。
lrcu_init_percpu_data
C++
static void rcu_init_percpu_data(int cpu, struct rcu_ctrlblk *rcp, struct rcu_data *rdp) { memset(rdp, 0, sizeof(*rdp)); rdp->curtail = &rdp->curlist; rdp->nxttail = &rdp->nxtlist; rdp->donetail = &rdp->donelist; rdp->quiescbatch = rcp->completed; rdp->qs_pending = 0; rdp->cpu = cpu; rdp->blimit = blimit; }
1
2
3
4
5
6
7
8
9
10
11
static void rcu_init_percpu_data(int cpu, struct rcu_ctrlblk *rcp, struct rcu_data *rdp)
{
memset(rdp, 0, sizeof(*rdp));
rdp->curtail = &rdp->curlist;
rdp->nxttail = &rdp->nxtlist;
rdp->donetail = &rdp->donelist;
rdp->quiescbatch = rcp->completed;
rdp->qs_pending = 0;
rdp->cpu = cpu;
rdp->blimit = blimit;
}
調(diào)用這個(gè)函數(shù)的第二個(gè)參數(shù)是一個(gè)全局變量rcu_ctlblk。在繼續(xù)向下分析之前我們先看下這些函數(shù)用到的一些重要數(shù)據(jù)結(jié)構(gòu)。
重要數(shù)據(jù)結(jié)構(gòu)
說(shuō)明:下列數(shù)據(jù)結(jié)構(gòu)只列出了主要成員。
C++
struct rcu_ctrlblk { long cur; long completed; cpumask_t cpumask; };
1
2
3
4
5
struct rcu_ctrlblk {??
long cur;??
long completed;??
cpumask_t??cpumask;??
};
structrcu_ctrlblk的主要作用就是定義上節(jié)提到的全局變量rcu_ctlblk,這個(gè)變量在系統(tǒng)中是唯一的。另外說(shuō)明一下,為了記錄方便,內(nèi)核將從啟動(dòng)開(kāi)始的每個(gè)grace period對(duì)應(yīng)一個(gè)數(shù)字表示。
這里的cpumask是為了標(biāo)識(shí)當(dāng)前系統(tǒng)中的所有cpu,以便標(biāo)記哪些cpu發(fā)生過(guò)上下文切換(經(jīng)歷過(guò)一個(gè)quiescent state)。而cur,completed,則用來(lái)同步。我們可以這樣理解,cur和completed是系統(tǒng)級(jí)別的記錄信息,也即系統(tǒng)實(shí)時(shí)經(jīng)歷的grace編號(hào),一般情況下,新開(kāi)一個(gè)graceperiod等待周期的話(huà),cur會(huì)加1,當(dāng)graceperiod結(jié)束后,會(huì)將completed置為cur,所以通常情況下,都是completed追著cur跑。
那么我們可能會(huì)猜測(cè),是不是如果complete= curr -1 的時(shí)候,就表示系統(tǒng)中g(shù)raceperiod還沒(méi)有結(jié)束?當(dāng)completed= curr的時(shí)候,就表示系統(tǒng)中不存在graceperiod等待周期了?我們姑且這么理解,實(shí)際上有些許差別,但設(shè)計(jì)思想都是一樣的。
C++
struct rcu_data { long quiescbatch; int passed_quiesc; long batch; struct rcu_head *nxtlist; struct rcu_head **nxttail; struct rcu_head *curlist; struct rcu_head **curtail; struct rcu_head *donelist; struct rcu_head **donetail; };
1
2
3
4
5
6
7
8
9
10
11
struct rcu_data {??
long quiescbatch;??????
int passed_quiesc;??
long????????????batch;??????????
struct rcu_head *nxtlist;??
struct rcu_head **nxttail;??
struct rcu_head *curlist;??
struct rcu_head **curtail;??
struct rcu_head *donelist;??
struct rcu_head **donetail;??
};
上面的結(jié)構(gòu),要達(dá)到的作用是,跟蹤單個(gè)CPU上的RCU事務(wù)。
(1)passed_quiesc:這是一個(gè)flag標(biāo)志,表示當(dāng)前cpu是否已經(jīng)發(fā)生過(guò)搶占了(經(jīng)歷過(guò)一個(gè)quiescent state),0表示為未發(fā)送過(guò)切換,1表示發(fā)生過(guò)切換;
說(shuō)明:我們一直強(qiáng)調(diào)發(fā)生過(guò)一次cpu搶占就是經(jīng)歷過(guò)一個(gè)quiescent state,其實(shí)這是不嚴(yán)格的說(shuō)法。為什么呢?因?yàn)樽韵到y(tǒng)啟動(dòng),各種進(jìn)程頻繁調(diào)度,肯定每個(gè)cpu都會(huì)發(fā)生過(guò)搶占。所以我們這里說(shuō)的“發(fā)生過(guò)搶占”是指從某個(gè)特殊的時(shí)間點(diǎn)開(kāi)始發(fā)生過(guò)搶占。那么這個(gè)特殊的時(shí)間點(diǎn)是什么時(shí)候呢?就是我們調(diào)用synchronize_rcu將rcu_head掛在每cpu變量上并掛起進(jìn)程時(shí),我們后面分析就會(huì)證實(shí)這一點(diǎn)。
(2) batch:表示一個(gè)graceperiodid,表示本次被阻塞的寫(xiě)者,需要在哪個(gè)graceperiod之后被激活;
(3) quiescbatch:表示一個(gè)graceperiodid,用來(lái)比較當(dāng)前cpu是否處于等待進(jìn)程切換完成。
下面我們介紹剩下的三組指針,但首先注意這三組鏈表上的元素都是structrcu_head類(lèi)型。這三個(gè)鏈表的作用還要結(jié)合整個(gè)RCU寫(xiě)者從阻塞到喚醒的過(guò)程來(lái)看。
rcu寫(xiě)者的整體流程,假設(shè)系統(tǒng)中出現(xiàn)rcu寫(xiě)者阻塞,那么流程如下:
(1) 調(diào)用synchronize_rcu,將代表寫(xiě)者的rcu_head添加到CPU[n]每cpu變量rcu_data的nxtlist,這個(gè)鏈表代表有需要提交給rcu處理的回調(diào)(但還沒(méi)有提交);
(2) CPU[n]每次時(shí)鐘中斷時(shí)檢測(cè)自己的nxtlist是否為null,若不為空,因此則喚醒rcu軟中斷;
(3) RCU的軟中斷處理函數(shù)rcu_process_callbacks會(huì)挨個(gè)檢查本CPU的三個(gè)鏈表。
下面分析RCU的核心處理函數(shù),也就是軟中斷處理函數(shù)rcu_process_callbacks。
RCU軟中斷
在“RCU API實(shí)現(xiàn)分析”一節(jié)我們知道調(diào)用synchronize_rcu,將代表寫(xiě)者的rcu_head添加到了CPU[n]每cpu變量rcu_data的nxtlist。
而另一方面,在每次時(shí)鐘中斷中,都會(huì)調(diào)用update_process_times函數(shù)。
lupdate_process_times
C++
void update_process_times(int user_tick) { //...... if (rcu_pending(cpu)) rcu_check_callbacks(cpu, user_tick); //......}
1
2
3
4
5
6
7
void update_process_times(int user_tick)
{
//......
if (rcu_pending(cpu))
rcu_check_callbacks(cpu, user_tick);
//......
}
在每一次時(shí)鐘中斷,都會(huì)檢查rcu_pending(cpu)是否為真,如果是,就會(huì)為其調(diào)用rcu_check_callbacks()。
lrcu_pending
rcu_pending()的代碼如下:
C++
int rcu_pending(int cpu) { return __rcu_pending(&rcu_ctrlblk, &per_cpu(rcu_data, cpu)) || __rcu_pending(&rcu_bh_ctrlblk, &per_cpu(rcu_bh_data, cpu)); }
1
2
3
4
5
int rcu_pending(int cpu)
{
return __rcu_pending(&rcu_ctrlblk, &per_cpu(rcu_data, cpu)) ||
__rcu_pending(&rcu_bh_ctrlblk, &per_cpu(rcu_bh_data, cpu));
}
同上面一樣,忽略bh的部份,我們看__rcu_pending。
l__rcu_pending
C++
static int __rcu_pending(struct rcu_ctrlblk *rcp, struct rcu_data *rdp) { /* This cpu has pending rcu entries and the grace period * for them has completed. */ if (rdp->curlist && !rcu_batch_before(rcp->completed, rdp->batch)) return 1; /* This cpu has no pending entries, but there are new entries */ if (!rdp->curlist && rdp->nxtlist) return 1; /* This cpu has finished callbacks to invoke */ if (rdp->donelist) return 1; /* The rcu core waits for a quiescent state from the cpu */ if (rdp->quiescbatch != rcp->cur || rdp->qs_pending) return 1; /* nothing to do */ return 0; }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
static int __rcu_pending(struct rcu_ctrlblk *rcp, struct rcu_data *rdp)
{
/* This cpu has pending rcu entries and the grace period
* for them has completed.
*/
if (rdp->curlist && !rcu_batch_before(rcp->completed, rdp->batch))
return 1;
/* This cpu has no pending entries, but there are new entries */
if (!rdp->curlist && rdp->nxtlist)
return 1;
/* This cpu has finished callbacks to invoke */
if (rdp->donelist)
return 1;
/* The rcu core waits for a quiescent state from the cpu */
if (rdp->quiescbatch != rcp->cur || rdp->qs_pending)
return 1;
/* nothing to do */
return 0;
}
可以上面有四種情況會(huì)返回1,分別對(duì)應(yīng):
A.該CPU上有等待處理的回調(diào)函數(shù),且已經(jīng)經(jīng)過(guò)了一個(gè)batch(grace period).rdp->datch表示rdp在等待的batch序號(hào);
B.上一個(gè)等待已經(jīng)處理完了,又有了新注冊(cè)的回調(diào)函數(shù);
C.等待已經(jīng)完成,但尚末調(diào)用該次等待的回調(diào)函數(shù);
D.在等待quiescent state.
如果rcu_pending返回1,就會(huì)進(jìn)入到rcu_check_callbacks(),rcu_check_callbacks()中主要工作就是調(diào)用raise_softirq(RCU_SOFTIRQ),觸發(fā)RCU軟中斷。而RCU軟中斷的處理函數(shù)為rcu_process_callbacks,其中分別針對(duì)每cpu變量rcu_bh_data和rcu_data調(diào)用__rcu_process_callbacks。我們主要分析針對(duì)rcu_data的調(diào)用。
l__rcu_process_callbacks
1) 先看nxtlist里有沒(méi)有待處理的回調(diào)(rcu_head),如果有的話(huà),說(shuō)明有寫(xiě)者待處理,那么還要分兩種情況:
1.1)如果系統(tǒng)是第一次出現(xiàn)寫(xiě)者阻塞,也即之前的寫(xiě)者都已經(jīng)處理完畢,那么此時(shí)curlist鏈表一定為空(curlist專(zhuān)門(mén)存放已被rcu檢測(cè)到的寫(xiě)者請(qǐng)求),于是就把nxtlist里的所有成員都移動(dòng)到curlist指向,并把當(dāng)前CPU需要等待的graceperiodid:rdp->batch設(shè)置為當(dāng)前系統(tǒng)處理的graceperiod的下一個(gè)grace周期,即rcp->cur+ 1。由于這算是一個(gè)新的graceperiod,即start_rcu_batch,所以還接著需要增加系統(tǒng)的graceperiod計(jì)數(shù),即rcp->cur++,同時(shí),將全局的cpusmask設(shè)置為全f,代表新的graceperiod開(kāi)始,需要檢測(cè)所有的cpu是否都經(jīng)過(guò)了一次進(jìn)程切換。代碼如下:
C++
void __rcu_process_callbacks(struct rcu_ctrlblk *rcp, struct rcu_data *rdp) { if (rdp->nxtlist && !rdp->curlist) { move_local_cpu_nxtlist_to_curlist(); rdp->batch = rcp->cur + 1; if (!rcp->next_pending) { rcp->next_pending = 1; rcp->cur++; cpus_andnot(rcp->cpumask, cpu_online_map, nohz_cpu_mask); } } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void __rcu_process_callbacks(struct rcu_ctrlblk *rcp,??
struct rcu_data *rdp)??
{??
if (rdp->nxtlist && !rdp->curlist) {??
move_local_cpu_nxtlist_to_curlist();??
rdp->batch = rcp->cur + 1;??
if (!rcp->next_pending) {??
rcp->next_pending = 1;??
rcp->cur++;??
cpus_andnot(rcp->cpumask, cpu_online_map, nohz_cpu_mask);??
}??
}??
}
接著跳轉(zhuǎn)至1.2。
1.2) 如果系統(tǒng)之前已經(jīng)有寫(xiě)者在被rcu監(jiān)控著,但還沒(méi)來(lái)得及經(jīng)過(guò)一個(gè)graceperiod,這個(gè)時(shí)候curlist不為空,nxtlist也不為空,寫(xiě)者會(huì)被加入nxtlist中。由于curlist不為空,說(shuō)明上個(gè)rcu周期的寫(xiě)者還沒(méi)有處理完,于是不會(huì)將本次阻塞的寫(xiě)者加入curlist,一直到上次的curlist里的rcu_head被處理完(都移動(dòng)到了donelist),才會(huì)將后來(lái)的寫(xiě)者納入RCU考慮(移動(dòng)到curlist)(假如這個(gè)期間又來(lái)了多個(gè)寫(xiě)者,則多個(gè)寫(xiě)者的rcu_head共享下一個(gè)graceperiod,也就是下一個(gè)graceperiod結(jié)束后這多個(gè)寫(xiě)者都會(huì)被喚醒)。進(jìn)入下一步。
2) rcu_process_callbacks調(diào)用每CPU函數(shù)rcu_check_quiescent_state開(kāi)始監(jiān)控,檢測(cè)所有的CPU是否會(huì)經(jīng)歷一個(gè)進(jìn)程切換。這個(gè)函數(shù)是如何得知需要開(kāi)始監(jiān)控的? 答案在于quiescbatch與全局的rcp->cur比較。 初始化時(shí)rdp->quiescbatch =rcp->completed = rcp->cur。 由于1.1有新graceperiod開(kāi)啟,所以rcp->cur已經(jīng)加1了,于是rdp->quiescbatch和rcp->curr不等,進(jìn)而將此cpu的rdp->passed_quiesc設(shè)為0,表示這個(gè)周期開(kāi)始,我要等待這個(gè)cpu經(jīng)歷一個(gè)進(jìn)程切換,等待該CPU將passed_quiesc置為1。即與前面講到的passed_quiesc標(biāo)志置0的時(shí)機(jī)吻合。最后將rdp->quiescbatch置為 rcp->cur,以避免下次再進(jìn)入軟中斷里將passed_quiesc重復(fù)置0。
C++
void rcu_check_quiescent_state(struct rcu_ctrlblk *rcp, struct rcu_data *rdp) { if (rdp->quiescbatch != rcp->cur) { /* start new grace period: */ rdp->qs_pending = 1; rdp->passed_quiesc = 0; rdp->quiescbatch = rcp->cur; return; } }
1
2
3
4
5
6
7
8
9
10
11
void rcu_check_quiescent_state(struct rcu_ctrlblk *rcp,??
struct rcu_data *rdp)??
{??
if (rdp->quiescbatch != rcp->cur) {??
/* start new grace period: */??
rdp->qs_pending = 1;??
rdp->passed_quiesc = 0;??
rdp->quiescbatch = rcp->cur;??
return;??
}??
}
3) 本次軟中斷結(jié)束,下次軟中斷到來(lái),再次進(jìn)入rcu_check_quiescent_state進(jìn)行檢測(cè),如果本CPU的rdp->passed_quiesc已經(jīng)置1,則需要cpu_quiet將本CPU標(biāo)志位從全局的rcp->cpumask中清除,如果cpumask為0了,則說(shuō)明自上次RCU寫(xiě)者被掛起以來(lái),所有CPU都已經(jīng)歷了一次進(jìn)程切換,于是本次rcu等待周期結(jié)束,將rcp->completed置為rcp->cur,重置cpumask為全f,并嘗試重新開(kāi)啟一個(gè)新的grace period。我們可以看到RCU用了如此多的同步標(biāo)志,卻少用spinlock鎖,是多么巧妙的設(shè)計(jì),不過(guò)這也提高了理解的難度。
C++
void rcu_check_quiescent_state(struct rcu_ctrlblk *rcp, struct rcu_data *rdp) { if (rdp->quiescbatch != rcp->cur) { /* start new grace period: */ rdp->qs_pending = 1; rdp->passed_quiesc = 0; rdp->quiescbatch = rcp->cur; return; } if (!rdp->passed_quiesc) return; /*this cpu has passed a quies state*/ if (likely(rdp->quiescbatch == rcp->cur)) { cpu_clear(cpu, rcp->cpumask); if (cpus_empty(rcp->cpumask)) rcp->completed = rcp->cur; } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void rcu_check_quiescent_state(struct rcu_ctrlblk *rcp,??
struct rcu_data *rdp)??
{??
if (rdp->quiescbatch != rcp->cur) {??
/* start new grace period: */??
rdp->qs_pending = 1;??
rdp->passed_quiesc = 0;??
rdp->quiescbatch = rcp->cur;??
return;??
}??
if (!rdp->passed_quiesc)??
return;??
/*this cpu has passed a quies state*/??
if (likely(rdp->quiescbatch == rcp->cur)) {??
cpu_clear(cpu, rcp->cpumask);??
if (cpus_empty(rcp->cpumask))??
rcp->completed = rcp->cur;??
}??
}
4)下次再進(jìn)入rcu軟中斷__rcu_process_callbacks,發(fā)現(xiàn)rdp->batch已經(jīng)比rcp->completed小了(因?yàn)樯弦徊襟E中,后者增大了),則將rdp->curlist上的回調(diào)移動(dòng)到rdp->donelist里,接著還會(huì)再次進(jìn)入rcu_check_quiescent_state,但是由于當(dāng)前CPU的rdp->qs_pending已經(jīng)為1了,所以不再往下清除cpu掩碼。__rcu_process_callbacks
代碼變成了:
C++
void __rcu_process_callbacks(struct rcu_ctrlblk *rcp, struct rcu_data *rdp) { if (rdp->curlist && !rcu_batch_before(rcp->completed, rdp->batch)) { *rdp->donetail = rdp->curlist; rdp->donetail = rdp->curtail; rdp->curlist = NULL; rdp->curtail = &rdp->curlist; } if (rdp->nxtlist && !rdp->curlist) { move_local_cpu_nxtlist_to_curlist(); rdp->batch = rcp->cur + 1; if (!rcp->next_pending) { rcp->next_pending = 1; rcp->cur++; cpus_andnot(rcp->cpumask, cpu_online_map, nohz_cpu_mask); } } if (rdp->donelist) rcu_do_batch(rdp); }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
void __rcu_process_callbacks(struct rcu_ctrlblk *rcp,??
struct rcu_data *rdp)??
{??
if (rdp->curlist && !rcu_batch_before(rcp->completed, rdp->batch)) {??
*rdp->donetail = rdp->curlist;??
rdp->donetail = rdp->curtail;??
rdp->curlist = NULL;??
rdp->curtail = &rdp->curlist;??
}??
if (rdp->nxtlist && !rdp->curlist) {??
move_local_cpu_nxtlist_to_curlist();??
rdp->batch = rcp->cur + 1;??
if (!rcp->next_pending) {??
rcp->next_pending = 1;??
rcp->cur++;??
cpus_andnot(rcp->cpumask, cpu_online_map, nohz_cpu_mask);??
}??
}??
if (rdp->donelist)??
rcu_do_batch(rdp);??
}
5)經(jīng)過(guò)千山萬(wàn)水終于來(lái)到rcu_do_batch(如果rdp->donelist有的話(huà))在此函數(shù)里,執(zhí)行RCU寫(xiě)者掛載的回調(diào),即wakeme_after_rcu。
lrcu_do_batch
C++
static void rcu_do_batch(struct rcu_data *rdp) { struct rcu_head *next, *list; int count = 0; list = rdp->donelist; while (list) { next = list->next; prefetch(next); list->func(list); list = next; if (++count >= rdp->blimit) break; } rdp->donelist = list; local_irq_disable(); rdp->qlen -= count; local_irq_enable(); if (rdp->blimit == INT_MAX && rdp->qlen) blimit = blimit; if (!rdp->donelist) rdp->donetail = &rdp->donelist; else raise_rcu_softirq(); }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
static void rcu_do_batch(struct rcu_data *rdp)
{
struct rcu_head *next, *list;
int count = 0;
list = rdp->donelist;
while (list) {
next = list->next;
prefetch(next);
list->func(list);
list = next;
if (++count >= rdp->blimit)
break;
}
rdp->donelist = list;
local_irq_disable();
rdp->qlen -= count;
local_irq_enable();
if (rdp->blimit == INT_MAX && rdp->qlen)
blimit = blimit;
if (!rdp->donelist)
rdp->donetail = &rdp->donelist;
else
raise_rcu_softirq();
}
它遍歷處理掛在鏈表上的回調(diào)函數(shù).在這里,注意每次調(diào)用的回調(diào)函數(shù)有最大值限制.這樣做主要是防止一次調(diào)用過(guò)多的回調(diào)函數(shù)而產(chǎn)生不必要系統(tǒng)負(fù)載.如果donelist中還有沒(méi)處理完的數(shù)據(jù),打開(kāi)RCU軟中斷,在下次軟中斷到來(lái)的時(shí)候接著處理.
注意:
僅當(dāng)系統(tǒng)檢測(cè)到一個(gè)grace period的所有CPU都經(jīng)歷了進(jìn)程切換后,才會(huì)給系統(tǒng)一個(gè)信息要求啟動(dòng)新batch,在此期間的所有寫(xiě)者請(qǐng)求,都暫存在本地CPU的nxtlist鏈表里。
進(jìn)程切換
在每一次進(jìn)程切換的時(shí)候,都會(huì)調(diào)用rcu_qsctr_inc().如下代碼片段如示:
C++
asmlinkage void __sched schedule(void) { //......rcu_qsctr_inc(cpu); //...... }
1
2
3
4
5
6
asmlinkage void __sched schedule(void)
{
//......
rcu_qsctr_inc(cpu);
//......
}
rcu_qsctr_inc()代碼如下:
C++
static inline void rcu_qsctr_inc(int cpu) { struct rcu_data *rdp = &per_cpu(rcu_data, cpu); rdp->passed_quiesc = 1; }
1
2
3
4
5
static inline void rcu_qsctr_inc(int cpu)
{
struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
rdp->passed_quiesc = 1;
}
該函數(shù)將對(duì)應(yīng)CPU上的rcu_data的passed_quiesc成員設(shè)為了1?;蛟S你已經(jīng)發(fā)現(xiàn)了,這個(gè)過(guò)程就標(biāo)識(shí)該CPU經(jīng)過(guò)了一次quiescent state,和之前在軟中斷的初始化為0相呼應(yīng)。
幾種RCU情況分析
1.如果CPU 1上有進(jìn)程調(diào)用rcu_read_lock進(jìn)入臨界區(qū),之后退出來(lái),發(fā)生了進(jìn)程切換,新進(jìn)程又通過(guò)rcu_read_lock進(jìn)入臨界區(qū).由于RCU軟中斷中只判斷一次上下文切換,因此,在調(diào)用回調(diào)函數(shù)的時(shí)候,仍然有進(jìn)程處于RCU的讀臨界區(qū),這樣會(huì)不會(huì)有問(wèn)題呢?
這樣是不會(huì)有問(wèn)題的.還是上面的例子:
C++
spin_lock(&foo_mutex); old_fp = gbl_foo; *new_fp = *old_fp; new_fp->a = new_a; rcu_assign_pointer(gbl_foo, new_fp); spin_unlock(&foo_mutex); synchronize_rcu(); kfree(old_fp);
1
2
3
4
5
6
7
8
spin_lock(&foo_mutex);
old_fp = gbl_foo;
*new_fp = *old_fp;
new_fp->a = new_a;
rcu_assign_pointer(gbl_foo, new_fp);
spin_unlock(&foo_mutex);
synchronize_rcu();
kfree(old_fp);
使用synchronize_rcu ()只是為了等待持有old_fd(也就是調(diào)用rcu_assign_pointer ()更新之前的gbl_foo)的進(jìn)程退出.而不需要等待所有的讀者全部退出.這是因?yàn)?在rcu_assign_pointer ()之后的讀取取得的保護(hù)指針,已經(jīng)是更新好的新值了.
2. 如果一個(gè)CPU連續(xù)調(diào)用synchronize_rcu()或者call_rcu()它們會(huì)有什么影響呢?
如果當(dāng)前有請(qǐng)求在等待,就會(huì)新請(qǐng)?zhí)峤坏幕卣{(diào)函數(shù)掛到taillist上,一直到前一個(gè)等待完成,再將taillist的數(shù)據(jù)移到curlist,并開(kāi)啟一個(gè)新的等待,因此,也就是說(shuō),在前一個(gè)等待期間提交的請(qǐng)求,都會(huì)放到一起處理.也就是說(shuō),他們會(huì)共同等待所有CPU切換完成.
舉例說(shuō)明如下:
帶bh的RCU API
在上面的代碼分析的時(shí)候,經(jīng)??吹綆в衎h的RCU代碼.現(xiàn)在來(lái)看一下這些帶bh的RCU是什么樣的。
C++
#define rcu_read_lock_bh() __rcu_read_lock_bh() #define rcu_read_unlock_bh() __rcu_read_unlock_bh() #define __rcu_read_lock_bh() do { local_bh_disable(); __acquire(RCU_BH); rcu_read_acquire(); }while (0) #define __rcu_read_unlock_bh() do { rcu_read_release(); __release(RCU_BH); local_bh_enable(); } while (0)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#define rcu_read_lock_bh() __rcu_read_lock_bh()
#define rcu_read_unlock_bh() __rcu_read_unlock_bh()
#define __rcu_read_lock_bh()
do {
local_bh_disable();
__acquire(RCU_BH);
rcu_read_acquire();
}while (0)
#define __rcu_read_unlock_bh()
do {
rcu_read_release();
__release(RCU_BH);
local_bh_enable();
} while (0)
根據(jù)上面的分析:bh RCU跟普通的RCU相比不同的是,普通RCU是禁止內(nèi)核搶占,而bh RCU是禁止下半部.
其實(shí),帶bh的RCU一般在軟中斷使用,不過(guò)計(jì)算quiescent state并不是發(fā)生一次上下文切換。而是發(fā)生一次softirq.我們?cè)诤竺娴姆治鲋锌傻玫接∽C.
lcall_rcu_bh()
C++
void call_rcu_bh(struct rcu_head *head, void (*func)(struct rcu_head *rcu)) { unsigned long flags; struct rcu_data *rdp; head->func = func; head->next = NULL; local_irq_save(flags); rdp = &__get_cpu_var(rcu_bh_data); *rdp->nxttail = head; rdp->nxttail = &head->next; if (unlikely(++rdp->qlen > qhimark)) { rdp->blimit = INT_MAX; force_quiescent_state(rdp, &rcu_bh_ctrlblk); } local_irq_restore(flags); }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void call_rcu_bh(struct rcu_head *head, void (*func)(struct rcu_head *rcu))
{
unsigned long flags;
struct rcu_data *rdp;
head->func = func;
head->next = NULL;
local_irq_save(flags);
rdp = &__get_cpu_var(rcu_bh_data);
*rdp->nxttail = head;
rdp->nxttail = &head->next;
if (unlikely(++rdp->qlen > qhimark)) {
rdp->blimit = INT_MAX;
force_quiescent_state(rdp, &rcu_bh_ctrlblk);
}
local_irq_restore(flags);
}
它跟call_rcu()不相同的是,rcu是取per_cpu變量rcu__data和全局變量rcu_ctrlblk.而bh RCU是取rcu_bh_data,rcu_bh_ctrlblk.他們的類(lèi)型都是一樣的,這樣做只是為了區(qū)分BH和普通RCU的等待.
對(duì)于rcu_bh_qsctr_inc
C++
static inline void rcu_bh_qsctr_inc(int cpu) { struct rcu_data *rdp = &per_cpu(rcu_bh_data, cpu); rdp->passed_quiesc = 1; }
1
2
3
4
5
static inline void rcu_bh_qsctr_inc(int cpu)
{
struct rcu_data *rdp = &per_cpu(rcu_bh_data, cpu);
rdp->passed_quiesc = 1;
}
它跟rcu_qsctr_inc()機(jī)同,也是更改對(duì)應(yīng)成員.所不同的是,調(diào)用rcu_bh_qsctr_inc()的地方發(fā)生了變化.
C++
asmlinkage void __do_softirq(void) { //...... do { if (pending & 1) { h->action(h); rcu_bh_qsctr_inc(cpu); } h++; pending >>= 1; } while (pending); //...... }
1
2
3
4
5
6
7
8
9
10
11
12
13
asmlinkage void __do_softirq(void)
{
//......
do {
if (pending & 1) {
h->action(h);
rcu_bh_qsctr_inc(cpu);
}
h++;
pending >>= 1;
} while (pending);
//......
}
也就是說(shuō),在發(fā)生軟中斷的時(shí)候,才會(huì)認(rèn)為是經(jīng)過(guò)了一次quiescent state.
RCU鏈表操作
為了操作鏈表,在include/linux/rculist.h有一套專(zhuān)門(mén)的RCU API。如:list_entry_rcu、list_add_rcu、list_del_rcu、list_for_each_entry_rcu等。即對(duì)所有kernel 的list的操作都有一個(gè)對(duì)應(yīng)的RCU操作。那么這些操作和原始的list操作有哪些不同呢?我們先對(duì)比幾個(gè)看下。
llist_entry_rcu
C++
#define list_entry_rcu(ptr, type, member) \container_of(rcu_dereference(ptr), type, member)#define list_entry(ptr, type, member) \container_of(ptr, type, member)
1
2
3
4
#define list_entry_rcu(ptr, type, member) \
container_of(rcu_dereference(ptr), type, member)
#define list_entry(ptr, type, member) \
container_of(ptr, type, member)
l__list_for_each_rcu
C++
#define __list_for_each_rcu(pos, head) \for (pos = rcu_dereference((head)->next); \pos != (head); \pos = rcu_dereference(pos->next))#define __list_for_each(pos, head) \for (pos = (head)->next; pos != (head); pos = pos->next)
1
2
3
4
5
6
#define __list_for_each_rcu(pos, head) \
for (pos = rcu_dereference((head)->next); \
pos != (head); \
pos = rcu_dereference(pos->next))
#define __list_for_each(pos, head) \
for (pos = (head)->next; pos != (head); pos = pos->next)
從__list_for_each_rcu和list_entry_rcu的實(shí)現(xiàn)可以看出,其將指針的獲取替換為使用rcu_dereference。
llist_replace_rcu
C++
static inline void list_replace_rcu(struct list_head *old,struct list_head *new){new->next = old->next;new->prev = old->prev;rcu_assign_pointer(new->prev->next, new);new->next->prev = new;old->prev = LIST_POISON2;}static inline void list_replace(struct list_head *old,struct list_head *new){new->next = old->next;new->next->prev = new;new->prev = old->prev;new->prev->next = new;}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
static inline void list_replace_rcu(struct list_head *old,
struct list_head *new)
{
new->next = old->next;
new->prev = old->prev;
rcu_assign_pointer(new->prev->next, new);
new->next->prev = new;
old->prev = LIST_POISON2;
}
static inline void list_replace(struct list_head *old,
struct list_head *new)
{
new->next = old->next;
new->next->prev = new;
new->prev = old->prev;
new->prev->next = new;
}
從list_replace_rcu的實(shí)現(xiàn)可以看出,RCU API的實(shí)現(xiàn)將指針的賦值替換為rcu_assign_pointer。
llist_del_rcu
C++
static inline void list_del_rcu(struct list_head *entry){__list_del(entry->prev, entry->next);entry->prev = LIST_POISON2;}static inline void list_del(struct list_head *entry){__list_del(entry->prev, entry->next);entry->next = LIST_POISON1;entry->prev = LIST_POISON2;}
1
2
3
4
5
6
7
8
9
10
11
static inline void list_del_rcu(struct list_head *entry)
{
__list_del(entry->prev, entry->next);
entry->prev = LIST_POISON2;
}
static inline void list_del(struct list_head *entry)
{
__list_del(entry->prev, entry->next);
entry->next = LIST_POISON1;
entry->prev = LIST_POISON2;
}
從list_del_rcu的實(shí)現(xiàn),可以看出RCU API的實(shí)現(xiàn)沒(méi)有將刪除項(xiàng)的next指針置為無(wú)效。這樣實(shí)現(xiàn)是為了防止刪除節(jié)點(diǎn)時(shí),讀者還在遍歷該節(jié)點(diǎn)。
RCU鏈表API使用
下面看下RCU list API的幾個(gè)應(yīng)用示例。
只有增加和刪除的鏈表操作
在這種應(yīng)用情況下,絕大部分是對(duì)鏈表的遍歷,即讀操作,而很少出現(xiàn)的寫(xiě)操作只有增加或刪除鏈表項(xiàng),并沒(méi)有對(duì)鏈表項(xiàng)的修改操作,這種情況使用RCU非常容易,從rwlock轉(zhuǎn)換成RCU非常自然。路由表的維護(hù)就是這種情況的典型應(yīng)用,對(duì)路由表的操作,絕大部分是路由表查詢(xún),而對(duì)路由表的寫(xiě)操作也僅僅是增加或刪除,因此使用RCU替換原來(lái)的rwlock順理成章。系統(tǒng)調(diào)用審計(jì)也是這樣的情況。
這是一段使用rwlock的系統(tǒng)調(diào)用審計(jì)部分的讀端代碼:
C++
static enum audit_state audit_filter_task(struct task_struct *tsk) { struct audit_entry *e; enum audit_state state; read_lock(&auditsc_lock); /* Note: audit_netlink_sem held by caller. */ list_for_each_entry(e, &audit_tsklist, list) { if (audit_filter_rules(tsk, &e->rule, NULL, &state)) { read_unlock(&auditsc_lock); return state; } } read_unlock(&auditsc_lock); return AUDIT_BUILD_CONTEXT; }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
static enum audit_state audit_filter_task(struct task_struct *tsk)
{
struct audit_entry *e;
enum audit_state?? state;
read_lock(&auditsc_lock);
/* Note: audit_netlink_sem held by caller. */
list_for_each_entry(e, &audit_tsklist, list) {
if (audit_filter_rules(tsk, &e->rule, NULL, &state)) {
read_unlock(&auditsc_lock);
return state;
}
}
read_unlock(&auditsc_lock);
return AUDIT_BUILD_CONTEXT;
}
使用RCU后將變成:
C++
static enum audit_state audit_filter_task(struct task_struct *tsk) { struct audit_entry *e; enum audit_state state; rcu_read_lock(); /* Note: audit_netlink_sem held by caller. */ list_for_each_entry_rcu(e, &audit_tsklist, list) { if (audit_filter_rules(tsk, &e->rule, NULL, &state)) { rcu_read_unlock(); return state; } } rcu_read_unlock(); return AUDIT_BUILD_CONTEXT; }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
static enum audit_state audit_filter_task(struct task_struct *tsk)
{
struct audit_entry *e;
enum audit_state?? state;
rcu_read_lock();
/* Note: audit_netlink_sem held by caller. */
list_for_each_entry_rcu(e, &audit_tsklist, list) {
if (audit_filter_rules(tsk, &e->rule, NULL, &state)) {
rcu_read_unlock();
return state;
}
}
rcu_read_unlock();
return AUDIT_BUILD_CONTEXT;
}
這種轉(zhuǎn)換非常直接,使用rcu_read_lock和rcu_read_unlock分別替換read_lock和read_unlock,鏈表遍歷函數(shù)使用_rcu版本替換就可以了。
使用rwlock的寫(xiě)端代碼:
C++
static inline int audit_del_rule(struct audit_rule *rule, struct list_head *list) { struct audit_entry *e; write_lock(&auditsc_lock); list_for_each_entry(e, list, list) { if (!audit_compare_rule(rule, &e->rule)) { list_del(&e->list); write_unlock(&auditsc_lock); return 0; } } write_unlock(&auditsc_lock); return -EFAULT; /* No matching rule */ } static inline int audit_add_rule(struct audit_entry *entry, struct list_head *list) { write_lock(&auditsc_lock); if (entry->rule.flags & AUDIT_PREPEND) { entry->rule.flags &= ~AUDIT_PREPEND; list_add(&entry->list, list); } else { list_add_tail(&entry->list, list); } write_unlock(&auditsc_lock); return 0; }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
static inline int audit_del_rule(struct audit_rule *rule,
struct list_head *list)
{
struct audit_entry??*e;
write_lock(&auditsc_lock);
list_for_each_entry(e, list, list) {
if (!audit_compare_rule(rule, &e->rule)) {
list_del(&e->list);
write_unlock(&auditsc_lock);
return 0;
}
}
write_unlock(&auditsc_lock);
return -EFAULT;???????? /* No matching rule */
}
static inline int audit_add_rule(struct audit_entry *entry,
struct list_head *list)
{
write_lock(&auditsc_lock);
if (entry->rule.flags & AUDIT_PREPEND) {
entry->rule.flags &= ~AUDIT_PREPEND;
list_add(&entry->list, list);
} else {
list_add_tail(&entry->list, list);
}
write_unlock(&auditsc_lock);
return 0;
}
使用RCU后寫(xiě)端代碼變成為:
C++
static inline int audit_del_rule(struct audit_rule *rule, struct list_head *list) { struct audit_entry *e; /* Do not use the _rcu iterator here, since this is the only * deletion routine. */ list_for_each_entry(e, list, list) { if (!audit_compare_rule(rule, &e->rule)) { list_del_rcu(&e->list); call_rcu(&e->rcu, audit_free_rule, e); return 0; } } return -EFAULT; /* No matching rule */ } static inline int audit_add_rule(struct audit_entry *entry, struct list_head *list) { if (entry->rule.flags & AUDIT_PREPEND) { entry->rule.flags &= ~AUDIT_PREPEND; list_add_rcu(&entry->list, list); } else { list_add_tail_rcu(&entry->list, list); } return 0; }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
static inline int audit_del_rule(struct audit_rule *rule,
struct list_head *list)
{
struct audit_entry??*e;
/* Do not use the _rcu iterator here, since this is the only
* deletion routine. */
list_for_each_entry(e, list, list) {
if (!audit_compare_rule(rule, &e->rule)) {
list_del_rcu(&e->list);
call_rcu(&e->rcu, audit_free_rule, e);
return 0;
}
}
return -EFAULT;???????? /* No matching rule */
}
static inline int audit_add_rule(struct audit_entry *entry,
struct list_head *list)
{
if (entry->rule.flags & AUDIT_PREPEND) {
entry->rule.flags &= ~AUDIT_PREPEND;
list_add_rcu(&entry->list, list);
} else {
list_add_tail_rcu(&entry->list, list);
}
return 0;
}
對(duì)于鏈表刪除操作,list_del替換為list_del_rcu和call_rcu,這是因?yàn)楸粍h除的鏈表項(xiàng)可能還在被別的讀者引用,所以不能立即刪除,必須等到所有讀者經(jīng)歷一個(gè)quiescent state才可以刪除。另外,list_for_each_entry并沒(méi)有被替換為list_for_each_entry_rcu,這是因?yàn)?,只有一個(gè)寫(xiě)者在做鏈表刪除操作,因此沒(méi)有必要使用_rcu版本。
通常情況下,write_lock和write_unlock應(yīng)當(dāng)分別替換成spin_lock和spin_unlock,但是對(duì)于只是對(duì)鏈表進(jìn)行增加和刪除操作而且只有一個(gè)寫(xiě)者的寫(xiě)端,在使用了_rcu版本的鏈表操作API后,rwlock可以完全消除,不需要spinlock來(lái)同步讀者的訪(fǎng)問(wèn)。對(duì)于上面的例子,由于已經(jīng)有audit_netlink_sem被調(diào)用者保持,所以spinlock就沒(méi)有必要了。
這種情況允許修改結(jié)果延后一定時(shí)間才可見(jiàn),而且寫(xiě)者對(duì)鏈表僅僅做增加和刪除操作,所以轉(zhuǎn)換成使用RCU非常容易。
寫(xiě)端需要對(duì)鏈表?xiàng)l目進(jìn)行修改操作
如果寫(xiě)者需要對(duì)鏈表?xiàng)l目進(jìn)行修改,那么就需要首先拷貝要修改的條目,然后修改條目的拷貝,等修改完畢后,再使用條目拷貝取代要修改的條目,要修改條目將被在經(jīng)歷一個(gè)grace period后安全刪除。
對(duì)于系統(tǒng)調(diào)用審計(jì)代碼,并沒(méi)有這種情況。這里假設(shè)有修改的情況,那么使用rwlock的修改代碼應(yīng)當(dāng)如下:
C++
static inline int audit_upd_rule(struct audit_rule *rule, struct list_head *list, __u32 newaction, __u32 newfield_count) { struct audit_entry *e; struct audit_newentry *ne; write_lock(&auditsc_lock); /* Note: audit_netlink_sem held by caller. */ list_for_each_entry(e, list, list) { if (!audit_compare_rule(rule, &e->rule)) { e->rule.action = newaction; e->rule.file_count = newfield_count; write_unlock(&auditsc_lock); return 0; } } write_unlock(&auditsc_lock); return -EFAULT; /* No matching rule */ }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static inline int audit_upd_rule(struct audit_rule *rule,
struct list_head *list,
__u32 newaction,
__u32 newfield_count)
{
struct audit_entry??*e;
struct audit_newentry *ne;
write_lock(&auditsc_lock);
/* Note: audit_netlink_sem held by caller. */
list_for_each_entry(e, list, list) {
if (!audit_compare_rule(rule, &e->rule)) {
e->rule.action = newaction;
e->rule.file_count = newfield_count;
write_unlock(&auditsc_lock);
return 0;
}
}
write_unlock(&auditsc_lock);
return -EFAULT;???????? /* No matching rule */
}
如果使用RCU,修改代碼應(yīng)當(dāng)為;
C++
static inline int audit_upd_rule(struct audit_rule *rule, struct list_head *list, __u32 newaction, __u32 newfield_count) { struct audit_entry *e; struct audit_newentry *ne; list_for_each_entry(e, list, list) { if (!audit_compare_rule(rule, &e->rule)) { ne = kmalloc(sizeof(*entry), GFP_ATOMIC); if (ne == NULL) return -ENOMEM; audit_copy_rule(&ne->rule, &e->rule); ne->rule.action = newaction; ne->rule.file_count = newfield_count; list_replace_rcu(e, ne); call_rcu(&e->rcu, audit_free_rule, e); return 0; } } return -EFAULT; /* No matching rule */ }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
static inline int audit_upd_rule(struct audit_rule *rule,
struct list_head *list,
__u32 newaction,
__u32 newfield_count)
{
struct audit_entry??*e;
struct audit_newentry *ne;
list_for_each_entry(e, list, list) {
if (!audit_compare_rule(rule, &e->rule)) {
ne = kmalloc(sizeof(*entry), GFP_ATOMIC);
if (ne == NULL)
return -ENOMEM;
audit_copy_rule(&ne->rule, &e->rule);
ne->rule.action = newaction;
ne->rule.file_count = newfield_count;
list_replace_rcu(e, ne);
call_rcu(&e->rcu, audit_free_rule, e);
return 0;
}
}
return -EFAULT;???????? /* No matching rule */
}
修改操作立即可見(jiàn)
前面兩種情況,讀者能夠容忍修改可以在一段時(shí)間后看到,也就說(shuō)讀者在修改后某一時(shí)間段內(nèi),仍然看到的是原來(lái)的數(shù)據(jù)。在很多情況下,讀者不能容忍看到舊的數(shù)據(jù),這種情況下,需要使用一些新措施,如System V IPC,它在每一個(gè)鏈表?xiàng)l目中增加了一個(gè)deleted字段,標(biāo)記該字段是否刪除,如果刪除了,就設(shè)置為真,否則設(shè)置為假,當(dāng)代碼在遍歷鏈表時(shí),核對(duì)每一個(gè)條目的deleted字段,如果為真,就認(rèn)為它是不存在的。
還是以系統(tǒng)調(diào)用審計(jì)代碼為例,如果它不能容忍舊數(shù)據(jù),那么,讀端代碼應(yīng)該修改為:
C++
static enum audit_state audit_filter_task(struct task_struct *tsk) { struct audit_entry *e; enum audit_state state; rcu_read_lock(); list_for_each_entry_rcu(e, &audit_tsklist, list) { if (audit_filter_rules(tsk, &e->rule, NULL, &state)) { spin_lock(&e->lock); if (e->deleted) { spin_unlock(&e->lock); rcu_read_unlock(); return AUDIT_BUILD_CONTEXT; } rcu_read_unlock(); return state; } } rcu_read_unlock(); return AUDIT_BUILD_CONTEXT; }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static enum audit_state audit_filter_task(struct task_struct *tsk)
{
struct audit_entry *e;
enum audit_state?? state;
rcu_read_lock();
list_for_each_entry_rcu(e, &audit_tsklist, list) {
if (audit_filter_rules(tsk, &e->rule, NULL, &state)) {
spin_lock(&e->lock);
if (e->deleted) {
spin_unlock(&e->lock);
rcu_read_unlock();
return AUDIT_BUILD_CONTEXT;
}
rcu_read_unlock();
return state;
}
}
rcu_read_unlock();
return AUDIT_BUILD_CONTEXT;
}
注意,對(duì)于這種情況,每一個(gè)鏈表?xiàng)l目都需要一個(gè)spinlock保護(hù),因?yàn)閯h除操作將修改條目的deleted標(biāo)志。此外,該函數(shù)如果搜索到條目,返回時(shí)應(yīng)當(dāng)保持該條目的鎖。
寫(xiě)端的刪除操作將變成:
C++
static inline int audit_del_rule(struct audit_rule *rule, struct list_head *list) { struct audit_entry *e; /* Do not use the _rcu iterator here, since this is the only * deletion routine. */ list_for_each_entry(e, list, list) { if (!audit_compare_rule(rule, &e->rule)) { spin_lock(&e->lock); list_del_rcu(&e->list); e->deleted = 1; spin_unlock(&e->lock); call_rcu(&e->rcu, audit_free_rule, e); return 0; } } return -EFAULT; /* No matching rule */ }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static inline int audit_del_rule(struct audit_rule *rule,
struct list_head *list)
{
struct audit_entry??*e;
/* Do not use the _rcu iterator here, since this is the only
* deletion routine. */
list_for_each_entry(e, list, list) {
if (!audit_compare_rule(rule, &e->rule)) {
spin_lock(&e->lock);
list_del_rcu(&e->list);
e->deleted = 1;
spin_unlock(&e->lock);
call_rcu(&e->rcu, audit_free_rule, e);
return 0;
}
}
return -EFAULT;???????? /* No matching rule */
}
刪除條目時(shí),需要標(biāo)記該條目為已刪除。這樣讀者就可以通過(guò)該標(biāo)志立即得知條目是否已經(jīng)刪除。
小結(jié)
RCU是2.6內(nèi)核引入的新的鎖機(jī)制,在絕大部分為讀而只有極少部分為寫(xiě)的情況下,它是非常高效的,因此在路由表維護(hù)、系統(tǒng)調(diào)用審計(jì)、SELinux的AVC、dcache和IPC等代碼部分中,使用它來(lái)取代rwlock來(lái)獲得更高的性能。但是,它也有缺點(diǎn),延后的刪除或釋放將占用一些內(nèi)存,尤其是對(duì)嵌入式系統(tǒng),這可能是非常昂貴的內(nèi)存開(kāi)銷(xiāo)。此外,寫(xiě)者的開(kāi)銷(xiāo)比較大,尤其是對(duì)于那些無(wú)法容忍舊數(shù)據(jù)的情況以及不只一個(gè)寫(xiě)者的情況,寫(xiě)者需要spinlock或其他的鎖機(jī)制來(lái)與其他寫(xiě)者同步。
?
評(píng)論
查看更多