分类: LINUX
2011-09-28 15:04:46
Workqueue的名字就和他的功能一样:需要处理的工作列表和工作的添加删除(貌似没有看到如何删除的)、以及工作的调度执行。
需要处理的工作列表通常都维护在内核对象workqueue_struct里面。系统里面可以有多个workqueue_struct。内核部分的工作添加到了工作队列keventd_wq。而fs/aio.c里面实现了自己的工作队列aio_wq。
workqueue_struct是双向循环链表。里面的单元是work_struct。
驱动接口:
create_workqueue:创建工作队列结构和内核处理线程。
schedule_work/schedule_delayed_work:调度执行一个具体的任务,执行的任务将会被挂入Linux系统提供的workqueue keventd_wq。请注意,调度执行并不等于立刻执行。而是指示worker_thread在下次处理工作队列的时候执行该工作;
queue_work/queue_delayed_work:调度执行一个指定workqueue中的任务。内核本身提供了一个工作队列keventd_wq。但是,系统里面也可以有其他的工作队列。所以就有了schedule_work和queue_work的区分。
1.1 schedule_work从schedule_work的实现看,其和queue_work的实现区别并不大:
int schedule_work(struct work_struct *work)
{
return queue_work(keventd_wq, work);
}
int queue_work(struct workqueue_struct *wq, struct work_struct *work)
{
int ret;
ret = queue_work_on(get_cpu(), wq, work);
put_cpu();
return ret;
}
int
queue_work_on(int cpu, struct workqueue_struct *wq, struct work_struct *work)
{
int ret = 0;
if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work)))
BUG_ON(!list_empty(&work->entry));
__queue_work(wq_per_cpu(wq, cpu), work);
ret = 1;
}
return ret;
}
static void __queue_work(struct cpu_workqueue_struct *cwq,
struct work_struct *work)
{
unsigned long flags;
spin_lock_irqsave(&cwq->lock, flags);
insert_work(cwq, work, &cwq->worklist);
spin_unlock_irqrestore(&cwq->lock, flags);
}
我觉得很郁闷。因为我没有看到工作是如何删除的。
1.2 worker_thread1.2.1 队列的创建就是创建workqueue对象和处理线程
worker_thread是在create_workqueue创建的。
#define create_workqueue(name) __create_workqueue((name), 0, 0, 0)
__create_workqueue的实现也很简单:
#define __create_workqueue(name, singlethread, freezeable, rt) /
({ /
static struct lock_class_key __key; /
const char *__lock_name; /
/
if (__builtin_constant_p(name)) /
__lock_name = (name); /
else /
__lock_name = #name; /
/
__create_workqueue_key((name), (singlethread), /
(freezeable), (rt), &__key, /
__lock_name); /
})
struct workqueue_struct *__create_workqueue_key(const char *name,
int singlethread,
int freezeable,
int rt,
struct lock_class_key *key,
const char *lock_name)
{
struct workqueue_struct *wq;
struct cpu_workqueue_struct *cwq;
…
err = create_workqueue_thread(cwq, cpu);
start_workqueue_thread(cwq, cpu);
…
}
static int create_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
{
struct sched_param param = { .sched_priority = MAX_RT_PRIO-1 };
struct workqueue_struct *wq = cwq->wq;
const char *fmt = is_wq_single_threaded(wq) ? "%s" : "%s/%d";
struct task_struct *p;
p = kthread_create(worker_thread, cwq, fmt, wq->name, cpu);
…
if (IS_ERR(p))
return PTR_ERR(p);
if (cwq->wq->rt)
sched_setscheduler_nocheck(p, SCHED_FIFO, ¶m);
cwq->thread = p;
trace_workqueue_creation(cwq->thread, cpu);
return 0;
}
1.2.2 工作对垒处理线程worker_thread的主循环
static int worker_thread(void *__cwq)
{
struct cpu_workqueue_struct *cwq = __cwq;
DEFINE_WAIT(wait);
if (cwq->wq->freezeable)
set_freezable();
set_user_nice(current, -5);
for (;;) {
prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE);
if (!freezing(current) &&
!kthread_should_stop() &&
list_empty(&cwq->worklist))
schedule();
finish_wait(&cwq->more_work, &wait);
try_to_freeze();
if (kthread_should_stop())
break;
run_workqueue(cwq);
}
return 0;
}
run_workqueue会执行工作的函数指针:
static void run_workqueue(struct cpu_workqueue_struct *cwq)
{
spin_lock_irq(&cwq->lock);
while (!list_empty(&cwq->worklist)) {
struct work_struct *work = list_entry(cwq->worklist.next,
struct work_struct, entry);
work_func_t f = work->func;
…
trace_workqueue_execution(cwq->thread, work);
cwq->current_work = work;
list_del_init(cwq->worklist.next);
spin_unlock_irq(&cwq->lock);
BUG_ON(get_wq_data(work) != cwq);
work_clear_pending(work);
lock_map_acquire(&cwq->wq->lockdep_map);
lock_map_acquire(&lockdep_map);
f(work);
…
}