Chinaunix首页 | 论坛 | 博客
  • 博客访问: 688369
  • 博文数量: 207
  • 博客积分: 1743
  • 博客等级: 上尉
  • 技术积分: 2044
  • 用 户 组: 普通用户
  • 注册时间: 2012-08-20 14:36
文章分类

全部博文(207)

文章存档

2016年(24)

2015年(10)

2014年(50)

2013年(45)

2012年(78)

分类: LINUX

2012-09-11 14:26:28

困惑我几天的问题终于解决了



工作队列实际就是用create_workqueue创建了两个线程(两个,其中一个叫more_work, 另一个叫work_done,我猜就是一个是完成队列,一个是添加队列),然后这两个线程都可以访问最开始建的工作队列,他们俩都参与调度,别的工作线程把东西做好后就queue_work,然后就该干嘛干嘛,这边调度到这俩线程,他俩就把那个work拿出来用事先设置好的callback来处理这个work,一切需要的东西都在work_struct里,如果需要换callback就用PREPARE_WORK宏换,第一次用INIT_WORK初始化,以后用PREPAR_WORK换就行。shedule__delayed_work是里面加了timer的,就是定时多少tick或jiffies后去执行你那个东西。

 

flush_workqueue(wq); 清除当前工作队列中的所有工作

 

 

慎用Linux系统中的default WorkQueue

文章发表于:2009-04-13 11:33

       在设计 Linux 内核程序时,有些小的任务可以直接交给系统的 default-workqueue ,这样的处理简单方便,受到了很多内核开发者的喜爱。但是,在使用 default-workqueue 时需要注意内核死锁问题,一不小心就会导致系统死锁。

       实际上我是知道上述设计原则的,并且在使用 default-workqueue 时也考虑了死锁问题,但是最近的设计还是发生了上述问题,真是防不甚防啊!最近遇到的问题可以描述如下图所示:

       系统中存在两个 work :第一个 work (可以称之为 IO-work )实现数据的 I/O 通路,在 IO 通路中可能会睡眠,当系统资源不够的时候 IO 通路的 work 就会睡眠;另一个 work (可以称之为 timer-work )实现 IO 通路的控制、处理,该 work 不会睡眠。考虑到这两个 work 发生的概率不是很高,所以都采用了 Linux 系统提供的default-workqueue ,在 wrok 调度时采用 schedule_work 直接将 work 派遣到 default-workqueue 中,让 default-workqueue 的内核线程进行 work 的具体处理。通过实际测试发现,上述处理在单核平台上导致了系统死锁问题,当 IO-work 的处理的 IO 过多时会导致 IO-work 的睡眠,由于 timer-work 无法得到执行,所以 IO 通路无法打通, IO-work 所需要的资源无法得到释放,因此系统死锁。

       解决上述死锁的方法比较简单, IO-work  timer-work 不能在同一个 workqueue 中运行,可以为其中的一个work 单独创建一个运行上下文( workqueue )。由于多核平台上的 default-workqueue 会存在多个(与 CPU 的个数相同),所以上述问题才不会出现。不管怎么样,在设计内核程序时一定要慎用 default-workqueue 了,该问题的本质是共享上下文问题。

 

 

  我们先来看一下默认的events任务队列,然后再看看创建新的工作者线程。

     1.创建推后的工作

      首先要做的是实际创建一些需要推后完成的工作。可以通过DECLARE_WORK在编译时静态地创建该结构体:

  1. #define DECLARE_WORK(n, f)                  /
  2.      struct  work_struct n = __WORK_INITIALIZER(n, f)
  3. #define __WORK_INITIALIZER(n, f) {              /
  4.     .data = WORK_DATA_INIT(0),              /
  5.         .entry  = { &(n).entry, &(n).entry },           /
  6.     .func = (f),                        /
  7.     }

也可以在运行时通过指针创建一个工作:

  1. #define INIT_WORK(_work, _func)                 /
  2.      do  {                            /
  3.         (_work)->data = (atomic_long_t) WORK_DATA_INIT(0);  /
  4.         INIT_LIST_HEAD(&(_work)->entry);        /
  5.         PREPARE_WORK((_work), (_func));         /
  6.     }  while  (0)
  7. /*
  8.  * initialize a work item's function pointer
  9.  */
  10. #define PREPARE_WORK(_work, _func)              /
  11.      do  {                            /
  12.         (_work)->func = (_func);            /
  13.     }  while  (0)

2.工作队列处理函数

 

       工作队列对立函数的原型是:

         void work_handler(void *data)

        这个函数会由一个工作者线程执行,因此,函数会运行在进程上下文中。默认情况下,运行响应中断,并且不持有任何锁。如果需要,函数可以睡眠。需要注意的 是,尽管操作处理函数运行在进程上下文中,但它不能访问用户空间,因为内核线程在用户空间没有相关的内存映射。通常在系统调用发生时,内核会代表用户空间 的进程运行,此时它才能访问用户空间,也只有在此时它才会映射用户空间的内存。

3.对工作进行调度

 

    要把给定工作的处理函数提交给默认的events工作线程,只须调用

    schedule_work(&work);

  1. /**
  2.  * schedule_work - put work task in global workqueue
  3.  * @work: job to be done
  4.  *
  5.  * This puts a job in the kernel-global workqueue.
  6.  */
  7. int  fastcall schedule_work( struct  work_struct *work)
  8. {
  9.      return  queue_work(keventd_wq, work);
  10. }
  11. /**
  12.  * queue_work - queue work on a workqueue
  13.  * @wq: workqueue to use
  14.  * @work: work to queue
  15.  *
  16.  * Returns 0 if @work was already on a queue, non-zero otherwise.
  17.  *
  18.  * We queue the work to the CPU it was submitted, but there is no
  19.  * guarantee that it will be processed by that CPU.
  20.  */
  21. int  fastcall queue_work( struct  workqueue_struct *wq,  struct  work_struct *work)
  22. {
  23.      int  ret = 0, cpu = get_cpu();
  24.      if  (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
  25.          if  (unlikely(is_single_threaded(wq)))
  26.             cpu = singlethread_cpu;
  27.         BUG_ON(!list_empty(&work->entry));
  28.         __queue_work(per_cpu_ptr(wq->cpu_wq, cpu), work);
  29.         ret = 1;
  30.     }
  31.     put_cpu();
  32.      return  ret;
  33. }

 

  1. /* Preempt must be disabled. */
  2. static   void  __queue_work( struct  cpu_workqueue_struct *cwq,
  3.               struct  work_struct *work)
  4. {
  5.     unsigned  long  flags;
  6.     spin_lock_irqsave(&cwq->lock, flags);
  7.     set_wq_data(work, cwq);
  8.     list_add_tail(&work->entry, &cwq->worklist);
  9.     cwq->insert_sequence++;
  10.     wake_up(&cwq->more_work);
  11.     spin_unlock_irqrestore(&cwq->lock, flags);
  12. }

     work马上就会被调度,一旦其所在的处理器上的工作者线程被唤醒,它就会被执行。如不希望工作马上被执行,延迟一段时间之后再执行,可以调度它在指定的时间执行:

    schedule_delayed_work(&work,delay);

  1. /**
  2.  * schedule_delayed_work - put work task in global workqueue after delay
  3.  * @dwork: job to be done
  4.  * @delay: number of jiffies to wait or 0 for immediate execution
  5.  *
  6.  * After waiting for a given time this puts a job in the kernel-global
  7.  * workqueue.
  8.  */
  9. int  fastcall schedule_delayed_work( struct  delayed_work *dwork,
  10.                     unsigned  long  delay)
  11. {
  12.     timer_stats_timer_set_start_info(&dwork->timer);
  13.      return  queue_delayed_work(keventd_wq, dwork, delay);
  14. }
  15. /**
  16.  * queue_delayed_work - queue work on a workqueue after delay
  17.  * @wq: workqueue to use
  18.  * @dwork: delayable work to queue
  19.  * @delay: number of jiffies to wait before queueing
  20.  *
  21.  * Returns 0 if @work was already on a queue, non-zero otherwise.
  22.  */
  23. int  fastcall queue_delayed_work( struct  workqueue_struct *wq,
  24.              struct  delayed_work *dwork, unsigned  long  delay)
  25. {
  26.      int  ret = 0;
  27.      struct  timer_list *timer = &dwork->timer;
  28.      struct  work_struct *work = &dwork->work;
  29.     timer_stats_timer_set_start_info(timer);
  30.      if  (delay == 0)
  31.          return  queue_work(wq, work);
  32.      if  (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
  33.         BUG_ON(timer_pending(timer));
  34.         BUG_ON(!list_empty(&work->entry));
  35.          /* This stores wq for the moment, for the timer_fn */
  36.         set_wq_data(work, wq);
  37.         timer->expires = jiffies + delay;
  38.         timer->data = (unsigned  long )dwork;
  39.         timer->function = delayed_work_timer_fn;
  40.         add_timer(timer);
  41.         ret = 1;
  42.     }
  43.      return  ret;
  44. }
  45. struct  delayed_work {
  46.      struct  work_struct work;
  47.      struct  timer_list timer;
  48. };

 

4.刷新操作

 

    排入队列的工作会在工作者线程下一次被唤醒的时候执行。有时,在继续下一步工作之前,你必须保证一些操作已经执行完毕了。这一点对模块来说很重要,在卸载 之前,它就有可能需要调用下面的函数;而在内核的其他部分,为了防止竞争条件的出现,也可能需要确保不在有待处理的工作。

   出于以上目的,内核准备了一个用于刷新指定工作队列的函数:

    void flush_scheduled_work(void);

  1. void  flush_scheduled_work( void )
  2. {
  3.     flush_workqueue(keventd_wq);
  4. }
  5. /**
  6.  * flush_workqueue - ensure that any scheduled work has run to completion.
  7.  * @wq: workqueue to flush
  8.  *
  9.  * Forces execution of the workqueue and blocks until its completion.
  10.  * This is typically used in driver shutdown handlers.
  11.  *
  12.  * This function will sample each workqueue's current insert_sequence number and
  13.  * will sleep until the head sequence is greater than or equal to that.  This
  14.  * means that we sleep until all works which were queued on entry have been
  15.  * handled, but we are not livelocked by new incoming ones.
  16.  *
  17.  * This function used to run the workqueues itself.  Now we just wait for the
  18.  * helper threads to do it.
  19.  */
  20. void  fastcall flush_workqueue( struct  workqueue_struct *wq)
  21. {
  22.     might_sleep();
  23.      if  (is_single_threaded(wq)) {
  24.          /* Always use first cpu's area. */
  25.         flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, singlethread_cpu));
  26.     }  else  {
  27.          int  cpu;
  28.         mutex_lock(&workqueue_mutex);
  29.         for_each_online_cpu(cpu)
  30.             flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu));
  31.         mutex_unlock(&workqueue_mutex);
  32.     }
  33. }

     函数会一直等待,直到队列中所有对象都被执行以后才返回。在等待所有待处理的工作执行的时候,该函数会进入休眠状态,所以只能在进程上下文中使用它。

     注意,该函数并不取消任何延迟执行的工作。就是说,任何通过schedule_delayed_work()调度的工作,如果其延迟时间未结束,它并不会因为调用flush_scheduled_work()而被刷新掉。

      取消延迟执行的工作应该调用:

 int cancle_delayed_work(sruct work_struc work);

       这个函数可以取消任何与work_struct相关的挂起工作。

  1. /*
  2.  * Kill off a pending schedule_delayed_work().  Note that the work callback
  3.  * function may still be running on return from cancel_delayed_work().  Run
  4.  * flush_scheduled_work() to wait on it.
  5.  */
  6. static   inline   int  cancel_delayed_work( struct  delayed_work *work)
  7. {
  8.      int  ret;
  9.     ret = del_timer_sync(&work->timer);
  10.      if  (ret)
  11.         work_release(&work->work);
  12.      return  ret;
  13. }
  14. /**
  15.  * work_release - Release a work item under execution
  16.  * @work: The work item to release
  17.  *
  18.  * This is used to release a work item that has been initialised with automatic
  19.  * release mode disabled (WORK_STRUCT_NOAUTOREL is set).  This gives the work
  20.  * function the opportunity to grab auxiliary data from the container of the
  21.  * work_struct before clearing the pending bit as the work_struct may be
  22.  * subject to deallocation the moment the pending bit is cleared.
  23.  *
  24.  * In such a case, this should be called in the work function after it has
  25.  * fetched any data it may require from the containter of the work_struct.
  26.  * After this function has been called, the work_struct may be scheduled for
  27.  * further execution or it may be deallocated unless other precautions are
  28.  * taken.
  29.  *
  30.  * This should also be used to release a delayed work item.
  31.  */
  32. #define work_release(work) /
  33.     clear_bit(WORK_STRUCT_PENDING, work_data_bits(work))

5.创建新的工作队列

 

         如果默认的队列不能满足需要,可以创建一个新的工作对列和与之相应的工作者线程。

         创建一个新的任务队列和与之相关的工作者线程,只须调用一个简单的函数:

         struct workqueue_struct *create_workqueue(const char *name);

  1. #define create_workqueue(name) __create_workqueue((name), 0, 0)
  2. struct  workqueue_struct *__create_workqueue( const   char  *name,
  3.                          int  singlethread,  int  freezeable)
  4. {
  5.      int  cpu, destroy = 0;
  6.      struct  workqueue_struct *wq;
  7.      struct  task_struct *p;
  8.     wq = kzalloc( sizeof (*wq), GFP_KERNEL);
  9.      if  (!wq)
  10.          return  NULL;
  11.     wq->cpu_wq = alloc_percpu( struct  cpu_workqueue_struct);
  12.      if  (!wq->cpu_wq) {
  13.         kfree(wq);
  14.          return  NULL;
  15.     }
  16.     wq->name = name;
  17.     mutex_lock(&workqueue_mutex);
  18.      if  (singlethread) {
  19.         INIT_LIST_HEAD(&wq->list);
  20.         p = create_workqueue_thread(wq, singlethread_cpu, freezeable);
  21.          if  (!p)
  22.             destroy = 1;
  23.          else
  24.             wake_up_process(p);
  25.     }  else  {
  26.         list_add(&wq->list, &workqueues);
  27.         for_each_online_cpu(cpu) {
  28.             p = create_workqueue_thread(wq, cpu, freezeable);
  29.              if  (p) {
  30.                 kthread_bind(p, cpu);
  31.                 wake_up_process(p);
  32.             }  else
  33.                 destroy = 1;
  34.         }
  35.     }
  36.     mutex_unlock(&workqueue_mutex);
  37.      /*
  38.      * Was there any error during startup? If yes then clean up:
  39.      */
  40.      if  (destroy) {
  41.         destroy_workqueue(wq);
  42.         wq = NULL;
  43.     }
  44.      return  wq;
  45. }

     这个函数会创建所有的工作者线程(系统中的每个处理器都有一个),并且做好所有开始处理工作之前的准备工作。

     创建一个工作的时候无须考虑工作队列的类型。可以使用下列函数对给定工作而不是默认的event队列进行操作。

int queue_work(struct workqueue_struct *wq, struct work_struct *work);

int queue_delayed_work(struct workqueue_struct *wq, struct work_struct *work, unsigned long delay);

flush_workqueue(struct workqueue_struct *wq);

  1. /**
  2.  * queue_work - queue work on a workqueue
  3.  * @wq: workqueue to use
  4.  * @work: work to queue
  5.  *
  6.  * Returns 0 if @work was already on a queue, non-zero otherwise.
  7.  *
  8.  * We queue the work to the CPU it was submitted, but there is no
  9.  * guarantee that it will be processed by that CPU.
  10.  */
  11. int  fastcall queue_work( struct  workqueue_struct *wq,  struct  work_struct *work)
  12. {
  13.      int  ret = 0, cpu = get_cpu();
  14.      if  (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
  15.          if  (unlikely(is_single_threaded(wq)))
  16.             cpu = singlethread_cpu;
  17.         BUG_ON(!list_empty(&work->entry));
  18.         __queue_work(per_cpu_ptr(wq->cpu_wq, cpu), work);
  19.         ret = 1;
  20.     }
  21.     put_cpu();
  22.      return  ret;
  23. }
  24. /**
  25.  * queue_delayed_work - queue work on a workqueue after delay
  26.  * @wq: workqueue to use
  27.  * @dwork: delayable work to queue
  28.  * @delay: number of jiffies to wait before queueing
  29.  *
  30.  * Returns 0 if @work was already on a queue, non-zero otherwise.
  31.  */
  32. int  fastcall queue_delayed_work( struct  workqueue_struct *wq,
  33.              struct  delayed_work *dwork, unsigned  long  delay)
  34. {
  35.      int  ret = 0;
  36.      struct  timer_list *timer = &dwork->timer;
  37.      struct  work_struct *work = &dwork->work;
  38.     timer_stats_timer_set_start_info(timer);
  39.      if  (delay == 0)
  40.          return  queue_work(wq, work);
  41.      if  (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
  42.         BUG_ON(timer_pending(timer));
  43.         BUG_ON(!list_empty(&work->entry));
  44.          /* This stores wq for the moment, for the timer_fn */
  45.         set_wq_data(work, wq);
  46.         timer->expires = jiffies + delay;
  47.         timer->data = (unsigned  long )dwork;
  48.         timer->function = delayed_work_timer_fn;
  49.         add_timer(timer);
  50.         ret = 1;
  51.     }
  52.      return  ret;
  53. }
  54. /**
  55.  * flush_workqueue - ensure that any scheduled work has run to completion.
  56.  * @wq: workqueue to flush
  57.  *
  58.  * Forces execution of the workqueue and blocks until its completion.
  59.  * This is typically used in driver shutdown handlers.
  60.  *
  61.  * This function will sample each workqueue's current insert_sequence number and
  62.  * will sleep until the head sequence is greater than or equal to that.  This
  63.  * means that we sleep until all works which were queued on entry have been
  64.  * handled, but we are not livelocked by new incoming ones.
  65.  *
  66.  * This function used to run the workqueues itself.  Now we just wait for the
  67.  * helper threads to do it.
  68.  */
  69. void  fastcall flush_workqueue( struct  workqueue_struct *wq)
  70. {
  71.     might_sleep();
  72.      if  (is_single_threaded(wq)) {
  73.          /* Always use first cpu's area. */
  74.         flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, singlethread_cpu));
  75.     }  else  {
  76.          int  cpu;
  77.         mutex_lock(&workqueue_mutex);
  78.         for_each_online_cpu(cpu)
  79.             flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu));
  80.         mutex_unlock(&workqueue_mutex);
  81.     }
  82. }
  1. static   void  flush_cpu_workqueue( struct  cpu_workqueue_struct *cwq)
  2. {
  3.      if  (cwq-> thread  == current) {
  4.          /*
  5.          * Probably keventd trying to flush its own queue. So simply run
  6.          * it by hand rather than deadlocking.
  7.          */
  8.         run_workqueue(cwq);
  9.     }  else  {
  10.         DEFINE_WAIT(wait);
  11.          long  sequence_needed;
  12.         spin_lock_irq(&cwq->lock);
  13.         sequence_needed = cwq->insert_sequence;
  14.          while  (sequence_needed - cwq->remove_sequence > 0) {
  15.             prepare_to_wait(&cwq->work_done, &wait,
  16.                     TASK_UNINTERRUPTIBLE);
  17.             spin_unlock_irq(&cwq->lock);
  18.             schedule();
  19.             spin_lock_irq(&cwq->lock);
  20.         }
  21.         finish_wait(&cwq->work_done, &wait);
  22.         spin_unlock_irq(&cwq->lock);
  23.     }
  24. }
  1. static   void  run_workqueue( struct  cpu_workqueue_struct *cwq)
  2. {
  3.     unsigned  long  flags;
  4.      /*
  5.      * Keep taking off work from the queue until
  6.      * done.
  7.      */
  8.     spin_lock_irqsave(&cwq->lock, flags);
  9.     cwq->run_depth++;
  10.      if  (cwq->run_depth > 3) {
  11.          /* morton gets to eat his hat */
  12.         printk( "%s: recursion depth exceeded: %d/n" ,
  13.             __FUNCTION__, cwq->run_depth);
  14.         dump_stack();
  15.     }
  16.      while  (!list_empty(&cwq->worklist)) {
  17.          struct  work_struct *work = list_entry(cwq->worklist.next,
  18.                          struct  work_struct, entry);
  19.         work_func_t f = work->func;
  20.         list_del_init(cwq->worklist.next);
  21.         spin_unlock_irqrestore(&cwq->lock, flags);
  22.         BUG_ON(get_wq_data(work) != cwq);
  23.          if  (!test_bit(WORK_STRUCT_NOAUTOREL, work_data_bits(work)))
  24.             work_release(work);
  25.         f(work);
  26.          if  (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
  27.             printk(KERN_ERR  "BUG: workqueue leaked lock or atomic: "
  28.                      "%s/0x%08x/%d/n" ,
  29.                     current->comm, preempt_count(),
  30.                         current->pid);
  31.             printk(KERN_ERR  "    last function: " );
  32.             print_symbol( "%s/n" , (unsigned  long )f);
  33.             debug_show_held_locks(current);
  34.             dump_stack();
  35.         }
  36.         spin_lock_irqsave(&cwq->lock, flags);
  37.         cwq->remove_sequence++;
  38.         wake_up(&cwq->work_done);
  39.     }
  40.     cwq->run_depth--;
  41.     spin_unlock_irqrestore(&cwq->lock, flags);
  42. }
阅读(2595) | 评论(0) | 转发(0) |
0

上一篇:linux 工作队列

下一篇:softlockup 解决思路

给主人留下些什么吧!~~