https://github.com/zytc2009/BigTeam_learning
分类: LINUX
2010-11-13 16:44:06
semaphore是内核中比较重要和常用的同步方式之一,他主要的特点是实现了Sleep机制下的同步。也就是当获取 一个semaphore但是又不能立刻获取的时候,他使当前的执行进程进入到Sleep状态中等待,当semaphore可以获取的时候,从新开始运行, 而不像splin lock在获取锁的时候是BusyWait。
首先看其定义:
struct semaphore {
atomic_t count; // 原子变量,是后续的实际代码中,我们能看到其即为我们在初始化时所设置的信号量。
int sleepers; // 有几个等待者。
wait_queue_head_t wait; // 等待队列
};
初始化函数:
static inline void sema_init (struct semaphore *sem, int val)
{
atomic_set(&sem->count, val); // 原子操作,把信号量的值设为原子操作的值。
sem->sleepers = 0; // 设为等待者为0。
init_waitqueue_head(&sem->wait); // 初始化等待队列。
}
好看完了初始化函数,我们来看一下一个特例PV操作:
static inline void init_MUTEX (struct semaphore *sem)
{
sema_init(sem, 1);
}
static inline void init_MUTEX_LOCKED (struct semaphore *sem)
{
sema_init(sem, 0);
}
从中我们可以看到,其实我们所说的PV操作就是调用sema_init来把其中的原子变量分别置0或者1。
下面我们来看下具体的操作函数:
static inline void down(struct semaphore * sem)
{
might_sleep();
__asm__ __volatile__(
"# atomic down operation\n\t"
LOCK_PREFIX "decl %0\n\t" /* --sem->count */
"jns 2f\n"
"\tlea %0,%%eax\n\t"
"call __down_failed\n"
"2:"
:"+m" (sem->count)
:
:"memory","ax");
}
关于might_sleep():
#define might_sleep() \
do { __might_sleep(__FILE__, __LINE__); might_resched(); } while (0)
从中可以看到其根本是调用might_resched():
#define might_resched() cond_resched()
其调用了cond_resched():
int __sched cond_resched(void)
{
if (need_resched() && !(preempt_count() & PREEMPT_ACTIVE) &&
system_state == SYSTEM_RUNNING) {
__cond_resched();
return 1;
}
return 0;
}
其中:
static inline int need_resched(void)
{
return unlikely(test_thread_flag(TIF_NEED_RESCHED));
// TIF_NEED_RESCHED 这个值在arm是2,在i386中是3。
}
其是判断进程的状态是否是需要调度。
其具体实现是:
#define test_thread_flag(flag) \
test_ti_thread_flag(current_thread_info(), flag)
得到当前进程的状态,然后和flag做位&操作,判断是否可以做调度。
看下关于抢占机制的判断:
#define preempt_count() (current_thread_info()->preempt_count)
调用preempt_count()得到当前进程关于抢占的状态,preempt_count & PREEMPT_ACTIVE 为TRUE,取反为当前不可以抢占,就不能调度,否则为可调度。
看些系统状态:system_state 为运行状态。
综合上面的几个条件,我们看出当前进程可以被调度或者说抢占,于是当前进程被调度放弃CPU资源。
might_sleep宏就是检查是否需要重新调度。
下面我们进入汇编代码区:
__asm__ __volatile__(
"# atomic up operation\n\t"
LOCK_PREFIX "decl %0\n\t" /* --sem->count */
"jns 2f\n" /* 当符号标志位为0的时候跳转到标号2地方,标志获取锁,结束本次操作。 */
"\tlea %0,%%eax\n\t"
"call __down_failed\n" /* 获取不到的话,压栈,调用 __down_failed */
"2:"
:"+m" (sem->count)
:
:"memory","ax");
LOCK_PREFIX宏:
在多处理器环境中 LOCK_PREFIX 实际被定义为 “lock”前缀。
x86 处理器使用“lock”前缀的方式提供了在指令执行期间对总线加锁的手段。芯片上有一条引线 LOCK,如果在一条汇编指令(ADD, ADC, AND, BTC, BTR, BTS, CMPXCHG, CMPXCH8B, DEC, INC, NEG, NOT, OR, SBB, SUB, XOR, XADD, XCHG)前加上“lock” 前缀,经过汇编后的机器代码就使得处理器执行该指令时把引线 LOCK 的电位拉低,从而把总线锁住,这样其它处理器或使用DMA的外设暂时无法通过同一总线访问内存。
注意:从 P6 处理器开始,如果指令访问的内存区域已经存在于处理器的内部缓存中,则“lock” 前缀并不将引线 LOCK 的电位拉低,而是锁住本处理器的内部缓存,然后依靠缓存一致性协议保证操作的原子性。
在比较新的内核版本中Version >2.6.20, i386平台找不到关于__down_failed的具体定义:
linux-2.6.13.4>arch>i386>kernel>semaphore.c
asm(
".section .sched.text\n"
".align 4\n"
".globl __down_failed\n"
"__down_failed:\n\t"
#if defined(CONFIG_FRAME_POINTER)
"pushl %ebp\n\t"
"movl %esp,%ebp\n\t"
#endif
"pushl %edx\n\t"
"pushl %ecx\n\t"
"call __down\n\t"
"popl %ecx\n\t"
"popl %edx\n\t"
#if defined(CONFIG_FRAME_POINTER)
"movl %ebp,%esp\n\t"
"popl %ebp\n\t"
#endif
"ret"
);
从汇编代码中我们可以看到其调用了__down()来实现其等待获取锁的过程。
fastcall void __sched __down(struct semaphore * sem)
{
struct task_struct *tsk = current; // 当前进程task_struct。
DECLARE_WAITQUEUE(wait, tsk); // 定义等待队列。
unsigned long flags;
tsk->state = TASK_UNINTERRUPTIBLE; // 设置当前进程状态为不可中断状态。
spin_lock_irqsave(&sem->wait.lock, flags);
// spin_loc_irqsave 在获得自旋锁之前禁止中断(只在当前处理器);之前的中断状态保存在 flags 里。
add_wait_queue_exclusive_locked(&sem->wait, &wait);
// 将当前的进程添加到等待队列的队尾。操作是不需要锁,因为此函数一般是在获取锁之后调用。
sem->sleepers++;
for (;;) {
int sleepers = sem->sleepers;
/*
* Add "everybody else" into it. They aren't
* playing, because we own the spinlock in
* the wait_queue_head.
*/
if (!atomic_add_negative(sleepers - 1, &sem->count)) {
// 将(sleepers - 1) + ( &sem->count),如果当结果值为负数时返回真。
/* First:初始化的时候,count = val, sleeper = 0,
* down()中, count -- 成为 val - 1,以PV操作为例,其val值为1,count = 0,
* __down()中,sleeper ++ 为 1,
* 此时(sleepers - 1, &sem->count) 为0,不为负值,故此为FALSE,跳出循环。
*
* Two:由于在跳出的时候sleepers设置为0,故此Other进程尝试想再次获取锁资源的时候,
* (sleepers - 1, &sem->count) 为 ( 0 - 1, 0 )为负值,此时为真,进入下次循环。
*/
sem->sleepers = 0; //
break;
}
sem->sleepers = 1; /* us - see -1 above */
/* 设置sleeper为1,标识有进程在等待。*/
spin_unlock_irqrestore(&sem->wait.lock, flags);
/* 解开锁,开始准备被调度。 */
schedule();
spin_lock_irqsave(&sem->wait.lock, flags);
tsk->state = TASK_UNINTERRUPTIBLE;
}
remove_wait_queue_locked(&sem->wait, &wait);
/* 获取到了锁资源,把当前进程从等待队列中删掉。 */
wake_up_locked(&sem->wait);
/* 唤醒等待队列 */
spin_unlock_irqrestore(&sem->wait.lock, flags);
tsk->state = TASK_RUNNING;
}
这里要注意的是对sleeper成员的理解结合后面的代码:
进程在没有获得锁时会将其设为1,在获得了锁的时候将其设为0。
所以,sleeper = 1时表示有进程在等待,sleeper = 0时,有进程获得信号量退出,或者没有进程在等待此信号量。
如果在等待队列中进程被唤醒并获得了锁,则将sleeper设为0,然后用break退出循环,再用wake_up_locked(&sem->wait)唤醒等待队列中的一个进程。
这个进程下次被调度的时候从shedule()后面开始运行,更改进程状态。
调用if (!atomic_add_negative(sleepers - 1, &sem->count))判断,此时sleeper = 0,sleep - 1 = -1,经过atomic_add_negative()使sem->count - 1了。
因此前面论述过,进程进来取信号量 down()里count有减1,然后如果有等待线程在获取到这个信号,刚又会将count + 1,使其变到了原值。
现在在我们这个情景中,已经有线程释放信号了,应该要 + 1。
那思考一下:
为什么要对count sleeper做这样的处理呢?直接使用使用计数不就完了吗?进程等待获取信号量的时候sleep+1 count-1.释放信号量的时候sleep-1.count+1不可以吗?
表面上上述的方法也能工作的很好,但是如果进程数目一多就要考虑到溢出问题了。
static inline void up(struct semaphore * sem)
{
__asm__ __volatile__(
"# atomic up operation\n\t"
LOCK_PREFIX "incl %0\n\t" /* ++sem->count */
"jg 1f\n\t"
"lea %0,%%eax\n\t"
"call __up_wakeup\n"
"1:"
:"+m" (sem->count)
:
:"memory","ax");
}
asm(
".section .sched.text\n"
".align 4\n"
".globl __up_wakeup\n"
"__up_wakeup:\n\t"
"pushl %edx\n\t"
"pushl %ecx\n\t"
"call __up\n\t"
"popl %ecx\n\t"
"popl %edx\n\t"
"ret"
);
fastcall void __up(struct semaphore *sem)
{
wake_up(&sem->wait);
}
释放锁资源就比较简单了,首先是利用汇编判断其是否是有符号大于,我们这里count是为0,不是大于则call __up_wakeup(),最终执行__up(),唤醒等待队列。
至于相应信号的分析如下:
static inline int down_interruptible(struct semaphore * sem)
{
int result;
might_sleep();
__asm__ __volatile__(
"# atomic interruptible down operation\n\t"
"xorl %0,%0\n\t"
LOCK_PREFIX "decl %1\n\t" /* --sem->count */
"jns 2f\n\t"
"lea %1,%%eax\n\t"
"call __down_failed_interruptible\n"
"2:"
:"=&a" (result), "+m" (sem->count)
:
:"memory");
return result;
}
fastcall int __sched __down_interruptible(struct semaphore * sem)
{
int retval = 0; /* 返回值 */
struct task_struct *tsk = current;
DECLARE_WAITQUEUE(wait, tsk);
unsigned long flags;
tsk->state = TASK_INTERRUPTIBLE;
spin_lock_irqsave(&sem->wait.lock, flags);
add_wait_queue_exclusive_locked(&sem->wait, &wait);
sem->sleepers++;
for (;;) {
int sleepers = sem->sleepers;
/*
* With signals pending, this turns into
* the trylock failure case - we won't be
* sleeping, and we* can't get the lock as
* it has contention. Just correct the count
* and exit.
*/
if (signal_pending(current)) {
/* 判断是否有中断触发,如果有则返回非0值,否则返回0。*/
retval = -EINTR;
sem->sleepers = 0;
/* 重置sleepers = 0 */
atomic_add(sleepers, &sem->count);
/* 让等待者等于count。*/
break;
}
/*
* Add "everybody else" into it. They aren't
* playing, because we own the spinlock in
* wait_queue_head. The "-1" is because we're
* still hoping to get the semaphore.
*/
if (!atomic_add_negative(sleepers - 1, &sem->count)) {
sem->sleepers = 0;
break;
}
sem->sleepers = 1; /* us - see -1 above */
spin_unlock_irqrestore(&sem->wait.lock, flags);
schedule();
spin_lock_irqsave(&sem->wait.lock, flags);
tsk->state = TASK_INTERRUPTIBLE;
}
remove_wait_queue_locked(&sem->wait, &wait);
wake_up_locked(&sem->wait);
spin_unlock_irqrestore(&sem->wait.lock, flags);
tsk->state = TASK_RUNNING;
return retval;
}
static inline int down_trylock(struct semaphore * sem)
{
int result;
__asm__ __volatile__(
"# atomic interruptible down operation\n\t"
"xorl %0,%0\n\t"
LOCK_PREFIX "decl %1\n\t" /* --sem->count */
"jns 2f\n\t" /* 可以获取锁,锁之。*/
"lea %1,%%eax\n\t"
"call __down_failed_trylock\n\t"
"2:\n"
:"=&a" (result), "+m" (sem->count)
:
:"memory");
return result;
}
fastcall int __down_trylock(struct semaphore * sem)
{
int sleepers;
unsigned long flags;
spin_lock_irqsave(&sem->wait.lock, flags);
sleepers = sem->sleepers + 1;
/* 不能获取锁,把sleepers + 1,判断是否有等待进程,如果有唤醒其。*/
sem->sleepers = 0;
/*
* Add "everybody else" and us into it. They aren't
* playing, because we own the spinlock in the
* wait_queue_head.
*/
if (!atomic_add_negative(sleepers, &sem->count)) {
wake_up_locked(&sem->wait);
}