我們先來看一下默認的events任務隊列,然后再看看創(chuàng)建新的工作者線程。
1.創(chuàng)建推后的工作
首先要做的是實際創(chuàng)建一些需要推后完成的工作??梢酝ㄟ^DECLARE_WORK在編譯時靜態(tài)地創(chuàng)建該結構體:
- 在<workqueue.h>中
- #define DECLARE_WORK(n, f)
- struct work_struct n = __WORK_INITIALIZER(n, f)
- #define __WORK_INITIALIZER(n, f) {
- .data = WORK_DATA_INIT(0), \
- .entry = { &(n).entry, &(n).entry }, \
- .func = (f), \
- }
也可以在運行時通過指針創(chuàng)建一個工作:
- #define INIT_WORK(_work, _func)
- do { \
- (_work)->data = (atomic_long_t) WORK_DATA_INIT(0); \
- INIT_LIST_HEAD(&(_work)->entry); \
- PREPARE_WORK((_work), (_func)); \
- } while (0)
- #define PREPARE_WORK(_work, _func)
- do { \
- (_work)->func = (_func); \
- } while (0)
2.工作隊列處理函數(shù)
工作隊列對立函數(shù)的原型是:
void work_handler(void *data)
這個函數(shù)會由一個工作者線程執(zhí)行,因此,函數(shù)會運行在進程上下文中。默認情況下,運行響應中斷,并且不持有任何鎖。如果需要,函數(shù)可以睡眠。需要注意的是,盡管操作處理函數(shù)運行在進程上下文中,但它不能訪問用戶空間,因為內(nèi)核線程在用戶空間沒有相關的內(nèi)存映射。通常在系統(tǒng)調用發(fā)生時,內(nèi)核會代表用戶空間的進程運行,此時它才能訪問用戶空間,也只有在此時它才會映射用戶空間的內(nèi)存。
3.對工作進行調度
要把給定工作的處理函數(shù)提交給默認的events工作線程,只須調用
schedule_work(&work);
- int fastcall schedule_work(struct work_struct *work)
- {
- return queue_work(keventd_wq, work);
- }
- int fastcall queue_work(struct workqueue_struct *wq, struct work_struct *work)
- {
- int ret = 0, cpu = get_cpu();
- if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
- if (unlikely(is_single_threaded(wq)))
- cpu = singlethread_cpu;
- BUG_ON(!list_empty(&work->entry));
- __queue_work(per_cpu_ptr(wq->cpu_wq, cpu), work);
- ret = 1;
- }
- put_cpu();
- return ret;
- }
- static void __queue_work(struct cpu_workqueue_struct *cwq,
- struct work_struct *work)
- {
- unsigned long flags;
- spin_lock_irqsave(&cwq->lock, flags);
- set_wq_data(work, cwq);
- list_add_tail(&work->entry, &cwq->worklist);
- cwq->insert_sequence++;
- wake_up(&cwq->more_work);
- spin_unlock_irqrestore(&cwq->lock, flags);
- }
work馬上就會被調度,一旦其所在的處理器上的工作者線程被喚醒,它就會被執(zhí)行。如不希望工作馬上被執(zhí)行,延遲一段時間之后再執(zhí)行,可以調度它在指定的時間執(zhí)行:
schedule_delayed_work(&work,delay);
- int fastcall schedule_delayed_work(struct delayed_work *dwork,
- unsigned long delay)
- {
- timer_stats_timer_set_start_info(&dwork->timer);
- return queue_delayed_work(keventd_wq, dwork, delay);
- }
- int fastcall queue_delayed_work(struct workqueue_struct *wq,
- struct delayed_work *dwork, unsigned long delay)
- {
- int ret = 0;
- struct timer_list *timer = &dwork->timer;
- struct work_struct *work = &dwork->work;
- timer_stats_timer_set_start_info(timer);
- if (delay == 0)
- return queue_work(wq, work);
- if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
- BUG_ON(timer_pending(timer));
- BUG_ON(!list_empty(&work->entry));
-
- set_wq_data(work, wq);
- timer->expires = jiffies + delay;
- timer->data = (unsigned long)dwork;
- timer->function = delayed_work_timer_fn;
- add_timer(timer);
- ret = 1;
- }
- return ret;
- }
- struct delayed_work {
- struct work_struct work;
- struct timer_list timer;
- };
4.刷新操作
排入隊列的工作會在工作者線程下一次被喚醒的時候執(zhí)行。有時,在繼續(xù)下一步工作之前,你必須保證一些操作已經(jīng)執(zhí)行完畢了。這一點對模塊來說很重要,在卸載之前,它就有可能需要調用下面的函數(shù);而在內(nèi)核的其他部分,為了防止競爭條件的出現(xiàn),也可能需要確保不在有待處理的工作。
出于以上目的,內(nèi)核準備了一個用于刷新指定工作隊列的函數(shù):
void flush_scheduled_work(void);
- void flush_scheduled_work(void)
- {
- flush_workqueue(keventd_wq);
- }
- void fastcall flush_workqueue(struct workqueue_struct *wq)
- {
- might_sleep();
- if (is_single_threaded(wq)) {
-
- flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, singlethread_cpu));
- } else {
- int cpu;
- mutex_lock(&workqueue_mutex);
- for_each_online_cpu(cpu)
- flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu));
- mutex_unlock(&workqueue_mutex);
- }
- }
函數(shù)會一直等待,直到隊列中所有對象都被執(zhí)行以后才返回。在等待所有待處理的工作執(zhí)行的時候,該函數(shù)會進入休眠狀態(tài),所以只能在進程上下文中使用它。
注意,該函數(shù)并不取消任何延遲執(zhí)行的工作。就是說,任何通過schedule_delayed_work()調度的工作,如果其延遲時間未結束,它并不會因為調用flush_scheduled_work()而被刷新掉。
取消延遲執(zhí)行的工作應該調用:
int cancle_delayed_work(sruct work_struc work);
這個函數(shù)可以取消任何與work_struct相關的掛起工作。
- 在<workqueue.h>中
- static inline int cancel_delayed_work(struct delayed_work *work)
- {
- int ret;
- ret = del_timer_sync(&work->timer);
- if (ret)
- work_release(&work->work);
- return ret;
- }
- #define work_release(work)
- clear_bit(WORK_STRUCT_PENDING, work_data_bits(work))
5.創(chuàng)建新的工作隊列
如果默認的隊列不能滿足需要,可以創(chuàng)建一個新的工作對列和與之相應的工作者線程。
創(chuàng)建一個新的任務隊列和與之相關的工作者線程,只須調用一個簡單的函數(shù):
struct workqueue_struct *create_workqueue(const char *name);
- 在<workqueue.h>中
- #define create_workqueue(name) __create_workqueue((name), 0, 0)
- 在<workqueue.c>中
- struct workqueue_struct *__create_workqueue(const char *name,
- int singlethread, int freezeable)
- {
- int cpu, destroy = 0;
- struct workqueue_struct *wq;
- struct task_struct *p;
- wq = kzalloc(sizeof(*wq), GFP_KERNEL);
- if (!wq)
- return NULL;
- wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);
- if (!wq->cpu_wq) {
- kfree(wq);
- return NULL;
- }
- wq->name = name;
- mutex_lock(&workqueue_mutex);
- if (singlethread) {
- INIT_LIST_HEAD(&wq->list);
- p = create_workqueue_thread(wq, singlethread_cpu, freezeable);
- if (!p)
- destroy = 1;
- else
- wake_up_process(p);
- } else {
- list_add(&wq->list, &workqueues);
- for_each_online_cpu(cpu) {
- p = create_workqueue_thread(wq, cpu, freezeable);
- if (p) {
- kthread_bind(p, cpu);
- wake_up_process(p);
- } else
- destroy = 1;
- }
- }
- mutex_unlock(&workqueue_mutex);
-
- if (destroy) {
- destroy_workqueue(wq);
- wq = NULL;
- }
- return wq;
- }
這個函數(shù)會創(chuàng)建所有的工作者線程(系統(tǒng)中的每個處理器都有一個),并且做好所有開始處理工作之前的準備工作。
創(chuàng)建一個工作的時候無須考慮工作隊列的類型??梢允褂孟铝泻瘮?shù)對給定工作而不是默認的event隊列進行操作。
int queue_work(struct workqueue_struct *wq, struct work_struct*work);
int queue_delayed_work(struct workqueue_struct *wq, structwork_struct *work, unsigned long delay);
flush_workqueue(struct workqueue_struct *wq);
- int fastcall queue_work(struct workqueue_struct *wq, struct work_struct *work)
- {
- int ret = 0, cpu = get_cpu();
- if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
- if (unlikely(is_single_threaded(wq)))
- cpu = singlethread_cpu;
- BUG_ON(!list_empty(&work->entry));
- __queue_work(per_cpu_ptr(wq->cpu_wq, cpu), work);
- ret = 1;
- }
- put_cpu();
- return ret;
- }
- int fastcall queue_delayed_work(struct workqueue_struct *wq,
- struct delayed_work *dwork, unsigned long delay)
- {
- int ret = 0;
- struct timer_list *timer = &dwork->timer;
- struct work_struct *work = &dwork->work;
- timer_stats_timer_set_start_info(timer);
- if (delay == 0)
- return queue_work(wq, work);
- if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
- BUG_ON(timer_pending(timer));
- BUG_ON(!list_empty(&work->entry));
-
- set_wq_data(work, wq);
- timer->expires = jiffies + delay;
- timer->data = (unsigned long)dwork;
- timer->function = delayed_work_timer_fn;
- add_timer(timer);
- ret = 1;
- }
- return ret;
- }
- void fastcall flush_workqueue(struct workqueue_struct *wq)
- {
- might_sleep();
- if (is_single_threaded(wq)) {
-
- flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, singlethread_cpu));
- } else {
- int cpu;
- mutex_lock(&workqueue_mutex);
- for_each_online_cpu(cpu)
- flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu));
- mutex_unlock(&workqueue_mutex);
- }
- }
- static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
- {
- if (cwq->thread == current) {
-
- run_workqueue(cwq);
- } else {
- DEFINE_WAIT(wait);
- long sequence_needed;
- spin_lock_irq(&cwq->lock);
- sequence_needed = cwq->insert_sequence;
- while (sequence_needed - cwq->remove_sequence > 0) {
- prepare_to_wait(&cwq->work_done, &wait,
- TASK_UNINTERRUPTIBLE);
- spin_unlock_irq(&cwq->lock);
- schedule();
- spin_lock_irq(&cwq->lock);
- }
- finish_wait(&cwq->work_done, &wait);
- spin_unlock_irq(&cwq->lock);
- }
- }
- static void run_workqueue(struct cpu_workqueue_struct *cwq)
- {
- unsigned long flags;
-
- spin_lock_irqsave(&cwq->lock, flags);
- cwq->run_depth++;
- if (cwq->run_depth > 3) {
-
- printk("%s: recursion depth exceeded: %d\n",
- __FUNCTION__, cwq->run_depth);
- dump_stack();
- }
- while (!list_empty(&cwq->worklist)) {
- struct work_struct *work = list_entry(cwq->worklist.next,
- struct work_struct, entry);
- work_func_t f = work->func;
- list_del_init(cwq->worklist.next);
- spin_unlock_irqrestore(&cwq->lock, flags);
- BUG_ON(get_wq_data(work) != cwq);
- if (!test_bit(WORK_STRUCT_NOAUTOREL, work_data_bits(work)))
- work_release(work);
- f(work);
- if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
- printk(KERN_ERR "BUG: workqueue leaked lock or atomic: "
- "%s/0x%08x/%d\n",
- current->comm, preempt_count(),
- current->pid);
- printk(KERN_ERR " last function: ");
- print_symbol("%s\n", (unsigned long)f);
- debug_show_held_locks(current);
- dump_stack();
- }
- spin_lock_irqsave(&cwq->lock, flags);
- cwq->remove_sequence++;
- wake_up(&cwq->work_done);
- }
- cwq->run_depth--;
- spin_unlock_irqrestore(&cwq->lock, flags);
- }