系统未建立
分类: LINUX
2014-01-09 07:55:13
原文地址:工作队列 作者:cainiaopei
首先来了解一下一个重要的数据结构:并发管理工作队列的后端 gcwq
struct global_cwq {
spinlock_t lock; /* the gcwq lock */
struct list_head worklist; /* L: list of pending works 需要处理的work_struct都挂载在这个链表上
unsigned int cpu; /* I: the associated cpu 与gcwq相关联的cpu
unsigned int flags; /* L: GCWQ_* flags */
int nr_workers; /* L: total number of workers 总的工作者数量
int nr_idle; /* L: currently idle ones 当前空闲的工作者数量
/* workers are chained either in the idle_list or busy_hash */
struct list_head idle_list; /* X: list of idle workers 空闲的工作者链表
struct hlist_head busy_hash[BUSY_WORKER_HASH_SIZE];正在执行的工作者线程的哈希表
/* L: hash of busy workers */
struct timer_list idle_timer; /* L: worker idle timeout */
struct timer_list mayday_timer; /* L: SOS timer for dworkers */
struct ida worker_ida; /* L: for worker IDs */
struct task_struct *trustee; /* L: for gcwq shutdown */
unsigned int trustee_state; /* L: trustee state */
wait_queue_head_t trustee_wait; /* trustee wait */
struct worker *first_idle; /* L: first idle worker */
} ____cacheline_aligned_in_smp;
这个结构每个CPU都有一个,通常我们可以根据CPU在系统中的编号来获取到这个结构,并且 gcwq会将与它关联的CPU编号保存在其成员cpu当中。Gcwqs几乎会处理所有的工作队列的任务。
在系统启动过程中,会对gcwqs做一个初始化工作,在workqueue.c的init_workqueues(void)中:
{
unsigned int cpu;
int i;
cpu_notifier(workqueue_cpu_up_callback, CPU_PRI_WORKQUEUE_UP);
cpu_notifier(workqueue_cpu_down_callback, CPU_PRI_WORKQUEUE_DOWN);/*主要用于支持热插拔CPU系统,注册CPU热插拔事件,这样可以在CPU上线离线的时候创建或者销毁工作线程
/* initialize gcwqs */
for_each_gcwq_cpu(cpu) {//遍历每个CPU,初始化每个CPU的gcwq
struct global_cwq *gcwq = get_gcwq(cpu);根据CPU编号获取它的gcwq结构
spin_lock_init(&gcwq->lock);
INIT_LIST_HEAD(&gcwq->worklist);//初始化挂载未处理工作项的链表
gcwq->cpu = cpu;//保存CPU编号
gcwq->flags |= GCWQ_DISASSOCIATED;//设置当前gcwq暂时不可用
INIT_LIST_HEAD(&gcwq->idle_list);初始化空闲工作者线程链表
for (i = 0; i < BUSY_WORKER_HASH_SIZE; i++)
INIT_HLIST_HEAD(&gcwq->busy_hash[i]);
/*初始化定时器*/
init_timer_deferrable(&gcwq->idle_timer);
gcwq->idle_timer.function = idle_worker_timeout;
gcwq->idle_timer.data = (unsigned long)gcwq;
setup_timer(&gcwq->mayday_timer, gcwq_mayday_timeout,
(unsigned long)gcwq);
ida_init(&gcwq->worker_ida);
gcwq->trustee_state = TRUSTEE_DONE;
init_waitqueue_head(&gcwq->trustee_wait);
}
/* create the initial worker 为当前活动的CPU创建工作者线程*/
for_each_online_gcwq_cpu(cpu) {
struct global_cwq *gcwq = get_gcwq(cpu);
struct worker *worker;
/*遍历每个CPU,根据CPU编号获取对应的gcwq,然后为每个CPU创建一个工作者线程,并利用gcwq及参数true将工作者线程与CPU绑定起来*/
if (cpu != WORK_CPU_UNBOUND)
gcwq->flags &= ~GCWQ_DISASSOCIATED;
worker = create_worker(gcwq, true);
BUG_ON(!worker);
spin_lock_irq(&gcwq->lock);
start_worker(worker);/*启动工作者线程,开始处理工作项,因为工作项是挂载在gcwq链表上的,因此会对gcwq的数据进行改动,所以这里需要锁来对gcwq作保护*/
spin_unlock_irq(&gcwq->lock);
}
/*通过传递不同的参数来创建用途不同的工作队列:
events:普通工作队列,大部分的延迟处理函数都在这个工作队列上运行,要求任务执行时间短,避免互相影响。
events_long:需要长时间运行的工作项可在这个工作队列上运行。
events_nrt:该工作队列上的任务是不可重入的,也就是该工作队列上的任务在某处理上执行的时候,它不会在其他处理上再执行这些任务。
events_unbound:该工作队列上的任务不会绑定在特定CPU上,只要某处理器空闲,就可以处理该工作队列上的任务。
events_freezable:类似于events,但工作项都是被挂起的
events_nrt_freezable:类似events_nrt。
system_wq = alloc_workqueue("events", 0, 0);
system_long_wq = alloc_workqueue("events_long", 0, 0);
system_nrt_wq = alloc_workqueue("events_nrt", WQ_NON_REENTRANT, 0);
system_unbound_wq = alloc_workqueue("events_unbound", WQ_UNBOUND,
WQ_UNBOUND_MAX_ACTIVE);
system_freezable_wq = alloc_workqueue("events_freezable",
WQ_FREEZABLE, 0);
system_nrt_freezable_wq = alloc_workqueue("events_nrt_freezable",
WQ_NON_REENTRANT | WQ_FREEZABLE, 0);
BUG_ON(!system_wq || !system_long_wq || !system_nrt_wq ||
!system_unbound_wq || !system_freezable_wq ||
!system_nrt_freezable_wq);
return 0;
}
在上面我们看到了,系统在启动的时候会为每个gcwq创建一个工作者线程管理者worker,这个worker就是用来管理gcwq上挂载的工作项的:
struct worker {
/* on idle list while idle, on busy hash table while busy */
union {
struct list_head entry; /* L: while idle */
struct hlist_node hentry; /* L: while busy */
};
//与工作者线程的状态有关,当工作者线程空闲时,使用entry,但忙碌时,使用哈希节点hentry,它们分别对应gcwq的idle_list和busy_hash[]。
struct work_struct *current_work; /* L: work being processed 当前正在处理的工作项
struct cpu_workqueue_struct *current_cwq; /* L: current_work's cwq worker所对应的cpu_workqueue_struct。
struct list_head scheduled; /* L: scheduled works 被调度的工作项,只有进入该链表,工作项才能被工作队列处理。
struct task_struct *task; /* I: worker task 被内核调度的线程实体,通过它参与到内核调度当中去,
struct global_cwq *gcwq; /* I: the associated gcwq 相关联的gcwq
/* 64 bytes boundary on 64bit, 32 on 32bit */
unsigned long last_active; /* L: last active timestamp 最后的活动时间,用于决定该worker的生命周期
unsigned int flags; /* X: flags */
int id; /* I: worker id */
struct work_struct rebind_work; /* L: rebind worker to cpu */
};
在初始化gcwq过程当中会创建一个worker,而且worker中又有一个成员task_struct,因此我们可以想像的到在创建worker时会发生什么:
static struct worker *create_worker(struct global_cwq *gcwq, bool bind)
{
.........
worker = alloc_worker();//为一个worker分配内存并完成一些初始化工作
if (!worker)
goto fail;
worker->gcwq = gcwq;//绑定gcwq
worker->id = id;
//以下会根据worker的gcwq的cpu标示是否on_unbound来创建worker的线程,使worker能参与到进程调度当中去。
if (!on_unbound_cpu)
worker->task = kthread_create_on_node(worker_thread,
worker,
cpu_to_node(gcwq->cpu),
"kworker/%u:%d", gcwq->cpu, id);
else
worker->task = kthread_create(worker_thread, worker,
"kworker/u:%d", id);
if (IS_ERR(worker->task))
goto fail;
.........
}
为worker创建的工作者线程会在创建好后启动,开始处理工作项。工作者线程的主要工作在worker_thread中完成:
static int worker_thread(void *__worker)
{
struct worker *worker = __worker;
struct global_cwq *gcwq = worker->gcwq;
/* tell the scheduler that this is a workqueue worker */
worker->task->flags |= PF_WQ_WORKER;//告诉调度器这是一个工作者线程
woke_up:
spin_lock_irq(&gcwq->lock);//锁住gcwq,每个cpu只有一个gcwq结构,该CPU的其他线程同样有可能对它进行改变,因此这里需要加锁保护
/* DIE can be set only while we're idle, checking here is enough */
if (worker->flags & WORKER_DIE) {
spin_unlock_irq(&gcwq->lock);
worker->task->flags &= ~PF_WQ_WORKER;
return 0;
}
//首先让工作者线程的管理者从空闲状态退出,因为它现在要work了,这里的主要工作是清除worker的空闲状态标志,减少gcwq的nr_idle数量,并将worker的entry删除并重新初始化。
worker_leave_idle(worker);
recheck:
/* no more worker necessary? */
if (!need_more_worker(gcwq))//这里会对gcwq进行检查:gcwq上的worklist上是否挂载有未处理的工作项,如果没有,说明当前工作者线程无事可做,睡眠。检查当前CPU的gcwq的worker如果有更高优先级的工作要处理,且系统的全局队列中已经没有空闲的worker了,那么这时应该需要一个新的worker。
goto sleep;
if (unlikely(!may_start_working(gcwq)) && manage_workers(worker))
goto recheck;
BUG_ON(!list_empty(&worker->scheduled));
worker_clr_flags(worker, WORKER_PREP);
//这里做一些处理工作项work_struct的准备工作如查看gcwq的空闲工作者数量,提示worker上不能有准备调度的工作项,清除和设置worker的一些标志。
do {
struct work_struct *work =
list_first_entry(&gcwq->worklist,
struct work_struct, entry);//从gcwq的worklist链表上获取未处理的工作项work_struct
if (likely(!(*work_data_bits(work) & WORK_STRUCT_LINKED))) {
/* optimization path, not strictly necessary */
process_one_work(worker, work);
if (unlikely(!list_empty(&worker->scheduled)))
process_scheduled_works(worker);
} else {
move_linked_works(work, &worker->scheduled, NULL);
process_scheduled_works(worker);
}
} while (keep_working(gcwq));
在这个do while()循环里处理挂载在gcwq上的未处理的工作项。一般情况下,会在do while()循环里把gcwq的worklist的所有未处理完的工作项都处理完后终止。在循环中,它通过process_one_work(worker, work)逐个的将工作项处理掉,process_scheduled_works(worker)内部同样是调用process_one_work(worker, work):
static void process_one_work(struct worker *worker, struct work_struct *work)
__releases(&gcwq->lock)
__acquires(&gcwq->lock)
{
........
work_func_t f = work->func;
........
trace_workqueue_execute_start(work);
f(work);
trace_workqueue_execute_end(work);
.........
}
因此,process_one_work()的最终目的是执行work_struct的func函数,也就是每个工作项需要完成的工作。
工作队列的使用:
工作项:
当然首先我们先来了解一下工作项:
struct work_struct {
atomic_long_t data;//work_struct 的参数
struct list_head entry;//work_struct 的连接链表
work_func_t func;//工作项处理函数
#ifdef CONFIG_LOCKDEP
struct lockdep_map lockdep_map;
#endif
};
有两种方法来创建工作项:
静态:DECLARE_WORK(struct work_struct *work,void(*func) (void *));
动态:INIT_WORK(struct work_struct *work,void(*func)(void *));
无论是静态还是动态的方法,它们的主要任务就是初始化好一个work_struct结构:
初始化work_struct的data和entry值,并且将func指向一个可执行函数:
do { \
__init_work((_work), _onstack); \
(_work)->data = (atomic_long_t) WORK_DATA_INIT(); \
INIT_LIST_HEAD(&(_work)->entry); \
PREPARE_WORK((_work), (_func)); \
} while (0);
当一个工作项被初始化好之后,表明它可以进入工作状态了,这时候我们就可以调度它了,如果我们把它交给系统默认线程去执行:
Schedule_work(work);
{
return queue_work(system_wq, work);
}
从代码可知,当调度执行工作项的时候,其实是去queue_work(system_wq,work),而从前面的初始化gcwqs过程中可知,system_wq是在系统启动过程中创建的一个普通的工作队列,也就是说,我们这里初始化的工作项会交给
统启动过程中的普通工作队列的工作者线程去处理。
当然,但我们有足够的理由需要去创建一个新的工作队列的时候我们可以:
Create_workqueue(const char *name);
创建一个名字为name的工作队列;
#define create_workqueue(name) \
alloc_workqueue((name), WQ_MEM_RECLAIM, 1) //在初始化gcwqs时使用同样的方式创建了几个默认的系统工作队列。
创建工作队列的主要工作在struct workqueue_struct *__alloc_workqueue_key(const char *fmt,
unsigned int flags,
int max_active,
struct lock_class_key *key,
const char *lock_name, ...)
{
.........
wq = kzalloc(sizeof(*wq) + namelen, GFP_KERNEL);
if (!wq)
goto err;
//为一个workqueue_struct分配内存。
vsnprintf(wq->name, namelen, fmt, args1);//初始化wq的name
/* init wq */
wq->flags = flags;
wq->saved_max_active = max_active;
mutex_init(&wq->flush_mutex);
atomic_set(&wq->nr_cwqs_to_flush, 0);
INIT_LIST_HEAD(&wq->flusher_queue);
INIT_LIST_HEAD(&wq->flusher_overflow);
lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
INIT_LIST_HEAD(&wq->list);//初始化wq的主要成员
......
for_each_cwq_cpu(cpu, wq) {
struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
struct global_cwq *gcwq = get_gcwq(cpu);
BUG_ON((unsigned long)cwq & WORK_STRUCT_FLAG_MASK);
cwq->gcwq = gcwq;
cwq->wq = wq;
cwq->flush_color = -1;
cwq->max_active = max_active;
INIT_LIST_HEAD(&cwq->delayed_works);
}
//用per-cpu workqueue将wq和gcwq关联起来。Cpu_workqueque_struct中保存了与CPU关联的gcwq,在这里,又关联了wq。
......
list_add(&wq->list, &workqueues);//最后把wq放到一个workqueues的链表上。
.........
Return wq;
}
它会返回一个初始化好的工作队列wq。
在整个创建workqueue的流程当中我们可以看到,它并没有为新的workqueue去创建一个工作者线程,而是将wq与cpu_workqueue_struct关联起来,在这个cpu_workqueue_struct结构中还关联了gcwq,然后把workqueue放到workqueues链表上去。
创建好了我们自己的工作队列之后,我们可以用自己的工作队列去执行工作项:
Queue_work(struct workqueue_struct *wq,struct wori_struct *work);
与上面的使用系统默认的工作队列调度工作项比较,就是使用了自己创建的而不是系统开机启动时创建的工作队列。
现在我们来看看系统到底是怎么去执行工作队列的吧:
Queue_work()->queue_work_on()->__queue_work()
{
struct global_cwq *gcwq;
struct cpu_workqueue_struct *cwq;
struct list_head *worklist;
unsigned int work_flags;
unsigned long flags;
debug_work_activate(work);
/* if dying, only works from the same workqueue are allowed */
if (unlikely(wq->flags & WQ_DRAINING) &&
WARN_ON_ONCE(!is_chained_work(wq)))
return;
/* determine gcwq to use */
if (!(wq->flags & WQ_UNBOUND)) {
struct global_cwq *last_gcwq;
if (unlikely(cpu == WORK_CPU_UNBOUND))
cpu = raw_smp_processor_id();
/*
* It's multi cpu. If @wq is non-reentrant and @work
* was previously on a different cpu, it might still
* be running there, in which case the work needs to
* be queued on that cpu to guarantee non-reentrance.
*/
gcwq = get_gcwq(cpu);
if (wq->flags & WQ_NON_REENTRANT &&
(last_gcwq = get_work_gcwq(work)) && last_gcwq != gcwq) {
struct worker *worker;
spin_lock_irqsave(&last_gcwq->lock, flags);
worker = find_worker_executing_work(last_gcwq, work);
if (worker && worker->current_cwq->wq == wq)
gcwq = last_gcwq;
else {
/* meh... not running there, queue here */
spin_unlock_irqrestore(&last_gcwq->lock, flags);
spin_lock_irqsave(&gcwq->lock, flags);
}
} else
spin_lock_irqsave(&gcwq->lock, flags);
} else {
gcwq = get_gcwq(WORK_CPU_UNBOUND);//
spin_lock_irqsave(&gcwq->lock, flags);
}
//前面这部分代码的主要功能是根据wq的标志位来获取合适的gcwq。
/* gcwq determined, get cwq and queue */
cwq = get_cwq(gcwq->cpu, wq);
trace_workqueue_queue_work(cpu, cwq, work);
if (WARN_ON(!list_empty(&work->entry))) {
spin_unlock_irqrestore(&gcwq->lock, flags);
return;
}
cwq->nr_in_flight[cwq->work_color]++;
work_flags = work_color_to_flags(cwq->work_color);
if (likely(cwq->nr_active < cwq->max_active)) {
trace_workqueue_activate_work(work);
cwq->nr_active++;
worklist = gcwq_determine_ins_pos(gcwq, cwq);//在gcwq的worklist上(也就是未执行工作项链表)找到一个合适的位置
} else {
work_flags |= WORK_STRUCT_DELAYED;
worklist = &cwq->delayed_works;
}
insert_work(cwq, work, worklist, work_flags);//将工作项插入到对应gcwq的worklist的合适位置上。
spin_unlock_irqrestore(&gcwq->lock, flags);
}
因此,用一句简单的句可以概括queue_work(),就是把工作项work放到与wq关联的gcwq的未执行工作项链表上去。前面有分析,gcwq对应的工作者线程在被调度的时候会把gcwq上的未执行的工作项都执行。