Chinaunix首页 | 论坛 | 博客
  • 博客访问: 162791
  • 博文数量: 84
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 1
  • 用 户 组: 普通用户
  • 注册时间: 2014-03-09 10:55
文章分类
文章存档

2014年(84)

我的朋友

分类: LINUX

2014-05-15 15:33:33

摘要:本文主要讲述linux如何处理ARM cortex A9多核处理器的中断延迟处理部分。主要包括工作队列。

 

法律声明LINUX3.0内核源代码分析》系列文章由谢宝友()发表于http://xiebaoyou.blog.chinaunix.net,文章中的LINUX3.0源代码遵循GPL协议。除此以外,文档中的其他内容由作者保留所有版权。谢绝转载。

本连载文章并不是为了形成一本适合出版的书籍,而是为了向有一定内核基本的读者提供一些linux3.0源码分析。因此,请读者结合《深入理解LINUX内核》第三版阅读本连载。

 

1.1 工作队列 1.1.1      概述

在中断、软中断上下文执行的代码都是不允许睡眠的,但是有些代码确实需要阻塞运行。比如需要申请信号号、申请资源、进行IO操作等。这些代码不能放到中断上下文执行,为此,驱动可以在中断处理代码中注册一个工作队列事务,这样的事务可以由内核线程延迟执行。工作队列就是用于执行这些延迟任务。

与老版本内核相比,工作队列变化较大,主要的变化有:

1、             并不会为每个工作队列创建线程。这是为了适应1024个以上的CPU,在这样的大型系统中,如果每个工作队列都创建一个线程的话,很容易就达到32K个线程。这会迅速将系统可用线程ID用完。在LINUX3.0中,每个核可以有一个gcwq线程,它处理所有工作队列中的任务。只有紧急工作队列才会单独创建一个线程,用于处理内存回收等紧急任务。

2、             工作队列被分类,如CPU密集型任务、高优先级任务、绑定任务、非绑定任务。当一个任务是绑定的高优先级任务时,它可以将优于其他任务被调度。这在老版本内核中是没有实现的。

 

系统在初始化阶段默认创建了五个工作队列:eventsevents_longevents_nrtevents_unboundevents_freezable

events:普通工作队列。大部分延迟执行的函数都在这个工作队列中运行。要求任务执行时间尽量短,避免相互影响。

events_long:需要长时间运行的任务可以放到本工作队列中。

events_nrt:处于该队列中的任务不会在多个CPU上并发执行。即其任务是不可重入的。

events_unbound:本工作队列不会绑定到特定CPU上运行。也没有并发管理。只要其中的任务数量没有达到限制,其任务就可以立即运行。

events_freezable:与event类似。用于系统挂起时。

 

1.1.2      初始化

创建工作队列的函数是alloc_workqueue

/**

 * 创建一个工作队列

 *             name:                工作队列的名称,如果工作队列用于内存回收,则会创建一个单独的线程,此参数也用于线程名称。

 *              flags:                  max_active一起控制如何为工作队列分配资源、调度、执行。标志值如WQ_NON_REENTRANT

 *              max_active:     用于确定每CPU上的执行上下文中,可以分配多少工作任务。例如,当其值为16时,表示该工作队列中最多有16个任务可以在同一个CPU上运行。

 *                                          当前,对绑定的工作队列来说,最大值是512,当指定为0时,表示使用默认值256.

 *                                          对非绑定队列来说,最大值是5124*num_possible_cpus().

 *              key:           用于lockdep

 *              lock_name:      用于lockdep

 */

struct workqueue_struct *__alloc_workqueue_key(const char *name,

                                                      unsigned int flags,

                                                      int max_active,

                                                      struct lock_class_key *key,

                                                      const char *lock_name)

{

         struct workqueue_struct *wq;

         unsigned int cpu;

 

         /*

          * Workqueues which may be used during memory reclaim should

          * have a rescuer to guarantee forward progress.

          */

         if (flags & WQ_MEM_RECLAIM)/* 此工作队列可能用于内存回收,这些进程应该有较高的优先权 */

                   flags |= WQ_RESCUER;/* WQ_RESCUER是内部标志,表示该工作队列用于内存回收等关键任务 */

 

         /*

          * Unbound workqueues aren't concurrency managed and should be

          * dispatched to workers immediately.

          */

         if (flags & WQ_UNBOUND)/* 没有进行CPU绑定的工作队列,可以设置为高优先级,因为它不受并发管理,应当立即被工作线程调度。 */

                   flags |= WQ_HIGHPRI;

 

         /**

          * 如果传入的max_active0,表示使用默认的值,目前是256。表示某个CPU上最多同时有256个任务被运行。

          */

         max_active = max_active ?: WQ_DFL_ACTIVE;

         /**

          * 根据队列属性计算工作任务限制数。对绑定和非绑定队列来说,有不同的计算方法。

          */

         max_active = wq_clamp_max_active(max_active, flags, name);

 

         /**

          * 分配工作队列数据结构并初始化为0.

          */

         wq = kzalloc(sizeof(*wq), GFP_KERNEL);

         if (!wq)/* 分配内存失败,错误 */

                   goto err;

 

         /**

          * 初始化数据结构中的字段。

          */

         wq->flags = flags;

         wq->saved_max_active = max_active;

         mutex_init(&wq->flush_mutex);

         atomic_set(&wq->nr_cwqs_to_flush, 0);

         INIT_LIST_HEAD(&wq->flusher_queue);

         INIT_LIST_HEAD(&wq->flusher_overflow);

 

         wq->name = name;

         lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);

         INIT_LIST_HEAD(&wq->list);

 

         /**

          * 为工作队列分配每CPU结构。

          */

         if (alloc_cwqs(wq) < 0)

                   goto err;

 

         /**

          * 初始化每CPU结构中的字段。

          * 注意:对非绑定工作队列来说,没有每CPU数据结构,请注意for_each_cwq_cpu的实现方法

          */

         for_each_cwq_cpu(cpu, wq) {

                   struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);

                   struct global_cwq *gcwq = get_gcwq(cpu);

 

                   BUG_ON((unsigned long)cwq & WORK_STRUCT_FLAG_MASK);

                   cwq->gcwq = gcwq;

                   cwq->wq = wq;

                   cwq->flush_color = -1;

                   cwq->max_active = max_active;

                   INIT_LIST_HEAD(&cwq->delayed_works);

         }

 

         if (flags & WQ_RESCUER) {/* 对内存回收等关键任务,需要特殊处理,特别是创建单独的工作线程 */

                   struct worker *rescuer;

 

                   /**

                    * mayday_mask是正在等待关键工作任务的CPU掩码,为紧急工作队列分配该结构。

                    */

                   if (!alloc_mayday_mask(&wq->mayday_mask, GFP_KERNEL))

                            goto err;

 

                   /**

                    * rescuer用于保存工作线程,先分配该结构。alloc_worker会初始化其字段

                    */

                   wq->rescuer = rescuer = alloc_worker();

                   if (!rescuer)/* 分配内存失败 */

                            goto err;

 

                   /**

                    * 为紧急工作队列创建单独的工作线程。

                    */

                   rescuer->task = kthread_create(rescuer_thread, wq, "%s", name);

                   if (IS_ERR(rescuer->task))/* 创建线程失败 */

                            goto err;

 

                   /**

                    * PF_THREAD_BOUND标志表示该线程绑定到特定CPU.

                    */

                   rescuer->task->flags |= PF_THREAD_BOUND;

                   wake_up_process(rescuer->task);/* 唤醒工作线程,该线程先处理一些初始化工作如设置优先级,再遍历工作队列上的任务 */

         }

 

         /*

          * workqueue_lock protects global freeze state and workqueues

          * list.  Grab it, set max_active accordingly and add the new

          * workqueue to workqueues list.

          */

         spin_lock(&workqueue_lock);/* workqueue_lock用于保护freeze状态和全局工作列队链表 */

 

         if (workqueue_freezing && wq->flags & WQ_FREEZABLE)/* freeze类型的工作队列 */

                   for_each_cwq_cpu(cpu, wq)/* 这种类型的工作队列,其每CPU队列上的任务限制数为0 */

                            get_cwq(cpu, wq)->max_active = 0;

 

         /**

          * 将工作队列加到全局链表中。

          */

         list_add(&wq->list, &workqueues);

 

         spin_unlock(&workqueue_lock);

 

         return wq;

err:

         if (wq) {/* 创建队列失败,释放已经申请的数据结构。 */

                   free_cwqs(wq);

                   free_mayday_mask(wq->mayday_mask);

                   kfree(wq->rescuer);

                   kfree(wq);

         }

         return NULL;

}

 

其中创建标志如下:

enum {

         /**

          * 默认的,一个工作队列保证在同一个CPU上不会重入。

          * 一个工作任务在同一个CPU上不会被多个工作线程并发执行,但是允许在多个CPU上并发执行。

          * 本标志确保在所有CPU上工作任务都不会重入。

          * 在一个特定时刻,在不可重入队列中的工作任务保证最多在系统内的一个工作线程中运行。

          */

         WQ_NON_REENTRANT   = 1 << 0, /* guarantee non-reentrance */

         /**

          * unbound队列中的任务被特定的gcwq线程运行,这些线程没有被绑定到特定CPU

          * 这使得工作队列的表现得就象是一个简单的没有并发管理的执行上下文。

          * 只要有可能,gcwq就试图执行一个工作任务。以下情况下,这种工作队列是有用的:

          *              当使用绑定的工作队列会创建大量线程时,使用unbound队列可以少创建工作线程,这会节省系统资源。

          *              CPU负载的系统中,调度器可以更好的管理线程。

          */

         WQ_UNBOUND                 = 1 << 1, /* not bound to any cpu */

         /**

          * 一个可冻结工作队列用于系统挂起操作的冻结阶段。其中的工作任务被被全部执行,并且新的任务不被执行。

          */

         WQ_FREEZABLE                = 1 << 2, /* freeze during suspend */

         /**

          * 所有用于内存回收的任务必须设置这个标志。

          * 系统确保工作队列至少被一个执行上下文运行,而不管系统当前内存紧张程度。

          */

         WQ_MEM_RECLAIM                = 1 << 3, /* may be used for memory reclaim */

         /**

          * 高优先级的工作任务被加到目标gswq工作链表的前端。并且不管当前并发级别。

          * 换句话说,处于可运行状态的任务将防止其他任务被运行。对unbond队列来说,这个标志没有意义。

          */

         WQ_HIGHPRI           = 1 << 4, /* high priority */

         /**

          * CPU密集型队列中任务对并发级别是无帮助的。换句话说,可运行的CPU密集型工作任务将不能防止其他任务被运行。

          * 对于在绑定CPU的工作队列上,并且预期会占用较多CPU周期的任务来说,这是有用的。

          * 对非绑定的工作队列来说,这个标志无用。

          */

         WQ_CPU_INTENSIVE       = 1 << 5, /* cpu instensive workqueue */

 

         WQ_DYING               = 1 << 6, /* internal: workqueue is dying */

         WQ_RESCUER          = 1 << 7, /* internal: workqueue has rescuer */

 

         WQ_MAX_ACTIVE            = 512,         /* I like 512, better ideas? */

         WQ_MAX_UNBOUND_PER_CPU    = 4,    /* 4 * #cpus for unbound wq */

         WQ_DFL_ACTIVE              = WQ_MAX_ACTIVE / 2,

};

 

工作队列初始化过程是:

/**

 * 工作队列初始化函数

 */

static int __init init_workqueues(void)

{

         unsigned int cpu;

         int i;

 

         /**

          * CPU热插拨的系统中,注册CPU事件函数。

          * 这样,在CPU上线离线过程中,可以创建需要的工作线程。

          */

         cpu_notifier(workqueue_cpu_callback, CPU_PRI_WORKQUEUE);

 

         /* initialize gcwqs */

         /**

          * 初始化gcwqs,每个CPU上都有一个gcwq,除了以下任务外,它几乎处理了所有工作队列任务:

          *              .用于内存回收的紧急工作队列任务。

          *              .CPU下线过程中的工作任务。

          * for_each_gcwq_cpu遍历处理每CPU上的gcwq

          */

         for_each_gcwq_cpu(cpu) {

                   /* 取得当前CPU上的gcwq对象 */

                   struct global_cwq *gcwq = get_gcwq(cpu);

 

                   /* 初始锁、链表 */

                   spin_lock_init(&gcwq->lock);

                   INIT_LIST_HEAD(&gcwq->worklist);

                   gcwq->cpu = cpu;

                   /* GCWQ_DISASSOCIATED标志表示暂时不执行其上的任务 */

                   gcwq->flags |= GCWQ_DISASSOCIATED;

 

                   INIT_LIST_HEAD(&gcwq->idle_list);

                   /* 初始化该CPU上的忙工作线程哈希表。应该是用于工作任务调度管理 */

                   for (i = 0; i < BUSY_WORKER_HASH_SIZE; i++)

                            INIT_HLIST_HEAD(&gcwq->busy_hash[i]);

 

                   /**

                    * 初始化定时器。

                    */

                   init_timer_deferrable(&gcwq->idle_timer);

                   gcwq->idle_timer.function = idle_worker_timeout;

                   gcwq->idle_timer.data = (unsigned long)gcwq;

 

                   setup_timer(&gcwq->mayday_timer, gcwq_mayday_timeout,

                                (unsigned long)gcwq);

 

                   /* worker_ida用于跟踪本CPU上处于忙状态的工作线程 */

                   ida_init(&gcwq->worker_ida);

 

                   gcwq->trustee_state = TRUSTEE_DONE;

                   init_waitqueue_head(&gcwq->trustee_wait);

         }

 

         /* create the initial worker */

         /**

          * 为已经在线的CPU创建工作线程,并启动这些线程。

          * 请注意:LINUX3.0中,不会为每个工作队列创建线程,而是在每个CPU上创建线程并执行几乎所有任务。

          */

         for_each_online_gcwq_cpu(cpu) {

                   struct global_cwq *gcwq = get_gcwq(cpu);

                   struct worker *worker;

 

                   if (cpu != WORK_CPU_UNBOUND)

                            gcwq->flags &= ~GCWQ_DISASSOCIATED;

                   /* 创建工作线程,参数true表示根据gcwq将线程绑定到指定CPU */

                   worker = create_worker(gcwq, true);

                   BUG_ON(!worker);

                   /* 这里使用锁,是因为工作任务会从gcwq上获取任务,而start_worker会操作gcwq的字段。为避免start_worker函数与工作任务冲突而使用锁 */

                   spin_lock_irq(&gcwq->lock);

                   start_worker(worker);/* 启动工作线程 */

                   spin_unlock_irq(&gcwq->lock);

         }

 

         /**

          * 传递不同的参数,以创建几个默认的工作队列。

          * 除非没有特殊用途,使用这几个工作队列就足够了。

          * 工作队列的主要用户API schedule_work就是针对system_wq进行的。

          * queue_work可以指定将工作任务挂在特定的工作队列上。

          */

         system_wq = alloc_workqueue("events", 0, 0);

         system_long_wq = alloc_workqueue("events_long", 0, 0);

         system_nrt_wq = alloc_workqueue("events_nrt", WQ_NON_REENTRANT, 0);

         system_unbound_wq = alloc_workqueue("events_unbound", WQ_UNBOUND,

                                                   WQ_UNBOUND_MAX_ACTIVE);

         system_freezable_wq = alloc_workqueue("events_freezable",

                                                     WQ_FREEZABLE, 0);

         BUG_ON(!system_wq || !system_long_wq || !system_nrt_wq ||

                !system_unbound_wq || !system_freezable_wq);

         return 0;

}

 

1.1.3      创建工作线程

系统在以下几个点创建工作线程:

1、系统初始化时创建默认工作线程。

2、CPU上线时调用create_worker创建工作线程。

3、CPU下线时调用kthread_create创建托管工作队列线程。

4、在工作线程运行过程中,发现需要新创建线程时,调用create_worker创建工作线程。

5、在创建工作队列时,如果工作队列用于内存回收等紧急任务,则在__alloc_workqueue_key函数中调用kthread_create创建单独的工作队列线程。

 

/**

 * 创建新的工作队列线程

 *              gcwq:                 本函数只为gcwq创建线程,其他工作线程是由kthread_create直接创建的。

 *              bind:                   是否将工作线程绑定到特定CPU.托管线程创建的线程以及新上线的CPU创建的线程是没有绑定的。

 */

static struct worker *create_worker(struct global_cwq *gcwq, bool bind)

{

         bool on_unbound_cpu = gcwq->cpu == WORK_CPU_UNBOUND;

         struct worker *worker = NULL;

         int id = -1;

 

         /**

          * 获得一个可用工作者线程ID,注意,这个ID不是pid

          * 在获取ID前,先获得gcwq的锁。

          */

         spin_lock_irq(&gcwq->lock);

         /**

          * ida_get_new返回0表示成功获得ID。如果返回错误值如EAGAINENOSPC表示需要重试。

          */

         while (ida_get_new(&gcwq->worker_ida, &id)) {

                   spin_unlock_irq(&gcwq->lock);

                   /**

                    * 如果ida_pre_get返回0,表示相应的内存是存在的,ida_get_new返回的错误表示没有ID可用了。

                    * 此时返回失败。

                    */

                   if (!ida_pre_get(&gcwq->worker_ida, GFP_KERNEL))

                            goto fail;

                   /**

                    * ida_pre_get分配了内存资源,可以重新调用ida_get_new分配ID了。

                    * 在调用ida_get_new之前,重新获取锁。

                    */

                   spin_lock_irq(&gcwq->lock);

         }

         spin_unlock_irq(&gcwq->lock);

 

         /**

          * 分配工作线程数据结构并初始化。

          */

         worker = alloc_worker();

         if (!worker)

                   goto fail;

 

         worker->gcwq = gcwq;

         worker->id = id;

 

         /**

          * 调用kthread_create创建线程。

          */

         if (!on_unbound_cpu)

                   worker->task = kthread_create_on_node(worker_thread,

                                                              worker,

                                                              cpu_to_node(gcwq->cpu),

                                                              "kworker/%u:%d", gcwq->cpu, id);

         else

                   worker->task = kthread_create(worker_thread, worker,

                                                     "kworker/u:%d", id);

         if (IS_ERR(worker->task))/* 创建线程失败,可能是资源不足 */

                   goto fail;

 

         /*

          * A rogue worker will become a regular one if CPU comes

          * online later on.  Make sure every worker has

          * PF_THREAD_BOUND set.

          */

         if (bind && !on_unbound_cpu)/* 如果需要,就将线程绑定到特定CPU */

                   kthread_bind(worker->task, gcwq->cpu);

         else {

                   worker->task->flags |= PF_THREAD_BOUND;

                   if (on_unbound_cpu)

                            worker->flags |= WORKER_UNBOUND;

         }

 

         return worker;

fail:

         if (id >= 0) {

                   spin_lock_irq(&gcwq->lock);

                   ida_remove(&gcwq->worker_ida, id);

                   spin_unlock_irq(&gcwq->lock);

         }

         kfree(worker);

         return NULL;

}

 

1.1.4      运行工作任务 1.1.4.1         Worker_thread函数

/**

 * 工作线程函数,绝大部分工作任务都在此函数中被回调

 */

static int worker_thread(void *__worker)

{

         struct worker *worker = __worker;

         struct global_cwq *gcwq = worker->gcwq;

 

         /* tell the scheduler that this is a workqueue worker */

         /* 该标志表示本线程是一个工作队列线程 */

         worker->task->flags |= PF_WQ_WORKER;

/**

 * 每当工作线程被唤醒,都会跳转到此处运行。

 */

woke_up:

         /**

          * 这里要读取工作任务,判断工作队列标志,因此先关中断关使用自旋锁保护。

          */

         spin_lock_irq(&gcwq->lock);

 

         /* DIE can be set only while we're idle, checking here is enough */

         if (worker->flags & WORKER_DIE) {/* 工作者已经被销毁,退出 */

                   spin_unlock_irq(&gcwq->lock);

                   worker->task->flags &= ~PF_WQ_WORKER;/* 取消此标志,说明线程没有工作于工作队列了 */

                   return 0;

         }

 

         /**

          * 开始执行工作任务,调用worker_leave_idle进行统计计数。

          */

         worker_leave_idle(worker);

recheck:

         /* no more worker necessary? */

         if (!need_more_worker(gcwq))/* 没有更多的任务需要处理,睡眠 */

                   goto sleep;

 

         /* do we need to manage? */

         /**

          * 没有空闲线程,并且创建了新线程,则继续检查是否需要创建新线程。

          */

         if (unlikely(!may_start_working(gcwq)) && manage_workers(worker))

                   goto recheck;

 

         /*

          * ->scheduled list can only be filled while a worker is

          * preparing to process a work or actually processing it.

          * Make sure nobody diddled with it while I was sleeping.

          */

         /* 工作线程当前处理任务应当为空 */

         BUG_ON(!list_empty(&worker->scheduled));

 

         /*

          * When control reaches this point, we're guaranteed to have

          * at least one idle worker or that someone else has already

          * assumed the manager role.

          */

         /* 运行到这里,说明至少有一个空闲工作线程或者其他线程正在担当管理者角色,现在开始执行任务了 */

         worker_clr_flags(worker, WORKER_PREP);

 

         do {/* 处理gcwq上的所有任务 */

                   /* 取工作链表中的第一个任务 */

                   struct work_struct *work =

                            list_first_entry(&gcwq->worklist,

                                                struct work_struct, entry);

 

                   if (likely(!(*work_data_bits(work) & WORK_STRUCT_LINKED))) {/* 没有下一个任务 */

                            /* optimization path, not strictly necessary */

                            /**

                             * 这里处理任务,调用任务的回调函数。process_one_work会释放gcwq的锁并开中断。

                             */

                            process_one_work(worker, work);/* 优化处理,不是必须的,可以将两个分支合并。 */

                            if (unlikely(!list_empty(&worker->scheduled)))/* 还有任务需要继续处理 */

                                     process_scheduled_works(worker);

                   } else {

                            /**

                             * gcwq链表中的任务移到工作线程链表中

                             */

                            move_linked_works(work, &worker->scheduled, NULL);

                            /**

                             * 遍历工作线程任务链表,并回调每一个任务的回调函数

                             */

                            process_scheduled_works(worker);

                   }

         } while (keep_working(gcwq));/* 如果工作队列上还有任务并且本线程是最后一个执行的线程,或者还有挂起的高优先级任务,则继续处理 */

 

         /**

          * 任务开始睡眠了,清除WORKER_PREP标志

          */

         worker_set_flags(worker, WORKER_PREP, false);

sleep:

         /**

          * 再次检查是否有更多的任务需要处理,并且需要创建新线程。

          */

         if (unlikely(need_to_manage_workers(gcwq)) && manage_workers(worker))

                   goto recheck;

 

         /*

          * gcwq->lock is held and there's no work to process and no

          * need to manage, sleep.  Workers are woken up only while

          * holding gcwq->lock or from local cpu, so setting the

          * current state before releasing gcwq->lock is enough to

          * prevent losing any event.

          */

         /**

          * 进入睡眠状态前,更新计数,并唤醒等待任务。

          */

         worker_enter_idle(worker);

         /**

          * 这里处于关中断状态,设置状态为TASK_INTERRUPTIBLE是安全的。

          * 如果在开关中断以后再设置状态,那么在中断里面唤醒任务的事件就会丢失。

          */

         __set_current_state(TASK_INTERRUPTIBLE);

         spin_unlock_irq(&gcwq->lock);

         /**

          * 这里调度出去,等待其他任务将工作线程唤醒后,执下一轮循环。

          */

         schedule();

         goto woke_up;

}

 

1.1.4.2         Rescuer_thread函数

/**

 * 紧急工作队列线程函数

 *              wq:            紧急工作队列,该线程将处理这个队列上的任务。

 */

static int rescuer_thread(void *__wq)

{

         struct workqueue_struct *wq = __wq;

         struct worker *rescuer = wq->rescuer;

         struct list_head *scheduled = &rescuer->scheduled;

         bool is_unbound = wq->flags & WQ_UNBOUND;

         unsigned int cpu;

 

         /**

          * 由于本线程处理紧急事务,因此优先级设置为非实时任务的最高优先级。

          */

         set_user_nice(current, RESCUER_NICE_LEVEL);

repeat:

         set_current_state(TASK_INTERRUPTIBLE);

 

         if (kthread_should_stop())/* 线程被终止了,退出 */

                   return 0;

 

         /*

          * See whether any cpu is asking for help.  Unbounded

          * workqueues use cpu 0 in mayday_mask for CPU_UNBOUND.

          */

         for_each_mayday_cpu(cpu, wq->mayday_mask) {/* 遍历所有需要运行紧急任务的CPU,这些CPU在运行队列上放置了任务 */

                   unsigned int tcpu = is_unbound ? WORK_CPU_UNBOUND : cpu;

                   /**

                    * 获取特定CPU上的工作队列,如果是非绑定工作队列,则从WORK_CPU_UNBOUND队列中获取任务。

                    */

                   struct cpu_workqueue_struct *cwq = get_cwq(tcpu, wq);

                   struct global_cwq *gcwq = cwq->gcwq;

                   struct work_struct *work, *n;

 

                   /* 开始处理工作任务了,将任务状态设置为TASK_RUNNING */

                   __set_current_state(TASK_RUNNING);

                   /* 清除CPU掩码,因为我们马上就要处理这个CPU上的任务。 */

                   mayday_clear_cpu(cpu, wq->mayday_mask);

 

                   /* migrate to the target cpu if possible */

                   rescuer->gcwq = gcwq;

                   /**

                    * 将线程绑定到指定的CPU上。

                    */

                   worker_maybe_bind_and_lock(rescuer);

 

                   /*

                    * Slurp in all works issued via this workqueue and

                    * process'em.

                    */

                   BUG_ON(!list_empty(&rescuer->scheduled));/* 工作线程当前处理任务应当为空 */

                   list_for_each_entry_safe(work, n, &gcwq->worklist, entry)

                            if (get_work_cwq(work) == cwq)/* 将工作队列中,属于当前CPU(线程正在处理的CPU)上的任务移到线程任务链表中 */

                                     move_linked_works(work, scheduled, &n);

 

                   /**

                    * 处理本工作线程scheduled队列上的所有任务。

                    */

                   process_scheduled_works(rescuer);

 

                   /*

                    * Leave this gcwq.  If keep_working() is %true, notify a

                    * regular worker; otherwise, we end up with 0 concurrency

                    * and stalling the execution.

                    */

                   if (keep_working(gcwq))/* 如果有高优先级任务,或者队列中有更多任务需要处理,则唤醒第一个空闲任务 */

                            wake_up_worker(gcwq);

 

                   spin_unlock_irq(&gcwq->lock);

         }

 

         /**

          * 如果进行了一轮任务处理,那么此时任务状态为运行状态,只要没有高优先级任务就调度不出去。

          * 这样会接着处理下一轮任务。

          * 如果所有任务都被处理完了,则没有进入前面的循环,此时任务状态为TASK_INTERRUPTIBLE,接下来就会真的调度出去了。

          */

         schedule();

         goto repeat;

}

 

1.1.4.3         trustee_thread函数

这个函数在CPU离线的时候,进行一些工作队列的收尾工作。这里不详述。

 

1.1.5      用户API

用户除了可以使用create_workqueue(实际上是调用alloc_workqueue)创建工作队列外,还可以使用以下API调度工作任务:

函数名称

用途

schedule_work

在当前CPU上开始一个工作任务

schedule_work_on

在特定CPU上开始一个工作任务

schedule_delayed_work

启用一个定时器,以便在随后某个时刻在当前CPU上开始一个工作任务

schedule_delayed_work_on

启用一个定时器,以便在随后某个时刻在特定CPU上开始一个工作任务

queue_work

在当前CPU的特定工作队列上开始一个工作任务

queue_work_on

在特定CPU的特定工作队列上开始一个工作任务

queue_delayed_work

启用一个定时器,以便在随后某个时刻在当前CPU的特定工作队列上开始一个工作任务

queue_delayed_work_on

启用一个定时器,以便在随后某个时刻在特定CPU的特定工作队列上开始一个工作任务

cancel_work_sync

终止一个工作任务

flush_work

等待任务完成

flush_work_sync

等待任务完成

flush_workqueue

等待工作队列中的任务完成

 

调度工作任务的函数最终会调用__queue_work函数:

/**

 * 将工作任务挂到工作队列链表中。

 */

static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,

                             struct work_struct *work)

{

         struct global_cwq *gcwq;

         struct cpu_workqueue_struct *cwq;

         struct list_head *worklist;

         unsigned int work_flags;

         unsigned long flags;

 

         debug_work_activate(work);

 

         /* if dying, only works from the same workqueue are allowed */

         if (unlikely(wq->flags & WQ_DYING) &&/* 工作队列正在被销毁,不能再调度任务 */

             WARN_ON_ONCE(!is_chained_work(wq)))

                   return;

 

         /* determine gcwq to use */

         if (!(wq->flags & WQ_UNBOUND)) {/* 工作队列是绑定到CPU */

                   struct global_cwq *last_gcwq;

 

                   if (unlikely(cpu == WORK_CPU_UNBOUND))/* 如果指定的CPU是无效的,则将其放到当前CPU的工作队列链表中 */

                            cpu = raw_smp_processor_id();

 

                   /*

                    * It's multi cpu.  If @wq is non-reentrant and @work

                    * was previously on a different cpu, it might still

                    * be running there, in which case the work needs to

                    * be queued on that cpu to guarantee non-reentrance.

                    */

                  gcwq = get_gcwq(cpu);

                   if (wq->flags & WQ_NON_REENTRANT &&/* 队列是不可重入的,即有并发管理 */

                       (last_gcwq = get_work_gcwq(work)) && last_gcwq != gcwq) {/* 任务所在的gcwq不在本CPU */

                            struct worker *worker;

 

                            spin_lock_irqsave(&last_gcwq->lock, flags);

 

                            /**

                             * 找到任务所在的工作线程

                             */

                            worker = find_worker_executing_work(last_gcwq, work);

 

                            if (worker && worker->current_cwq->wq == wq)/* 工作线程正在执行要操作的工作队列 */

                                     gcwq = last_gcwq;/* 需要将工作任务放到正在操作的工作队列上,按序执行避免重入。 */

                            else {

                                     /* meh... not running there, queue here */

                                     /**

                                      * 当前工作队列没有运行。释放gcwq的锁。

                                      */

                                     spin_unlock_irqrestore(&last_gcwq->lock, flags);

                                     /**

                                      * 我觉得这里有一个BUG,此时last_gcwq锁已经释放,那么工作可能被某个工作线程运行。

                                      * 而随后我们工作放到另一个gcwq上,那么就可能造成工作任务重入

                                      * 后续版本是否会有所改动,可关注。

                                      */

                                     spin_lock_irqsave(&gcwq->lock, flags);

                            }

                   } else

                            spin_lock_irqsave(&gcwq->lock, flags);

         } else {/* 工作队列是没有绑定的,也就没有并发管理,直接放到WORK_CPU_UNBOUND队列中即可 */

                   gcwq = get_gcwq(WORK_CPU_UNBOUND);

                   spin_lock_irqsave(&gcwq->lock, flags);

         }

 

         /* gcwq determined, get cwq and queue */

         /**

          * 放到gcwq的指定CPU队列上去。

          */

         cwq = get_cwq(gcwq->cpu, wq);

         trace_workqueue_queue_work(cpu, cwq, work);

 

         BUG_ON(!list_empty(&work->entry));

 

         cwq->nr_in_flight[cwq->work_color]++;

         work_flags = work_color_to_flags(cwq->work_color);

 

         if (likely(cwq->nr_active < cwq->max_active)) {/* 工作队列上的任务还没有超过限制 */

                   trace_workqueue_activate_work(work);

                   cwq->nr_active++;/* 活动任务计数加1 */

                   worklist = gcwq_determine_ins_pos(gcwq, cwq);/* 在工作队列中为任务找到一个适合的位置。优先级高的任务将位于前面 */

         } else {

                   work_flags |= WORK_STRUCT_DELAYED;/* 超过限制了,设置任务的延迟标记并将其加到延迟队列中 */

                   worklist = &cwq->delayed_works;

         }

 

         /**

          * 将任务链接到链表中。如果有必要,唤醒工作线程。

          */

         insert_work(cwq, work, worklist, work_flags);

 

         spin_unlock_irqrestore(&gcwq->lock, flags);

}

阅读(612) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~