对并发的管理是操作系统编程中核心的问题之一。
所谓并发,可以这样来理解。 并发:一个人(CPU)喂2个孩子(程序),轮换着每人喂一口饭,表面上2个孩子都在吃饭。 而并行:就是2个人喂2个孩子,2个孩子也同时在吃饭。
并发产生竞态,竞态导致共享数据的非法访问。
一、并发及其管理
竞争情况来自对资源的共享存取的结果。当2个执行的线路有机会操作同一个数据结构(或硬件资源),混合的可能性就一直存在。
第一个经验法则是在你设计驱动时在任何可能的时候记住避免共享的资源。如果没有并发存取,就没有竞争情况。这个想法最明显的应用是使用全局变量。
但是,实际情况是,共享是常常需要的。硬件资源的特性是共享的。软件代码传递一个指针给内核的其他部分,潜在的也创造了一个共享。
资源共享的硬规则:任何时候,一个硬件或软件资源被超过一个单一执行线程共享,并且,可能一个线程看到那个共享资源的不一致(即:或者看到内存已经分配了,或者知道没有内存,或者将要被其他人分配),你必须明确地管理对那个资源的存取。存取管理的常用技术是加锁或者互斥----确保在任何时间只有一个执行线程可以操作一个共享资源。
资源共享的另一个重要规则是:当内核代码创建一个会被内核其他部分共享的对象时,这个对象必须一直存在(而且功能正常)直到对他的外部引用都不存在为止,也就是不存在任何对他的外部引用为止。
二、信号量和互斥体
当一个LINUX进程到了一个它无法做出进一步处理的地方时,即必须等待资源的时候,它去睡眠(也就是一般说的阻塞)。从而让出处理器给别的进程,直到以后某个时间它能够再做事情。这样也是为了不把CPU浪费在进程的等待时间上,提高系统的性能。
信号量(semaphore,旗语,信号灯)。
信号量(LDD3称之为旗标)是一个单个整型值,结合一对函数联合使用。一对函数是P和V。
一个想进入临界区的进程将在相关信号量上调用P;如果信号量的值大于零,这个值递减1并且进程继续。
相反,如果信号量的值是0(或者负数),进程必须等待直到别人释放信号量。解锁一个信号量通过调用V完成;这个函数递增信号量的值,并且,如果需要,唤醒等待的进程。
当信号量用作互斥----阻止多个进程同时在同一个临界区内运行;信号量的值将初始化为1.这样的信号量在任何给定时间只能由一个单个进程或者线程持有。
以这种模式使用的信号量有时称为一个互斥锁,就是互斥。
LINUX内核中几乎所有的信号量都是用于互斥。
使用信号量,内核代码必须包含头文件
。
信号量初始化的方法:
/*直接创建一个信号量*/
void sema_init(struct semaphore *sem, int val);
由于信号量通常是被用于互斥模式,所以内核提供了一组辅助函数和宏:
/*方法一、声明+初始化宏*/
DECLARE_MUTEX(name);
DECLARE_MUTEX_LOCKED(name);
/*结果是一个信号量变量(称为name),初始化为1(使用DECLARE_MUTEX)或者0(使用DECLARE_MUTEX_LOCKED)。在后一种情况,互斥锁开始于上锁的状态;在允许任何线程存取之前必须显式解锁它*/
/*方法二、初始化函数(这里是在运行时间初始化,即动态分配它的情况)*/
void init_MUTEX(struct semaphore *sem);
void init_MUTEX_LOCKED(struct semphore *sem);
/*带有“_LOCKED”的是将信号量初始化为0,也就是进行锁定,线程访问时必须先解锁。没带“_LOCKED”的,初始化为1*/
P函数为:
void down(struct semaphore *sem); /*不推荐使用,此函数会建立不可杀的进程,因为它的操作是不可中断的*/
int down_interruptible(struct semaphore *sem); /*推荐使用,使用down_interruptible需要格外小心,若操作被中断,该函数会返回非零值,而调用者不会拥有该信号量。正确使用是:需要始终检查返回值,并做出相应的响应。*/
int down_trylock(struct semphore *sem);/*从不睡眠;如果信号量在调用时不可用,down_trylock立刻返回一个非零值。*/
一旦一个线程成功调用了down各个版本中的一个,我们就说它持有信号量(或者已经“取得”或者“获得”信号量)。这个线程现在有权利存取这个信号量保护的临界区。当这个需要互斥的操作完成时,信号量必须被返回。V的LINUX对应的是up,对应于P的down。
void up(struct semaphore *sem);/*一旦up被调用,调用者就不再拥有信号量*/
/*任何拿到信号量的线程都必须通过一次(只有一次)对up的调用而释放该信号量。在出错时,要特别的小心;若在拥有一个信号量时发生错误,必须在将错误状态返回前释放信号量*/
读者/写者信号量
信号量为所有调用者进行互斥,不管每个线程可能想做什么。
然而,很多任务分为2中类型:只需要读取被保护的数据结构的类型和必须做改变的类型。允许多个并发读者常常来说是可能的,只要这些读者中没有人试图对数据做出修改。只读的任务可以并行进行它们的工作而不必等待其他读者退出临界区。
LINUX内核提供了读者/写者信号量“rwsem”,必须包括头文件。
初始化:
void init_rwsem(struct rw_semaphore *sem);
只读接口:
void down_read(struct rw_semaphore *sem);
int down_read_trylock(struct rw_semaphore *sem);
void up_read(struct rw_semaphore *sem);
只写接口:
void down_write(struct rw_semaphore *sem);
int down_write_trylock(struct rw_semaphore *sem);
void up_write(struct rw_semaphore *sem);
void downgrade_write(struct rw_semaphore *sem);/*该函数用于把写者降级为读者,这个操作有时是必要的。因为写者是排他性的,即不可并发的,因此在写者保持读写信号量期间,任何读者或写者都将无法访问该读写信号量保护的共享资源,对于那些当前条件下不需要写访问的写者,降级为读者,将使得等待访问的读者能够立即访问,从而增加了并发性,提高了效率。*/
一个rwsem允许一个写者或无限多个读者来看拥有该信号量。写者有优先权;当某个写者试图进入临界区,就不会允许读者进入直到写者完成了它的工作。如果有大量的写者竞争该信号量,则这个实现可能导致读者“饿死”,即可能会长期拒绝读者访问。因此,rwsem最好用在很少请求写的时候,并且写者只占用较短的时间。
completion
completion是一种轻量级的机制,它允许一个线程告诉另一个线程某个工作已经完成。代码中必须包含头文件。使用的代码如下:
DECLARE_COMPLETION(my_completion);/*创建completion(声明+初始化)*/
struct completion my_completion; /*动态声明completion结构体*/
static inline void init_completion(&my_completion);/*动态初始化completion*/
void wait_for_completion(struct completion *c);/*等待completion*/
void completion(struct completion *c); /*唤醒一个等待completion的线程*/
void completion_all(struct completion *c);/*唤醒所有等待completion的线程*/
/*如果未使用completion_all,completion可重复利用;否则必须使用以下的函数重新初始化completion*/
INIT_COMPLETION(struct completion c);/*快速重新初始化completion*/
completion的典型应用是模块退出时的内核线程终止。在这种运行中,某些驱动程序的内部工作有一个内核线程在while(1)循环中完成。当内核准备清除该模块时,exit函数会告诉该线程退出并等待completion。为此内核包含了用于这种线程的一个特殊函数:
void completion_and_exit(struct completion *c, long retval);
自旋锁
自旋锁的使用:
示例:A,B两个人要去将一个房子的里面墙刷上油漆,A刷蓝色,B刷红色。要求是保证这个房子的颜色在某一个时间内是同一种颜色。
如果A,B分别刷就没有问题了。但是一定会出现:A去刷的同时,B也去刷的情况,这样就可能会出现2个颜色。所以,为了达到要求(保证房子的颜色在某一个时间内是同一种颜色),我们就在A或者B进入房间后把门锁上,等它刷完后才开门让另一个人进去刷漆。这样的解决方法:把门锁上;就是自旋锁。A进去后,B就得等锁的打开,B等的方式就是自旋锁的特点。这个房子可能是内存或某个变量。
注意:若A刷房子的时候,A“睡着”了,那么就会导致B需要等待很长的一段时间才能得到锁。在程序中也是一样的,当线程或进程处于睡眠状态的话,如果某个线程(B)想得到该锁,最好的情况是B在处理器上自旋等待该锁很长时间才获得该锁;最坏的情况是系统进入了死锁情况。
自旋锁遵守的核心规则是:任何拥有自旋锁的代码必须是原子的。(所谓原子:就是不可被打断一口气要实现完)它不能睡眠,不能因为任何原因放弃处理器。自旋锁的时间越短越好,长时间占着茅坑,搁谁都是很不爽的。
其实上面介绍的几种信号量和互斥机制,其底层源码都是使用自旋锁,可以理解为自旋的再包装。这就是为什么自旋锁通常可以提供比信号量更高的性能。
自旋锁是一个互斥设备,他只能会两个值:“锁定”和“解锁”。它通常实现为某个整数之中的单个位。
“测试并设置”的操作必须以原子方式完成。
任何时候,只要内核代码拥有自旋锁,在相关CPU上的抢占就会被禁止。
适合于自旋锁的核心规则:
(1)任何拥有自旋锁的代码都必须是原子的,除服务中断外(某些情况下也不能放弃CPU,如中断服务也要获得自旋锁。为避免这种锁陷阱,需要在拥有自旋锁时禁止中断),不能放弃CPU(如休眠,休眠可能发生在许多无法预期的地方)。否则CPU将有可能永远自旋下去(即死机)。
(2)拥有自旋锁的时间越短越好。
自旋锁原语所需包含的头文件:,以下是自旋锁的内核API:
spinlock_t my_lock = SPIN_LOCK_UNLOCKED; /*编译时初始化spinlock*/
void spin_lock_init(spinlock_t *lock); /*运行时初始化spinlock*/
/*所有spinlock等待,本质上是不可中断的,一旦调用spin_lock,在获得锁之前一直处于自旋状态*/
void spin_lock(spinlock_t *lock); /*获得spinlock*/
void spin_lock_irqsave(spinlock_t *lock, unsigned long flags);/*获得spinlock,禁止本地CPU中断,保存中断标志于flags*/
void spin_lock_irq(spinlock_t *lock);/*获得spinlock,禁止本地CPU中断*/
void spin_lock_bh(spinlock_t *lock); /*获得spinlock,禁止软件中断,保持硬件中断打开*/
/*以下是对应的锁释放函数*/
void spin_unlock(spinlock_t *lock);
void spin_unlock_irqrestore(spinlock_t *lock, unsigned long flags);
void spin_unlock_irq(spinlock_t *lock);
void spin_unlock_bh(spinlock_t *lock);
/*以下非阻塞自旋锁函数,成功获得,则返回非零值;失败,则返回零*/
int spin_trylock(spinlock_t *lock);
int spin_trylock_bh(spinlock_t *lock);
/*新内核的包含了更多的函数*/
读取者/写入者自旋锁:
rwlock_t my_rwlock = RW_LOCK_UNLOCKED;/*编译时初始化*/
rwlock_t my_rwlock;
rwlock_init(&my_rwlock); /*运行时初始化*/
void read_lock(rwlock_t *lock);
void read_lock_irqsave(rwlock_t *lock, unsigned long flags);
void read_lock_irq(rwlock_t *lock);
void read_lock_bh(rwlock_t *lock);
void read_unlock(rwlock_t *lock);
void read_unlock_irqsave(rwlock_t *lock, unsigned long flags);
void read_unlock_irq(rwlock_t *lock);
void read_unlock_bh(rwlock_t *lock);
/*新内核已经有了read_trylock*/
void write_lock(rwlock_t *lock);
void write_lock_irqsave(rwlock_t *lock, unsigned long flags);
void write_lock_irq(rwlock_t *lock);
void write_lock_bh(rwlock_t *lock);
int write_trylock(rwlock_t *lock);
void write_unlock(rwlock_t *lock);
void write_unlock_irqsave(rwlock_t *lock, unsigned long flags);
void write_unlock_irq(rwlock_t *lock);
void write_unlock_bh(rwlock_t *lock);
/*新内核的包含了更多函数*/
锁陷阱
锁定模式必须在一开始就安排好,否则其后的改进将会非常困难。
模糊的规则:如果某个获得锁的函数要调用其他同样试图获取这个锁的函数,代码就会锁死。(不允许锁的拥有者第二次获得同个锁)。为了锁的正确工作,不得不编写一些函数,假定它们的调用者已经获取了相关的锁。
锁的顺序规则:当多个锁必须获得时,它们应当一直以同样顺序获得。可以避免简单死锁。
如果你必须获得一个对你的代码来说的本地锁(假如,一个设备锁),以及一个属于内核更中心部分的锁,先获取你的。如果你有一个信号量和自旋锁的组合,你必须先获得信号量;
调用down(可能睡眠)在持有一个自旋锁时是一个严重的错误。但是最重要的,尽力避免需要多于一个锁的情况。
细颗粒度和粗颗粒度的对比
应该在最初使用粗颗粒度的锁,除非有真正的原因相信竞争会导致问题。
四、加锁的各种选择
(1)不加锁算法
经常可以对无锁的生产者/消费者任务有用的数据结构是环形缓冲。它在设备驱动程序中相当普遍,如网卡驱动程序。内核中有一个通用的环形缓冲的实现在。
(2)原子变量
完整的锁机制对一个简单的整数来讲显得浪费。内核提供了一种原子的整数类型,称为atomic_t,定义在。原子变量操作是非常快的,因为它们在任何可能时编译成一条单一机器指令。
以下是其接口函数:
void atomic_set(atomic_t *v, int i);/*设置原子变量v为整数值i*/
atomic_t v = ATOMIC_INIT(0); /*编译时使用宏定义ATOMIC_INIT初始化原子值*/
int atomic_read(atomic_t *v); /*返回v的当前值*/
void atomic_add(int i, atomic_t *v);/*由v指向的原子变量加i。返回值类型是void*/
void atomic_sub(int i, atomic_t *v);/*从*v减去i*/
void atomic_inc(atomic_t *v);
void atomic_dec(atomic_t *v);/*递增或递减一个原子变量*/
int atomic_inc_and_test(atomic_t *v);
int atomic_dec_and_test(atomic_t *v);
int atomic_sub_and_test(int i, atomic_t *v);
/*进行一个特定的操作并且测试结果;如果,在操作后,原子值是0,那么返回值是真;否则,它是假。注意没有atomic_add_and_test*/
int atomic_add_negative(int i, atomic_t *v);
/*加整数变量i到v。如果结果是负数,返回值是真,否则为假*/
int atomic_add_return(int i, atomic_t *v);
int atomic_sub_return(int i, atomic_t *v);
int atomic_inc_return(atomic_t *v);
int atomic_dec_return(atomic_t *v);
/*像atomic_add和其类似函数,除了它们返回原子变量的新值给调用者*/(不明白这句话的意思)
atomic_t数据项必须通过这些函数存取。如果你传递一个原子项给一个期望一个整数参数的函数,你会得到一个编译错误。需要多个atomic_t变量的操作仍然需要某种其他种类的加锁。
(3)位操作
内核提供了一套函数来原子地修改或测试单个位。因为整个操作在单步内发生,没有中断(或者其它处理器)能干扰。原子位操作非常快,因为它们使用单个机器指令来进行操作,而在任何时候低层平台做的时候不用禁止中断。函数的体系依赖在中声明。以下函数中的数据是体系依赖的。nr参数(描述要操作哪个位)在ARM体系中定义为unsigned int:
void set_bit(nr, void *addr);/*设置第nr位在addr指向的数据项中*/
void clear_bit(nr, void *addr);/*清除指定位在addr处的无符号长型数据*/
void change_bit(nr, void *addr);/*翻转nr位*/
test_bit(nr, void *addr);/*这个函数是唯一一个不需要是原子的位操作;它简单地返回这个位的当前值*/
/*一下原子操作如同前面列出的,除了它们还返回这个位以前的值*/
int test_and_set_bit(nr, void *addr);
int test_and_clear_bit(nr, void *addr);
int test_and_change_bit(nr, void *addr);
以下是一个使用范例:
/*try to set lock*/
while ( test_and_set_bit(nr, addr) != 0 )
wait_for_a_while();
/*do your work*/
/*release lock, and check */
if ( test_and_clear_bit(nr, addr)==0 )
something_went_wrong(); /*alreadly release error*/
(4) seqlock
2.6内核包含了一对新机制打算来提供快速地,无锁地存取一个共享资源。seqlock要保护的资源小,简单,并且常常被存取,并且很少写存取但是必须要快。seqlock通常不能用在保护包含指针的数据结构。seqlock定义在
/*两种初始化方法*/
seqlock_t lock1 = SEQLOCK_UNLOCKED;
seqlock_t lock2;
seqlock_init(&lock2);
这个类型的锁常常用在保护某种简单计算,读存取通过在进入临界区入口获取一个(无符号的)整数序列来工作。在退出时,那个序列值与当前值比较;如果不匹配,读存取必须重试。读者代码形式:
unsigned int seq;
do {
seq = read_seqbegin(&the_lock);
/*Do what you need to do*/
}while read_seqretry(&the_lock, seq);
如果你的seqlock可能从一个中断处理里存取,你应当使用IRQ安全的版本来代替:
unsigned int read_seqbegin_irqsave(seqlock_t *lock, unsigned long flags);
int read_seqretry_irqrestore(seqlock_t *lock, unsigned int seq, unsigned long flags);
写者必须获取一个排他锁来进入由一个seqlock保护的临界区,写锁由一个自旋锁实现,调用:
void write_seqlock(seqlock_t *lock);
void write_sequnlock(seqlock_t *lock);
因为自旋锁用来控制写存取,所有通常的变体都可用:
void write_seqlock_irqsave(seqlock_t *lock, unsigned long flags);
void write_seqlock_irq(seqlock_t *lock);
void write_seqlock_bh(seqlock_t *lock);
void write_sequnlock_irqrestore(seqlock_t *lock, unsigned long flags);
void write_sequnlock_irq(seqlock_t *lock);
void write_sequnlock_bh(seqlock_t *lock);
还有一个write_tryseqlock在它能够获得锁时返回非零。
(5)读取-复制-更新
读取-拷贝-更新(RCU)是一个高级的互斥方法,在合适的情况下能够有高效率。它在驱动中的使用很少。