linux內(nèi)核工作隊列講解和源碼詳細(xì)注釋
1. 前言
本文引用地址:http://2s4d.com/article/201610/305909.htm工作隊列(workqueue)的Linux內(nèi)核中的定義的用來處理不是很緊急事件的回調(diào)方式處理方法。
以下代碼的linux內(nèi)核版本為2.6.19.2, 源代碼文件主要為kernel/workqueue.c.
2. 數(shù)據(jù)結(jié)構(gòu)
/* include/linux/workqueue.h */ // 工作節(jié)點(diǎn)結(jié)構(gòu)struct work_struct { // 等待時間unsigned long pending;// 鏈表節(jié)點(diǎn)struct list_head entry;// workqueue回調(diào)函數(shù)void (*func)(void *);// 回調(diào)函數(shù)func的數(shù)據(jù)void *data;// 指向CPU相關(guān)數(shù)據(jù), 一般指向struct cpu_workqueue_struct結(jié)構(gòu)void *wq_data;// 定時器struct timer_list timer;};
struct execute_work { struct work_struct work;};
/* kernel/workqueue.c */ /* * The per-CPU workqueue (if single thread, we always use the first * possible cpu)。
* * The sequence counters are for flush_scheduled_work()。 It wants to wait * until all currently-scheduled works are completed, but it doesn't * want to be livelocked by new, incoming ones. So it waits until * remove_sequence is >= the insert_sequence which pertained when * flush_scheduled_work() was called. */ // 這個結(jié)構(gòu)是針對每個CPU的struct cpu_workqueue_struct { // 結(jié)構(gòu)鎖spinlock_t lock;// 下一個要執(zhí)行的節(jié)點(diǎn)序號long remove_sequence; /* Least-recently added (next to run) */ // 下一個要插入節(jié)點(diǎn)的序號long insert_sequence; /* Next to add */ // 工作機(jī)構(gòu)鏈表節(jié)點(diǎn)struct list_head worklist;// 要進(jìn)行處理的等待隊列wait_queue_head_t more_work;// 處理完的等待隊列wait_queue_head_t work_done;// 工作隊列節(jié)點(diǎn)struct workqueue_struct *wq;// 進(jìn)程指針struct task_struct *thread;int run_depth; /* Detect run_workqueue() recursion depth */ } ____cacheline_aligned;/* * The externally visible workqueue abstraction is an array of * per-CPU workqueues:*/ // 工作隊列結(jié)構(gòu)struct workqueue_struct { struct cpu_workqueue_struct *cpu_wq;const char *name;struct list_head list; /* Empty if single thread */ };
kernel/workqueue.c中定義了一個工作隊列鏈表, 所有工作隊列可以掛接到這個鏈表中:static LIST_HEAD(workqueues);
3. 一些宏定義
/* include/linux/workqueue.h */ // 初始化工作隊列#define __WORK_INITIALIZER(n, f, d) { // 初始化list。entry = { (n)。entry, (n)。entry },// 回調(diào)函數(shù)。func = (f),// 回調(diào)函數(shù)參數(shù)。data = (d),// 初始化定時器。timer = TIMER_INITIALIZER(NULL, 0, 0),}
// 聲明工作隊列并初始化#define DECLARE_WORK(n, f, d)
struct work_struct n = __WORK_INITIALIZER(n, f, d)
/* * initialize a work-struct's func and data pointers:*/ // 重新定義工作結(jié)構(gòu)參數(shù)#define PREPARE_WORK(_work, _func, _data)
do {(_work)->func = _func;(_work)->data = _data;} while (0)
/* * initialize all of a work-struct:*/ // 初始化工作結(jié)構(gòu), 和__WORK_INITIALIZER功能相同,不過__WORK_INITIALIZER用在// 參數(shù)初始化定義, 而該宏用在程序之中對工作結(jié)構(gòu)賦值#define INIT_WORK(_work, _func, _data)
do { INIT_LIST_HEAD((_work)->entry);(_work)->pending = 0;PREPARE_WORK((_work), (_func), (_data));init_timer((_work)->timer);} while (0)
4. 操作函數(shù)
4.1 創(chuàng)建工作隊列
一般的創(chuàng)建函數(shù)是create_workqueue, 但這其實(shí)只是一個宏:/* include/linux/workqueue.h */ #define create_workqueue(name) __create_workqueue((name), 0)
在workqueue的初始化函數(shù)中, 定義了一個針對內(nèi)核中所有線程可用的事件工作隊列, 其他內(nèi)核線程建立的事件工作結(jié)構(gòu)就都掛接到該隊列:void init_workqueues(void)
{……
keventd_wq = create_workqueue(events);……
}
核心創(chuàng)建函數(shù)是__create_workqueue:
struct workqueue_struct *__create_workqueue(const char *name,int singlethread)
{ int cpu, destroy = 0;struct workqueue_struct *wq;struct task_struct *p;// 分配工作隊列結(jié)構(gòu)空間wq = kzalloc(sizeof(*wq), GFP_KERNEL);if (!wq)
return NULL;// 為每個CPU分配單獨(dú)的工作隊列空間wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);if (!wq->cpu_wq) { kfree(wq);return NULL;} wq->name = name;mutex_lock(workqueue_mutex);if (singlethread) { // 使用create_workqueue宏時該參數(shù)始終為0 // 如果是單一線程模式, 在單線程中調(diào)用各個工作隊列// 建立一個的工作隊列內(nèi)核線程INIT_LIST_HEAD(wq->list);// 建立工作隊列的線程p = create_workqueue_thread(wq, singlethread_cpu);if (!p)
destroy = 1;else // 喚醒該線程wake_up_process(p);} else { // 鏈表模式, 將工作隊列添加到工作隊列鏈表list_add(wq->list, workqueues);// 為每個CPU建立一個工作隊列線程for_each_online_cpu(cpu) { p = create_workqueue_thread(wq, cpu);if (p) { // 綁定CPU kthread_bind(p, cpu);// 喚醒線程wake_up_process(p);} else destroy = 1;} mutex_unlock(workqueue_mutex);/* * Was there any error during startup? If yes then clean up:*/ if (destroy) { // 建立線程失敗, 釋放工作隊列destroy_workqueue(wq);wq = NULL;} return wq;} EXPORT_SYMBOL_GPL(__create_workqueue);
// 創(chuàng)建工作隊列線程static struct task_struct *create_workqueue_thread(struct workqueue_struct *wq,int cpu)
{ // 每個CPU的工作隊列struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);struct task_struct *p;spin_lock_init(cwq->lock);// 初始化cwq->wq = wq;cwq->thread = NULL;cwq->insert_sequence = 0;cwq->remove_sequence = 0;INIT_LIST_HEAD(cwq->worklist);// 初始化等待隊列more_work, 該隊列處理要執(zhí)行的工作結(jié)構(gòu)init_waitqueue_head(cwq->more_work);// 初始化等待隊列work_done, 該隊列處理執(zhí)行完的工作結(jié)構(gòu)init_waitqueue_head(cwq->work_done);// 建立內(nèi)核線程work_thread if (is_single_threaded(wq))
評論