【摘要】本文详解了中断服务下半部之工作队列实现机制。介绍了工作队列的特点、其与tasklet和softirq的区别以及其使用场合。接着分析了工作队列的三种数据结构的组织形式,在此基础之上分析了工作队列执行流程。最后介绍了工作队列相关的API,如何编写自己的工作队列处理程序及定义一个work对象并向内核提交等待调度运行。
【关键字】中断下半部,工作队列,workqueue_struct,work_struct,DECLARE_WORK,schedule_work,schedule_delayed_work ,flush_workqueue,create_workqueue,destroy_workqueue
1 工作队列概述
工作队列(work queue)是另外一种将工作推后执行的形式,它和我们前面讨论的所有其他形式都不相同。工作队列可以把工作推后,交由一个内核线程去执行—这个下半部分总是会在进程上下文执行,但由于是内核线程,其不能访问用户空间。最重要特点的就是工作队列允许重新调度甚至是睡眠。
通常,在工作队列和软中断/tasklet中作出选择非常容易。可使用以下规则:
² 如果推后执行的任务需要睡眠,那么只能选择工作队列;
² 如果推后执行的任务需要延时指定的时间再触发,那么使用工作队列,因为其可以利用timer延时;
² 如果推后执行的任务需要在一个tick之内处理,则使用软中断或tasklet,因为其可以抢占普通进程和内核线程;
² 如果推后执行的任务对延迟的时间没有任何要求,则使用工作队列,此时通常为无关紧要的任务。
另外如果你需要用一个可以重新调度的实体来执行你的下半部处理,你应该使用工作队列。它是惟一能在进程上下文运行的下半部实现的机制,也只有它才可以睡眠。这意味着在你需要获得大量的内存时、在你需要获取信号量时,在你需要执行阻塞式的I/O操作时,它都会非常有用。
实际上,工作队列的本质就是将工作交给内核线程处理,因此其可以用内核线程替换。但是内核线程的创建和销毁对编程者的要求较高,而工作队列实现了内核线程的封装,不易出错,所以我们也推荐使用工作队列。
2 工作队列的实现
2.1 工作者线程
工作队列子系统是一个用于创建内核线程的接口,通过它创建的进程负责执行由内核其他部分排到队列里的任务。它创建的这些内核线程被称作工作者线程(worker thread)。工作队列可以让你的驱动程序创建一个专门的工作者线程来处理需要推后的工作。不过,工作队列子系统提供了一个默认的工作者线程来处理这些工作。因此,工作队列最基本的表现形式就转变成了一个把需要推后执行的任务交给特定的通用线程这样一种接口。
默认的工作者线程叫做events/n,这里n是处理器的编号,每个处理器对应一个线程。比如,单处理器的系统只有events/0这样一个线程。而双处理器的系统就会多一个events/1线程。
默认的工作者线程会从多个地方得到被推后的工作。许多内核驱动程序都把它们的下半部交给默认的工作者线程去做。除非一个驱动程序或者子系统必须建立一个属于它自己的内核线程,否则最好使用默认线程。不过并不存在什么东西能够阻止代码创建属于自己的工作者线程。如果你需要在工作者线程中执行大量的处理操作,这样做或许会带来好处。处理器密集型和性能要求严格的任务会因为拥有自己的工作者线程而获得好处。
2.2 工作队列的组织结构
2.2.1 工作队列workqueue_struct
外部可见的工作队列抽象,用户接口,是由每个CPU的工作队列组成的链表
64struct workqueue_struct {
65 struct cpu_workqueue_struct *cpu_wq;
66 const char *name;
67 struct list_head list; /* Empty if single thread */
68};
² cpu_wq:本队列包含的工作者线程;
² name:所有本队列包含的线程的公共名称部分,创建工作队列时的唯一用户标识;
² list:链接本队列的各个工作线程。
在早期的版本中,cpu_wq是用数组维护的,即对每个工作队列,每个CPU包含一个此线程。改成链表的优势在于,创建工作队列的时候可以指定只创建一个内核线程,这样消耗的资源较少。
在该结构体里面,给每个线程分配一个cpu_workqueue_struct,因而也就是给每个处理器分配一个,因为每个处理器都有一个该类型的工作者线程。
2.2.2 工作者线程cpu_workqueue_struct
这个结构是针对每个CPU的,属于内核维护的结构,用户不可见。
43struct cpu_workqueue_struct {
44
45 spinlock_t lock;
46
47 long remove_sequence; /* Least-recently added (next to run) */
48 long insert_sequence; /* Next to add */
49
50 struct list_head worklist;
51 wait_queue_head_t more_work;
52 wait_queue_head_t work_done;
53
54 struct workqueue_struct *wq;
55 struct task_struct *thread;
56
57 int run_depth; /* Detect run_workqueue() recursion depth */
58} ____cacheline_aligned;
² lock:操作该数据结构的互斥锁
² remove_sequence:下一个要执行的工作序号,用于flush
² insert_sequence:下一个要插入工作的序号
² worklist:待处理的工作的链表头
² more_work:标识有工作待处理的等待队列,插入新工作后唤醒对应的内核线程
² work_done:处理完的等待队列,没完成一个工作后,唤醒可能等待通知处理完成通知的线程
² wq:所属的工作队列节点
² thread:关联的内核线程指针
² run_depth:run_workqueue()循环深度,多处可能调用此函数
所有的工作者线程都是用普通的内核线程实现的,它们都要执行worker thread()函数。在它初始化完以后,这个函数执行一个死循环并开始休眠。当有操作被插入到队列里的时候,线程就会被唤醒,以便执行这些操作。当没有剩余的操作时,它又会继续休眠。
2.2.3 工作work_struct
工作用work_struct结构体表示:
linux+v2.6.19/include/linux/workqueue.h
14struct work_struct {
15 unsigned long pending;
16 struct list_head entry;
17 void (*func)(void *);
18 void *data;
19 void *wq_data;
20 struct timer_list timer;
21};
² Pending:这个工作是否正在等待处理标志,加入到工作队列后置此标志
² Entry:该工作在链表中的入口点,连接所有工作
² Func:该工作执行的回调函数
² Data:传递给处理函数的参数
² wq_data:本工作所挂接的cpu_workqueue_struct;若需要使用定时器,则其为工作队列传递给timer
² timer:延迟的工作队列所用到的定时器,无需延迟是初始化为NULL
2.2.4 三者的关系
位于最高一层的是工作队列。系统允许有多种类型的工作队列存在。每一个工作队列具备一个workqueue_struct,而SMP机器上每个CPU都具备一个该类的工作者线程cpu_workqueue_struct,系统通过CPU号和workqueue_struct 的链表指针及第一个成员cpu_wq可以得到每个CPU的cpu_workqueue_struct结构。
而每个工作提交时,将链接在当前CPU的cpu_workqueue_struct结构的worklist链表中。通常情况下由当前所注册的CPU执行此工作,但在flush_work中可能由其他CPU来执行。或者CPU热插拔后也将进行工作的转移。
内核中有些部分可以根据需要来创建工作队列。而在默认情况下内核只有events这一种类型的工作队列。大部分驱动程序都使用的是现存的默认工作者线程。它们使用起来简单、方便。可是,在有些要求更严格的情况下,驱动程序需要自己的工作者线程。
2.3 工作队列执行的细节
工作结构体被连接成链表,对于某个工作队列,在每个处理器上都存在这样一个链表。当一个工作者线程被唤醒时,它会执行它的链表上的所有工作。工作被执行完毕,它就将相应的work_struct对象从链表上移去。当链表上不再有对象的时候,它就会继续休眠。
此为工作者线程的标准模板,所以工作者线程都使用此函数。对于用户自定义的内核线程可以参考此函数。
233static int worker_thread(void *__cwq)
234{
235 struct cpu_workqueue_struct *cwq = __cwq;
// 与该工作者线程关联的cpu_workqueue_struct结构
236 DECLARE_WAITQUEUE(wait, current);
// 声明一个等待节点,若无工作,则睡眠
237 struct k_sigaction sa;
238 sigset_t blocked;
239
240 current->flags |= PF_NOFREEZE;
241
242 set_user_nice(current, -5);
// 设定较低的进程优先级, 工作进程不是个很紧急的进程,不和其他进程抢占CPU,通常在系统空闲时运行
244 /* 禁止并清除所有信号 */
245 sigfillset(&blocked);
246 sigprocmask(SIG_BLOCK, &blocked, NULL);
247 flush_signals(current);
248
255 /* SIG_IGN makes children autoreap: see do_notify_parent(). */
// 允许SIGCHLD信号,并设置处理函数
256 sa.sa.sa_handler = SIG_IGN;
257 sa.sa.sa_flags = 0;
258 siginitset(&sa.sa.sa_mask, sigmask(SIGCHLD));
259 do_sigaction(SIGCHLD, &sa, (struct k_sigaction *)0);
260
261 set_current_state(TASK_INTERRUPTIBLE);
// 可被信号中断,适当的时刻可被杀死,若收到停止命令则退出返回,否则进程就一直运行,无工作可执行时,主动休眠
262 while (!kthread_should_stop()) {
// 为了便于remove_wait_queue的统一处理,将当前内核线程添加到cpu_workqueue_struct的more_work等待队列中,当有新work结构链入队列中时会激活此等待队列
263 add_wait_queue(&cwq->more_work, &wait);
// 判断是否有工作需要作,无则调度让出CPU等待唤醒
264 if (list_empty(&cwq->worklist))
265 schedule();
266 else
267 __set_current_state(TASK_RUNNING);
268 remove_wait_queue(&cwq->more_work, &wait);
// 至此,线程肯定处于TASK_RUNNING,从等待队列中移出
//需要再次判断是因为可能从schedule中被唤醒的。如果有工作做,则执行
270 if (!list_empty(&cwq->worklist))
271 run_workqueue(cwq);
// 无工作或者全部执行完毕了,循环整个过程,接着一般会休眠
272 set_current_state(TASK_INTERRUPTIBLE);
273 }
274 __set_current_state(TASK_RUNNING);
275 return 0;
276}
该函数在死循环中完成了以下功能:
² 线程将自己设置为休眠状态TASK_INTERRUPTIBLE并把自己加人到等待队列上。
² 如果工作链表是空的,线程调用schedule()函数进入睡眠状态。
² 如果链表中有对象,线程不会睡眠。相反,它将自己设置成TASK_RUNNING,脱离等待队列。
² 如果链表非空,调用run_workqueue函数执行被推后的工作。
run_workqueue执行具体的工作,多处会调用此函数。在调用Flush_work时为防止死锁,主动调用run_workqueue,此时可能导致多层次递归。
196static void run_workqueue(struct cpu_workqueue_struct *cwq)
197{
198 unsigned long flags;
199
204 spin_lock_irqsave(&cwq->lock, flags);
// 统计已经递归调用了多少次了
205 cwq->run_depth++;
206 if (cwq->run_depth > 3) {
207 /* morton gets to eat his hat */
208 printk("%s: recursion depth exceeded: %d\n",
209 __FUNCTION__, cwq->run_depth);
210 dump_stack();
211 }
212 while (!list_empty(&cwq->worklist)) {
213 struct work_struct *work = list_entry(cwq->worklist.next,
214 struct work_struct, entry);
215 void (*f) (void *) = work->func;
216 void *data = work->data;
217 //将当前节点从链表中删除并初始化其entry
218 list_del_init(cwq->worklist.next);
219 spin_unlock_irqrestore(&cwq->lock, flags);
220
221 BUG_ON(work->wq_data != cwq);
222 clear_bit(0, &work->pending); //清除pengding位,标示已经执行
223 f(data);
224
225 spin_lock_irqsave(&cwq->lock, flags);
226 cwq->remove_sequence++;
// // 唤醒可能等待的进程,通知其工作已经执行完毕
227 wake_up(&cwq->work_done);
228 }
229 cwq->run_depth--;
230 spin_unlock_irqrestore(&cwq->lock, flags);
231}
3 工作队列的API
3.1 API列表
功能描述
对应API函数
附注
静态定义一个工作
DECLARE_WORK(n, f, d)
动态创建一个工作
INIT_WORK(_work, _func, _data)
工作原型
void work_handler(void *data)
将工作添加到指定的工作队列中
queue_work(struct workqueue_struct *wq, struct work_struct *work)
将工作添加到keventd_wq队列中
schedule_work(struct work_struct *work)
延迟delay个tick后将工作添加到指定的工作队列中
queue_delayed_work(struct workqueue_struct *wq,
struct work_struct *work, unsigned long delay)
延迟delay个tick后将工作添加到keventd_wq队列中
schedule_delayed_work(struct work_struct *work, unsigned long delay)
刷新等待指定队列中的所有工作完成
flush_workqueue(struct workqueue_struct *wq)
刷新等待keventd_wq中的所有工作完成
flush_scheduled_work(void)
取消指定队列中所有延迟工作
cancel_delayed_work(struct work_struct *work)
创建一个工作队列
create_workqueue(name)
创建一个单线程的工作队列
create_singlethread_workqueue(name)
销毁指定的工作队列
destroy_workqueue(struct workqueue_struct *wq)
3.2 如何创建工作
首先要做的是实际创建一些需要推后完成的工作。可以通过DECLARE_WORK在编译时静态地创建该结构体:
27#define __WORK_INITIALIZER(n, f, d) { \
28 .entry = { &(n).entry, &(n).entry }, \
29 .func = (f), \
30 .data = (d), \
31 .timer = TIMER_INITIALIZER(NULL, 0, 0), \
32 }
33
34#define DECLARE_WORK(n, f, d) \
35 struct work_struct n = __WORK_INITIALIZER(n, f, d)
这样就会静态地创建一个名为name,处理函数为func,参数为data的work_struct结构体。
同样,也可以在运行时通过指针创建一个工作:
40#define PREPARE_WORK(_work, _func, _data) \
41 do { \
42 (_work)->func = _func; \
43 (_work)->data = _data; \
44 } while (0)
45
49#define INIT_WORK(_work, _func, _data) \
50 do { \
51 INIT_LIST_HEAD(&(_work)->entry); \
52 (_work)->pending = 0; \
53 PREPARE_WORK((_work), (_func), (_data)); \
54 init_timer(&(_work)->timer); \
55 } while (0)
这会动态地初始化一个由work指向的工作,处理函数为func,参数为data。
无论是动态还是静态创建,默认定时器初始化为0,即不进行延时调度。
3.3 工作队列处理函数
工作队列处理函数的原型是:
void work_handler(void *data)
这个函数会由一个工作者线程执行,因此,函数会运行在进程上下文中。默认情况下,允许响应中断,并且不持有任何锁。如果需要,函数可以睡眠。需要注意的是,尽管操作处理函数运行在进程上下文中,但它不能访问用户空间,因为内核线程在用户空间没有相关的内存映射。通常在系统调用发生时,内核会代表用户空间的进程运行,此时它才能访问用户空间,也只有在此时它才会映射用户空间的内存。
在工作队列和内核其他部分之间使用锁机制就像在其他的进程上下文中使用锁机制一样方便。这使编写处理函数变得相对容易。
3.4 调度工作
3.4.1 queue_work
创建一个工作的时候无须考虑工作队列的类型。在创建之后,可以调用下面列举的函数。这些函数与schedule-work()以及schedule-delayed-Work()相近,惟一的区别就在于它们针对给定的工作队列而不是默认的event队列进行操作。
将工作添加到当前处理器对应的链表中,但并不能保证此工作由提交该工作的CPU执行。Flushwork时可能执行所有CPU上的工作或者CPU热插拔后将进行工作的转移
107int fastcall queue_work(struct workqueue_struct *wq, struct work_struct *work)
108{
109 int ret = 0, cpu = get_cpu();
// 工作结构还没在队列, 设置pending标志表示把工作结构挂接到队列中
111 if (!test_and_set_bit(0, &work->pending)) {
112 if (unlikely(is_single_threaded(wq)))
113 cpu = singlethread_cpu;
114 BUG_ON(!list_empty(&work->entry));
115 __queue_work(per_cpu_ptr(wq->cpu_wq, cpu), work);
////////////////////////////////
84static void __queue_work(struct cpu_workqueue_struct *cwq,
85 struct work_struct *work)
86{
87 unsigned long flags;
88
89 spin_lock_irqsave(&cwq->lock, flags);
//// 指向CPU工作队列
90 work->wq_data = cwq;
// 加到队列尾部
91 list_add_tail(&work->entry, &cwq->worklist);
92 cwq->insert_sequence++;
// 唤醒工作队列的内核处理线程
93 wake_up(&cwq->more_work);
94 spin_unlock_irqrestore(&cwq->lock, flags);
95}
////////////////////////////////////
116 ret = 1;
117 }
118 put_cpu();
119 return ret;
120}
121EXPORT_SYMBOL_GPL(queue_work);
一旦其所在的处理器上的工作者线程被唤醒,它就会被执行。
3.4.2 schedule_work
在大多数情况下, 并不需要自己建立工作队列,而是只定义工作, 将工作结构挂接到内核预定义的事件工作队列中调度, 在kernel/workqueue.c中定义了一个静态全局量的工作队列static struct workqueue_struct *keventd_wq;
调度工作结构, 将工作结构添加到全局的事件工作队列keventd_wq,调用了queue_work通用模块。对外屏蔽了keventd_wq的接口,用户无需知道此参数,相当于使用了默认参数。keventd_wq由内核自己维护,创建,销毁。
455static struct workqueue_struct *keventd_wq;
463int fastcall schedule_work(struct work_struct *work)
464{
465 return queue_work(keventd_wq, work);
466}
467EXPORT_SYMBOL(schedule_work);
3.4.3 queue_delayed_work
有时候并不希望工作马上就被执行,而是希望它经过一段延迟以后再执行。在这种情况下,
同时也可以利用timer来进行延时调度,到期后才由默认的定时器回调函数进行工作注册。
延迟delay后,被定时器唤醒,将work添加到工作队列wq中。
143int fastcall queue_delayed_work(struct workqueue_struct *wq,
144 struct work_struct *work, unsigned long delay)
145{
146 int ret = 0;
147 struct timer_list *timer = &work->timer;
148
149 if (!test_and_set_bit(0, &work->pending)) {
150 BUG_ON(timer_pending(timer));
151 BUG_ON(!list_empty(&work->entry));
152
153 /* This stores wq for the moment, for the timer_fn */
154 work->wq_data = wq;
155 timer->expires = jiffies + delay;
156 timer->data = (unsigned long)work;
157 timer->function = delayed_work_timer_fn;
////////////////////////////////////
定时器到期后执行的默认函数,其将某个work添加到一个工作队列中,需两个重要信息:
Work:__data定时器的唯一参数
待添加至的队列:由work->wq_data提供
123static void delayed_work_timer_fn(unsigned long __data)
124{
125 struct work_struct *work = (struct work_struct *)__data;
126 struct workqueue_struct *wq = work->wq_data;
127 int cpu = smp_processor_id();
128
129 if (unlikely(is_single_threaded(wq)))
130 cpu = singlethread_cpu;
131
132 __queue_work(per_cpu_ptr(wq->cpu_wq, cpu), work);
133}
////////////////////////////////////
158 add_timer(timer);
159 ret = 1;
160 }
161 return ret;
162}
163EXPORT_SYMBOL_GPL(queue_delayed_work);
3.4.4 schedule_delayed_work
其利用queue_delayed_work实现了默认线程keventd_wq中工作的调度。
477int fastcall schedule_delayed_work(struct work_struct *work, unsigned long delay)
478{
479 return queue_delayed_work(keventd_wq, work, delay);
480}
481EXPORT_SYMBOL(schedule_delayed_work);
3.5 刷新工作
3.5.1 flush_workqueue
排入队列的工作会在工作者线程下一次被唤醒的时候执行。有时,在继续下一步工作之前,你必须保证一些操作已经执行完毕了。这一点对模块来说就很重要,在卸载之前,它就有可能需要调用下面的函数。而在内核的其他部分,为了防止竟争条件的出现,也可能需要确保不再有待处理的工作。
出于以上目的,内核准备了一个用于刷新指定工作队列的函数flush_workqueue。其确保所有已经调度的工作已经完成了,否则阻塞直到其执行完毕,通常用于驱动模块的关闭处理。其检查已经每个CPU上执行完的序号是否大于此时已经待插入的序号。对于新的以后插入的工作,其不受影响。
320void fastcall flush_workqueue(struct workqueue_struct *wq)
321{
322 might_sleep();
323
324 if (is_single_threaded(wq)) {
325 /* Always use first cpu's area. */
326 flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, singlethread_cpu));
327 } else {
328 int cpu;
// 被保护的代码可能休眠,故此处使用内核互斥锁而非自旋锁
330 mutex_lock(&workqueue_mutex);
// 将同时调度其他CPU上的工作,这说明了工作并非在其注册的CPU上执行
331 for_each_online_cpu(cpu)
332 flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu));
//////////////////////////
278static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
279{
280 if (cwq->thread == current) {
// keventd本身需要刷新所有工作时,手动调用run_workqueue,否则将造成死锁。
285 run_workqueue(cwq);
286 } else {
287 DEFINE_WAIT(wait);
288 long sequence_needed;
289
290 spin_lock_irq(&cwq->lock);
// 保存队列中当前已有的工作所处的位置,不用等待新插入的工作执行完毕
291 sequence_needed = cwq->insert_sequence;
292
293 while (sequence_needed - cwq->remove_sequence > 0) {
// 如果队列中还有未执行完的工作,则休眠
294 prepare_to_wait(&cwq->work_done, &wait,
295 TASK_UNINTERRUPTIBLE);
296 spin_unlock_irq(&cwq->lock);
297 schedule();
298 spin_lock_irq(&cwq->lock);
299 }
300 finish_wait(&cwq->work_done, &wait);
301 spin_unlock_irq(&cwq->lock);
302 }
303}
//////////////////////////
333 mutex_unlock(&workqueue_mutex);
334 }
335}
336EXPORT_SYMBOL_GPL(flush_workqueue);
函数会一直等待,直到队列中所有对象都被执行以后才返回。在等待所有待处理的工作执行的时候,该函数会进入休眠状态,所以只能在进程上下文中使用它。
注意,该函数并不取消任何延迟执行的工作。就是说,任何通过schedule_delayed_work调度的工作,如果其延迟时间未结束,它并不会因为调用flush_scheduled_work()而被刷新掉。
3.5.2 flush_scheduled_work
刷新系统默认工作线程的函数为flush_scheduled_work,其调用了上面通用的函数
532void flush_scheduled_work(void)
533{
534 flush_workqueue(keventd_wq);
535}
536EXPORT_SYMBOL(flush_scheduled_work);
3.5.3 cancel_delayed_work
取消延迟执行的工作应该调用:
int cancel_delayed_work(struct work_struct *work);
这个函数可以取消任何与work_struct相关的挂起工作。
3.6 创建新的工作队列
如果默认的队列不能满足你的需要,你应该创建一个新的工作队列和与之相应的工作者线程。由于这么做会在每个处理器上都创建一个工作者线程,所以只有在你明确了必须要靠自己的一套线程来提高性能的情况下,再创建自己的工作队列。
创建一个新的任务队列和与之相关的工作者线程,只需调用一个简单的函数:create_workqueue。这个函数会创建所有的工作者线程(系统中的每个处理器都有一个)并且做好所有开始处理工作之前的准备工作。name参数用于该内核线程的命名。对于具体的线程会更加CPU号添加上序号。
create_workqueue和create_singlethread_workqueue都是创建一个工作队列,但是差别在于create_singlethread_workqueue可以指定为此工作队列只创建一个内核线程,这样可以节省资源,无需发挥SMP的并行处理优势。
create_singlethread_workqueue对外进行了封装,相当于使用了默认参数。二者同时调用了统一的处理函数__create_workqueue,其对外不可见。
59#define create_workqueue(name) __create_workqueue((name), 0)
60#define create_singlethread_workqueue(name) __create_workqueue((name), 1)
363struct workqueue_struct *__create_workqueue(const char *name,
364 int singlethread)
365{
366 int cpu, destroy = 0;
367 struct workqueue_struct *wq;
368 struct task_struct *p;
369
370 wq = kzalloc(sizeof(*wq), GFP_KERNEL);
371 if (!wq)
372 return NULL;
373
374 wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);
375 if (!wq->cpu_wq) {
376 kfree(wq);
377 return NULL;
378 }
379
380 wq->name = name;
381 mutex_lock(&workqueue_mutex);
382 if (singlethread) {
383 INIT_LIST_HEAD(&wq->list); //终止链表
384 p = create_workqueue_thread(wq, singlethread_cpu);
385 if (!p)
386 destroy = 1;
387 else
388 wake_up_process(p);
389 } else {
390 list_add(&wq->list, &workqueues);
391 for_each_online_cpu(cpu) {
392 p = create_workqueue_thread(wq, cpu);
/////////////////////////////////
338static struct task_struct *create_workqueue_thread(struct workqueue_struct *wq,
339 int cpu)
340{
341 struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);
342 struct task_struct *p;
343
344 spin_lock_init(&cwq->lock);
345 cwq->wq = wq;
346 cwq->thread = NULL;
347 cwq->insert_sequence = 0;
348 cwq->remove_sequence = 0;
349 INIT_LIST_HEAD(&cwq->worklist);
350 init_waitqueue_head(&cwq->more_work);
351 init_waitqueue_head(&cwq->work_done);
352
353 if (is_single_threaded(wq))
354 p = kthread_create(worker_thread, cwq, "%s", wq->name);
355 else
356 p = kthread_create(worker_thread, cwq, "%s/%d", wq->name, cpu);
357 if (IS_ERR(p))
358 return NULL;
359 cwq->thread = p;
360 return p;
361}
/////////////////////////////////
393 if (p) {
394 kthread_bind(p, cpu);
395 wake_up_process(p);
396 } else
397 destroy = 1;
398 }
399 }
400 mutex_unlock(&workqueue_mutex);
401
405 if (destroy) {//如果启动任意一个线程失败,则销毁整个工作队列
406 destroy_workqueue(wq);
407 wq = NULL;
408 }
409 return wq;
410}
411EXPORT_SYMBOL_GPL(__create_workqueue);
3.7 销毁工作队列
销毁一个工作队列,若有未完成的工作,则阻塞等待其完成。然后销毁对应的内核线程。
434void destroy_workqueue(struct workqueue_struct *wq)
435{
436 int cpu;
437
438 flush_workqueue(wq); //等待所有工作完成
439/// 利用全局的互斥锁锁定所有工作队列的操作
441 mutex_lock(&workqueue_mutex);
// 清除相关的内核线程
442 if (is_single_threaded(wq))
443 cleanup_workqueue_thread(wq, singlethread_cpu);
444 else {
445 for_each_online_cpu(cpu)
446 cleanup_workqueue_thread(wq, cpu);
/////////////////////////////////
413static void cleanup_workqueue_thread(struct workqueue_struct *wq, int cpu)
414{
415 struct cpu_workqueue_struct *cwq;
416 unsigned long flags;
417 struct task_struct *p;
418
419 cwq = per_cpu_ptr(wq->cpu_wq, cpu);
420 spin_lock_irqsave(&cwq->lock, flags);
421 p = cwq->thread;
422 cwq->thread = NULL;
423 spin_unlock_irqrestore(&cwq->lock, flags);
424 if (p)
425 kthread_stop(p); //销毁该线程,此处可能休眠
426}
/////////////////////////////////
447 list_del(&wq->list);
448 }
449 mutex_unlock(&workqueue_mutex);
450 free_percpu(wq->cpu_wq);
451 kfree(wq);
452}
453EXPORT_SYMBOL_GPL(destroy_workqueue);