Chinaunix首页 | 论坛 | 博客
  • 博客访问: 502251
  • 博文数量: 76
  • 博客积分: 4010
  • 博客等级: 上校
  • 技术积分: 1534
  • 用 户 组: 普通用户
  • 注册时间: 2007-04-10 16:28
文章分类
文章存档

2010年(1)

2009年(3)

2008年(72)

我的朋友

分类: LINUX

2008-03-11 20:08:29

同步通常是为了达到多线程协同的目的而设计的一种机制,通常包含异步信号机制和互斥机制作为其实现的底层。在Linux 2.4内核中也有相应的技术实现,包括信号量、自旋锁、原子操作和等待队列,其中原子操作和等待队列又是实现信号量的底层。

wait queue很早就作为一个基本的功能单位出现在Linux内核里了,它以队列为基础数据结构,与进程调度机制紧密结合,能够用于实现核心的异步事件通知机制。我们从它的使用范例着手,看看等待队列是如何实现异步信号功能的。

在核心运行过程中,经常会因为某些条件不满足而需要挂起当前线程,直至条件满足了才继续执行。在2.4内核中提供了一组新接口来实现这样的功能,下面的代码节选自kernel/printk.c:

    unsigned long log_size;
1:  DECLARE_WAIT_QUEUE_HEAD(log_wait);...
4:  spinlock_t console_lock = SPIN_LOCK_UNLOCKED;...
    int do_syslog(int type,char *buf,int len){
        ...
2:      error=wait_event_interruptible(log_wait,log_size);
        if(error)
             goto out;
        ...
5:      spin_lock_irq(&console_lock);
	...
        log_size--;
        ...
6:	spin_unlock_irq(&console_lock);
        ...
    }
    asmlinkage int printk(const char *fmt,...){
	...
7:	spin_lock_irqsave(&console_lock,flags);
        ...
        log_size++;...
8:	spin_unlock_irqrestore(&console_lock,flags);
3:      wake_up_interruptible(&log_wait);
        ...
    }
    

这段代码实现了printk调用和syslog之间的同步,syslog需要等待printk送数据到缓冲区,因此,在2:处等待log_size非0;而printk一边传送数据,一边增加log_size的值,完成后唤醒在log_wait上等待的所有线程(这个线程不是用户空间的线程概念,而是核内的一个执行序列)。执行了3:的wake_up_interruptible()后,2:处的wait_event_interruptible()返回0,从而进入syslog的实际动作。

1:是定义log_wait全局变量的宏调用。

在实际操作log_size全局变量的时候,还使用了spin_lock自旋锁来实现互斥,关于自旋锁,这里暂不作解释,但从这段代码中已经可以清楚的知道它的使用方法了。

所有wait queue使用上的技巧体现在wait_event_interruptible()的实现上,代码位于include/linux/sched.h中,前置数字表示行号:

779 #define __wait_event_interruptible(wq, condition, ret)                  \
780 do {                                                                    \
781         wait_queue_t __wait;                                            \
782         init_waitqueue_entry(&__wait, current);                         \
783                                                                         \
784         add_wait_queue(&wq, &__wait);                                   \
785         for (;;) {                                                      \
786                 set_current_state(TASK_INTERRUPTIBLE);                  \
787                 if (condition)                                          \
788                         break;                                          \
789                 if (!signal_pending(current)) {                         \
790                         schedule();                                     \
791                         continue;                                       \
792                 }                                                       \
793                 ret = -ERESTARTSYS;                                     \
794                 break;                                                  \
795         }                                                               \
796         current->state = TASK_RUNNING;                                  \
797         remove_wait_queue(&wq, &__wait);                                \
798 } while (0)
799         
800 #define wait_event_interruptible(wq, condition)                         \
801 ({                                                                      \
802         int __ret = 0;                                                  \
803         if (!(condition))                                               \
804                 __wait_event_interruptible(wq, condition, __ret);       \
805         __ret;                                                          \
806 })
 

在wait_event_interruptible()中首先判断condition是不是已经满足,如果是则直接返回0,否则调用__wait_event_interruptible(),并用__ret来存放返回值。__wait_event_interruptible()首先定义并初始化一个wait_queue_t变量__wait,其中数据为当前进程结构current(struct task_struct),并把__wait入队。在无限循环中,__wait_event_interruptible()将本进程置为可中断的挂起状态,反复检查condition是否成立,如果成立则退出,如果不成立则继续休眠;条件满足后,即把本进程运行状态置为运行态,并将__wait从等待队列中清除掉,从而进程能够调度运行。如果进程当前有异步信号(POSIX的),则返回-ERESTARTSYS。

挂起的进程并不会自动转入运行的,因此,还需要一个唤醒动作,这个动作由wake_up_interruptible()完成,它将遍历作为参数传入的log_wait等待队列,将其中所有的元素(通常都是task_struct)置为运行态,从而可被调度到,执行__wait_event_interruptible()中的代码。

DECLARE_WAIT_QUEUE_HEAD(log_wait)经过宏展开后就是定义了一个log_wait等待队列头变量:

struct __wait_queue_head log_wait = {
	lock:	SPIN_LOCK_UNLOCKED,
	task_list:      { &log_wait.task_list, &log_wait.task_list }
}
 

其中task_list是struct list_head变量,包括两个list_head指针,一个next、一个prev,这里把它们初始化为自身,属于队列实现上的技巧,其细节可以参阅关于内核list数据结构的讨论,add_wait_queue()和remove_wait_queue()就等同于list_add()和list_del()。

wait_queue_t结构在include/linux/wait.h中定义,关键元素即为一个struct task_struct变量表征当前进程。

除了wait_event_interruptible()/wake_up_interruptible()以外,与此相对应的还有wait_event()和wake_up()接口,interruptible是更安全、更常用的选择,因为可中断的等待可以接收信号,从而挂起的进程允许被外界kill。

wait_event*()接口是2.4内核引入并推荐使用的,在此之前,最常用的等待操作是interruptible_sleep_on(wait_queue_head_t *wq),当然,与此配套的还有不可中断版本sleep_on(),另外,还有带有超时控制的*sleep_on_timeout()。sleep_on系列函数的语义比wait_event简单,没有条件判断功能,其余动作与wait_event完全相同,也就是说,我们可以用interruptible_sleep_on()来实现wait_event_interruptible()(仅作示意〉:

do{
	interruptible_sleep_on(&log_wait);
        if(condition)
		break;
}while(1);
 

相对而言,这种操作序列有反复的入队、出队动作,更加耗时,而很大一部分等待操作的确是需要判断一个条件是否满足的,因此2.4才推荐使用wait_event接口。

在wake_up系列接口中,还有一类wake_up_sync()和wake_up_interruptible_sync()接口,保证调度在wake_up返回之后进行。





回页首


POSIX有信号量,SysV IPC有信号量,核内也有信号量,接口很简单,一个down(),一个up(),分别对应P操作和V操作,down()调用可能引起线程挂起,因此和sleep_on类似,也有interruptible系列接口。down意味着信号量减1,up意味着信号量加1,这两个操作显然需要互斥。在Linux 2.4中,并没有如想象中的用锁实现,而是使用了原子操作。

在include/asm/atomic.h中定义了一系列原子操作,包括原子读、原子写、原子加等等,大多是直接用汇编语句来实现的,这里就不详细解释。

我们从信号量数据结构开始,它定义在include/asm/semaphore.h中:

struct semaphore {
        atomic_t count;
        int sleepers;
        wait_queue_head_t wait;
}
 

down()操作可以理解为申请资源,up()操作可以理解为释放资源,因此,信号量实际表示的是资源的数量以及是否有进程正在等待。在semaphore结构中,count相当于资源计数,为正数或0时表示可用资源数,-1则表示没有空闲资源且有等待进程。而等待进程的数量并不关心。这种设计主要是考虑与信号量的原语相一致,当某个进程执行up()函数释放资源,点亮信号灯时,如果count恢复到0,则表示尚有进程在等待该资源,因此执行唤醒操作。一个典型的down()-up()流程是这样的:

down()-->count做原子减1操作,如果结果不小于0则表示成功申请,从down()中返回;
-->如果结果为负(实际上只可能是-1),则表示需要等待,则调用__down_fail();
__down_fail()调用__down(),__down()用C代码实现,要求已不如down()和__down_fail()严格,在此作实际的等待(arch/i386/kernel/semaphore.c):

void __down(struct semaphore * sem)
{
        struct task_struct *tsk = current;
        DECLARE_WAITQUEUE(wait, tsk);
        tsk->state = TASK_UNINTERRUPTIBLE;
        add_wait_queue_exclusive(&sem->wait, &wait);
        spin_lock_irq(&semaphore_lock);
        sem->sleepers++;
        for (;;) {
                int sleepers = sem->sleepers;
                /*
                 * Add "everybody else" into it. They aren't
                 * playing, because we own the spinlock.
                 */
                if (!atomic_add_negative(sleepers - 1, &sem->count)) {
                        sem->sleepers = 0;
                        break;
                }
                sem->sleepers = 1;      /* us - see -1 above */
                spin_unlock_irq(&semaphore_lock);
                schedule();
                tsk->state = TASK_UNINTERRUPTIBLE;
                spin_lock_irq(&semaphore_lock);
        }
        spin_unlock_irq(&semaphore_lock);
        remove_wait_queue(&sem->wait, &wait);
        tsk->state = TASK_RUNNING;
        wake_up(&sem->wait);
}
 

__down()-->当前进程进入wait等待队列,状态为不可中断的挂起,sleepers++,如果这是第一次申请失败,则sleepers值为1,否则为2--这个设置纯粹是为了下面这句原子加而安排的。

在真正进入休眠以前,__down()还是需要判断一下是不是确实没有资源可用,因为在spin_lock之前什么都可能发生。atomic_add_negative()将sleepers-1(只可能是0或者1,分别表示仅有一个等待进程或是多个)加到count(如果有多个进程申请资源失败进入__down(),count可能为-2、-3等)之上,这个加法完成后,结果为0只可能是在sleepers等于1的时候发生(因为如果sleepers等于2,表示有多个进程执行了down(),则count必然小于-1,因此sleepers-1+count必然小于0),表示count在此之前已经变为0了,也就是说已经有进程释放了资源,因此本进程不用休眠而是获得资源退出__down(),从而也从down()中返回;如果没有进程释放资源,那么在所有等待进程的这一加法完成后,count将等于-1。因此,从down()调用外面看(无论是在down()中休眠还是获得资源离开down()),count为负时只可能为-1(此时sleepers等于1),这么设计使得up()操作只需要对count加1,判断是否为0就可以知道是否有必要执行唤醒操作__up_wakeup()了。

获得了资源的进程将把sleepers设为0,并唤醒所有其他等待进程,这个操作实际上只是起到恢复count为-1,并使它们再次进入休眠的作用,因为第一个被唤醒的等待进程执行atomic_add_negative()操作后会将count恢复为-1,然后将sleepers置为1;以后的等待进程则会像往常一样重新休眠。

将down()操作设计得如此复杂的原因和结果就是up操作相当简单。up()利用汇编原子地将count加1,如果小于等于0表示有等待进程,则调用__up_wakeup()-->__up()唤醒wait;否则直接返回。

在down()中竞争获得资源的进程并不是按照优先级排序的,只是在up()操作完成后第一个被唤醒或者正在__down()中运行而暂未进入休眠的进程成功的可能性稍高一些。

尽管可以将信号量的count初始化为1从而实现一种互斥锁(mutex),但Linux并不保证这个count不会超过1,因为up操作并不考虑count的初值,所以只能依靠程序员自己来控制不要无谓的执行up()从而破坏mutex的语义。相关的初始化接口定义在include/asm/semaphore.h中,但一般程序员可以通过sema_init()接口来初始化信号量:

#define DECLARE_MUTEX(name) __DECLARE_SEMAPHORE_GENERIC(name,1)
#define DECLARE_MUTEX_LOCKED(name) __DECLARE_SEMAPHORE_GENERIC(name,0)
static inline void sema_init (struct semaphore *sem, int val)
static inline void init_MUTEX (struct semaphore *sem)
static inline void init_MUTEX_LOCKED (struct semaphore *sem)
 

除了down()以外,Linux还提供了一个down_interruptible(),操作序列与down()基本相同,仅在休眠状态为可中断和信号处理上有所不同。在标准的信号量以外,还有一套读写信号量,用于将资源的读写区分开来进行同步以提高效率,采用读写锁来实现,有兴趣的可以参阅文后列出的参考资料。





回页首


锁是一个概念,正如上面提到的mutex互斥锁仅仅是其中的一种。互斥锁被锁定时进入休眠,而系统还能正常运转,但有很多时候,锁应该不仅仅互斥访问,甚至应该让系统挂起,直至锁成功,也就是说在锁操作中"自旋",这就是Linux中的spinlock机制。

从实现上来说,自旋锁比较简单,主要分为两部分,一部分是中断处理,一部分是自旋处理,最基础的部分在spin_lock_string和spin_unlock_string这两段汇编代码中:

#define spin_lock_string \
        "\n1:\t" \
        "lock ; decb %0\n\t" \
        "js 2f\n" \
        ".section .text.lock,\"ax\"\n" \
        "2:\t" \
        "cmpb $0,%0\n\t" \
        "rep;nop\n\t" \
        "jle 2b\n\t" \
        "jmp 1b\n" \
        ".previous"
#define spin_unlock_string \
        "movb $1,%0"
 

不详细解释这段汇编代码的语义,spin_lock_string对锁原子减1,循环检查锁值,直到锁值大于0;而spin_unlock_string则是对锁赋值1。spin_lock_string用于构成spin_lock()函数,spin_unlock_string用于构成spin_unlock()函数。

spin_lock()/spin_unlock()构成了自旋锁机制的基础,它们和关中断local_irq_disable()/开中断local_irq_enable()、关bh local_bh_disable()/开bh local_bh_enable()、关中断并保存状态字local_irq_save()/开中断并恢复状态字local_irq_restore()结合就形成了整套自旋锁机制,接口定义在include/linux/spinlock.h中,这里就不列举了。

实际上,以上提到的spin_lock()都是在CONFIG_SMP的前提下才会生成的,也就是说,在单处理机系统中,spin_lock()是一条空语句,因为在处理机执行它的语句时,不可能受到打扰,语句肯定是串行的。在这种简单情况下,spin_lock_irq()就只需要锁中断就可以完成任务了。在include/linux/spinlock.h中就用#ifdef CONFIG_SMP来区分两种不同的情况。

自旋锁有很多种,信号量也可以用来构成互斥锁,原子操作也有锁功能,而且还有与标准锁机制类似的读写锁变种,在不同的应用场合应该选择不同的锁,下一节就是介绍如何选择。





回页首


如果所访问的共享资源仅在用户上下文中使用,最高效的办法就是使用信号量。在net/core/netfilter.c中有一处使用信号量的例子:

static DECLARE_MUTEX(nf_sockopt_mutex);
int nf_register_sockopt(struct nf_sockopt_ops *reg)
{
...
        if (down_interruptible(&nf_sockopt_mutex) != 0)
                return -EINTR;
...
out:
        up(&nf_sockopt_mutex);
        return ret;
}
 

此时有两种情况需要使用锁,一是用户上下文被bottom half中断,二是多个处理机同时进入一个临界段。一般情况下,使用spin_lock_bh()/spin_unlock_bh()可以满足要求,它将关闭当前CPU的bottom half,然后再获取锁,直至离开临界段再释放并对bottom half重新使能。

tasklet与bottom half的实现机制是一样的,实际上spin_lock_bh()也同时关闭了tasklet的执行,因此,在这种情况下,用户上下文与tasklet之间的同步也使用spin_lock_bh()/spin_unlock_bh()。

bottom half本身的机制就保证了不会有多于1个的bottom half同时处于运行态,即使对于SMP系统也是如此,因此,在设计共享数据的bottom half时无需考虑互斥。

tasklet和bottom half类似,也是受到local_bh_disable()保护的,因此,同一个tasklet不会同时在两个CPU上运行;但不同的tasklet却有可能,因此,如果需要同步不同的tasklet访问共享数据的话,就应该使用spin_lock()/spin_unlock()。正如上面提到的,这种保护仅对SMP系统有意义,UP系统中tasklet的运行不会受到另一个tasklet(不论它是否与之相同)的打扰,因此也就没有必要上锁。

softirq是实现tasklet和bottom half的基础,限制较后二者都少,允许两个softirq同时运行于不同的CPU之上,而不论它们是否来自同一个softirq代码,因此,在这种情况下,都需要用spin_lock()/spin_unlock()来同步。

硬中断是指硬件中断的处理程序上下文,软中断包括softirq和在它基础上实现的tasklet和bottom half等,此时,为了防止硬件中断软中断的运行,同步措施必须包括关闭硬件中断,spin_lock_irq()/spin_unlock_irq()就包括这个动作。还有一对API,spin_lock_irqsave()/spin_unlock_irqrestore(),不仅关闭中断,还保存机器状态字,并在打开中断时恢复。

首先需要提醒的是"死锁",这在操作系统原理的课本上都做过介绍,无论是使用信号量还是使用自旋锁,都有可能产生死锁,特别是自旋锁,如果死锁在spin_lock上,整个系统就会挂起。如何避免死锁是理论课的问题,这里就不多说了。

另外一点就是尽可能短时间的锁定,因此,"对数据上锁,而不是对代码上锁"就成了一个简单的原则;在有可能的情况下,使用读写锁,而不要总是使用互斥锁;对读写排序,使用原子操作,从而完全避免使用锁,也是一个不错的设计思想。

不要在锁定状态下调用可能引起休眠的操作,以下这些操作就是目前可能因此休眠的函数:

  1. 对用户内存的访问:copy_from_user()、copy_to_user()、get_user()、put_user()
  2. kmalloc(GFP_KERNEL)
  3. down_interruptible()和down(),如果需要在spinlock中使用信号量,可以选择down_trylock(),它不会引起挂起 printk()的灵巧设计使得它不会挂起,因此可以在任何上下文中使用。
阅读(1150) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~