接着上节的来,我们在上节说了软中断和tasklet,那这最后就是工作队列了哦..
工作队列和前面讨论的其他形式都不相同,它可以把工作推后,交由一个内核线程去执行----该工作总是会在进程上下文执行。这样,通过工作队列执行代码能占尽进程上下文的所有优势,最重要的就是工作队列允许重新调度甚至是睡眠。相比较前边两个,这个选择起来就很容易了。我说过,前边两个是不允许休眠的,这个是允许休眠的,这就很明白了是不?这意味着在你需要获得大量内存的时候,在你需要获取信号量时,在你需要执行阻塞式的I/O操作时,它都会非常有用(先说话, 这个不是我说的,是书上这么说的哦)。
工作队列子系统是一个用于创建内核线程的接口,通过它创建的进程负责执行由内核其他部分排到队列里的任务。它创建的这些内核线程被称作工作者线程(worker threads).工作队列可以让你的驱动程序创建一个专门的工作者线程来处理需要推后的工作。不过,工作队列子系统提供了一个缺省的工作者线程来处理这些工作。因此,工作队列最基本的表现形式就转变成一个把需要推后执行的任务交给特定的通用线程这样一种接口。缺省的工作线程叫做event/n.每个处理器对应一个线程,这里的n代表了处理器编号。除非一个驱动程序或者子系统必须建立一个属于自己的内核线程,否则最好还是使用缺省线程。
1.工作这线程结构用下面的结构表示:
1 | struct workqueue_struct{ |
2 | struct cpu_workqueue_struct cpu_wq[NR_CPUS]; |
结构中数组的每一项对应系统的一个CPU.接下来,在看看在kernel/workqueue.c中的核心数据结构cpu_workqueue_struct:
01 | struct cpu_workqueue_struct{ |
04 | struct list_head worklist; |
05 | wait_queue_head_t more_work; |
06 | wait_queue_head_t work_done; |
07 | struct workqueue_struct *wq; |
09 | struct completion exti; |
2.表示工作的数据结构:所有的工作者线程都是用普通的内核线程来实现的,它们都要执行worker_thread()函数。在它初始化完以后,这个函数执行一个死循环执行一个循环并开始休眠,当有操作被插入到队列的时候,线程就会被唤醒,以便执行这些操作。当没有剩余的时候,它又会继续休眠。工作有work_struct(linux/workqueue)结构表示:
3 | struct list_head entry; //连接所有工作的链表 |
4 | void (*func)(void *); //处理函数 |
5 | void *data; //传递给处理函数的参数 |
7 | struct timer_list timer; //y延迟工作队列所用到的定时器 |
当一个工作线程被唤醒时,它会执行它的链表上的所有工作。工作一旦执行完毕,它就将相应的work_struct对象从链表上移去,当链表不再有对象时,它就继续休眠。woker_thread函数的核心流程如下:
02 | set_task_state(current,TASK_INTERRUPTIBLE); |
03 | add_wait_queue(&cwq->more_work,&wait); |
04 | if(list_empty(&cwq->worklist)) |
07 | set_task_state(current,TASK_RUNNING); |
08 | remove_wait_queue(&cwq->more_work,&wait); |
09 | if(!list_empty(&cwq->worklist)) |
分析一下上面的代码。首先线程将自己设置为休眠状态并把自己加入等待队列。如果工作对列是空的,线程调用schedule()函数进入睡眠状态。如果链表有对象,线程就将自己设为运行态,脱离等待队列。然后,再次调用run_workqueue()执行推后的工作。好了,接下来,问题就纠结在run_workqueue(),它完成实际推后到此的工作:
1 | while(!list_empty(&cwq->worklist)){ |
2 | struct work_struct *work = list_entry(cwq->worklist.next,struct work_struct,entry); |
3 | void (*f)(void *) = work->func; |
4 | void *data = work->data; |
5 | list_del_init(cwq->worklist.next); |
6 | clear_bit(0,&work->pending); |
该函数循环遍历链表上每个待处理的工作,执行链表上每个结点上的work_struct的func成员函数:
1.当链表不为空时,选取下一个节点对象。 2.获取我们希望执行的函数func及其参数data。 3.把该结点从链表上接下来,将待处理标志位pending清0。 4.调用函数。 5.重复执行。 |
老师说的好:光说不练,不是好汉。现在我们继续来看看怎么用吧:
1.首先,实际创建一些需要推后完成的工作,可以在编译时静态地创建该数据结构:
1 | DECLARE_WORK(name,void (*func)(void *),void *data); |
当然了,如果愿意,我们当然可以在运行时通过指针动态创建一个工作:
1 | INIT_WORK(struct work_struct *work, void (*func)(void *),void *data); |
2.工作队列处理函数,会由一个工作者线程执行,因此,函数会运行在进程上下文中,默认情况下,允许相应中断,并且不持有锁。如果需要,函数可以睡眠。需要注意的是,尽管处理函数运行在进程上下文中,但它不能访问用户空间,因为内核线程在用户空间没有相应的内存映射。函数原型如下:
1 | void work_hander(void *data); |
3.对工作进行调度。前面的准备工作做完以后,下面就可以开始调度了,只需调用schedule_work(&work).这样work马上就会被调度,一旦其所在的处理器上的工作者线程被唤醒,它就会被执行。当然如果不想快速执行,而是想延迟一段时间执行,按就用schedule_delay_work(&work,delay);delay是要延迟的时间节拍,后面讲.
4.刷新操作。插入队列的工作会在工作者线程下一次被唤醒的时候执行。有时,在继续下一步工作之前,你必须保证一些操作已经执行完毕等等。由于这些原因,内核提供了一个用于刷新指定工作队列的函数:void flush_scheduled_work(void); 这个函数会一直等待,直到队列中所有的对象都被执行后才返回。在等待所有待处理的工作执行的时候,该函数会进入休眠状态,所以只能在进程上下文中使用它。需要说明的是,该函数并不取消任何延迟执行的工作。取消延迟执行的工作应该调用:int cancel_delayed_work(struct work_struct *work);这个函数可以取消任何与work_struct 相关挂起的工作。
5.创建新的工作队列。前边说过最好使用缺省线程,可如果你坚持要使用自己创建的线程,咋办?这时你就应该创建一个新的工作队列和与之相应的工作者线程,方法很简单,用下面的函数:struct workqueue_struct *create_workqueue(const char *name);name是新内核线程的名字。这样就会创建所有的工作者线程(系统中的每个处理器都有一个)并且做好所有开始处理工作之前的准备工作。在创建之后,就调用下面的函数吧:
1 | int queue_work(struct workqueue_struct *wq, struct work_struct *work); |
2 | int queue_delayed_work(struct workqueue_struct *wq,struct work_struct *work,unsigned long delay); |
这两个函数和schedule_work()和schedule_delayed_work()相近,唯一的区别在于它们可以针对特定的工作队列而不是缺省的event队列进行操作。
好了,工作队列也说完了,我还是结合前边一篇,把这三个地板不实现的策略比较一下,方便以后选择.
首先,tasklet是基于软中断实现的,两者相近,工作队列机制与它们完全不同,靠内核线程来实现。软中断提供的序列化的保障最少,这就要求中断处理函数必须格外小心地采取一些步骤确保共享数据的安全,两个甚至更多相同类别的软中断有可能在不同的处理器上同时执行。如果被考察的代码本身多线索化的工作做得非常好,它完全使用单处理器变量,那么软中断就是非常好的选择。对于时间要求严格和执行效率很高的应用来说,它执行的也最快。否则选择tasklets意义更大。tasklet接口简单,而且两种同种类型的tasklet不能同时执行,所以实现起来也会简单一些。如果需要把任务推迟到进程上下文中完成,那你只能选择工作队列了。如果不需要休眠,那软中断和tasklet可能更合适。另外就是工作队列造成的开销最大,当然这是相对的,针对大部分情况,工作队列都能提供足够的支持。从方便度上考虑就是:工作队列,tasklets,最后才是软中断。我们在做驱动的时候,关于这三个下半部实现,需要考虑两点:首先,是不是需要一个可调度的实体来执行需要推后完成的工作(即休眠的需要),如果有,工作队列就是唯一的选择,否则最好用tasklet。性能如果是最重要的,那还是软中断吧。
最后,就是一些禁止下半部的相关部分了,给一个表:
函数 | 描述 |
void local_bh_disable() | 禁止本地处理器的软中断和tasklet的处理 |
void local_bh_enable() | 激活本地处理器的软中断和tasklet的处理 |
这些函数有可能被嵌套使用----最后被调用的local_bh_enable()最终激活下半部。函数通过preempt_count为每个进程维护一个计数器。当计数器变为0时,下半部才能够被处理。因为下半部的处理已经被禁止了,所以local_bh_enable()还需要检查所有现存的待处理的下半部并执行它们。
好了,这一次讲完了,画了两次,我们在这两次中提到了一些同时发生的问题,这时可能存在数据共享互斥访问的问题,这个就是内核同步方面的事情了,我们后面再慢慢说这个事。