Chinaunix首页 | 论坛 | 博客
  • 博客访问: 110182
  • 博文数量: 45
  • 博客积分: 1810
  • 博客等级: 上尉
  • 技术积分: 345
  • 用 户 组: 普通用户
  • 注册时间: 2009-09-03 21:57
文章分类
文章存档

2010年(26)

2009年(19)

我的朋友

分类: LINUX

2010-05-14 15:18:06

第五章、并发与竞态

信号量和互斥体:

信号量和互斥体是用来实现对共享资源的管理的一种方法,一个信号量也可以称为一个互斥体,
linux 信号量的实现:
对信号量的操作的函数定义在文件 中,类型是 struct semaphore;
声明并初始化信号量: void sema_init(struct semaphore *sem , int val);
其中val 是 赋予一个信号量的初始值;
也可以使用下面的宏来声明和初始化一个互斥体:

DECLAER_MUTEX(name); //声明一个 name 的信号量并将他初始化为 1 ;
DECLAER_MUTEX_LOCKED(name); // 声明一个 name 的信号量并将他初始化为 0; 在这种状态下信号量是锁定的,在允许任何线程访问之前必须显式的解锁该互斥体;

如果互斥体必须在运行时被初始化(例如:动态的分配互斥体的情况下),应应用下面的函数之一:

void init_MUTEX(struct semaphore *sem);
void init_MUTEX_LOCKED(struct semaphore *sem);

在处理信号量时,我们通常用一族函数:P 和 V 。
在这里,当希望进入临界区的线程在相关信号量上调用 P 函数,如果信号量的值大于 0 , 则 该值减一,表示信号量可用,进程可以继续;
相反, 当我们想释放一个信号量时,进程在相关的信号量上调用 V 函数,是信号量的值 加一,并在必要时唤醒在等待信号量的值。
在这里,P 函数相当于 down ,down 的三个版本:

void down(struct semaphore *sem);
int down_interruptible(struct semaphore *sem);
int down_trylock(struct semaphore *sem);

down 减小信号量的值,并在必要时等待,down_interruptible 实现相同的功能,但操作是可中断的。如果进程在使用down_interruptible 时 操作被中断,函数会返回非零值,线程不会得到该信号量,我们要对这种情况进行必要的返回值检查,并做出相应的相应。
down_trylock 永远不会休眠,如果信号量在该函数调用时不可获得,该函数会立刻返回一个非零值。
当我们调用以上函数成功后,我们就得到了信号量,也就得到了访问由该信号量保护的临界区的权利,当我们对此临界区的操作完成之后,我们应释放该信号量,通过调用V函数来释放信号量(相当于up):
void up(struct semaphore *sem);
注意:当我们在拥有信号量时发生错误,我们必须在错误状态返回给调用者之前释放该信号量。

在scull 中使用信号量
在使用信号量时,我们必须明确我们的信号量所保护的资源,并对其进行正确的锁定,在scull中,所有的信息都在 struct scull_dev 中,所以该结构就是我们锁定机构的逻辑范围。
在 scull_dev 结构中有一个 struct semaphore sem 的成员,我们就是对其进行操作来进行锁定的:
scull 在转载前进行的初始化操作:

for (i = 0; i < scull_nr_devs; i++){
scull_devices[i].quantum = scull_quantum ; 
scull_devices[i].qset  = scull_qset ; 
init_MUTEX(&scull_devices[i].sem);
scull_setup_cdev(&scull_devices[i], i);
}
接下来我们必须在方位 scull_dev 结构之前检查操作是否具有该信号量:
if(down_interruptible(&dev->sem)
return -ERESTARTSYS ; 
同时,对于这种 对 down_interruptible 的调用,我们必须对返回值进行检查,以确保我们获得了信号量。

释放该信号量: 
out: 
up(&dev->sem);
return retval ; 
读取者/写入者信号量:
我们允许多个并发的读取者读取资源,只要他们之中没有哪个要对资源进行修改,linux 内核位这种情形提供了一种特殊的信号量类型: reader/writer semaphore ,即读取者/写入者信号量:"rwsem"。
头文件:
数据类型: struct rw_semphore ;
一个rwsem 对象必须在运行时通过以下的方式显式的初始化:
void init_rwsem(struct rw_semaphore *sem);

只读访问:

void down_read(struct rw_semaphore *sem); //该函数可能会将调用进程置于不可中断的休眠之中。
int down_read_trylock(struct rw_semaphore *sem);//不会在读取访问不可获得情况下等待,正确调用后返回非零,错误调用返回零值,与其他函数不同
void up_read(struct rw_semaphore *sem);//对 down_read 操作获得 rwsem 必须用 up_read 来释放 ; 

写入者的接口:

void down_write(struct rw_semaphore *sem);
int down_write_trylock(struct rw_semaphore *sem);
void up_write(struct rw_semaphore *sem);
void downgrade_write(struct rw_semaphore *sem);

函数的动能对应与 上面对 read的描述,其中加入一个快速的写入操作完成之后我们又要执行更长时间的读操作,我们可以结束写入操作之后调用downgrade_write 操作,来允许其他读取者的访问。 

completion: 
completion 是一种轻量级的机制,它允许一个线程告诉另一个线程某个工作已经完成。
头文件:
创建completion :  DECLARE_COMPLETION(my_completion);
或者,动态的创建和初始化 completion :

struct completion my_completion ;
init_completion(&my_completion);

等待 completion : 
void wait_for_completion(struct completion *c);//该函数执行一个非中断的等待,如果调用了该函数并且没有人会完成人物,则将会产生一个不可杀进程

实际的completion 时间通过以下函数之一开触发:

void complete(struct completion *c);
void complete_all(struct completion *c);
这两个函数在有多个进程在等待相同的completion 是有所不同,complete 会唤醒一个等待线程,而 completion_all 会唤醒所有的等待线程。

completion 可以被多次重复使用,没有使用complete_all 我们可以重复的使用一个completion,如果我们complete_all,我们必须在重新使用该 completion 之前对其进行初始化:
INIT_COMPLETION(struct completion c);
实例代码:

DECLARE_COMPLETION(comp);
ssize_t complete_read(struct file *filp, char __user *buf, size_t count, loff_t *pos)
{
printk(KERN_DEBUG "process %i (%s)\n",current->pid, current->comm);
wait_for_completion(&comp);
printk(KERN_DEBUG "awoken %i (%s) \n",current->pid , current->comm);
return 0;
}

ssize_t complete_write(struct file *filp, const char __user *buf,size_t count, loff_t *pos)
{
printk(KERN_DEBUG "process %i (%s) awakening the readers...\n",current->compid,current->comm);
complete(&comp);
return count;
}
completion 机制的典型使用是模块退出时的内核线程终止。在这种原型中,某些驱动程序的内部工作由一个内核线程在while(1) 循环中完成,当内核准备清除模块时,exit 函数会告诉线程退出并等待completion。为了实现这个目的,内核包含了可用的这种线程的一个特殊函数:
void complete_and_exit(struct completion *c,long retval);

自旋锁

自旋锁可以在不能休眠的代码中使用,比如终端处理函数,只有锁定和解锁两个值,通常通过某个整数值的单个位来实现。希望获得锁的代码测试这个数字的相关的位,如果可用,就”锁定“这个位,如果不可用,那么代码就进入忙碌环并重复检查这个锁,知道这个锁可用。这个循环就是自旋锁的“自旋“部分。

测试并设置以原子的操作完成;

自旋锁API介绍:

自旋锁的原语所包含的头文件是, 实际的锁具有类型是 spinglock_t 类型。
自旋锁的初始化:
1、在编译时初始化: spinlock_t my_lock = SPIN_LOCK_UNLOCKED;
2、在运行时初始化: void spin_lock_init(spinlock_t *lock);

在进入临界区资源之前我们必须调用以下的函数获得自旋锁:

void spin_lock(spinlock_t *lock);

释放已经获得的锁

void spin_unlockd(spinlock_t *lock);

自旋锁和原子上下文:

任何拥有自旋锁的代码都必须是原子的,它不能休眠,它不能以任何形式放弃处理器,除了服务中断之外。
内核抢占的情况由自旋锁代码本身解决,在任何时候,只要内核代码拥有自旋锁,在相关处理器上的抢占就会被禁止。
我们需要在拥有自旋锁时禁止中断(仅在本地CPU上),因为这样可能会导致处理器永远自旋下去。(详见 P121)
自旋锁必须在最短的时间内拥有,谨记,拥有锁的时间越短越好。

自旋锁函数:

锁定一个自旋锁的函数:

void spin_lock(spinlock_t *lock);
void spin_lock_irqsave(spinlock_t *lock, unsigned long flags);
void spin_lock_irq(spinlock_t *lock);
void spin_lock_bh(spinlock_t *lock);

spin_lock_irqsave 会在获得自旋锁之前禁止中断(只在本地处理器上),先前的中断状态保存在flags中。
spin_lock_bh 在获得自旋锁之前会禁止软件中断,但会保持硬件中断打开。

释放自旋锁的函数,对函数的选取应严格对应于获取自旋锁的那些函数:

void spin_unlock(spinlock_t *lock);
void spin_unlock_irqrestore(spinlock_t *lock, unsigned long flags);
void spin_unlock_irq(spinlock_t *lock);
void spin_unlock_bh(spinlock_t *lock);

传递给spin_unlock_irqestore 的 flags 参数应和传递给spin_lock_irqsave 的 flags 参数一致,我们必须在一个函数中调用spin_lock_irqsave 和 spin_unlock_irqrestore 。

非阻塞的自旋锁操作:
int spin_trylock(spinlock_t *lock);
int spin_trylock_bh(spinlock_t *lock);
这两个函数在成功(即获得自旋锁)时返回非零值,否则返回零值。对禁止中断的情况没有对应的“try”版本。

读取者/写入者锁
自旋锁提供读取者/写入者锁,跟之前的读取者/写入者信号量相似。这种锁允许任意数量的读取者同时进入临界区,但写入者必须互斥的访问。
类型: rwlock_t ;
头文件:

声明和初始化:
rwlock_t my_rwlock  = RW_LOCK_UNLOCKED ; 
rwlock_t my_rwlock;
rwlock_init(&my_rwlock);

可使用的函数清单:

void read_lock(rwlock_t *lock);
void read_lock_irqsave(rwlock_t *lock, unsigned long flags);
void read_lock_irq(rwlock_t *lock);
void read_lock_bh(rwlock_t *lock);

void read_unlock(rwlock_t *lock);
void read_unlock_irqsave(rwlock_t *lock);
void read_unlock_irq(rwlock_t *lock);
void read_unlock_bh(rwlock_t *lock);

在read函数组中没有 read_trylock , 因为*_trylock 的功能是尝试获得一个锁,在锁当前状态不可用时不等待的返回,但是,read操作在这种情况下总是可用的,所以人read_trylock 就失去了他的价值。

用于写入者的函数类似于读取者,如下:
void write_lock(rwlock_t *lock);
void write_lock_irqsave(rwlock_t *lock, unsigned long flags);
void write_lock_irq(rwlock_t *lock);
void write_lock_bh(rwlock_t *lock);
int  write_trylock(rwlock_t *lock);

锁陷阱
……(p123) ……

除了锁之外的免锁算法
1、循环缓冲区;
前提是 读取者和写入者锁看到的数据结构始终一直,我们就可以使用循环缓冲区的方法来避免对锁的使用。

2、原子变量;
类型 atomic_t 头文件:
atomic_t 类型的数据在所有的内核支持的结构上都保存为一个int 类型的值。但是由于某些限制,不能使用完整的整数范围,atomic_t 类型的数据不能记录超过24位的整数。
特点:对执行整数算数来将比较有用。
void atomic_set(atomic_t *v, int i);//将原子变量v 的值设置为整数i。
atomic_t v = ATOMIC_INIT(0);//在编译时利用这个宏来初始化原子变量的值

int atomic_read(atomic_t *v);//返回v当前的值
void atomic_add(int i, atomic_t *v) // 将i累加到v指向的原子变量上。无返回值
void atomic_sub(int i, atomic_t *v)//从 *v中减去 i 
void atomic_inc(atomic_t *v);
void atomic_dec(atomic_t *v); //增加活减小一个原子变量。

int atomic_inc_and_test(atomic_t *v);
int atomic_dec_and_test(atomic_t *v);
int atomic_sub_and_test(int i, atomic_t *v);
执行特定的操作并测试,如果在操作结束之后,原子值是0,则返回值位true;否则返回值位为false;

int atomic_add_negative(int i, atomic_t *v);//将整数变量i累加到v。返回值在结果为负时位true,否则为false

int atomic_add_return(int i, atomic_t *v);
int atomic_sub_return(int i, atomic_t *v);
int atomic_inc_return(atomic_t *v);
int atomic_dec_return(atomic_t *v);
类似于atomic_add ,不同之处在于这些函数会将新的值返回给调用者

atomic_t 类型的数据只能通过上述的函数来访问。

需要多个atomic_t 变量的操作,仍然需要某种类型的锁,如:
atomic_add(amount, &first_atomic);
atomic_sub(amount, &first_atomic);
在amount已经从第一个原子值中减去,到还没有增加到第二个原子值之间,会存在一段时间,如果可能在者两个操作之间运行的代码会导致问题的发生,则必须使用锁。

位操作:
以原子的形式操作单个的位,整个(位操作)操作发生在单个步骤中,因此,不会受到中断。
头文件:
函数的参数中 nr 通常被定义位int型,也有少数架构上被定义为 unsigned long 型,用来描述要操作的位,一般用unsigned long 型的指针来表示要修该的地址,某些架构上用 void * 来表示。
可用的位操作:

void set_bit(nr,void *addr);//设置addr 指向的数据项的第nr位
void clear_bit(nr,void *addr)//清除addr指向的数据项的第nr位,与set 相反
void change_bit(nr,void *addr)//切换指定的位
test_bit(nr,void *addr) // 返回位的当前的值,唯一一个不能原子完成的操作

int test_and_set_bit(nr,void *addr);
int test_and_clear_bit(nr,void *addr);
int test_and_change_bit(nr,void *addr);
想前面的函数一样具有原子化的行为,特点是同时返回这个位的先前值。

对位操作的使用:
以原子方式获得锁并访问某个共享数据项的代码,可使用test_and_set_bit 或者 test_and_clear_bit 。常见的用法:
该方法假定锁就是addr地址上的第nr位,并假定当锁在0是空闲,而在非零是忙。
/*试着设置锁*/
while(test_and_set_bit(nr,addr) != 0)
wait_for_a_while();

/*完成自己的工作*/

/*释放锁,并检查*/
if(test_and_clear_bit(nr,addr) == 0)
something_went_wrong();

seqlock:

当要保护的资源很次、很简单、会被频繁访问而且写入访问很少发生且必须快速时我们可以使用seqlock,seqlock会允许读取者对临界区资源的自由访问,通常seqlock 锁保护的临界区资源不能含有包含指针的数据结构,这是因为当写入者在修改数据结构的同时,读取者可能会追随一个无效的指针。
头文件:
初始化seqlock 的两种方法:
seqlock_t lock1 = SEQLOCK_UNLOCKED ;

seqlock_t lock2 ; 
seqlock_init(&lock2);

读取访问会获得一个整数顺序值来并进入临界区进行访问,在退出时,该顺序值会和当前值比较,如果不相等,则必须重新读取访问。
读取者代码:
unsigned long seq ;
do{
seq = read_seqbegin(&the_lock);
/*要完成的代码*/
}while read_seqretry(&the_lock, seq);

如果在中断处理例程中使用seqlock, 则应该使用IRQ安全的版本:
unsigned long read_seqbegin_irqsave(seqlock_t *lock, unsigned long flags);
int read_seqretry_irqerstore(seqlock_t *lock, unsigned int seq, unsigned long flags);

写入者必须在进入临界区之前获得一个互斥锁。影调用下面的函数:

void write_seqlock(seqlock_t *lock);

写入锁以自旋锁的方式实现。
释放锁:

void write_sequnlock(seqlock_t *lock);

其他自旋锁的变种:

void write_seqlock_irqsave(seqlock_t *lock, unsigned long flags);
void write_seqlock_irq(seqlock_t *lock);
void write_seqlock_bh(seqlock_t *lock);

void write_sequnlock_irqrestore(seqlock_t *lock, unsigned long flags);
void write_sequnlock_irq(seqlock_t *lock);
void write_sequnlock_bh(seqlock_t *lock);

如果 write_tryseqlock 可以获得自旋锁,它也会返回非零值。

读取-复制-更新(read-copy-update : RCU):
RCU 对保护的数据进行了一些限定,被保护的资源应该通过指针进行访问,而对资源的访问必须是原子代码拥有。再需要修改数据结构时,写入线程先复制,然后修改副本,之后用新的版本替代相关指针。但确定旧的版本没有用了的时候,就释放掉该结构。
头文件:
在读取端,代码使用受RCU 保护的数据结构时,必须将引用数据结构的操作代码放在rcu_read_lock 和 rcu_read_unlock 调用之间。如下:
struct my_stuff *stuff;

rcu_read_lock();
stuff = find_the _stuff(args...);
do_something_with(stuff);
rcu_read_unlock();

rcp_read_lock 会禁止内核抢占,但不会等待任何东西。用来检查锁的代码是原子的。

修改手 RCU 保护的数据结构的代码必须通过分配一个struct rcp_head 数据结构来获得清除用的回调函数,在修改完资源之后,因该作如下调用:

void call_rcu(struct rcu_head *head, void (*func)(void *arg), void *arg);
在可安全释放该资源时,给定的func会被调用,传递给call_rcu 的相同参数也会传递给这个函数。
阅读(1263) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~