8.工作队列
工作队列是另一种中断下半部实现方式,它与软中断和tasklet最主要区别在于其不在中断上下文执行而是在进程上下文执行。因此它拥有在进程上下文执行的所有优势,例如允许被重新调度以及睡眠等。一般的,如果推后执行的任务需要睡眠则选择工作队列来实现,否则选择软中断或tasklet。
工作队列子系统可以让你的驱动程序创建一个专门的工作者线程来处理需要推后的工作。不过,工作队列子系统提供了一个默认的工作者线程来处理这些工作。因此,工作队列最基本的表现形式就转变成了一个把需要推后执行的任务交给特定的通用线程这样一种接口。
8.1 相关数据结构
首先介绍一下与工作队列相关的几个数据结构。内核将工作队列抽象成数据结构workqueue_struct,定义如下:
-
struct workqueue_struct {
-
struct cpu_workqueue_struct *cpu_wq; /*指向CPU工作队列管理结构,根据该指针,系统中的每个CPU都可以通过per_cpu_ptr来获得属于自己的CPU工作队列管理结构的对象*/
-
struct list_head list; /*双向链表,用于将workqueue_struct 加入到内核全局变量workqueues中*/
-
const char *name; /*工作队列的名称*/
-
int singlethread; /*标识该工作队列中拥有线程的数量*/
-
int freezeable; /*表示进程可否处于冻结状态*/
-
int rt; /*用来调整worker_thread线程所在进程的调度策略*/
-
};
每个工作队列都包含一个cpu_workqueue_struct结构体链表,该链表的每个成员都对应系统中的一个CPU,cpu_workqueue_struct定义如下:
-
struct cpu_workqueue_struct {
-
spinlock_t lock;
-
struct list_head worklist; /*该工作队列上待处理的工作链表*/
-
wait_queue_head_t more_work;
-
struct work_struct *current_work; /*指向当前工作*/
-
struct workqueue_struct *wq; /*指向包含自己的workqueue_struct结构体*/
-
struct task_struct *thread; /*指向工作者线程*/
-
} ____cacheline_aligned;
通过上述两个结构体可知,workqueue_struct向外界提供工作队列的抽象,而cpu_workqueue_struct是针对系统中每个CPU设计的工作队列实体,其包含负责处理工作的工作者线程以及挂在该工作队列上的所有work。每一个work对应的数据结构为work_struct,定义如下:
-
struct work_struct {
-
atomic_long_t data; /*传递给处理函数的参数*/
-
struct list_head entry; /*将多个待处理工作节点形成链表*/
-
work_func_t func; /*处理函数*/
-
};
图8-1显示了3个工作队列相关的数据结构之间的关系。
图8-1 工作队列相关数据结构关系图
8.2 使用工作队列
内核通过封装一系列的工作队列操作函数,使开发人员在使用工作队列时非常方便。大致分三步:创建工作队列,提交工作节点,销毁工作队列。内核在启动时会创建一个默认的工作队列“events”,因此如果开发人员不需要建立一个自己的工作队列的话,完全可以使用默认的工作队列,这样就省去了创建和销毁工作队列的工作。下面将对工作队列相关的接口函数进行分析。
8.2.1 创建工作队列
内核对外提供了4种创建工作队列的接口(宏定义):
#define create_workqueue(name) __create_workqueue((name), 0, 0, 0)
#define create_rt_workqueue(name) __create_workqueue((name), 0, 0,
1)
#define create_freezeable_workqueue(name) __create_workqueue((name),
1, 1, 0)
#define create_singlethread_workqueue(name)
__create_workqueue((name), 1, 0, 0)
通过对宏展开发现,这4个接口最终都是调用__create_workqueue_key函数,其定义如下:
-
<kernel/workqueue.c>
-
struct workqueue_struct *__create_workqueue_key(const char *name,
-
int singlethread,
-
int freezeable,
-
int rt,
-
struct lock_class_key *key,
-
const char *lock_name)
-
{
-
struct workqueue_struct *wq;
-
struct cpu_workqueue_struct *cwq;
-
int err = 0, cpu;
-
-
wq = kzalloc(sizeof(*wq), GFP_KERNEL); /*处理函数*/
-
if (!wq)
-
return NULL;
-
-
wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);
-
if (!wq->cpu_wq) {
-
kfree(wq);
-
return NULL;
-
}
-
-
/*初始化工作队列各项参数*/
-
wq->name = name;
-
lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
-
wq->singlethread = singlethread;
-
wq->freezeable = freezeable;
-
wq->rt = rt;
-
INIT_LIST_HEAD(&wq->list);
-
-
if (singlethread) { //如果singlethread不为0
-
cwq = init_cpu_workqueue(wq, singlethread_cpu);//初始化cpu_workqueue_struct结构体
-
err = create_workqueue_thread(cwq, singlethread_cpu);//创建工作者线程
-
start_workqueue_thread(cwq, -1);//开启工作这线程等待被调度执行
-
} else {//如果singlethread为0
-
cpu_maps_update_begin();
-
-
spin_lock(&workqueue_lock);
-
list_add(&wq->list, &workqueues); //将工作队列加入全局链表workqueues
-
spin_unlock(&workqueue_lock);
-
-
for_each_possible_cpu(cpu) {//为每个CPU
-
cwq = init_cpu_workqueue(wq, cpu); //初始化cpu_workqueue_struct结构体
-
if (err || !cpu_online(cpu))
-
continue;
-
err = create_workqueue_thread(cwq, cpu); //创建工作者线程
-
start_workqueue_thread(cwq, cpu); //开启工作这线程等待被调度执行
-
}
-
cpu_maps_update_done();
-
}
-
-
if (err) {
-
destroy_workqueue(wq);//如果创建过程中发生错误,则销毁工作队列
-
wq = NULL;
-
}
-
return wq;
-
}
其中create_workqueue_thread函数创建了工作这线程worker_thread实例,并将cpu_workqueue_struct结构体的thread成员指向该线程。worker_thread的定义如下:
-
<kernel/workqueue.c>
-
static int worker_thread(void *__cwq)
-
{
-
struct cpu_workqueue_struct *cwq = __cwq;
-
DEFINE_WAIT(wait);
-
-
if (cwq->wq->freezeable)
-
set_freezable();
-
-
for (;;) {
-
prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE);//将当前进程状态修改为TASK_INTERRUPTIBLE并加入到cwq->more_work等待队列中。
-
if (!freezing(current) &&
-
!kthread_should_stop() &&
-
list_empty(&cwq->worklist))
-
schedule();//如果没有should_stop被设置且没有待处理的工作节点,则挂起该线程
-
finish_wait(&cwq->more_work, &wait);//由于先前先将该线程加入等待队列且设置为TASK_INTERRUPTIBLE状态,所以当不需要挂起线程或该线程再次被唤醒时,在此处需要将该线程从等待队列中删除并设置为TASK_RUNNING状态。
-
-
try_to_freeze();
-
-
if (kthread_should_stop())
-
break;//如果should_stop被设置,则跳出for(;;)循环,线程函数所在的进程将会终结。
-
-
run_workqueue(cwq);//处理cwq->worklist上的工作节点
-
}
-
-
return 0;
-
}
run_workqueue函数是负责遍历挂在cwq->worklist上的所有工作节点,调用工作节点的处理函数,以下是run_workqueue函数定义的主要部分:
-
<kernel/workqueue.c>
-
static void run_workqueue(struct cpu_workqueue_struct *cwq)
-
{
-
spin_lock_irq(&cwq->lock);
-
while (!list_empty(&cwq->worklist)) {//遍历cwq->worklist上的所有工作节点
-
struct work_struct *work = list_entry(cwq->worklist.next,
-
struct work_struct, entry);//获取当前处理的工作节点
-
work_func_t f = work->func; //获取处理函数指针
-
trace_workqueue_execution(cwq->thread, work);
-
cwq->current_work = work; //使cwq->current_work指向当前处理的工作节点
-
list_del_init(cwq->worklist.next);//将工作节点从cwq->worklist链表中删除
-
spin_unlock_irq(&cwq->lock);
-
-
BUG_ON(get_wq_data(work) != cwq);
-
work_clear_pending(work);//清除work->data的WORK_STRUCT_PENDING位,使驱动程序可以再次提交该工作节点到工作队列中。
-
lock_map_acquire(&cwq->wq->lockdep_map);
-
lock_map_acquire(&lockdep_map);
-
f(work); //调用工作节点的处理函数
-
lock_map_release(&lockdep_map);
-
lock_map_release(&cwq->wq->lockdep_map);
-
-
spin_lock_irq(&cwq->lock);
-
cwq->current_work = NULL; //当前工作节点已被处理,将cwq->current_work指向NULL
-
}
-
spin_unlock_irq(&cwq->lock);
-
}
8.2.2 提交工作节点
创建完工作队列后,主要工作是将构造好的工作节点提交到工作队列处理。这部分工作其实很简单,内核为开发者提供了很多方便易用的接口。首先看一下内核提供的构造工作节点的接口:
-
<kernel/workqueue.h>
-
#define __WORK_INITIALIZER(n, f) { \
-
.data = WORK_DATA_INIT(), \
-
.entry = { &(n).entry, &(n).entry }, \
-
.func = (f), \
-
__WORK_INIT_LOCKDEP_MAP(#n, &(n)) \
-
}
-
#define DECLARE_WORK(n, f) \
-
struct work_struct n = __WORK_INITIALIZER(n, f)
DECLARE_WORK用于在驱动程序中静态定义一个work_struct工作节点实例,其接收两个参数,分别是工作节点实例的名称n和工作节点的处理函数名。
-
<kernel/workqueue.h>
-
#define PREPARE_WORK(_work, _func) \
-
do { \
-
(_work)->func = (_func); \
-
} while (0)
-
#define INIT_WORK(_work, _func) \
-
do { \
-
(_work)->data = (atomic_long_t) WORK_DATA_INIT(); \
-
INIT_LIST_HEAD(&(_work)->entry); \
-
PREPARE_WORK((_work), (_func)); \
-
} while (0)
INIT_WORK用于在驱动程序中动态初始化一个工作节点实例,其接收两个参数,分别是已定义的工作节点和处理函数名。
完成工作节点初始化工作后,就可以通过queue_work函数将该工作节点提交到工作队列中,如果要将工作节点提交到内核默认的工作队列“events”中,则使用schedule_work或schedule_delayed_work等函数。
-
<kernel/workqueue.c>
-
int queue_work(struct workqueue_struct *wq, struct work_struct *work)
-
{
-
int ret;
-
//将工作节点work提交到当前CPU的工作队列的worklist上
-
ret = queue_work_on(get_cpu(), wq, work);
-
put_cpu();
-
-
return ret;
-
}
-
int queue_work_on(int cpu, struct workqueue_struct *wq, struct work_struct *work)
-
{
-
int ret = 0;
-
/*首先检测work->data的WORK_STRUCT_PENDING 是否置1,如果置1则说明该工作节点此前已被提交但未处理,内核禁止一个工作节点还没处理完再次提交该节点。如果置0则可以提交该工作节点*/
-
if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
-
BUG_ON(!list_empty(&work->entry));
-
__queue_work(wq_per_cpu(wq, cpu), work);
-
ret = 1;
-
}
-
return ret;
-
}
-
static void __queue_work(struct cpu_workqueue_struct *cwq,
-
struct work_struct *work)
-
{
-
unsigned long flags;
-
-
spin_lock_irqsave(&cwq->lock, flags);
-
insert_work(cwq, work, &cwq->worklist);//将工作节点插入到cpu_workqueue_struct的worklisk中
-
spin_unlock_irqrestore(&cwq->lock, flags);
-
}
-
static void insert_work(struct cpu_workqueue_struct *cwq,
-
struct work_struct *work, struct list_head *head)
-
{
-
trace_workqueue_insertion(cwq->thread, work);
-
-
set_wq_data(work, cwq);
-
-
smp_wmb();
-
list_add_tail(&work->entry, head);//将工作节点插入worklist链表
-
wake_up(&cwq->more_work); //唤醒在cwq->more_work上等待的工作者线程worker_thread
-
}
内核还提供了将工作节点推迟某个时间提交的函数接口queue_delayed_work,该函数的基本流程与queue_ work类似,区别在于在提交的过程中添加定时器功能,将__queue_work函数封装到定时器的超时函数delayed_work_timer_fn中。此处不再贴出相关源码。
8.2.3 销毁工作队列
当驱动程序不再需要先前创建的工作队列时,就需要使用destroy_workqueue函数来销毁工作队列,其主要工作是释放在__create_workqueue_key函数中分配的资源以及终结worker_thread线程。
-
<kernel/workqueue.c>
-
void destroy_workqueue(struct workqueue_struct *wq)
-
{
-
const struct cpumask *cpu_map = wq_cpu_map(wq);
-
int cpu;
-
-
cpu_maps_update_begin();
-
spin_lock(&workqueue_lock);
-
list_del(&wq->list);//将工作队列从链表中删除
-
spin_unlock(&workqueue_lock);
-
-
for_each_cpu(cpu, cpu_map)//遍历每个CPU
-
cleanup_workqueue_thread(per_cpu_ptr(wq->cpu_wq, cpu));//终止工作者线程
-
cpu_maps_update_done();
-
-
free_percpu(wq->cpu_wq);//释放资源
-
kfree(wq);
-
}
-
static void cleanup_workqueue_thread(struct cpu_workqueue_struct *cwq)
-
{
-
if (cwq->thread == NULL)
-
return;
-
-
lock_map_acquire(&cwq->wq->lockdep_map);
-
lock_map_release(&cwq->wq->lockdep_map);
-
/*由于在销毁工作队列时,工作者线程worker_thread可能还有一些工作节点没有处理,因此需要调用flush_cpu_workqueue 函数来确保在worker_thread终止前所有的工作节点都被处理完毕。*/
-
flush_cpu_workqueue(cwq);
-
-
trace_workqueue_destruction(cwq->thread);
-
kthread_stop(cwq->thread); //结束worker_thread,即kthread->should_stop = 1
-
cwq->thread = NULL;
-
}
-
static int flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
-
{
-
int active = 0;
-
struct wq_barrier barr;
-
-
WARN_ON(cwq->thread == current);
-
-
spin_lock_irq(&cwq->lock);
-
if (!list_empty(&cwq->worklist) || cwq->current_work != NULL) {
-
insert_wq_barrier(cwq, &barr, &cwq->worklist);//如果工作队列有未处理完的工作节点,则插入一个“结束节点”,该节点的处理函数将调用complete函数通知flush_cpu_workqueue——这是最后一个节点了,可以终止工作者线程了。
-
active = 1;
-
}
-
spin_unlock_irq(&cwq->lock);
-
-
if (active)//当工作队列有未处理完的工作节点
-
wait_for_completion(&barr.done);//在此处睡眠,等待“结束节点”的处理函数发来通知再继续运行。
-
-
return active;
-
}
参考资料:
[1] Andrew N.Sloss, Dominic Symes, Chris Wright. ARM嵌入式系统开发——软件设计与优化. 沈建华, 译.
[2] Daniel P.Bovet, Marco Cesati. 深入理解Linux内核(第三版). 陈莉君, 张琼声, 张宏伟, 译.
[3] Pobert Love. Linux内核设计与实现. 陈莉君, 康华, 张波, 译.
[4] Wolfgang Mauerer. 深入Linux内核架构. 郭旭, 译.
[5] 陈学松. 深入Linux设备驱动程序内核机制.
[6] ARM Architecture Reference Manual
[7] The ARM-THUMB Procedure Call Standard
阅读(3619) | 评论(0) | 转发(4) |