同步是linux内核中一种很重要的操作.它为内核提供了一种临界区和SMP系统中的数据保护机制.今天就来分析一下在linux内核是怎么样实现这些操作的.
一:原子操作(摘自《understanding the linux kernel 2.4》)
原子操作是指在执行过程中不能被打断的操作.它包括以下几种类型:
进行一次或者零次对齐内存的访问操作都是原子操作.因为这些指令一般都是单指令.不可能单指令在执行过程中被抢占。
如果在读操作之后,写操作之前没有其他处理器占用内存总线。那么从内存中读取数据,更新数据并把更新后的数据写回内存的这些“读—修改—写”汇编语言指令是原子的。例如:inc或dec
操作码前缀是lock字节的(0xf0)的“读—修改—写”汇编程序指令即使在SMP系统中也是原子的。当控制单元检测lock后,就会把总线锁住,一直到这些加锁指令执行完为止.
操作码前缀是一个rep字节的汇编语言指令都不是原子的。这条指令强行让控制单元执行相同的指令.控制单元在执行新的指令之前要检查挂起的中断.
1.1: 在内核中提供了以下的几组操作接口:
void atomic_set(atomic_t *v, int i)
int atomic_read(atomic_t *v)
void atomic_add(int i, atomic_t *v)
void atomic_sub(int i, atomic_t *v)
void atomic_inc(atomic_t *v)
void atomic_dec(atomic_t *v)
int atomic_inc_and_test(atomic_t *v)
int atomic_dec_and_test(atomic_t *v)
int atomic_sub_and_test(int i, atomic_t *v)
int atomic_add_negative(int i, atomic_t *v)
int atomic_add_return(int i, atomic_t *v)
int atomic_sub_return(int i, atomic_t *v)
int atomic_inc_return(atomic_t *v)
int atomic_dec_return(atomic_t *v)
atomic_sub(amount, &first_atomic)
atomic_add(amount, &second_atomic)
上述原子操作对象是一个32位数。之所以定义成这样,是为了和一般的操作区别。提醒用户这是一个原子操作接口。
1.2: 原子的位操作:
void set_bit(nr, void *addr)
void clear_bit(nr, void *addr)
void change_bit(nr, void *addr)
test_bit(nr, void *addr)
int test_and_set_bit(nr, void *addr)
int test_and_clear_bit(nr, void *addr)
int test_and_change_bit(nr, void *addr)
以atomic_inc(atomic_t *v), atomic_dec(atomic_t *v)为例来分析内核中对应的代码.
static __inline__ void atomic_dec(atomic_t *v)
{
__asm__ __volatile__(
LOCK_PREFIX "decl %0"
:"+m" (v->counter));
}
LOCK_PREFIX为锁操作前缀
Atomic_inc的代码如下:
static __inline__ void atomic_inc(atomic_t *v)
{
__asm__ __volatile__(
LOCK_PREFIX "incl %0"
:"+m" (v->counter));
}
二:自旋锁
上述的原子操作操作接口提供了单变量的保护操作,但是在实际项目中,情况并没有这么简单,我们需要保护的是一个临界区.所以我们需要用到其它的加锁方法.自旋锁就是其中之一.
2.1:自旋锁的特性
自旋锁只能被一个控制路径所持有.进入临界区,自旋锁锁住.其后如果有其它线程来取自旋锁的自己就会原地“自旋”,一直到持有者放开锁为止.
很显然,当得不到锁的时候,CPU就会在一直在做轮询判断.也就是说如果一个进程取不到自旋锁.也会一直判断下去,不会主动去调度其它进程代替当前进程.但是这样锁机制的开销极少.可以快速的开锁和加锁.持有自旋锁的进程一般都会对临界区有快速的处理
另外,内核还规定持有自旋锁的进程不能被抢占,这样做主要是出于系统性能考虑.如果一个持自旋锁的进程被调度出CPU.那其它需要这些锁的进程被调度进来之后,也只做 “自旋”工作.
2.2: 自旋锁的实现
自旋锁在内核中对应的类型为: spinlock_t
typedef struct {
raw_spinlock_t raw_lock;
#if defined(CONFIG_PREEMPT) && defined(CONFIG_SMP)
//配置了内核抢占与SMP
unsigned int break_lock;
#endif
//自旋锁DEBUG
#ifdef CONFIG_DEBUG_SPINLOCK
unsigned int magic, owner_cpu;
void *owner;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
struct lockdep_map dep_map;
#endif
} spinlock_t;
typedef struct {
unsigned int slock;
} raw_spinlock_t;
在接下来的代码中再来分析结构中各成员的含义.
2.2.1:spin_lock_init :自旋锁的初始化
代码如下:
# define spin_lock_init(lock) \
do { *(lock) = SPIN_LOCK_UNLOCKED; } while (0)
#define SPIN_LOCK_UNLOCKED __SPIN_LOCK_UNLOCKED(old_style_spin_init)
# define __SPIN_LOCK_UNLOCKED(lockname) \
(spinlock_t) { .raw_lock = __RAW_SPIN_LOCK_UNLOCKED, \
SPIN_DEP_MAP_INIT(lockname) }
#define __RAW_SPIN_LOCK_UNLOCKED { 1 }
上述操作就是把sipinlock_t中的raw_lock置为__RAW_SPIN_LOCK_UNLOCKED.即为1.
此时锁是打开状态.
2.2.2: spin_lock() :获取自旋锁.
#define spin_lock(lock) _spin_lock(lock)
_spin_lock()定义如下:
void __lockfunc _spin_lock(spinlock_t *lock)
{
//禁止抢占
preempt_disable();
//这个函数在没有定义自旋锁调试的时候是空函数
spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
//相当于_raw_spin_lock(lock)
LOCK_CONTENDED(lock, _raw_spin_trylock, _raw_spin_lock);
}
LOCK_CONTENDED的定义如下:
#define LOCK_CONTENDED(_lock, try, lock) \
lock(_lock)
_raw_spin_lock()的操作如下:
# define _raw_spin_lock(lock) __raw_spin_lock(&(lock)->raw_lock)
static inline void __raw_spin_lock(raw_spinlock_t *lock)
{
asm volatile(
"\n1:\t"
//decl: lock->slock减1
LOCK_PREFIX " ; decl %0\n\t"
//如果不为负.跳转到2f.2f后面没有任何指令.即为退出
"jns 2f\n"
"3:\n"
//重复执行nop.nop是x86的小延迟函数
"rep;nop\n\t"
//比较0与lock->slock的值,如果lock->slock不大于0.跳转到标号3.即继续重复执行nop
"cmpl $0,%0\n\t"
//如果lock->slock不大于0.跳转到标号3.即继续重复执行nop
"jle 3b\n\t"
//如果lock->slock大于0.跳转到标号1.重新判断锁的slock成员
"jmp 1b\n"
"2:\t" : "=m" (lock->slock) : : "memory");
}
在上面的函数中,可能对"jmp 1b\n"比较难以理解.在我们一般的观念里.获得一个锁.将其值减1.释放锁时将其值加1.实际上在自旋锁的实现中lock->slock只有两个可能值,一个是0. 一个是1.释放锁的时候并不是将lock->slock加1.而是将其赋为1.在后面的自旋锁释放代码中再详细分析.
# define spin_unlock(lock) _spin_unlock(lock)
void __lockfunc _spin_unlock(spinlock_t *lock)
{
//在没有配置自旋锁调试的时候.该函数为空函数
spin_release(&lock->dep_map, 1, _RET_IP_);
//
_raw_spin_unlock(lock);
//释放自旋锁了.允许内核抢占
preempt_enable();
}
# define _raw_spin_unlock(lock) __raw_spin_unlock(&(lock)->raw_lock)
static inline void __raw_spin_unlock(raw_spinlock_t *lock)
{
//将lock->slock赋为1
asm volatile("movb $1,%0" : "+m" (lock->slock) :: "memory");
}
总体来说,释放自旋锁的分为两步:
第一步:将锁的slock字段回归到1.注意这里不是加1,而是赋值1
第二步:允许内核抢占,与加锁时的禁止内核抢占相对应
在内核中还有几个与硬中断和软中断有关的几个自旋锁API.如下所示:
void spin_lock(spinlock_t *lock)
void spin_lock_irqsave(spinlock_t *lock, unsigned long flags)
void spin_lock_irq(spinlock_t *lock)
void spin_lock_bh(spinlock_t *lock)
void spin_unlock(spinlock_t *lock);
void spin_unlock_irqrestore(spinlock_t *lock, unsigned long flags);
void spin_unlock_irq(spinlock_t *lock);
void spin_unlock_bh(spinlock_t *lock);
代码都不复杂,可自行结合前面分析的IRQ中断部份理解这几个API的操作.
三:信号量的操作
3.1:信号量的特点:
根据前面的分析看到.自旋锁是一种轻快但效率极其低下的一种加锁,对于需要对临界区长时间操作的情况下不太合适.因此,需要一种等待获锁的机制.这就是信号量.信号量在请求锁的时候,如果不成功,便会将进程投入睡眠.因此要在允许睡眠的场合使用信号量.
3.2:信号量的实现
信号量的结构如下所示:
struct semaphore {
//信号量资源数
atomic_t count;
//是否有一些进程在睡眠
int sleepers;
//信号的等待队列
wait_queue_head_t wait;
}
3.3: sema_init() :信号量的初始化
static inline void sema_init (struct semaphore *sem, int val)
{
//设置信号量的资源数为val.sem->count表示允许同时有多少个进程占有此信号量
atomic_set(&sem->count, val);
//sleepers成员置0,表示没有进程在等待
sem->sleepers = 0;
//初始化等待队列
init_waitqueue_head(&sem->wait);
}
3.4:down :获取信号量
static inline void down(struct semaphore * sem)
{
//可能会引起睡眠
might_sleep();
__asm__ __volatile__(
"# atomic down operation\n\t"
//LOCK:锁定总线标志
//使sem->count -1 如果小于0.跳转到2
LOCK "decl %0\n\t" /* --sem->count */
"js 2f\n"
//后面没有指令了,退出
"1:\n"
LOCK_SECTION_START("")
//如果sem->count -1小于0.说明该信号已经没有足够的资源了.调用__down_failed
"2:\tcall __down_failed\n\t"
"jmp 1b\n"
LOCK_SECTION_END
:"=m" (sem->count)
:"c" (sem)
:"memory");
}
__down_failed的代码如下:
asm(
".section .sched.text\n"
".align 4\n"
".globl __down_failed\n"
"__down_failed:\n\t"
#if defined(CONFIG_FRAME_POINTER)
"pushl %ebp\n\t"
"movl %esp,%ebp\n\t"
#endif
//之所以把eax edx ecx压栈,是因为__down()函数的参数就是eax edx ecx.它从堆栈中取参数.所以//将其入栈.这样做是从2.4遗留下来的,因为2.4的__down函数为fastcall型
"pushl %eax\n\t"
"pushl %edx\n\t"
"pushl %ecx\n\t"
"call __down\n\t"
"popl %ecx\n\t"
"popl %edx\n\t"
"popl %eax\n\t"
#if defined(CONFIG_FRAME_POINTER)
"movl %ebp,%esp\n\t"
"popl %ebp\n\t"
#endif
"ret"
)
__down的代码如下:
asmlinkage void __sched __down(struct semaphore * sem)
{
struct task_struct *tsk = current;
//声明一个等待队列
DECLARE_WAITQUEUE(wait, tsk);
unsigned long flags;
//将进程状态置为不可中断型.外部信号不可以将其中断
tsk->state = TASK_UNINTERRUPTIBLE;
//获取自旋锁且关中断
spin_lock_irqsave(&sem->wait.lock, flags);
//加入等待队列,并将等待标志高为WQ_FLAG_EXCLUSIVE
add_wait_queue_exclusive_locked(&sem->wait, &wait);
//sleepers之前只能为0或者是1
sem->sleepers++;
for (;;) {
int sleepers = sem->sleepers;
/*
* Add "everybody else" into it. They aren't
* playing, because we own the spinlock in
* the wait_queue_head.
*/
//如果之前有进程在线程,则使sem->count+1 (刚好使count等于down进来的值)
//因为之前在down()中有减1操作
//负数返回真
if (!atomic_add_negative(sleepers - 1, &sem->count)) {
//已经有信号了,将sleeps置零,退出循环后会唤醒等列中的一个进程
sem->sleepers = 0;
break;
}
//将sleepers 置1
sem->sleepers = 1; /* us - see -1 above */
//释放自旋量并恢复之前的中断标志
spin_unlock_irqrestore(&sem->wait.lock, flags);
//重新调度
schedule();
//重新调度后,因为其状态是TASK_UNINTERRUPTIBLE .所以要释放信号量的进程将其唤醒
//才能继续运行
spin_lock_irqsave(&sem->wait.lock, flags);
//将状态再设为TASK_UNINTERRUPTIBLE . 重新判断是否有信号量资源
tsk->state = TASK_UNINTERRUPTIBLE;
}
//已经有信号量资源了,将等待队列删除
remove_wait_queue_locked(&sem->wait, &wait);
//唤醒其它队列
wake_up_locked(&sem->wait);
//恢复中断标志和释放自旋锁
spin_unlock_irqrestore(&sem->wait.lock, flags);
tsk->state = TASK_RUNNING;
}
这里要注意的是对sleeper成员的理解.结合后面的代码.进程在没有获得锁时会将其设为1.在获得了锁的时候将其设为0.所以,sleeper=1时表示有进程在等待.sleeper=0时,有进程获得信号量退出,或者没有进程在等待此信号量.
如果在等待队列中进程被唤醒,并获得了锁.则将sleeper设为0.然后用break退出循环,再用wake_up_locked(&sem->wait)唤醒等待队列中的一个进程,这个进程下次被调度的时候从shedule()后面开始运行.更改进程状态好,调用if (!atomic_add_negative(sleepers - 1, &sem->count))判断,此时sleeper=0.sleep-1 = -1.经过atomic_add_negative()使sem->count-1了.因此前面论述过.进程进来取信号量 down()里count有减1.然后如果有等待线程在获取这个信号,刚又会将count+1,使其变到了原值.现在在我们这个情景中,已经有线程进程去了,应该要+1.
那思考一下:为什么要对count sleeper做这样的处理呢?直接使用使用计数不就完了吗?进程等待获取信号量的时候sleep+1 count-1.释放信号量的时候sleep-1.count+1不可以吗?
表面上上述的方法也能工作的很好,但是如果进程数目一多就要考虑到溢出问题了.
Linux在这里设计的很巧妙,可以好好的回味.
3.5:up(): 释放信号
static inline void up(struct semaphore * sem)
{
__asm__ __volatile__(
"# atomic up operation\n\t"
//sem->count 计数加1
LOCK "incl %0\n\t" /* ++sem->count */
//如果大于0的话,说明信号量资源充足,没有进程在等待
//无需唤醒等待队列.直接退出即可
"jle 2f\n"
"1:\n"
LOCK_SECTION_START("")
//否则唤醒队列
"2:\tcall __up_wakeup\n\t"
"jmp 1b\n"
LOCK_SECTION_END
".subsection 0\n"
:"=m" (sem->count)
:"c" (sem)
:"memory");
}
asm(
".section .sched.text\n"
".align 4\n"
".globl __up_wakeup\n"
"__up_wakeup:\n\t"
"pushl %eax\n\t"
"pushl %edx\n\t"
"pushl %ecx\n\t"
"call __up\n\t"
"popl %ecx\n\t"
"popl %edx\n\t"
"popl %eax\n\t"
"ret"
);
同理调用__up()进行处理
asmlinkage void __up(struct semaphore *sem)
{
wake_up(&sem->wait);
}
这个函数很简单,就是将信号量中的等待队列唤醒.注意之前提到过的,加入等待队列中所用的标志WQ_FLAG_EXCLUSIVE.这个标志表示只需将挂成队列前面的进程唤醒.无需全部唤醒.
信号量的使用内核中还有很多的变体API.究其原理都差不多.可以自行了解^_^
四:完成变量
4.1:完全变量的特点
在内核中经常有这样的需要,在一个进程中要等待某个进程完成某个事情.内核为其提供了一个简单的接口,其实它只是一个简化版的信号量而已.
4.2:完全变量的结构
完全变量在代码中的结构如下:
struct completion {
//等待标志
unsigned int done;
//等待队列
wait_queue_head_t wait;
};
4.3:完全变量的操作与实现
wait_for_completion():等待条件的完成,它的代码如下:
void __sched wait_for_completion(struct completion *x)
{
wait_for_common(x, MAX_SCHEDULE_TIMEOUT, TASK_UNINTERRUPTIBLE);
}
wait_for_common() à do_wait_for_common():
static inline long __sched
do_wait_for_common(struct completion *x, long timeout, int state)
{
//!x->done:条件末完成.投入睡眠等待
if (!x->done) {
DECLARE_WAITQUEUE(wait, current);
wait.flags |= WQ_FLAG_EXCLUSIVE;
//加入等待队列
__add_wait_queue_tail(&x->wait, &wait);
do {
//如果是可中断状态且有末处理的信号。将其移出等待队列
if (state == TASK_INTERRUPTIBLE &&
signal_pending(current)) {
__remove_wait_queue(&x->wait, &wait);
return -ERESTARTSYS;
}
__set_current_state(state);
spin_unlock_irq(&x->wait.lock);
timeout = schedule_timeout(timeout);
spin_lock_irq(&x->wait.lock);
if (!timeout) {
//延时到了.将其移出等待队列
__remove_wait_queue(&x->wait, &wait);
return timeout;
}
} while (!x->done);
//运行到这里,条件已经完成了,将其从等待队列移出
__remove_wait_queue(&x->wait, &wait);
}
//如果条件完成了,将x->done-- 直接返回
x->done--;
return timeout;
}
Complete()来用通告已经完成了这个条件.代码如下:
void complete(struct completion *x)
{
unsigned long flags;
//持完全变量的自旋锁且禁中断
spin_lock_irqsave(&x->wait.lock, flags);
//x-
x->done++;
__wake_up_common(&x->wait, TASK_UNINTERRUPTIBLE | TASK_INTERRUPTIBLE,
1, 0, NULL);
//释放自旋锁且恢复中断到以前的状态
spin_unlock_irqrestore(&x->wait.lock, flags);
}
__wake_up_common()的代码如下所示:
static void __wake_up_common(wait_queue_head_t *q, unsigned int mode,
int nr_exclusive, int sync, void *key)
{
wait_queue_t *curr, *next;
//遍历等待队列且唤醒等待的进程
list_for_each_entry_safe(curr, next, &q->task_list, task_list) {
unsigned flags = curr->flags;
if (curr->func(curr, mode, sync, key) &&
(flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)
break;
}
}
分析完了信号量之后,看完全变量的实现还是挺简单.另外需要注意的是信号量与完成变量里的自旋锁的使用.信号量是在其中的一段操作中使用了自旋锁.这是为了在这一段操作中保持串行化.而完全变量是一开始就加锁.不允许wait_for_completion()和completion()同时调用.
五:读写信号量
5.1:读写信号量的特点
通常受保护的数据是允许多个进程去读的。只需要保护写操作就可以了。如果多个读者去排队等待其它读者的操作完成的话,显然是有失效率的。读写信号量就是为了解瘊这个问题而产生的。读写信号量的等待队列是一个严格的FIFO.有以下几个特点:
1):读者去获取信号量的时候:
1:如果临界区是空的或者有其它读者在操作,以时如果没有写者在排队,就把读者放进去。如果有 写者在排队,就把读者加入到等待队列。
2:如果临界区有写者在操作。就把这个读者加入等待队列。
2):写者去获取信号量的时候:
1:如果临界区是空的且等待队列为空。放这个写者进去。如果等待队不为空,就将其加至等待队列
2:如果临界区中有读者在操作,就将其加入到等待队列.
3):释放信号量的时候:
1:如果这个进程是读者。检查是不是临界区内的最后一个读者。如果是最后一个读者,检查是否有写者在等待。如果有。将写者唤醒。如果没有,直接退出。
2:如果释放信号量是一个写者,通常会唤醒等待队列中的第一个进程
2.1:如果这个进程是读者。那它后面的所有读者也会被唤醒(不包含写者后面的)
2.2:如果这个进程是写者。那么它后面的所有等待进程继续睡眠.
总之:在这里要注意对待读者与写者的区别。在临界区中可以允许有多个读者,但不允许有多个写者。
5.2:读写信号量结构:
读写信号量在内核中对应的数据结构为:
struct rw_semaphore {
//activity = 0:表示没有对象在操作信号量
//activity > 0:表示有一个或者多个读者在操作信号量
//activity < 0:表示有一个写者在操作信号量
__s32 activity;
//读写信号量对应的自旋锁
spinlock_t wait_lock;
//对应的等待队列
struct list_head wait_list;
#if RWSEM_DEBUG
int debug;
#endif
}
5.3:读写信号量的操作与实现:
5.3.1:down_read():读者获得信号量
down_read()用来在读者操作临界区的情况。它的代码如下:
static inline void down_read(struct rw_semaphore *sem)
{
//可能会引起睡眠
might_sleep();
//调试用
rwsemtrace(sem,"Entering down_read");
__down_read(sem);
//调试用
rwsemtrace(sem,"Leaving down_read");
}
__down_read()的代码如下:
void fastcall __sched __down_read(struct rw_semaphore *sem)
{
struct rwsem_waiter waiter;
struct task_struct *tsk;
rwsemtrace(sem, "Entering __down_read");
//获取自旋锁
spin_lock(&sem->wait_lock);
//sem->activeity>=0:说明有读者进入了临界区
//sem->wait_list:等待队列
//如果activeity>=0. wait_list不为空:说明有写者在等待
//如果activeity<0:说明有写者在对临界区操作
//如果临界区中有读者然后没有写者在等待.可以直接获得此信号量
if (sem->activity >= 0 && list_empty(&sem->wait_list)) {
/* granted */
sem->activity++;
spin_unlock(&sem->wait_lock);
goto out;
}
//否则,将读者加入等待队列末尾
tsk = current;
set_task_state(tsk, TASK_UNINTERRUPTIBLE);
/* set up my own style of waitqueue */
waiter.task = tsk;
//对应的等待标志是RWSEM_WAITING_FOR_READ
waiter.flags = RWSEM_WAITING_FOR_READ;
get_task_struct(tsk);
list_add_tail(&waiter.list, &sem->wait_list);
/* we don't need to touch the semaphore struct anymore */
spin_unlock(&sem->wait_lock);
/* wait to be given the lock */
//一个死循环,这样是为了 是被期待的唤醒
for (;;) {
//如果waiter.task==NULL.确实是被唤醒的.因为唤醒的时候,会将waiter,task清空
if (!waiter.task)
break;
//睡眠
schedule();
//设置进程状态为不可中断型
set_task_state(tsk, TASK_UNINTERRUPTIBLE);
}
//已经获得信号量了.设置状态为RUNNING 退出
tsk->state = TASK_RUNNING;
out:
rwsemtrace(sem, "Leaving __down_read");
}
5.3.2: up_read():读者释放信号量
读者操作完临界区了之后,使用up_read()将信号量释放。代码如下:
up_read()à
void fastcall __up_read(struct rw_semaphore *sem)
{
rwsemtrace(sem, "Entering __up_read");
spin_lock(&sem->wait_lock);
//--sem->activity == 0:此读者是临界区中的最后一个对象
//sem->wait_list不为空说明临界区外有写者在等待
if (--sem->activity == 0 && !list_empty(&sem->wait_list))
//唤醒等待队列中的第一个进程
sem = __rwsem_wake_one_writer(sem);
//如果不是临界区中的最后一个读者或者是没有写者在等待,退出
spin_unlock(&sem->wait_lock);
rwsemtrace(sem, "Leaving __up_read");
}
__rwsem_wake_one_writer()代码如下:
static inline struct rw_semaphore *
__rwsem_wake_one_writer(struct rw_semaphore *sem)
{
struct rwsem_waiter *waiter;
struct task_struct *tsk;
//将activity置为-1.因为临界区是被写者占据了
sem->activity = -1;
//取等待队列中的第一个进程
waiter = list_entry(sem->wait_list.next, struct rwsem_waiter, list);
//将其从等待链表中删除
list_del(&waiter->list);
tsk = waiter->task;
mb();
//将waiter->task清空
waiter->task = NULL;
//唤醒过程
wake_up_process(tsk);
//减少进程引用计数,因为在把它投入队列的时候被+1 了.这样做是为了防止task结构被释放掉
put_task_struct(tsk);
return sem;
}
注意里面对activity和waiter->task的操作.
5.3.3: down_write():写者获得信号量
在更改临界区的情况下,使用down_write()获得信号量。代码如下:
down_write() à __down_write():
void fastcall __sched __down_write(struct rw_semaphore *sem)
{
struct rwsem_waiter waiter;
struct task_struct *tsk;
rwsemtrace(sem, "Entering __down_write");
spin_lock(&sem->wait_lock);
//如果临界区没有对象在操作.且没有其它的写者在等待
if (sem->activity == 0 && list_empty(&sem->wait_list)) {
/* granted */
//获得信号
sem->activity = -1;
spin_unlock(&sem->wait_lock);
goto out;
}
//如果临界区被其它对象占据或者有其它的写者在等待,将其加到等待队列末尾
tsk = current;
set_task_state(tsk, TASK_UNINTERRUPTIBLE);
/* set up my own style of waitqueue */
waiter.task = tsk;
//置标志为RWSEM_WAITING_FOR_WRITE
waiter.flags = RWSEM_WAITING_FOR_WRITE;
get_task_struct(tsk);
list_add_tail(&waiter.list, &sem->wait_list);
/* we don't need to touch the semaphore struct anymore */
spin_unlock(&sem->wait_lock);
/* wait to be given the lock */
//判断是否被正常唤醒
for (;;) {
if (!waiter.task)
break;
schedule();
set_task_state(tsk, TASK_UNINTERRUPTIBLE);
}
tsk->state = TASK_RUNNING;
out:
rwsemtrace(sem, "Leaving __down_write");
}
注意读者进入临界的判断条件和写者进入临界区的判断条件的不同.读者只需要sem->activity >= 0且没有等待队列就可以进行。写者必须要求 sem->activity == 0且无等待队列.
5.3.4:up_write():写者释放信号量
写者操作完了之后,调用up_write()释放信号量。代码如下:
Up_write() à __up_write():
void fastcall __up_write(struct rw_semaphore *sem)
{
rwsemtrace(sem, "Entering __up_write");
spin_lock(&sem->wait_lock);
//写者释放信号量.将activity重置为0.因为临界区已经没有对象在操作了
sem->activity = 0;
//如果有进程在等待,将其唤醒
if (!list_empty(&sem->wait_list))
sem = __rwsem_do_wake(sem, 1);
spin_unlock(&sem->wait_lock);
rwsemtrace(sem, "Leaving __up_write");
}
写进程释放信号唤醒其它进程的处理有点特别,代码如下:
static inline struct rw_semaphore *
__rwsem_do_wake(struct rw_semaphore *sem, int wakewrite)
{
struct rwsem_waiter *waiter;
struct task_struct *tsk;
int woken;
rwsemtrace(sem, "Entering __rwsem_do_wake");
waiter = list_entry(sem->wait_list.next, struct rwsem_waiter, list);
//在__up_write()调用的时候.wakewrite是置为1的.表示是一个写者在进行唤醒
if (!wakewrite) {
if (waiter->flags & RWSEM_WAITING_FOR_WRITE)
goto out;
goto dont_wake_writers;
}
/* if we are allowed to wake writers try to grant a single write lock
* if there's a writer at the front of the queue
* - we leave the 'waiting count' incremented to signify potential
* contention
*/
//如果等待队列的第一个进程是一个写等待,只需要将这个写者唤醒就行了
if (waiter->flags & RWSEM_WAITING_FOR_WRITE) {
sem->activity = -1;
list_del(&waiter->list);
tsk = waiter->task;
/* Don't touch waiter after ->task has been NULLed */
mb();
waiter->task = NULL;
wake_up_process(tsk);
put_task_struct(tsk);
goto out;
}
/* grant an infinite number of read locks to the front of the queue */
dont_wake_writers:
//如果第一个进程是读等待,将将其后所有为读等待的进程唤醒
woken = 0;
while (waiter->flags & RWSEM_WAITING_FOR_READ) {
struct list_head *next = waiter->list.next;
list_del(&waiter->list);
tsk = waiter->task;
mb();
waiter->task = NULL;
wake_up_process(tsk);
put_task_struct(tsk);
woken++;
if (list_empty(&sem->wait_list))
break;
waiter = list_entry(next, struct rwsem_waiter, list);
}
sem->activity += woken;
out:
rwsemtrace(sem, "Leaving __rwsem_do_wake");
return sem;
}
六:小结
在linux内核中还有其它的几种同步机制,在这里没有一一列出分析。
阅读(1675) | 评论(0) | 转发(0) |