一、用法 struct cpu_workqueue_struct { spinlock_t lock; long remove_sequence; /* Least-recently added (next to run) */ long insert_sequence; /* Next to add */ struct list_head worklist; wait_queue_head_t more_work; wait_queue_head_t work_done; struct workqueue_struct *wq; struct task_struct *thread; int run_depth; /* Detect run_workqueue() recursion depth */ } ____cacheline_aligned; /* * The externally visible workqueue abstraction is an array of * per-CPU workqueues: */ struct workqueue_struct { struct cpu_workqueue_struct *cpu_wq; const char *name; struct list_head list; /* Empty if single thread */ }; 工作隊(duì)列的使用很簡(jiǎn)單。 1.首先是建立一個(gè)工作隊(duì)列: struct workqueue_struct *keventd_wq; keventd_wq = create_workqueue("events"); 2.然后就是在這個(gè)隊(duì)列中insert你所要做的“工作”: DECLARE_WORK(work, func, data) queue_work(keventd_wq, work); struct work_struct { unsigned long pending; struct list_head entry; void (*func)(void *); void *data; void *wq_data; struct timer_list timer; }; 初始化有兩種方法。 一種為靜態(tài)方法: #define __WORK_INITIALIZER(n, f, d) { \ .entry = { &(n).entry, &(n).entry }, \ .func = (f), \ .data = (d), \ .timer = TIMER_INITIALIZER(NULL, 0, 0), \ } #define DECLARE_WORK(n, f, d) \ struct work_struct n = __WORK_INITIALIZER(n, f, d) 另一種為動(dòng)態(tài)方法: /* * initialize all of a work-struct: */ #define INIT_WORK(_work, _func, _data) \ do { \ INIT_LIST_HEAD(&(_work)->entry); \ (_work)->pending = 0; \ PREPARE_WORK((_work), (_func), (_data)); \ init_timer(&(_work)->timer); \ } while (0) 二、執(zhí)行過(guò)程 create_workqueue() -> __create_workqueue() struct workqueue_struct *__create_workqueue(const char *name, int singlethread) { int cpu, destroy = 0; struct workqueue_struct *wq; struct task_struct *p; wq = kzalloc(sizeof(*wq), GFP_KERNEL); //為每個(gè)CPU建立一個(gè)結(jié)構(gòu) wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct); ... wq->name = name; mutex_lock(&workqueue_mutex); if (singlethread) { ... } else { list_add(&wq->list, &workqueues); for_each_online_cpu(cpu) { //為每個(gè)CPU創(chuàng)建一個(gè)線程 p = create_workqueue_thread(wq, cpu); if (p) { kthread_bind(p, cpu); //喚醒這個(gè)線程執(zhí)行工作 wake_up_process(p); } else destroy = 1; } } mutex_unlock(&workqueue_mutex); ... return wq; } create_workqueue() -> __create_workqueue() -> create_workqueue_thread() static struct task_struct *create_workqueue_thread(struct workqueue_struct *wq, int cpu) { struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu); struct task_struct *p; spin_lock_init(&cwq->lock); cwq->wq = wq; cwq->thread = NULL; cwq->insert_sequence = 0; cwq->remove_sequence = 0; INIT_LIST_HEAD(&cwq->worklist); init_waitqueue_head(&cwq->more_work); init_waitqueue_head(&cwq->work_done); if (is_single_threaded(wq)) p = kthread_create(worker_thread, cwq, "%s", wq->name); else //創(chuàng)建一個(gè)線程,這個(gè)線程以cwq為數(shù)據(jù)執(zhí)行worker_thread這個(gè)函數(shù) p = kthread_create(worker_thread, cwq, "%s/%d", wq->name, cpu); if (IS_ERR(p)) return NULL; cwq->thread = p; // return p; } create_workqueue() -> __create_workqueue() -> create_workqueue_thread() -> worker_thread() //本函數(shù)在一個(gè)死循環(huán)等待工作的到來(lái),這一般在睡眠狀態(tài)中,等待被喚醒執(zhí)行工作 //當(dāng)有工作到來(lái)時(shí)queue_work()會(huì)將這個(gè)線程喚醒 static int worker_thread(void *__cwq) { struct cpu_workqueue_struct *cwq = __cwq; DECLARE_WAITQUEUE(wait, current); ... current->flags |= PF_NOFREEZE; //設(shè)置優(yōu)先級(jí) set_user_nice(current, -5); ... set_current_state(TASK_INTERRUPTIBLE); while (!kthread_should_stop()) { //將本線程加入睡眠隊(duì)列,用于睡眠后可以被喚醒 add_wait_queue(&cwq->more_work, &wait); //如果沒(méi)用被執(zhí)行的“工作”,則將自己切換出去,進(jìn)入睡眠狀態(tài) if (list_empty(&cwq->worklist)) schedule(); else //否則或是被喚醒 __set_current_state(TASK_RUNNING); remove_wait_queue(&cwq->more_work, &wait); //工作隊(duì)列非空,執(zhí)行工作 if (!list_empty(&cwq->worklist)) run_workqueue(cwq); set_current_state(TASK_INTERRUPTIBLE); } __set_current_state(TASK_RUNNING); return 0; } create_workqueue() -> __create_workqueue() -> create_workqueue_thread() -> worker_thread() -> run_workqueue() //該函數(shù)執(zhí)行真正的工作 static void run_workqueue(struct cpu_workqueue_struct *cwq) { unsigned long flags; spin_lock_irqsave(&cwq->lock, flags); ... //順次執(zhí)行隊(duì)列中的所有工作 while (!list_empty(&cwq->worklist)) { struct work_struct *work = list_entry(cwq->worklist.next, struct work_struct, entry); void (*f) (void *) = work->func; void *data = work->data; //從隊(duì)列中刪除待執(zhí)行的任務(wù) list_del_init(cwq->worklist.next); spin_unlock_irqrestore(&cwq->lock, flags); BUG_ON(work->wq_data != cwq); clear_bit(0, &work->pending); //執(zhí)行“工作” f(data); spin_lock_irqsave(&cwq->lock, flags); cwq->remove_sequence++; wake_up(&cwq->work_done); // } cwq->run_depth--; spin_unlock_irqrestore(&cwq->lock, flags); } 三、工作線程創(chuàng)建的詳細(xì)過(guò)程 create_workqueue() -> __create_workqueue() -> create_workqueue_thread() -> kthread_create() struct task_struct *kthread_create(int (*threadfn)(void *data), void *data, const char namefmt[], ...) { //初始化用于創(chuàng)建線程的輔助結(jié)構(gòu) struct kthread_create_info create; DECLARE_WORK(work, keventd_create_kthread, &create); create.threadfn = threadfn; create.data = data; init_completion(&create.started); init_completion(&create.done); if (!helper_wq) //首先創(chuàng)建輔助工作隊(duì)列 work.func(work.data); else { //注意,“創(chuàng)建一個(gè)工作隊(duì)列”這個(gè)工作本身又是屬于helper_wq工作隊(duì)列 //的一項(xiàng)工作,所以,將這個(gè)工作加入的輔助工作隊(duì)列中等待執(zhí)行。 queue_work(helper_wq, &work); wait_for_completion(&create.done); } ... return create.result; } create_workqueue() -> __create_workqueue() -> create_workqueue_thread() -> kthread_create()-> keventd_create_kthread() //最終會(huì)調(diào)用kernel_thread為每個(gè)工作隊(duì)列創(chuàng)建一個(gè)線程 //這樣,被創(chuàng)建的線程會(huì)以create為數(shù)據(jù)執(zhí)行kthread(如下),而kthread中則執(zhí)行create中的threadfn(data), //即為create_workqueue_thread中的worker_thread(cwq),即為我們工作隊(duì)列要執(zhí)行的函數(shù)了。 static void keventd_create_kthread(void *_create) { struct kthread_create_info *create = _create; int pid; /* We want our own signal handler (we take no signals by default). */ pid = kernel_thread(kthread, create, CLONE_FS | CLONE_FILES | SIGCHLD); if (pid < 0) { create->result = ERR_PTR(pid); } else { wait_for_completion(&create->started); read_lock(&tasklist_lock); create->result = find_task_by_pid(pid); read_unlock(&tasklist_lock); } complete(&create->done); } static int kthread(void *_create) { struct kthread_create_info *create = _create; int (*threadfn)(void *data); void *data; ... threadfn = create->threadfn; data = create->data; ... if (!kthread_should_stop()) ret = threadfn(data); ... return 0; } 四、插入“工作” /* Preempt must be disabled. */ static void __queue_work(struct cpu_workqueue_struct *cwq, struct work_struct *work) { unsigned long flags; spin_lock_irqsave(&cwq->lock, flags); work->wq_data = cwq; //將當(dāng)前工作插入到工作隊(duì)列中待待執(zhí)行 list_add_tail(&work->entry, &cwq->worklist); cwq->insert_sequence++; wake_up(&cwq->more_work); //喚醒相應(yīng)線程 spin_unlock_irqrestore(&cwq->lock, flags); } int fastcall queue_work(struct workqueue_struct *wq, struct work_struct *work) { int ret = 0, cpu = get_cpu(); //如里當(dāng)前工作未在隊(duì)列中才插入 if (!test_and_set_bit(0, &work->pending)) { ... __queue_work(per_cpu_ptr(wq->cpu_wq, cpu), work); ret = 1; } put_cpu(); return ret; } |
聯(lián)系客服