1. 响应中断时,如果需要处理非常要求时间的,或者与硬件相关的,或者不允许被中断时,将这些工作交给上半部去做,而其他的,交给下半部去做,下半部执行时,中断是可以enable的。
2. 上半部通常称为bottom half,而在linux下将工作延迟执行的实现机制统统成为bottom halves。
3. 内核中的bottom halves
(1) Softirqs,一个定义好的bottom halves的集合,在同一时间,这些bottom halves可以并行在各处理器上执行,甚至相同类型的bottom halves也可以。
(2) Tasklets,是动态创建的bottom halves,由softirqs实现。同一时间,不同的tasklets可以并行执行在不同的处理器上,但同一类型的tasklets不可以。
(3) Work queues,与task queues类似,将函数推迟到进程上下文执行。
4. Softirqs
(1) 实现
softirqs在内核编译时就已经预先分配好了的,它是不可以动态注册和删除的。
- struct softirq_action {
- void (*action)(struct softirq_action *);
-
};
-
static struct softirq_action softirq_vec[NR_SOFTIRQS];
-
void softirq_handler(struct softirq_action *); //action
-
eg: my_softirq->action(my_softirq);
一个softirqs永远不会抢占另一个softirqs,中断处理函数是唯一可以抢占softirqs的。
一个softirqs在执行前必须被标记,这被称为raising softirqs。通常,中断处理函数在返回前会标记后面要执行的softirqs,这样,在合适的时间,softirqs就会执行。检查并执行pending softirqs发生在: 从硬件中断代码返回时, 在ksoftirqd内核线程中,在任何显示检查并执行pending softirqs的地方,如net子系统。
do_softirq()----->__do_softirq()
- u32 pending;
-
pending = local_softirq_pending();
-
if (pending) {
- struct softirq_action *h;
- /* reset the pending bitmask */
- set_softirq_pending(0);
- h = softirq_vec;
- do {
- if (pending & 1)
- h->action(h);
- h++;
- pending >>= 1;
- } while (pending);
-
}
softirqs被保留给系统中最关注时间和最重要的那些bottom-half的处理,当前,只有网络和块设备直接使用softirqs。kernel timers和tasklets是由softirqs实现的。
softirq types: 低数字优先级的softirqs比高数字优先级的softirqs优先执行。
(2) 使用
registering softirqs:
- open_softirq(NET_TX_SOFTIRQ, net_tx_action);
-
open_softirq(NET_RX_SOFTIRQ, net_rx_action);
softirq handlers执行时需要enable interrupt,且不能睡眠。当一个handler执行时,当前处理器被disable。
raising softirqs:
- raise_softirq(NET_TX_SOFTIRQ); //该函数首先禁止中断,然后raise softirqs,最后再开启中断
-
-
-
/*
-
* interrupts must already be off!
-
*/
-
raise_softirq_irqoff(NET_TX_SOFTIRQ); //如果预先已经知道中断被禁止,这个函数可以提高性能
4. Tasklets
(1) 实现
tasklets由两种softirqs表示: HI_SOFTIRQ和TASKLET_SOFTIRQ
- struct tasklet_struct {
- struct tasklet_struct *next; //next tasklet in the list
- unsigned long state; //state of the tasklet
- atomic_t count; //reference counter
- void (*func)(unsigned long); //tasklet handler function
- unsigned long data; //argument to the tasklet function
-
};
- state of the tasklet
-
0
-
TASKLET_STATE_SCHED: a tasklet that is scheduled to run
-
TASKLET_STATE_RUN: a tasklet that is running, used only on multiprocessor machines
- if count > 0, tasklet is disabled, cannot run;
-
else can run if marked pending.
Scheduled tasklets(the equivalent of raised softirqs) are stored in two per-processor structures: tasklet_vec(for regular tasklets) and tasklet_hi_vec(for high-priority tasklets)。
Scheduling tasklets: call tasklet_schedule() or tasklet_hi_schedule()。
- tasklet_schedule():
-
(1) if tasklet's state == TASKLET_STATE_SCHED, return
-
(2) call __tasklet_schedule()
-
(3) save the state of the interrupt systems, and then disable local interrupts
-
(4) add the tasklet to be scheduled to the head of the tasklet_vec or tasklet_hi_vec linked list, which is unique to each processor in the system
-
(5) raise the TASKLET_SOFTIRQ or HI_SORTIRQ softirq
-
(6) restore interrupts to their previous state and return
- tasklet handlers: tasklet_action() and tasklet_hi_action()
-
1. Disable local interrupt delivery (there is no need to first save their state because the
-
code here is always called as a softirq handler and interrupts are always enabled) and
-
retrieve the tasklet_vec or tasklet_hi_vec list for this processor.
-
2. Clear the list for this processor by setting it equal to NULL.
-
3. Enable local interrupt delivery.Again, there is no need to restore them to their pre-
-
vious state because this function knows that they were always originally enabled.
-
4. Loop over each pending tasklet in the retrieved list.
-
5. If this is a multiprocessing machine, check whether the tasklet is running on
-
another processor by checking the TASKLET_STATE_RUN flag. If it is currently run-
-
ning, do not execute it now and skip to the next pending tasklet. (Recall that only
-
one tasklet of a given type may run concurrently.)
-
6. If the tasklet is not currently running, set the TASKLET_STATE_RUN flag, so another
-
processor will not run it.
-
7. Check for a zero count value, to ensure that the tasklet is not disabled. If the tasklet
-
is disabled, skip it and go to the next pending tasklet.
-
8. We now know that the tasklet is not running elsewhere, is marked as running so it
-
will not start running elsewhere, and has a zero count value. Run the tasklet handler.
-
9. After the tasklet runs, clear the TASKLET_STATE_RUN flag in the tasklet’s state field.
- 10. Repeat for the next pending tasklet, until there are no more scheduled tasklets
-
waiting to run.
(2) 使用
- 静态创建:
-
DECLARE_TASKLET(name, func, data)
-
DECLARE_TASKLET_DISABLED(name, func, data);
-
-
动态创建:
-
tasklet_init(t, tasklet_handler, dev)
- 写tasklet handler:
-
void tasklet_handler(unsigned long data);
- 调度tasklet,一个tasklet总是在调度它的处理器上执行
-
tasklet_schedule(&my_tasklet); //mark my_tasklet as pending
- 禁止和开启tasklet:
- //如果tasklet正在执行,前者等待执行完后返回,后者直接返回
-
tasklet_disable()
-
tasklet_disable_nosync()
-
tasklet_enable()
- //等待tasklet执行完,然后将其从pending queue中删除。可以sleep,所以不能在interrupt context中使用
-
tasklet_kill()
5. ksoftirq
softirq带来的问题: softirq在interrupt handler返回前被raise,使得在interrupt handler返回后会被立即执行,而softirq可以reactive自己。如果同时有大量高优先级的softirq,可能会导致饿死用户空间的进程执行。
解决方式: 引入ksoftirq内核线程,一个processor一个ksoftirq,命名为ksoftirqd/n,n为处理器序号。ksoftirq线程的处理函数是一个死循环:
- for (;;) {
- if (!softirq_pending(cpu))
- schedule();
- set_current_state(TASK_RUNNING);
- while (softirq_pending(cpu)) {
- do_softirq();
- if (need_resched())
- schedule();
- }
- set_current_state(TASK_INTERRUPTIBLE);
-
}
The softirq kernel threads are awakened whenever do_softirq() detects an executed kernel thread reactivating itself.
6. work queues
work queues交给work threads执行在进程上下文中。
(1) work threads按照类别分组,一组对应一个workqueue_struct,组中的每一个work thread对应一个cpu_workqueue_struct。一个work thread对应一个处理器,默认类别为events,对应的work threads为events/n,其中n为核数。
- /*
-
* The externally visible workqueue abstraction is an array of
-
* per-CPU workqueues:
-
*/
-
struct workqueue_struct {
- struct cpu_workqueue_struct cpu_wq[NR_CPUS];
- struct list_head list;
- const char *name;
- int singlethread;
- int freezeable;
- int rt;
-
};
- struct cpu_workqueue_struct {
- spinlock_t lock;
- /* lock protecting this structure */
- struct list_head worklist;
- /* list of work */
- wait_queue_head_t more_work;
- struct work_struct *current_struct;
- struct workqueue_struct *wq; /* associated workqueue_struct */
- task_t *thread;
- /* associated thread */
-
};
(2) 所有的work threads都被实现为一个执行worker_thread()函数的内核线程,该函数是一个死循环
- //work_thread()
- for (;;) {
- prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE);
- if (list_empty(&cwq->worklist))
- schedule();
- finish_wait(&cwq->more_work, &wait);
- run_workqueue(cwq);
-
}
- //run_workqueue(cwq)
- while (!list_empty(&cwq->worklist)) {
- struct work_struct *work;
- work_func_t f;
- void *data;
- work = list_entry(cwq->worklist.next, struct work_struct, entry);
- f = work->func;
- list_del_init(cwq->worklist.next);
- work_clear_pending(work);
- f(work);
-
}
一个要执行的任务由work_struct表示
- struct work_struct {
- atomic_long_t data;
- struct list_head entry;
- work_func_t func;
-
};
(3) 函数流程
(4) 使用方法
使用默认类型的work queue
1) creating work
- DECLARE_WORK(name, void (*func)(void *), void *data); //静态创建
-
-
INIT_WORK(struct work_struct *work, void (*func)(void *), void *data); //动态创建,返回值于work中
2) work queue handler
- void work_handler(void *data)
由于work thread为内核线程,所以work handler访问不了用户空间。
3) scheduling work
- schedule_work(&work); //立即执行,只要当前处理器的work thread处于wake up状态
- schedule_delayed_work(&work, delay); //延迟delay timer ticks执行
4) flushing work
- //waits until all entries in the queue are executed before returning, not flush delayed work
- void flush_scheduled_work(void);
- int cancel_delayed_work(struct work_struct *work);
使用自定义类型的work queue
- //创建work queue
-
struct workqueue_struct *create_workqueue(const char *name);
-
-
struct workqueue_struct *keventd_wq;
-
keventd_wq = create_workqueue(“events”);
- int queue_work(struct workqueue_struct *wq, struct work_struct *work); //类似schedule_work
-
int queue_delayed_work(struct workqueue_struct *wq, struct work_struct *work, unsigned long delay) //类似schedule_delayed_work
自定义类型的work queue,需要自己创建一个work queue,work的创建方式不变,但调度函数改变了。其他的与标准类型的相同。
8. 对比三种下半部的实现
9. 锁的问题
如果下半部与其他进程上下文或者中断上下文代码共享数据,则需要锁的支持。
local_bh_disable()可以执行很多次,对应执行相同次数的local_bh_enable()来enable下半部。
- /*
-
* disable local bottom halves by incrementing the preempt_count
-
*/
-
void local_bh_disable(void)
-
{
- struct thread_info *t = current_thread_info();
- t->preempt_count += SOFTIRQ_OFFSET;
-
}
- /*
- * decrement the preempt_count - this will ‘automatically’ enable
- * bottom halves if the count returns to zero
- *
- * optionally run any bottom halves that are pending
- */
-
void local_bh_enable(void)
-
{
- struct thread_info *t = current_thread_info();
- t->preempt_count -= SOFTIRQ_OFFSET;
- /*
- * is preempt_count zero and are any bottom halves pending?
- * if so, run them
- */
- if (unlikely(!t->preempt_count && softirq_pending(smp_processor_id())))
- do_softirq();
-
}
阅读(4828) | 评论(0) | 转发(0) |