1. Tasklet機制分析
上面我們介紹了軟中斷機制,linux內(nèi)核為什么還要引入tasklet機制呢?主要原因是軟中斷的pending標志位也就32位,一般情況是不隨意增加軟中斷處理的。而且內(nèi)核也沒有提供通用的增加軟中斷的接口。其次內(nèi),軟中斷處理函數(shù)要求可重入,需要考慮到競爭條件比較多,要求比較高的編程技巧。所以內(nèi)核提供了tasklet這樣的一種通用的機制。
其實每次寫總結(jié)的文章,總是想把細節(jié)的東西說明白,所以越寫越多。這樣做的好處是能真正理解其中的機制。但是,內(nèi)容太多的一個壞處就是難道記憶,所以,在講清楚講詳細的同時,我還要把精髓總結(jié)出來。Tasklet的特點,也是tasklet的精髓就是:tasklet不能休眠,同一個tasklet不能在兩個CPU上同時運行,但是不同tasklet可能在不同CPU上同時運行,則需要注意共享數(shù)據(jù)的保護。
主要的數(shù)據(jù)結(jié)構(gòu)
static DEFINE_PER_CPU(struct tasklet_head, tasklet_vec);
static DEFINE_PER_CPU(struct tasklet_head, tasklet_hi_vec);
struct tasklet_struct{ struct tasklet_struct *next; unsigned long state; atomic_t count; void (*func)(unsigned long); unsigned long data;};
1
2
3
4
5
6
7
8
struct tasklet_struct
{
struct tasklet_struct *next;
unsigned long state;
atomic_t count;
void (*func)(unsigned long);
unsigned long data;
};
如何使用tasklet
使用tasklet比較簡單,只需要初始化一個tasklet_struct結(jié)構(gòu)體,然后調(diào)用tasklet_schedule,就能利用tasklet機制執(zhí)行初始化的func函數(shù)。
static inline void tasklet_schedule(struct tasklet_struct *t){ if (!test_and_set_bit(TASKLET_STATE_SCHED, &t->state)) __tasklet_schedule(t);}
1
2
3
4
5
static inline void tasklet_schedule(struct tasklet_struct *t)
{
if (!test_and_set_bit(TASKLET_STATE_SCHED, &t->state))
__tasklet_schedule(t);
}
tasklet_schedule處理過程也比較簡單,就是把tasklet_struct結(jié)構(gòu)體掛到tasklet_vec鏈表或者掛接到tasklet_hi_vec鏈表上,并調(diào)度軟中斷TASKLET_SOFTIRQ或者HI_SOFTIRQ
void __tasklet_schedule(struct tasklet_struct *t){ unsigned long flags;local_irq_save(flags); t->next = NULL; *__get_cpu_var(tasklet_vec).tail = t; __get_cpu_var(tasklet_vec).tail = &(t->next); raise_softirq_irqoff(TASKLET_SOFTIRQ); local_irq_restore(flags);}EXPORT_SYMBOL(__tasklet_schedule);void __tasklet_hi_schedule(struct tasklet_struct *t){ unsigned long flags; local_irq_save(flags); t->next = NULL; *__get_cpu_var(tasklet_hi_vec).tail = t; __get_cpu_var(tasklet_hi_vec).tail = &(t->next); raise_softirq_irqoff(HI_SOFTIRQ); local_irq_restore(flags);}EXPORT_SYMBOL(__tasklet_hi_schedule);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void __tasklet_schedule(struct tasklet_struct *t)
{
unsigned long flags;local_irq_save(flags);
t->next = NULL;
*__get_cpu_var(tasklet_vec).tail = t;
__get_cpu_var(tasklet_vec).tail = &(t->next);
raise_softirq_irqoff(TASKLET_SOFTIRQ);
local_irq_restore(flags);
}
EXPORT_SYMBOL(__tasklet_schedule);
void __tasklet_hi_schedule(struct tasklet_struct *t)
{
unsigned long flags;
local_irq_save(flags);
t->next = NULL;
*__get_cpu_var(tasklet_hi_vec).tail = t;
__get_cpu_var(tasklet_hi_vec).tail = &(t->next);
raise_softirq_irqoff(HI_SOFTIRQ);
local_irq_restore(flags);
}
EXPORT_SYMBOL(__tasklet_hi_schedule);
Tasklet執(zhí)行過程
Tasklet_action在軟中斷TASKLET_SOFTIRQ被調(diào)度到后會被執(zhí)行,它從tasklet_vec鏈表中把tasklet_struct結(jié)構(gòu)體都取下來,然后逐個執(zhí)行。如果t->count的值等于0,說明這個tasklet在調(diào)度之后,被disable掉了,所以會將tasklet結(jié)構(gòu)體重新放回到tasklet_vec鏈表,并重新調(diào)度TASKLET_SOFTIRQ軟中斷,在之后enable這個tasklet之后重新再執(zhí)行它。
static void tasklet_action(struct softirq_action *a){ struct tasklet_struct *list;local_irq_disable(); list = __get_cpu_var(tasklet_vec).head; __get_cpu_var(tasklet_vec).head = NULL; __get_cpu_var(tasklet_vec).tail = &__get_cpu_var(tasklet_vec).head; local_irq_enable(); while (list) { struct tasklet_struct *t = list; list = list->next; if (tasklet_trylock(t)) { if (!atomic_read(&t->count)) { if (!test_and_clear_bit(TASKLET_STATE_SCHED, &t->state)) BUG(); t->func(t->data); tasklet_unlock(t); continue; } tasklet_unlock(t); } local_irq_disable(); t->next = NULL; *__get_cpu_var(tasklet_vec).tail = t; __get_cpu_var(tasklet_vec).tail = &(t->next); __raise_softirq_irqoff(TASKLET_SOFTIRQ); local_irq_enable(); }}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
static void tasklet_action(struct softirq_action *a)
{
struct tasklet_struct *list;local_irq_disable();
list = __get_cpu_var(tasklet_vec).head;
__get_cpu_var(tasklet_vec).head = NULL;
__get_cpu_var(tasklet_vec).tail = &__get_cpu_var(tasklet_vec).head;
local_irq_enable();
while (list)
{
struct tasklet_struct *t = list;
list = list->next;
if (tasklet_trylock(t))
{
if (!atomic_read(&t->count))
{
if (!test_and_clear_bit(TASKLET_STATE_SCHED, &t->state))
BUG();
t->func(t->data);
tasklet_unlock(t);
continue;
}
tasklet_unlock(t);
}
local_irq_disable();
t->next = NULL;
*__get_cpu_var(tasklet_vec).tail = t;
__get_cpu_var(tasklet_vec).tail = &(t->next);
__raise_softirq_irqoff(TASKLET_SOFTIRQ);
local_irq_enable();
}
}
2. Linux工作隊列
前面已經(jīng)介紹了tasklet機制,有了tasklet機制為什么還要增加工作隊列機制呢?我的理解是由于tasklet機制的限制,變形tasklet中的回調(diào)函數(shù)有很多的限制,比如不能有休眠的操作等等。而是用工作隊列機制,需要處理的函數(shù)在進程上下文中調(diào)用,休眠操作都是允許的。但是工作隊列的實時性不如tasklet,采用工作隊列的例程可能不能在短時間內(nèi)被調(diào)用執(zhí)行。
數(shù)據(jù)結(jié)構(gòu)說明
首先需要說明的是workqueue_struct和cpu_workqueue_struct這兩個數(shù)據(jù)結(jié)構(gòu),創(chuàng)建一個工作隊列首先需要創(chuàng)建workqueue_struct,然后可以在每個CPU上創(chuàng)建一個cpu_workqueue_struct管理結(jié)構(gòu)體。
struct cpu_workqueue_struct{ spinlock_t lock; struct list_head worklist; wait_queue_head_t more_work; struct work_struct *current_work; struct workqueue_struct *wq; struct task_struct *thread; int run_depth; /* Detect run_workqueue() recursion depth */} ____cacheline_aligned;/* * The externally visible workqueue abstraction is an array of * per-CPU workqueues: */struct workqueue_struct{ struct cpu_workqueue_struct *cpu_wq; struct list_head list; const char *name; int singlethread; int freezeable; /* Freeze threads during suspend */ int rt;#ifdef CONFIG_LOCKDEP struct lockdep_map lockdep_map;#endif};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
struct cpu_workqueue_struct
{
spinlock_t lock;
struct list_head worklist;
wait_queue_head_t more_work;
struct work_struct *current_work;
struct workqueue_struct *wq;
struct task_struct *thread;
int run_depth;????????/* Detect run_workqueue() recursion depth */
} ____cacheline_aligned;
/*
* The externally visible workqueue abstraction is an array of
* per-CPU workqueues:
*/
struct workqueue_struct
{
struct cpu_workqueue_struct *cpu_wq;
struct list_head list;
const char *name;
int singlethread;
int freezeable;????????/* Freeze threads during suspend */
int rt;
#ifdef CONFIG_LOCKDEP
struct lockdep_map lockdep_map;
#endif
};
Work_struct表示將要提交的處理的工作。
struct work_struct{ atomic_long_t data;#define WORK_STRUCT_PENDING 0 /* T if work item pending execution */#define WORK_STRUCT_FLAG_MASK (3UL)#define WORK_STRUCT_WQ_DATA_MASK (~WORK_STRUCT_FLAG_MASK) struct list_head entry; work_func_t func;#ifdef CONFIG_LOCKDEP struct lockdep_map lockdep_map;#endif};
1
2
3
4
5
6
7
8
9
10
11
12
struct work_struct
{
atomic_long_t data;
#define WORK_STRUCT_PENDING 0????????/* T if work item pending execution */
#define WORK_STRUCT_FLAG_MASK (3UL)
#define WORK_STRUCT_WQ_DATA_MASK (~WORK_STRUCT_FLAG_MASK)
struct list_head entry;
work_func_t func;
#ifdef CONFIG_LOCKDEP
struct lockdep_map lockdep_map;
#endif
};
上面三個數(shù)據(jù)結(jié)構(gòu)的關(guān)系如下圖所示
介紹主要數(shù)據(jù)結(jié)構(gòu)的目的并不是想要把工作隊列具體的細節(jié)說明白,主要的目的是給大家一個總的架構(gòu)的輪廓。具體的分析在下面展開。從上面的該模塊主要數(shù)據(jù)結(jié)構(gòu)的關(guān)系來看,主要需要分析如下幾個問題:
1. Workqueque是怎樣創(chuàng)建的,包括event/0內(nèi)核進程的創(chuàng)建
2. Work_queue是如何提交到工作隊列的
3. Event/0內(nèi)核進程如何處理提交到隊列上的工作
Workqueque的創(chuàng)建
首先申請了workqueue_struct結(jié)構(gòu)體內(nèi)存,cpu_workqueue_struct結(jié)構(gòu)體的內(nèi)存。然后在init_cpu_workqueue函數(shù)中對cpu_workqueue_struct結(jié)構(gòu)體進行初始化。同時調(diào)用create_workqueue_thread函數(shù)創(chuàng)建處理工作隊列的內(nèi)核進程。
create_workqueue_thread中創(chuàng)建了如下的內(nèi)核進程
p = kthread_create(worker_thread, cwq, fmt, wq->name, cpu);
最后調(diào)用start_workqueue_thread啟動新創(chuàng)建的進程。
struct workqueue_struct *__create_workqueue_key(const char *name, int singlethread, int freezeable, int rt, struct lock_class_key *key, const char *lock_name){ struct workqueue_struct *wq; struct cpu_workqueue_struct *cwq; int err = 0, cpu;wq = kzalloc(sizeof(*wq), GFP_KERNEL); if (!wq) return NULL; wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct); if (!wq->cpu_wq) { kfree(wq); return NULL; } wq->name = name; lockdep_init_map(&wq->lockdep_map, lock_name, key, 0); wq->singlethread = singlethread; wq->freezeable = freezeable; wq->rt = rt; INIT_LIST_HEAD(&wq->list); if (singlethread) { cwq = init_cpu_workqueue(wq, singlethread_cpu); err = create_workqueue_thread(cwq, singlethread_cpu); start_workqueue_thread(cwq, -1); } else { cpu_maps_update_begin(); /* * We must place this wq on list even if the code below fails. * cpu_down(cpu) can remove cpu from cpu_populated_map before * destroy_workqueue() takes the lock, in that case we leak * cwq[cpu]->thread. */ spin_lock(&workqueue_lock); list_add(&wq->list, &workqueues); spin_unlock(&workqueue_lock); /* * We must initialize cwqs for each possible cpu even if we * are going to call destroy_workqueue() finally. Otherwise * cpu_up() can hit the uninitialized cwq once we drop the * lock. */ for_each_possible_cpu(cpu) { cwq = init_cpu_workqueue(wq, cpu); if (err || !cpu_online(cpu)) continue; err = create_workqueue_thread(cwq, cpu); start_workqueue_thread(cwq, cpu); } cpu_maps_update_done(); } if (err) { destroy_workqueue(wq); wq = NULL; } return wq;}EXPORT_SYMBOL_GPL(__create_workqueue_key);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
struct workqueue_struct *__create_workqueue_key(const char *name,
int singlethread,
int freezeable,
int rt,
struct lock_class_key *key,
const char *lock_name)
{
struct workqueue_struct *wq;
struct cpu_workqueue_struct *cwq;
int err = 0, cpu;wq = kzalloc(sizeof(*wq), GFP_KERNEL);
if (!wq)
return NULL;
wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);
if (!wq->cpu_wq)
{
kfree(wq);
return NULL;
}
wq->name = name;
lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
wq->singlethread = singlethread;
wq->freezeable = freezeable;
wq->rt = rt;
INIT_LIST_HEAD(&wq->list);
if (singlethread)
{
cwq = init_cpu_workqueue(wq, singlethread_cpu);
err = create_workqueue_thread(cwq, singlethread_cpu);
start_workqueue_thread(cwq, -1);
}
else
{
cpu_maps_update_begin();
/*
* We must place this wq on list even if the code below fails.
* cpu_down(cpu) can remove cpu from cpu_populated_map before
* destroy_workqueue() takes the lock, in that case we leak
* cwq[cpu]->thread.
*/
spin_lock(&workqueue_lock);
list_add(&wq->list, &workqueues);
spin_unlock(&workqueue_lock);
/*
* We must initialize cwqs for each possible cpu even if we
* are going to call destroy_workqueue() finally. Otherwise
* cpu_up() can hit the uninitialized cwq once we drop the
* lock.
*/
for_each_possible_cpu(cpu)
{
cwq = init_cpu_workqueue(wq, cpu);
if (err || !cpu_online(cpu))
continue;
err = create_workqueue_thread(cwq, cpu);
start_workqueue_thread(cwq, cpu);
}
cpu_maps_update_done();
}
if (err)
{
destroy_workqueue(wq);
wq = NULL;
}
return wq;
}
EXPORT_SYMBOL_GPL(__create_workqueue_key);
向工作隊列中添加工作
Shedule_work 函數(shù)向工作隊列中添加任務(wù)。這個接口比較簡單,無非是一些隊列操作,不再敘述。
/** * schedule_work - put work task in global workqueue * @work: job to be done * * This puts a job in the kernel-global workqueue. */int schedule_work(struct work_struct *work){ return queue_work(keventd_wq, work);}EXPORT_SYMBOL(schedule_work);
1
2
3
4
5
6
7
8
9
10
11
/**
* schedule_work - put work task in global workqueue
* @work: job to be done
*
* This puts a job in the kernel-global workqueue.
*/
int schedule_work(struct work_struct *work)
{
return queue_work(keventd_wq, work);
}
EXPORT_SYMBOL(schedule_work);
工作隊列內(nèi)核進程的處理過程
在創(chuàng)建工作隊列的時候,我們創(chuàng)建了一個或者多個進程來處理掛到隊列上的工作。這個內(nèi)核進程的主要函數(shù)體為worker_thread,這個函數(shù)比較有意思的地方就是,自己降低的優(yōu)先級,說明worker_thread調(diào)度的優(yōu)先級比較低。在系統(tǒng)負載大大時候,采用工作隊列執(zhí)行的操作可能存在較大的延遲。
就函數(shù)的執(zhí)行流程來說是真心的簡單,只是從隊列中取出work,從隊列中刪除掉,清除掉pending標記,并執(zhí)行work設(shè)置的回調(diào)函數(shù)。
static int worker_thread(void *__cwq){ struct cpu_workqueue_struct *cwq = __cwq; DEFINE_WAIT(wait);if (cwq->wq->freezeable) set_freezable(); set_user_nice(current, -5); for (;;) { prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE); if (!freezing(current) && !kthread_should_stop() && list_empty(&cwq->worklist)) schedule(); finish_wait(&cwq->more_work, &wait); try_to_freeze(); if (kthread_should_stop()) break; run_workqueue(cwq); } return 0;}static void run_workqueue(struct cpu_workqueue_struct *cwq){ spin_lock_irq(&cwq->lock); cwq->run_depth++; if (cwq->run_depth > 3) { /* morton gets to eat his hat */ printk("%s: recursion depth exceeded: %dn", __func__, cwq->run_depth); dump_stack(); } while (!list_empty(&cwq->worklist)) { struct work_struct *work = list_entry(cwq->worklist.next, struct work_struct, entry); work_func_t f = work->func;#ifdef CONFIG_LOCKDEP /* * It is permissible to free the struct work_struct * from inside the function that is called from it, * this we need to take into account for lockdep too. * To avoid bogus "held lock freed" warnings as well * as problems when looking into work->lockdep_map, * make a copy and use that here. */ struct lockdep_map lockdep_map = work->lockdep_map;#endifcwq->current_work = work; list_del_init(cwq->worklist.next); spin_unlock_irq(&cwq->lock); BUG_ON(get_wq_data(work) != cwq); work_clear_pending(work); lock_map_acquire(&cwq->wq->lockdep_map); lock_map_acquire(&lockdep_map); f(work); lock_map_release(&lockdep_map); lock_map_release(&cwq->wq->lockdep_map); if (unlikely(in_atomic() || lockdep_depth(current) > 0)) { printk(KERN_ERR "BUG: workqueue leaked lock or atomic: " "%s/0x%08x/%dn", current->comm, preempt_count(), task_pid_nr(current)); printk(KERN_ERR " last function: "); print_symbol("%sn", (unsigned long)f); debug_show_held_locks(current); dump_stack(); } spin_lock_irq(&cwq->lock); cwq->current_work = NULL; } cwq->run_depth--; spin_unlock_irq(&cwq->lock);}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
static int worker_thread(void *__cwq)
{
struct cpu_workqueue_struct *cwq = __cwq;
DEFINE_WAIT(wait);if (cwq->wq->freezeable)
set_freezable();
set_user_nice(current, -5);
for (;;)
{
prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE);
if (!freezing(current) &&
!kthread_should_stop() &&
list_empty(&cwq->worklist))
schedule();
finish_wait(&cwq->more_work, &wait);
try_to_freeze();
if (kthread_should_stop())
break;
run_workqueue(cwq);
}
return 0;
}
static void run_workqueue(struct cpu_workqueue_struct *cwq)
{
spin_lock_irq(&cwq->lock);
cwq->run_depth++;
if (cwq->run_depth > 3)
{
/* morton gets to eat his hat */
printk("%s: recursion depth exceeded: %dn",
__func__, cwq->run_depth);
dump_stack();
}
while (!list_empty(&cwq->worklist))
{
struct work_struct *work = list_entry(cwq->worklist.next,
struct work_struct, entry);
work_func_t f = work->func;
#ifdef CONFIG_LOCKDEP
/*
* It is permissible to free the struct work_struct
* from inside the function that is called from it,
* this we need to take into account for lockdep too.
* To avoid bogus "held lock freed" warnings as well
* as problems when looking into work->lockdep_map,
* make a copy and use that here.
*/
struct lockdep_map lockdep_map = work->lockdep_map;
#endifcwq->current_work = work;
list_del_init(cwq->worklist.next);
spin_unlock_irq(&cwq->lock);
BUG_ON(get_wq_data(work) != cwq);
work_clear_pending(work);
lock_map_acquire(&cwq->wq->lockdep_map);
lock_map_acquire(&lockdep_map);
f(work);
lock_map_release(&lockdep_map);
lock_map_release(&cwq->wq->lockdep_map);
if (unlikely(in_atomic() || lockdep_depth(current) > 0))
{
printk(KERN_ERR "BUG: workqueue leaked lock or atomic: "
"%s/0x%08x/%dn",
current->comm, preempt_count(),
task_pid_nr(current));
printk(KERN_ERR "????last function: ");
print_symbol("%sn", (unsigned long)f);
debug_show_held_locks(current);
dump_stack();
}
spin_lock_irq(&cwq->lock);
cwq->current_work = NULL;
}
cwq->run_depth--;
spin_unlock_irq(&cwq->lock);
}
?
評論
查看更多