Chinaunix首页 | 论坛 | 博客
  • 博客访问: 611504
  • 博文数量: 353
  • 博客积分: 1104
  • 博客等级: 少尉
  • 技术积分: 1457
  • 用 户 组: 普通用户
  • 注册时间: 2008-12-23 23:02
个人简介

1、刚工作时做Linux 流控;后来做安全操作系统;再后来做操作系统加固;现在做TCP 加速。唉!没离开过类Unix!!!但是水平有限。。

文章存档

2015年(80)

2013年(4)

2012年(90)

2011年(177)

2010年(1)

2009年(1)

分类:

2011-10-14 13:59:11

原文地址:linux内核同步 作者:xgr180

------------------------------------------
本文系本站原创,欢迎转载!
转载请注明出处:http://ericxiao.cublog.cn/
------------------------------------------
同步是linux内核中一种很重要的操作.它为内核提供了一种临界区和SMP系统中的数据保护机制.今天就来分析一下在linux内核是怎么样实现这些操作的.
一:原子操作(摘自《understanding the linux kernel 2.4》)
原子操作是指在执行过程中不能被打断的操作.它包括以下几种类型:
进行一次或者零次对齐内存的访问操作都是原子操作.因为这些指令一般都是单指令.不可能单指令在执行过程中被抢占。
如果在读操作之后,写操作之前没有其他处理器占用内存总线。那么从内存中读取数据,更新数据并把更新后的数据写回内存的这些“读—修改—写”汇编语言指令是原子的。例如:inc或dec
操作码前缀是lock字节的(0xf0)的“读—修改—写”汇编程序指令即使在SMP系统中也是原子的。当控制单元检测lock后,就会把总线锁住,一直到这些加锁指令执行完为止.
操作码前缀是一个rep字节的汇编语言指令都不是原子的。这条指令强行让控制单元执行相同的指令.控制单元在执行新的指令之前要检查挂起的中断.
1.1: 在内核中提供了以下的几组操作接口:
void atomic_set(atomic_t *v, int i)
int atomic_read(atomic_t *v)
void atomic_add(int i, atomic_t *v)
void atomic_sub(int i, atomic_t *v)
void atomic_inc(atomic_t *v)
void atomic_dec(atomic_t *v)
int atomic_inc_and_test(atomic_t *v)
int atomic_dec_and_test(atomic_t *v)
int atomic_sub_and_test(int i, atomic_t *v)
int atomic_add_negative(int i, atomic_t *v)
int atomic_add_return(int i, atomic_t *v)
int atomic_sub_return(int i, atomic_t *v)
int atomic_inc_return(atomic_t *v)
int atomic_dec_return(atomic_t *v)
atomic_sub(amount, &first_atomic)
atomic_add(amount, &second_atomic)
上述原子操作对象是一个32位数。之所以定义成这样,是为了和一般的操作区别。提醒用户这是一个原子操作接口。
1.2: 原子的位操作:
void set_bit(nr, void *addr)
void clear_bit(nr, void *addr)
void change_bit(nr, void *addr)
test_bit(nr, void *addr)
int test_and_set_bit(nr, void *addr)
int test_and_clear_bit(nr, void *addr)
int test_and_change_bit(nr, void *addr)
以atomic_inc(atomic_t *v), atomic_dec(atomic_t *v)为例来分析内核中对应的代码.
static __inline__ void atomic_dec(atomic_t *v)
{
    __asm__ __volatile__(
         LOCK_PREFIX "decl %0"
         :"+m" (v->counter));
}
LOCK_PREFIX为锁操作前缀
Atomic_inc的代码如下:
static __inline__ void atomic_inc(atomic_t *v)
{
     __asm__ __volatile__(
         LOCK_PREFIX "incl %0"
         :"+m" (v->counter));
}
二:自旋锁
上述的原子操作操作接口提供了单变量的保护操作,但是在实际项目中,情况并没有这么简单,我们需要保护的是一个临界区.所以我们需要用到其它的加锁方法.自旋锁就是其中之一.
2.1:自旋锁的特性
自旋锁只能被一个控制路径所持有.进入临界区,自旋锁锁住.其后如果有其它线程来取自旋锁的自己就会原地“自旋”,一直到持有者放开锁为止.
很显然,当得不到锁的时候,CPU就会在一直在做轮询判断.也就是说如果一个进程取不到自旋锁.也会一直判断下去,不会主动去调度其它进程代替当前进程.但是这样锁机制的开销极少.可以快速的开锁和加锁.持有自旋锁的进程一般都会对临界区有快速的处理
另外,内核还规定持有自旋锁的进程不能被抢占,这样做主要是出于系统性能考虑.如果一个持自旋锁的进程被调度出CPU.那其它需要这些锁的进程被调度进来之后,也只做 “自旋”工作.
2.2: 自旋锁的实现
自旋锁在内核中对应的类型为: spinlock_t
typedef struct {
     raw_spinlock_t raw_lock;
#if defined(CONFIG_PREEMPT) && defined(CONFIG_SMP)
     //配置了内核抢占与SMP
     unsigned int break_lock;
#endif
     //自旋锁DEBUG
#ifdef CONFIG_DEBUG_SPINLOCK
     unsigned int magic, owner_cpu;
     void *owner;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
     struct lockdep_map dep_map;
#endif
} spinlock_t;
 
typedef struct {
     unsigned int slock;
} raw_spinlock_t;
在接下来的代码中再来分析结构中各成员的含义.
2.2.1:spin_lock_init :自旋锁的初始化
代码如下:
# define spin_lock_init(lock)                      \
     do { *(lock) = SPIN_LOCK_UNLOCKED; } while (0)
#define SPIN_LOCK_UNLOCKED  __SPIN_LOCK_UNLOCKED(old_style_spin_init)
# define __SPIN_LOCK_UNLOCKED(lockname) \
     (spinlock_t)  {    .raw_lock = __RAW_SPIN_LOCK_UNLOCKED, \
                   SPIN_DEP_MAP_INIT(lockname) }
#define __RAW_SPIN_LOCK_UNLOCKED { 1 }
上述操作就是把sipinlock_t中的raw_lock置为__RAW_SPIN_LOCK_UNLOCKED.即为1.
此时锁是打开状态.
 
2.2.2: spin_lock() :获取自旋锁.
#define spin_lock(lock)          _spin_lock(lock)
_spin_lock()定义如下:
void __lockfunc _spin_lock(spinlock_t *lock)
{
     //禁止抢占
     preempt_disable();
     //这个函数在没有定义自旋锁调试的时候是空函数
     spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
     //相当于_raw_spin_lock(lock)
     LOCK_CONTENDED(lock, _raw_spin_trylock, _raw_spin_lock);
}
LOCK_CONTENDED的定义如下:
#define LOCK_CONTENDED(_lock, try, lock) \
     lock(_lock)
_raw_spin_lock()的操作如下:
# define _raw_spin_lock(lock)        __raw_spin_lock(&(lock)->raw_lock)
 
static inline void __raw_spin_lock(raw_spinlock_t *lock)
{
     asm volatile(
         "\n1:\t"
         //decl: lock->slock减1
         LOCK_PREFIX " ; decl %0\n\t"
         //如果不为负.跳转到2f.2f后面没有任何指令.即为退出
         "jns 2f\n"
         "3:\n"
         //重复执行nop.nop是x86的小延迟函数
         "rep;nop\n\t"
         //比较0与lock->slock的值,如果lock->slock不大于0.跳转到标号3.即继续重复执行nop
"cmpl $0,%0\n\t"
//如果lock->slock不大于0.跳转到标号3.即继续重复执行nop
         "jle 3b\n\t"
         //如果lock->slock大于0.跳转到标号1.重新判断锁的slock成员
         "jmp 1b\n"
         "2:\t" : "=m" (lock->slock) : : "memory");
}
在上面的函数中,可能对"jmp 1b\n"比较难以理解.在我们一般的观念里.获得一个锁.将其值减1.释放锁时将其值加1.实际上在自旋锁的实现中lock->slock只有两个可能值,一个是0. 一个是1.释放锁的时候并不是将lock->slock加1.而是将其赋为1.在后面的自旋锁释放代码中再详细分析.
 
# define spin_unlock(lock)       _spin_unlock(lock)
void __lockfunc _spin_unlock(spinlock_t *lock)
{
     //在没有配置自旋锁调试的时候.该函数为空函数
     spin_release(&lock->dep_map, 1, _RET_IP_);
     //
     _raw_spin_unlock(lock);
     //释放自旋锁了.允许内核抢占
     preempt_enable();
}
# define _raw_spin_unlock(lock)      __raw_spin_unlock(&(lock)->raw_lock)
static inline void __raw_spin_unlock(raw_spinlock_t *lock)
{
     //将lock->slock赋为1
     asm volatile("movb $1,%0" : "+m" (lock->slock) :: "memory");
}
总体来说,释放自旋锁的分为两步:
第一步:将锁的slock字段回归到1.注意这里不是加1,而是赋值1
第二步:允许内核抢占,与加锁时的禁止内核抢占相对应
 
在内核中还有几个与硬中断和软中断有关的几个自旋锁API.如下所示:
void spin_lock(spinlock_t *lock)
void spin_lock_irqsave(spinlock_t *lock, unsigned long flags)
void spin_lock_irq(spinlock_t *lock)
void spin_lock_bh(spinlock_t *lock)
 
void spin_unlock(spinlock_t *lock);
void spin_unlock_irqrestore(spinlock_t *lock, unsigned long flags);
void spin_unlock_irq(spinlock_t *lock);
void spin_unlock_bh(spinlock_t *lock);
代码都不复杂,可自行结合前面分析的IRQ中断部份理解这几个API的操作.
 
三:信号量的操作
3.1:信号量的特点:
根据前面的分析看到.自旋锁是一种轻快但效率极其低下的一种加锁,对于需要对临界区长时间操作的情况下不太合适.因此,需要一种等待获锁的机制.这就是信号量.信号量在请求锁的时候,如果不成功,便会将进程投入睡眠.因此要在允许睡眠的场合使用信号量.
3.2:信号量的实现
信号量的结构如下所示:
struct semaphore {
     //信号量资源数
     atomic_t count;
     //是否有一些进程在睡眠
     int sleepers;
     //信号的等待队列
     wait_queue_head_t wait;
}
3.3: sema_init() :信号量的初始化
static inline void sema_init (struct semaphore *sem, int val)
{
     //设置信号量的资源数为val.sem->count表示允许同时有多少个进程占有此信号量
     atomic_set(&sem->count, val);
     //sleepers成员置0,表示没有进程在等待
     sem->sleepers = 0;
     //初始化等待队列
     init_waitqueue_head(&sem->wait);
}
3.4:down :获取信号量
static inline void down(struct semaphore * sem)
{
     //可能会引起睡眠
might_sleep();
     __asm__ __volatile__(
         "# atomic down operation\n\t"
         //LOCK:锁定总线标志
         //使sem->count -1 如果小于0.跳转到2
         LOCK "decl %0\n\t"     /* --sem->count */
         "js 2f\n"
         //后面没有指令了,退出
         "1:\n"
         LOCK_SECTION_START("")
         //如果sem->count -1小于0.说明该信号已经没有足够的资源了.调用__down_failed
         "2:\tcall __down_failed\n\t"
         "jmp 1b\n"
         LOCK_SECTION_END
         :"=m" (sem->count)
         :"c" (sem)
         :"memory");
}
__down_failed的代码如下:
asm(
".section .sched.text\n"
".align 4\n"
".globl __down_failed\n"
"__down_failed:\n\t"
#if defined(CONFIG_FRAME_POINTER)
     "pushl %ebp\n\t"
     "movl  %esp,%ebp\n\t"
#endif
//之所以把eax edx ecx压栈,是因为__down()函数的参数就是eax edx ecx.它从堆栈中取参数.所以//将其入栈.这样做是从2.4遗留下来的,因为2.4的__down函数为fastcall型
     "pushl %eax\n\t"
     "pushl %edx\n\t"
     "pushl %ecx\n\t"
     "call __down\n\t"
     "popl %ecx\n\t"
     "popl %edx\n\t"
     "popl %eax\n\t"
#if defined(CONFIG_FRAME_POINTER)
     "movl %ebp,%esp\n\t"
     "popl %ebp\n\t"
#endif
     "ret"
)
__down的代码如下:
asmlinkage void __sched __down(struct semaphore * sem)
{
     struct task_struct *tsk = current;
     //声明一个等待队列
     DECLARE_WAITQUEUE(wait, tsk);
     unsigned long flags;
 
     //将进程状态置为不可中断型.外部信号不可以将其中断
     tsk->state = TASK_UNINTERRUPTIBLE;
     //获取自旋锁且关中断
     spin_lock_irqsave(&sem->wait.lock, flags);
     //加入等待队列,并将等待标志高为WQ_FLAG_EXCLUSIVE
     add_wait_queue_exclusive_locked(&sem->wait, &wait);
 
     //sleepers之前只能为0或者是1
     sem->sleepers++;
     for (;;) {
         int sleepers = sem->sleepers;
 
         /*
          * Add "everybody else" into it. They aren't
          * playing, because we own the spinlock in
          * the wait_queue_head.
          */
          //如果之前有进程在线程,则使sem->count+1 (刚好使count等于down进来的值)
          //因为之前在down()中有减1操作
 
         //负数返回真
         if (!atomic_add_negative(sleepers - 1, &sem->count)) {
              //已经有信号了,将sleeps置零,退出循环后会唤醒等列中的一个进程
              sem->sleepers = 0;
              break;
         }
         //将sleepers 置1
         sem->sleepers = 1; /* us - see -1 above */
         //释放自旋量并恢复之前的中断标志
         spin_unlock_irqrestore(&sem->wait.lock, flags);
 
         //重新调度
         schedule();
 
         //重新调度后,因为其状态是TASK_UNINTERRUPTIBLE .所以要释放信号量的进程将其唤醒
         //才能继续运行
         spin_lock_irqsave(&sem->wait.lock, flags);
         //将状态再设为TASK_UNINTERRUPTIBLE . 重新判断是否有信号量资源
         tsk->state = TASK_UNINTERRUPTIBLE;
     }
 
     //已经有信号量资源了,将等待队列删除
     remove_wait_queue_locked(&sem->wait, &wait);
     //唤醒其它队列
     wake_up_locked(&sem->wait);
     //恢复中断标志和释放自旋锁
     spin_unlock_irqrestore(&sem->wait.lock, flags);
     tsk->state = TASK_RUNNING;
}
这里要注意的是对sleeper成员的理解.结合后面的代码.进程在没有获得锁时会将其设为1.在获得了锁的时候将其设为0.所以,sleeper=1时表示有进程在等待.sleeper=0时,有进程获得信号量退出,或者没有进程在等待此信号量.
如果在等待队列中进程被唤醒,并获得了锁.则将sleeper设为0.然后用break退出循环,再用wake_up_locked(&sem->wait)唤醒等待队列中的一个进程,这个进程下次被调度的时候从shedule()后面开始运行.更改进程状态好,调用if (!atomic_add_negative(sleepers - 1, &sem->count))判断,此时sleeper=0.sleep-1 = -1.经过atomic_add_negative()使sem->count-1了.因此前面论述过.进程进来取信号量 down()里count有减1.然后如果有等待线程在获取这个信号,刚又会将count+1,使其变到了原值.现在在我们这个情景中,已经有线程进程去了,应该要+1.
那思考一下:为什么要对count sleeper做这样的处理呢?直接使用使用计数不就完了吗?进程等待获取信号量的时候sleep+1 count-1.释放信号量的时候sleep-1.count+1不可以吗?
表面上上述的方法也能工作的很好,但是如果进程数目一多就要考虑到溢出问题了.
Linux在这里设计的很巧妙,可以好好的回味.
 
3.5:up(): 释放信号
static inline void up(struct semaphore * sem)
{
     __asm__ __volatile__(
         "# atomic up operation\n\t"
         //sem->count 计数加1
         LOCK "incl %0\n\t"     /* ++sem->count */
         //如果大于0的话,说明信号量资源充足,没有进程在等待
         //无需唤醒等待队列.直接退出即可
         "jle 2f\n"
         "1:\n"
         LOCK_SECTION_START("")
         //否则唤醒队列
         "2:\tcall __up_wakeup\n\t"
         "jmp 1b\n"
         LOCK_SECTION_END
         ".subsection 0\n"
         :"=m" (sem->count)
         :"c" (sem)
         :"memory");
}
asm(
".section .sched.text\n"
".align 4\n"
".globl __up_wakeup\n"
"__up_wakeup:\n\t"
     "pushl %eax\n\t"
     "pushl %edx\n\t"
     "pushl %ecx\n\t"
     "call __up\n\t"
     "popl %ecx\n\t"
     "popl %edx\n\t"
     "popl %eax\n\t"
     "ret"
);
同理调用__up()进行处理
asmlinkage void __up(struct semaphore *sem)
{
     wake_up(&sem->wait);
}
这个函数很简单,就是将信号量中的等待队列唤醒.注意之前提到过的,加入等待队列中所用的标志WQ_FLAG_EXCLUSIVE.这个标志表示只需将挂成队列前面的进程唤醒.无需全部唤醒.
信号量的使用内核中还有很多的变体API.究其原理都差不多.可以自行了解^_^
 
四:完成变量
4.1:完全变量的特点
在内核中经常有这样的需要,在一个进程中要等待某个进程完成某个事情.内核为其提供了一个简单的接口,其实它只是一个简化版的信号量而已.
4.2:完全变量的结构
完全变量在代码中的结构如下:
struct completion {
     //等待标志
     unsigned int done;
     //等待队列
     wait_queue_head_t wait;
};
4.3:完全变量的操作与实现
wait_for_completion():等待条件的完成,它的代码如下:
void __sched wait_for_completion(struct completion *x)
{
     wait_for_common(x, MAX_SCHEDULE_TIMEOUT, TASK_UNINTERRUPTIBLE);
}
wait_for_common() à  do_wait_for_common():
static inline long __sched
do_wait_for_common(struct completion *x, long timeout, int state)
{
     //!x->done:条件末完成.投入睡眠等待
     if (!x->done) {
         DECLARE_WAITQUEUE(wait, current);
 
         wait.flags |= WQ_FLAG_EXCLUSIVE;
         //加入等待队列
         __add_wait_queue_tail(&x->wait, &wait);
         do {
              //如果是可中断状态且有末处理的信号。将其移出等待队列   
              if (state == TASK_INTERRUPTIBLE &&
                  signal_pending(current)) {
                   __remove_wait_queue(&x->wait, &wait);
                   return -ERESTARTSYS;
              }
              __set_current_state(state);
              spin_unlock_irq(&x->wait.lock);
              timeout = schedule_timeout(timeout);
              spin_lock_irq(&x->wait.lock);
              if (!timeout) {
                   //延时到了.将其移出等待队列
                   __remove_wait_queue(&x->wait, &wait);
                   return timeout;
              }
         } while (!x->done);
         //运行到这里,条件已经完成了,将其从等待队列移出
         __remove_wait_queue(&x->wait, &wait);
     }
     //如果条件完成了,将x->done-- 直接返回
     x->done--;
     return timeout;
}
 
Complete()来用通告已经完成了这个条件.代码如下:
void complete(struct completion *x)
{
     unsigned long flags;
 
     //持完全变量的自旋锁且禁中断
     spin_lock_irqsave(&x->wait.lock, flags);
     //x-
     x->done++;
     __wake_up_common(&x->wait, TASK_UNINTERRUPTIBLE | TASK_INTERRUPTIBLE,
               1, 0, NULL);
     //释放自旋锁且恢复中断到以前的状态
     spin_unlock_irqrestore(&x->wait.lock, flags);
}
__wake_up_common()的代码如下所示:
static void __wake_up_common(wait_queue_head_t *q, unsigned int mode,
                   int nr_exclusive, int sync, void *key)
{
     wait_queue_t *curr, *next;
 
     //遍历等待队列且唤醒等待的进程
     list_for_each_entry_safe(curr, next, &q->task_list, task_list) {
         unsigned flags = curr->flags;
 
         if (curr->func(curr, mode, sync, key) &&
                   (flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)
              break;
     }
}
分析完了信号量之后,看完全变量的实现还是挺简单.另外需要注意的是信号量与完成变量里的自旋锁的使用.信号量是在其中的一段操作中使用了自旋锁.这是为了在这一段操作中保持串行化.而完全变量是一开始就加锁.不允许wait_for_completion()和completion()同时调用.
 
五:读写信号量
5.1:读写信号量的特点
通常受保护的数据是允许多个进程去读的。只需要保护写操作就可以了。如果多个读者去排队等待其它读者的操作完成的话,显然是有失效率的。读写信号量就是为了解瘊这个问题而产生的。读写信号量的等待队列是一个严格的FIFO.有以下几个特点:
1):读者去获取信号量的时候:
     1:如果临界区是空的或者有其它读者在操作,以时如果没有写者在排队,就把读者放进去。如果有  写者在排队,就把读者加入到等待队列。
     2:如果临界区有写者在操作。就把这个读者加入等待队列。
2):写者去获取信号量的时候:
     1:如果临界区是空的且等待队列为空。放这个写者进去。如果等待队不为空,就将其加至等待队列
     2:如果临界区中有读者在操作,就将其加入到等待队列.
3):释放信号量的时候:
     1:如果这个进程是读者。检查是不是临界区内的最后一个读者。如果是最后一个读者,检查是否有写者在等待。如果有。将写者唤醒。如果没有,直接退出。
     2:如果释放信号量是一个写者,通常会唤醒等待队列中的第一个进程
     2.1:如果这个进程是读者。那它后面的所有读者也会被唤醒(不包含写者后面的)
     2.2:如果这个进程是写者。那么它后面的所有等待进程继续睡眠.
总之:在这里要注意对待读者与写者的区别。在临界区中可以允许有多个读者,但不允许有多个写者。
 
5.2:读写信号量结构:
读写信号量在内核中对应的数据结构为:
struct rw_semaphore {
     //activity = 0:表示没有对象在操作信号量
     //activity > 0:表示有一个或者多个读者在操作信号量
     //activity < 0:表示有一个写者在操作信号量
     __s32              activity;
     //读写信号量对应的自旋锁
     spinlock_t         wait_lock;
     //对应的等待队列
     struct list_head   wait_list;
#if RWSEM_DEBUG
     int           debug;
#endif
}
 
5.3:读写信号量的操作与实现:
5.3.1:down_read():读者获得信号量
down_read()用来在读者操作临界区的情况。它的代码如下:
static inline void down_read(struct rw_semaphore *sem)
{
     //可能会引起睡眠
     might_sleep();
     //调试用
     rwsemtrace(sem,"Entering down_read");
     __down_read(sem);
     //调试用
     rwsemtrace(sem,"Leaving down_read");
}
__down_read()的代码如下:
void fastcall __sched __down_read(struct rw_semaphore *sem)
{
     struct rwsem_waiter waiter;
     struct task_struct *tsk;
 
     rwsemtrace(sem, "Entering __down_read");
 
     //获取自旋锁
     spin_lock(&sem->wait_lock);
 
     //sem->activeity>=0:说明有读者进入了临界区
     //sem->wait_list:等待队列
     //如果activeity>=0. wait_list不为空:说明有写者在等待
     //如果activeity<0:说明有写者在对临界区操作
 
     //如果临界区中有读者然后没有写者在等待.可以直接获得此信号量
     if (sem->activity >= 0 && list_empty(&sem->wait_list)) {
         /* granted */
         sem->activity++;
         spin_unlock(&sem->wait_lock);
         goto out;
     }
    
     //否则,将读者加入等待队列末尾
     tsk = current;
     set_task_state(tsk, TASK_UNINTERRUPTIBLE);
 
     /* set up my own style of waitqueue */
     waiter.task = tsk;
     //对应的等待标志是RWSEM_WAITING_FOR_READ
     waiter.flags = RWSEM_WAITING_FOR_READ;
     get_task_struct(tsk);
 
     list_add_tail(&waiter.list, &sem->wait_list);
 
     /* we don't need to touch the semaphore struct anymore */
     spin_unlock(&sem->wait_lock);
 
     /* wait to be given the lock */
     //一个死循环,这样是为了 是被期待的唤醒
     for (;;) {
         //如果waiter.task==NULL.确实是被唤醒的.因为唤醒的时候,会将waiter,task清空
         if (!waiter.task)
              break;
         //睡眠
         schedule();
         //设置进程状态为不可中断型
         set_task_state(tsk, TASK_UNINTERRUPTIBLE);
     }
 
     //已经获得信号量了.设置状态为RUNNING  退出
     tsk->state = TASK_RUNNING;
 
 out:
     rwsemtrace(sem, "Leaving __down_read");
}
5.3.2: up_read():读者释放信号量
读者操作完临界区了之后,使用up_read()将信号量释放。代码如下:
up_read()à
void fastcall __up_read(struct rw_semaphore *sem)
{
     rwsemtrace(sem, "Entering __up_read");
 
     spin_lock(&sem->wait_lock);
 
     //--sem->activity == 0:此读者是临界区中的最后一个对象
     //sem->wait_list不为空说明临界区外有写者在等待
     if (--sem->activity == 0 && !list_empty(&sem->wait_list))
         //唤醒等待队列中的第一个进程
         sem = __rwsem_wake_one_writer(sem);
 
     //如果不是临界区中的最后一个读者或者是没有写者在等待,退出
     spin_unlock(&sem->wait_lock);
 
     rwsemtrace(sem, "Leaving __up_read");
}
__rwsem_wake_one_writer()代码如下:
static inline struct rw_semaphore *
__rwsem_wake_one_writer(struct rw_semaphore *sem)
{
     struct rwsem_waiter *waiter;
     struct task_struct *tsk;
 
     //将activity置为-1.因为临界区是被写者占据了
     sem->activity = -1;
 
     //取等待队列中的第一个进程
     waiter = list_entry(sem->wait_list.next, struct rwsem_waiter, list);
     //将其从等待链表中删除
     list_del(&waiter->list);
 
     tsk = waiter->task;
     mb();
     //将waiter->task清空
     waiter->task = NULL;
     //唤醒过程
     wake_up_process(tsk);
     //减少进程引用计数,因为在把它投入队列的时候被+1 了.这样做是为了防止task结构被释放掉
     put_task_struct(tsk);
     return sem;
}
注意里面对activity和waiter->task的操作.
5.3.3: down_write():写者获得信号量
在更改临界区的情况下,使用down_write()获得信号量。代码如下:
down_write() à __down_write():
void fastcall __sched __down_write(struct rw_semaphore *sem)
{
     struct rwsem_waiter waiter;
     struct task_struct *tsk;
 
     rwsemtrace(sem, "Entering __down_write");
 
     spin_lock(&sem->wait_lock);
 
     //如果临界区没有对象在操作.且没有其它的写者在等待
     if (sem->activity == 0 && list_empty(&sem->wait_list)) {
         /* granted */
         //获得信号
         sem->activity = -1;
         spin_unlock(&sem->wait_lock);
         goto out;
     }
 
     //如果临界区被其它对象占据或者有其它的写者在等待,将其加到等待队列末尾
     tsk = current;
     set_task_state(tsk, TASK_UNINTERRUPTIBLE);
 
     /* set up my own style of waitqueue */
     waiter.task = tsk;
     //置标志为RWSEM_WAITING_FOR_WRITE
     waiter.flags = RWSEM_WAITING_FOR_WRITE;
     get_task_struct(tsk);
 
     list_add_tail(&waiter.list, &sem->wait_list);
 
     /* we don't need to touch the semaphore struct anymore */
     spin_unlock(&sem->wait_lock);
 
     /* wait to be given the lock */
     //判断是否被正常唤醒
     for (;;) {
         if (!waiter.task)
              break;
         schedule();
         set_task_state(tsk, TASK_UNINTERRUPTIBLE);
     }
 
     tsk->state = TASK_RUNNING;
 
 out:
     rwsemtrace(sem, "Leaving __down_write");
}
注意读者进入临界的判断条件和写者进入临界区的判断条件的不同.读者只需要sem->activity >= 0且没有等待队列就可以进行。写者必须要求 sem->activity == 0且无等待队列.
5.3.4:up_write():写者释放信号量
写者操作完了之后,调用up_write()释放信号量。代码如下:
Up_write() à __up_write():
void fastcall __up_write(struct rw_semaphore *sem)
{
     rwsemtrace(sem, "Entering __up_write");
 
     spin_lock(&sem->wait_lock);
 
     //写者释放信号量.将activity重置为0.因为临界区已经没有对象在操作了
     sem->activity = 0;
     //如果有进程在等待,将其唤醒
     if (!list_empty(&sem->wait_list))
         sem = __rwsem_do_wake(sem, 1);
 
     spin_unlock(&sem->wait_lock);
 
     rwsemtrace(sem, "Leaving __up_write");
}
写进程释放信号唤醒其它进程的处理有点特别,代码如下:
static inline struct rw_semaphore *
__rwsem_do_wake(struct rw_semaphore *sem, int wakewrite)
{
     struct rwsem_waiter *waiter;
     struct task_struct *tsk;
     int woken;
 
     rwsemtrace(sem, "Entering __rwsem_do_wake");
 
     waiter = list_entry(sem->wait_list.next, struct rwsem_waiter, list);
 
     //在__up_write()调用的时候.wakewrite是置为1的.表示是一个写者在进行唤醒
     if (!wakewrite) {
         if (waiter->flags & RWSEM_WAITING_FOR_WRITE)
              goto out;
         goto dont_wake_writers;
     }
 
     /* if we are allowed to wake writers try to grant a single write lock
      * if there's a writer at the front of the queue
      * - we leave the 'waiting count' incremented to signify potential
      *   contention
      */
 
     //如果等待队列的第一个进程是一个写等待,只需要将这个写者唤醒就行了
     if (waiter->flags & RWSEM_WAITING_FOR_WRITE) {
         sem->activity = -1;
         list_del(&waiter->list);
         tsk = waiter->task;
         /* Don't touch waiter after ->task has been NULLed */
         mb();
         waiter->task = NULL;
         wake_up_process(tsk);
         put_task_struct(tsk);
         goto out;
     }
 
     /* grant an infinite number of read locks to the front of the queue */
 dont_wake_writers:
     //如果第一个进程是读等待,将将其后所有为读等待的进程唤醒
     woken = 0;
     while (waiter->flags & RWSEM_WAITING_FOR_READ) {
         struct list_head *next = waiter->list.next;
 
         list_del(&waiter->list);
         tsk = waiter->task;
         mb();
         waiter->task = NULL;
         wake_up_process(tsk);
         put_task_struct(tsk);
         woken++;
         if (list_empty(&sem->wait_list))
              break;
         waiter = list_entry(next, struct rwsem_waiter, list);
     }
 
     sem->activity += woken;
 
 out:
     rwsemtrace(sem, "Leaving __rwsem_do_wake");
     return sem;
}
六:小结
在linux内核中还有其它的几种同步机制,我在这里没有一一列出分析。在上面分析的几种机制中,设得都很巧妙。需要慢慢的琢磨。
阅读(335) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~