你必须非常努力,才能看起来毫不费力!
分类: LINUX
2008-08-18 17:07:19
级别: 初级
杨沙洲 ()国防科技大学计算机学院
2002 年 6 月 01 日
本文将Linux内核中用于同步的几种机制集中起来分析,强调了它们之间在实现和使用上的不同。
同步通常是为了达到多线程协同的目的而设计的一种机制,通常包含异步信号机制和互斥机制作为其实现的底层。在Linux 2.4内核中也有相应的技术实现,包括信号量、自旋锁、原子操作和等待队列,其中原子操作和等待队列又是实现信号量的底层。
wait queue很早就作为一个基本的功能单位出现在Linux内核里了,它以队列为基础数据结构,与进程调度机制紧密结合,能够用于实现核心的异步事件通知机制。我们从它的使用范例着手,看看等待队列是如何实现异步信号功能的。
在核心运行过程中,经常会因为某些条件不满足而需要挂起当前线程,直至条件满足了才继续执行。在2.4内核中提供了一组新接口来实现这样的功能,下面的代码节选自kernel/printk.c:
unsigned long log_size; 1: DECLARE_WAIT_QUEUE_HEAD(log_wait);... 4: spinlock_t console_lock = SPIN_LOCK_UNLOCKED;... int do_syslog(int type,char *buf,int len){ ... 2: error=wait_event_interruptible(log_wait,log_size); if(error) goto out; ... 5: spin_lock_irq(&console_lock); ... log_size--; ... 6: spin_unlock_irq(&console_lock); ... } asmlinkage int printk(const char *fmt,...){ ... 7: spin_lock_irqsave(&console_lock,flags); ... log_size++;... 8: spin_unlock_irqrestore(&console_lock,flags); 3: wake_up_interruptible(&log_wait); ... } |
这段代码实现了printk调用和syslog之间的同步,syslog需要等待printk送数据到缓冲区,因此,在2:处等待log_size非0;而printk一边传送数据,一边增加log_size的值,完成后唤醒在log_wait上等待的所有线程(这个线程不是用户空间的线程概念,而是核内的一个执行序列)。执行了3:的wake_up_interruptible()后,2:处的wait_event_interruptible()返回0,从而进入syslog的实际动作。
1:是定义log_wait全局变量的宏调用。
在实际操作log_size全局变量的时候,还使用了spin_lock自旋锁来实现互斥,关于自旋锁,这里暂不作解释,但从这段代码中已经可以清楚的知道它的使用方法了。
所有wait queue使用上的技巧体现在wait_event_interruptible()的实现上,代码位于include/linux/sched.h中,前置数字表示行号:
779 #define __wait_event_interruptible(wq, condition, ret) \ 780 do { \ 781 wait_queue_t __wait; \ 782 init_waitqueue_entry(&__wait, current); \ 783 \ 784 add_wait_queue(&wq, &__wait); \ 785 for (;;) { \ 786 set_current_state(TASK_INTERRUPTIBLE); \ 787 if (condition) \ 788 break; \ 789 if (!signal_pending(current)) { \ 790 schedule(); \ 791 continue; \ 792 } \ 793 ret = -ERESTARTSYS; \ 794 break; \ 795 } \ 796 current->state = TASK_RUNNING; \ 797 remove_wait_queue(&wq, &__wait); \ 798 } while (0) 799 800 #define wait_event_interruptible(wq, condition) \ 801 ({ \ 802 int __ret = 0; \ 803 if (!(condition)) \ 804 __wait_event_interruptible(wq, condition, __ret); \ 805 __ret; \ 806 }) |
在wait_event_interruptible()中首先判断condition是不是已经满足,如果是则直接返回0,否则调用__wait_event_interruptible(),并用__ret来存放返回值。__wait_event_interruptible()首先定义并初始化一个wait_queue_t变量__wait,其中数据为当前进程结构current(struct task_struct),并把__wait入队。在无限循环中,__wait_event_interruptible()将本进程置为可中断的挂起状态,反复检查condition是否成立,如果成立则退出,如果不成立则继续休眠;条件满足后,即把本进程运行状态置为运行态,并将__wait从等待队列中清除掉,从而进程能够调度运行。如果进程当前有异步信号(POSIX的),则返回-ERESTARTSYS。
挂起的进程并不会自动转入运行的,因此,还需要一个唤醒动作,这个动作由wake_up_interruptible()完成,它将遍历作为参数传入的log_wait等待队列,将其中所有的元素(通常都是task_struct)置为运行态,从而可被调度到,执行__wait_event_interruptible()中的代码。
DECLARE_WAIT_QUEUE_HEAD(log_wait)经过宏展开后就是定义了一个log_wait等待队列头变量:
struct __wait_queue_head log_wait = { lock: SPIN_LOCK_UNLOCKED, task_list: { &log_wait.task_list, &log_wait.task_list } } |
其中task_list是struct list_head变量,包括两个list_head指针,一个next、一个prev,这里把它们初始化为自身,属于队列实现上的技巧,其细节可以参阅关于内核list数据结构的讨论,add_wait_queue()和remove_wait_queue()就等同于list_add()和list_del()。
wait_queue_t结构在include/linux/wait.h中定义,关键元素即为一个struct task_struct变量表征当前进程。
除了wait_event_interruptible()/wake_up_interruptible()以外,与此相对应的还有wait_event()和wake_up()接口,interruptible是更安全、更常用的选择,因为可中断的等待可以接收信号,从而挂起的进程允许被外界kill。
wait_event*()接口是2.4内核引入并推荐使用的,在此之前,最常用的等待操作是interruptible_sleep_on(wait_queue_head_t *wq),当然,与此配套的还有不可中断版本sleep_on(),另外,还有带有超时控制的*sleep_on_timeout()。sleep_on系列函数的语义比wait_event简单,没有条件判断功能,其余动作与wait_event完全相同,也就是说,我们可以用interruptible_sleep_on()来实现wait_event_interruptible()(仅作示意〉:
do{ interruptible_sleep_on(&log_wait); if(condition) break; }while(1); |
相对而言,这种操作序列有反复的入队、出队动作,更加耗时,而很大一部分等待操作的确是需要判断一个条件是否满足的,因此2.4才推荐使用wait_event接口。
在wake_up系列接口中,还有一类wake_up_sync()和wake_up_interruptible_sync()接口,保证调度在wake_up返回之后进行。
|
POSIX有信号量,SysV IPC有信号量,核内也有信号量,接口很简单,一个down(),一个up(),分别对应P操作和V操作,down()调用可能引起线程挂起,因此和sleep_on类似,也有interruptible系列接口。down意味着信号量减1,up意味着信号量加1,这两个操作显然需要互斥。在Linux 2.4中,并没有如想象中的用锁实现,而是使用了原子操作。
在include/asm/atomic.h中定义了一系列原子操作,包括原子读、原子写、原子加等等,大多是直接用汇编语句来实现的,这里就不详细解释。
我们从信号量数据结构开始,它定义在include/asm/semaphore.h中:
struct semaphore { atomic_t count; int sleepers; wait_queue_head_t wait; } |
down()操作可以理解为申请资源,up()操作可以理解为释放资源,因此,信号量实际表示的是资源的数量以及是否有进程正在等待。在semaphore结构中,count相当于资源计数,为正数或0时表示可用资源数,-1则表示没有空闲资源且有等待进程。而等待进程的数量并不关心。这种设计主要是考虑与信号量的原语相一致,当某个进程执行up()函数释放资源,点亮信号灯时,如果count恢复到0,则表示尚有进程在等待该资源,因此执行唤醒操作。一个典型的down()-up()流程是这样的:
down()-->count做原子减1操作,如果结果不小于0则表示成功申请,从down()中返回;
-->如果结果为负(实际上只可能是-1),则表示需要等待,则调用__down_fail();
__down_fail()调用__down(),__down()用C代码实现,要求已不如down()和__down_fail()严格,在此作实际的等待(arch/i386/kernel/semaphore.c):
void __down(struct semaphore * sem) { struct task_struct *tsk = current; DECLARE_WAITQUEUE(wait, tsk); tsk->state = TASK_UNINTERRUPTIBLE; add_wait_queue_exclusive(&sem->wait, &wait); spin_lock_irq(&semaphore_lock); sem->sleepers++; for (;;) { int sleepers = sem->sleepers; /* * Add "everybody else" into it. They aren't * playing, because we own the spinlock. */ if (!atomic_add_negative(sleepers - 1, &sem->count)) { sem->sleepers = 0; break; } sem->sleepers = 1; /* us - see -1 above */ spin_unlock_irq(&semaphore_lock); schedule(); tsk->state = TASK_UNINTERRUPTIBLE; spin_lock_irq(&semaphore_lock); } spin_unlock_irq(&semaphore_lock); remove_wait_queue(&sem->wait, &wait); tsk->state = TASK_RUNNING; wake_up(&sem->wait); } |
__down()-->当前进程进入wait等待队列,状态为不可中断的挂起,sleepers++,如果这是第一次申请失败,则sleepers值为1,否则为2--这个设置纯粹是为了下面这句原子加而安排的。
在真正进入休眠以前,__down()还是需要判断一下是不是确实没有资源可用,因为在spin_lock之前什么都可能发生。atomic_add_negative()将sleepers-1(只可能是0或者1,分别表示仅有一个等待进程或是多个)加到count(如果有多个进程申请资源失败进入__down(),count可能为-2、-3等)之上,这个加法完成后,结果为0只可能是在sleepers等于1的时候发生(因为如果sleepers等于2,表示有多个进程执行了down(),则count必然小于-1,因此sleepers-1+count必然小于0),表示count在此之前已经变为0了,也就是说已经有进程释放了资源,因此本进程不用休眠而是获得资源退出__down(),从而也从down()中返回;如果没有进程释放资源,那么在所有等待进程的这一加法完成后,count将等于-1。因此,从down()调用外面看(无论是在down()中休眠还是获得资源离开down()),count为负时只可能为-1(此时sleepers等于1),这么设计使得up()操作只需要对count加1,判断是否为0就可以知道是否有必要执行唤醒操作__up_wakeup()了。
获得了资源的进程将把sleepers设为0,并唤醒所有其他等待进程,这个操作实际上只是起到恢复count为-1,并使它们再次进入休眠的作用,因为第一个被唤醒的等待进程执行atomic_add_negative()操作后会将count恢复为-1,然后将sleepers置为1;以后的等待进程则会像往常一样重新休眠。
将down()操作设计得如此复杂的原因和结果就是up操作相当简单。up()利用汇编原子地将count加1,如果小于等于0表示有等待进程,则调用__up_wakeup()-->__up()唤醒wait;否则直接返回。
在down()中竞争获得资源的进程并不是按照优先级排序的,只是在up()操作完成后第一个被唤醒或者正在__down()中运行而暂未进入休眠的进程成功的可能性稍高一些。
尽管可以将信号量的count初始化为1从而实现一种互斥锁(mutex),但Linux并不保证这个count不会超过1,因为up操作并不考虑count的初值,所以只能依靠程序员自己来控制不要无谓的执行up()从而破坏mutex的语义。相关的初始化接口定义在include/asm/semaphore.h中,但一般程序员可以通过sema_init()接口来初始化信号量:
#define DECLARE_MUTEX(name) __DECLARE_SEMAPHORE_GENERIC(name,1) #define DECLARE_MUTEX_LOCKED(name) __DECLARE_SEMAPHORE_GENERIC(name,0) static inline void sema_init (struct semaphore *sem, int val) static inline void init_MUTEX (struct semaphore *sem) static inline void init_MUTEX_LOCKED (struct semaphore *sem) |
除了down()以外,Linux还提供了一个down_interruptible(),操作序列与down()基本相同,仅在休眠状态为可中断和信号处理上有所不同。在标准的信号量以外,还有一套读写信号量,用于将资源的读写区分开来进行同步以提高效率,采用读写锁来实现,有兴趣的可以参阅文后列出的参考资料。
|
锁是一个概念,正如上面提到的mutex互斥锁仅仅是其中的一种。互斥锁被锁定时进入休眠,而系统还能正常运转,但有很多时候,锁应该不仅仅互斥访问,甚至应该让系统挂起,直至锁成功,也就是说在锁操作中"自旋",这就是Linux中的spinlock机制。
从实现上来说,自旋锁比较简单,主要分为两部分,一部分是中断处理,一部分是自旋处理,最基础的部分在spin_lock_string和spin_unlock_string这两段汇编代码中:
#define spin_lock_string \ "\n1:\t" \ "lock ; decb %0\n\t" \ "js 2f\n" \ ".section .text.lock,\"ax\"\n" \ "2:\t" \ "cmpb $0,%0\n\t" \ "rep;nop\n\t" \ "jle 2b\n\t" \ "jmp 1b\n" \ ".previous" #define spin_unlock_string \ "movb $1,%0" |
不详细解释这段汇编代码的语义,spin_lock_string对锁原子减1,循环检查锁值,直到锁值大于0;而spin_unlock_string则是对锁赋值1。spin_lock_string用于构成spin_lock()函数,spin_unlock_string用于构成spin_unlock()函数。
spin_lock()/spin_unlock()构成了自旋锁机制的基础,它们和关中断local_irq_disable()/开中断local_irq_enable()、关bh local_bh_disable()/开bh local_bh_enable()、关中断并保存状态字local_irq_save()/开中断并恢复状态字local_irq_restore()结合就形成了整套自旋锁机制,接口定义在include/linux/spinlock.h中,这里就不列举了。
实际上,以上提到的spin_lock()都是在CONFIG_SMP的前提下才会生成的,也就是说,在单处理机系统中,spin_lock()是一条空语句,因为在处理机执行它的语句时,不可能受到打扰,语句肯定是串行的。在这种简单情况下,spin_lock_irq()就只需要锁中断就可以完成任务了。在include/linux/spinlock.h中就用#ifdef CONFIG_SMP来区分两种不同的情况。
自旋锁有很多种,信号量也可以用来构成互斥锁,原子操作也有锁功能,而且还有与标准锁机制类似的读写锁变种,在不同的应用场合应该选择不同的锁,下一节就是介绍如何选择。
|
如果所访问的共享资源仅在用户上下文中使用,最高效的办法就是使用信号量。在net/core/netfilter.c中有一处使用信号量的例子:
static DECLARE_MUTEX(nf_sockopt_mutex);
int nf_register_sockopt(struct nf_sockopt_ops *reg)
{
...
if (down_interruptible(&nf_sockopt_mutex) != 0)
return -EINTR;
...
out:
up(&nf_sockopt_mutex);
return ret;
}
|
此时有两种情况需要使用锁,一是用户上下文被bottom half中断,二是多个处理机同时进入一个临界段。一般情况下,使用spin_lock_bh()/spin_unlock_bh()可以满足要求,它将关闭当前CPU的bottom half,然后再获取锁,直至离开临界段再释放并对bottom half重新使能。
tasklet与bottom half的实现机制是一样的,实际上spin_lock_bh()也同时关闭了tasklet的执行,因此,在这种情况下,用户上下文与tasklet之间的同步也使用spin_lock_bh()/spin_unlock_bh()。
bottom half本身的机制就保证了不会有多于1个的bottom half同时处于运行态,即使对于SMP系统也是如此,因此,在设计共享数据的bottom half时无需考虑互斥。
tasklet和bottom half类似,也是受到local_bh_disable()保护的,因此,同一个tasklet不会同时在两个CPU上运行;但不同的tasklet却有可能,因此,如果需要同步不同的tasklet访问共享数据的话,就应该使用spin_lock()/spin_unlock()。正如上面提到的,这种保护仅对SMP系统有意义,UP系统中tasklet的运行不会受到另一个tasklet(不论它是否与之相同)的打扰,因此也就没有必要上锁。
softirq是实现tasklet和bottom half的基础,限制较后二者都少,允许两个softirq同时运行于不同的CPU之上,而不论它们是否来自同一个softirq代码,因此,在这种情况下,都需要用spin_lock()/spin_unlock()来同步。
硬中断是指硬件中断的处理程序上下文,软中断包括softirq和在它基础上实现的tasklet和bottom half等,此时,为了防止硬件中断软中断的运行,同步措施必须包括关闭硬件中断,spin_lock_irq()/spin_unlock_irq()就包括这个动作。还有一对API,spin_lock_irqsave()/spin_unlock_irqrestore(),不仅关闭中断,还保存机器状态字,并在打开中断时恢复。
首先需要提醒的是"死锁",这在操作系统原理的课本上都做过介绍,无论是使用信号量还是使用自旋锁,都有可能产生死锁,特别是自旋锁,如果死锁在spin_lock上,整个系统就会挂起。如何避免死锁是理论课的问题,这里就不多说了。
另外一点就是尽可能短时间的锁定,因此,"对数据上锁,而不是对代码上锁"就成了一个简单的原则;在有可能的情况下,使用读写锁,而不要总是使用互斥锁;对读写排序,使用原子操作,从而完全避免使用锁,也是一个不错的设计思想。
不要在锁定状态下调用可能引起休眠的操作,以下这些操作就是目前可能因此休眠的函数:
杨沙洲,现为国防科技大学计算机学院博士生,主要研究领域为操作系统技术 |