对并发的管理是操作系统编程中核心的问题之一。并发产生竞态,竞态导致对共享数据的非控制访问,会产生非预期的结果。竞态常会导致系统崩溃,数据被破坏或者产生安全问题。因为竞态是一种极端低可能性的事件,因此程序员往往会忽视竞态。但是在计算机世界中,百万分之一的事件可能没几秒就会发生,而其结果是灾难性的。
1.并发及其管理
在设计自己的驱动程序时,第一个要记住的规则是:只要可能,就应该避免资源的共享。若没有并发访问,就不会有竞态。这种思想的最明显的应用是避免使用全局变量。
但是,资源的共享是不可避免的 ,如硬件资源本质上就是共享、指针传递等等。
资源共享的硬性规则:
(1)在单个执行线程之外共享硬件或软件资源的任何时候,因为另外一个线程可能产生对该资源的不一致观察,因此必须显示地管理对该资源的访问。访问管理的常见技术成为“锁定”或者“互斥”:确保一次只有一个执行线程可操作共享资源。
(2)当内核代码创建了一个可能和其他内核部分共享的对象时,该对象必须在还有其他组件引用自己时保持存在(并正确工作)。对象尚不能正确工作时,不能将其对内核可用。
2.信号量和互斥体
一个信号量(semaphore)本质上是一个整数值,它和一对函数联合使用,这一对函数通常称为P和V(操作系统原理里)。希望进入临界区的进程将在相关信号量上调用P;如果信号量的值大于零,则该值会减一,而进程可以继续。相反,如果信号量的值为零(或更小),进程必须等待直到其他人释放该信号。对信号量的解锁通过调用V完成;该函数增加信号量的值,并在必要时唤醒等待的进程。
当信号量用于互斥时(即避免多个进程同时在一个临界区运行),信号量的值应初始化为1。这种信号量在任何给定时刻只能由单个进程或线程拥有。在这种使用模式下,一个信号量也被称为一个“互斥体(mutex)”,它是互斥(mutual exclusion)的简称。Linux内核中几乎所有的信号量均用于互斥。
- void sema_init(struct semaphore *sem, int val);
由于信号量通常被用于互斥模式。所以以下是内核提供的一组辅助函数和宏:
- /*方法一、声明+初始化宏,编译时,初始化*/
- DECLARE_MUTEX(name);
- DECLARE_MUTEX_LOCKED(name);
- /*方法二、初始化函数,动态初始化,运行时*/
- void init_MUTEX(struct semaphore *sem);
- void init_MUTEX_LOCKED(struct semaphore *sem);
- /*带有“_LOCKED”的是将信号量初始化为0,即锁定,允许任何线程访问时必须先解锁。没带的为1。*/
P函数:
- void down(struct semaphore *sem); /*一般不用,会建立不可杀进程*/
- int down_interruptible(struct semaphore *sem);/*推荐使用,使用down_interruptible需要格外小心,若操作被中断,该函数会返回非零值,而调用者不会拥有该信号量。对down_interruptible的正确使用需要始终检查返回值,并做出相应的响应。*/
- if (down_interruptible(&dev->sem))
- return -ERESTARTSYS;
- 若返回非零值,说明操作被中断,通常返回-ERESTARTSYS。kernel会对这个返回重新启动内核调用,要么会将该错误返回给用户,若返回-ERESTARTSYS,这必须首先撤销已经做出的任何用户可见的修改,这样系统调用可以重试,如无法撤销,则应该返回-EINTR。
- int down_trylock(struct semaphore *sem);/*带有“_trylock”的永不休眠,若信号量在调用是不可获得,会返回非零值。*/
V函数为:
- void up(struct semaphore *sem);/*任何拿到信号量的线程都必须通过一次(只有一次)对up的调用而释放该信号量。在出错时,要特别小心;若在拥有一个信号量时发生错误,必须在将错误状态返回前释放信号量。*/
在scull中使用信号量
在初始化scull_dev的地方:
- /* Initialize each device. */
- for (i = 0; i < scull_nr_devs; i++) {
- scull_devices[i].quantum = scull_quantum;
- scull_devices[i].qset = scull_qset;
- init_MUTEX(&scull_devices[i].sem);/* 注意顺序:先初始化好互斥信号量 ,再使scull_devices可用。*/
- scull_setup_cdev(&scull_devices[i], i);
- }
而且要确保在不拥有信号量的时候不会访问scull_dev结构体
3.读取者/写入者信号量
只读任务可并行完成它们的工作,而不需要等待其他读取者退出临界区。Linux内核提供了读取者/写入者信号量“rwsem”,使用是必须包括 。
初始化:
- void init_rwsem(struct rw_semaphore *sem);
只读接口:
- void down_read(struct rw_semaphore *sem);
- int down_read_trylock(struct rw_semaphore *sem);
- void up_read(struct rw_semaphore *sem);
写入接口:
- void down_write(struct rw_semaphore *sem);
- int down_write_trylock(struct rw_semaphore *sem);
- void up_write(struct rw_semaphore *sem);
- void downgrade_write(struct rw_semaphore *sem);/*该函数用于把写者降级为读者,这有时是必要的。因为写者是排他性的,因此在写者保持读写信号量期间,任何读者或写者都将无法访问该读写信号量保护的共享资源,对于那些当前条件下不需要写访问的写者,降级为读者将,使得等待访问的读者能够立刻访问,从而增加了并发性,提高了效率。×/
一个 rwsem 允许一个写者或无限多个读者来拥有该信号量. 写者有优先权; 当某个写者试图进入临界区, 就不会允许读者进入直到写者完成了它的工作. 如果有大量的写者竞争该信号量,则这个实现可能导致读者“饿死”,即可能会长期拒绝读者访问。因此, rwsem 最好用在很少请求写的时候, 并且写者只占用短时间.
4.completion
completion是一种轻量级的机制,它允许一个线程告诉另一个线程某个工作已经完成。代码必须包含。使用的代码如下:
- DECLARE_COMPLETION(my_completion);/* 创建completion(声明+初始化) */
- struct completion my_completion;/* 动态声明completion 结构体*/
- static inline void init_completion(&my_completion);/*动态初始化completion*/
- void wait_for_completion(struct completion *c);/* 等待completion */
- void complete(struct completion *c);/*唤醒一个等待completion的线程*/
- void complete_all(struct completion *c);/*唤醒所有等待completion的线程*/
- /*如果未使用completion_all,completion可重复使用;否则必须使用以下函数重新初始化completion*/
- INIT_COMPLETION(struct completion c);/*快速重新初始化completion*/
completion的典型应用是模块退出时的内核线程终止。在这种原型中,某些驱动程序的内部工作由一个内核线程在while(1)循环中完成。当内核准备清除该模块时,exit函数会告诉该线程退出并等待completion。为此内核包含了用于这种线程的一个特殊函数:
- void complete_and_exit(struct completion *c, long retval);
5.自旋锁
自旋锁可在不能休眠的代码中使用,比如中断例程。
自旋锁是一个互斥设备,他只能会两个值:“锁定”和“解锁”。它通常实现为某个整数值中的单个位。
“测试并设置”的操作必须以原子方式完成。当存在自旋锁时,等待执行忙循环的处理器做不了任何有用的工作。任何时候,只要内核代码拥有自旋锁,在相关CPU上的抢占就会被禁止。所有的自旋锁等待在本质上都是不可中断的。
适用于自旋锁的核心规则:
① 任何拥有自旋锁的代码都必须是原子的,他不能休眠,事实上,它不能因为任何原因放弃处理器。除了服务中断外。(某些情况下也不能放弃CPU,如中断服务也要获得自旋锁。为了避免这种锁陷阱,需要在拥有自旋锁时禁止中断)。只要内核代码拥有自旋锁,在相关处理器上的抢占就应该禁止(如休眠,休眠可发生在许多无法预期的地方)。否则CPU将有可能永远自旋下去(死机)。
② 拥有自旋锁的时间越短越好(会导致高优先级线程等待,降低系统性能)。
自旋锁原语所需包含的文件是 ,以下是自旋锁的内核API:
- spinlock_t my_lock = SPIN_LOCK_UNLOCKED;/* 编译时初始化spinlock*/
- void spin_lock_init(spinlock_t *lock);/* 运行时初始化spinlock*/
- /* 所有spinlock等待本质上是不可中断的,一旦调用spin_lock,在获得锁之前一直处于自旋状态*/
- void spin_lock(spinlock_t *lock);/* 获得spinlock*/
- void spin_lock_irqsave(spinlock_t *lock, unsigned long flags);/* 获得spinlock,禁止本地cpu中断,保存中断标志于flags*/
- void spin_lock_irq(spinlock_t *lock);/* 获得spinlock,禁止本地cpu中断*/
- void spin_lock_bh(spinlock_t *lock)/* 获得spinlock,禁止软件中断,保持硬件中断打开*/
- /* 以下是对应的锁释放函数*/
- void spin_unlock(spinlock_t *lock);
- void spin_unlock_irqrestore(spinlock_t *lock, unsigned long flags);
- void spin_unlock_irq(spinlock_t *lock);
- void spin_unlock_bh(spinlock_t *lock);
- /* 以下非阻塞自旋锁函数,成功获得,返回非零值;否则返回零*/
- int spin_trylock(spinlock_t *lock);
- int spin_trylock_bh(spinlock_t *lock);
- /*新内核的包含了更多函数,spin_lock_nest_lock/spin_lock_irqsave_nested等*/
读取者/写入者自旋锁
- rwlock_t my_rwlock = RW_LOCK_UNLOCKED;/* 编译时初始化*/
- rwlock_t my_rwlock;
- rwlock_init(&my_rwlock); /* 运行时初始化*/
- void read_lock(rwlock_t *lock);
- void read_lock_irqsave(rwlock_t *lock, unsigned long flags);
- void read_lock_irq(rwlock_t *lock);
- void read_lock_bh(rwlock_t *lock);
- void read_unlock(rwlock_t *lock);
- void read_unlock_irqrestore(rwlock_t *lock, unsigned long flags);
- void read_unlock_irq(rwlock_t *lock);
- void read_unlock_bh(rwlock_t *lock);
- /* 新内核已经有了read_trylock*/
- void read_trylock(rwlock_t * lock);
- void write_lock(rwlock_t *lock);
- void write_lock_irqsave(rwlock_t *lock, unsigned long flags);
- void write_lock_irq(rwlock_t *lock);
- void write_lock_bh(rwlock_t *lock);
- int write_trylock(rwlock_t *lock);
- void write_unlock(rwlock_t *lock);
- void write_unlock_irqrestore(rwlock_t *lock, unsigned long flags);
- void write_unlock_irq(rwlock_t *lock);
- void write_unlock_bh(rwlock_t *lock);
锁陷阱
锁定模式必须在一开始就安排好,否则其后的改进将会非常困难。
不明确规则:①如果某个获得锁的函数要调用其他同样试图获取这个锁的函数,代码就会死锁。②不管是信号量还是自旋锁,都不允许锁拥有者第二次获得这个锁,否则系统将挂起。
锁的顺序规则:①在必须获取多个锁时,应该始终以相同的顺序获得;②如果必须获取一个局部锁(比如一个设备),以及一个属于内核更中心位置的锁,这应该首先获取自己的局部锁;③如果拥有信号量和自旋锁的组合,则必须先获得信号量,在拥有自旋锁时调用down()(可导致休眠)是个严重的错误。
作为通常的规则,我们应该在最初使用粗粒度的锁,除非有真正的原因相信竞争会导致问题。我们需要抑制自己过早考虑优化的欲望,因为真正的性能约束通常出现在非预期的情况下。
信号量与自旋锁差别
信号量:①可有多个持有者,当只有一个时,是互斥信号量;②保持较长时间。
自旋锁:①一个持有者;②时间非常短.
6.除了锁之外的办法
⑴免锁算法:经常用于免锁的生产者/消费者任务的数据结构之一是循环缓冲区(circular buffer).循环缓冲区的使用在设备驱动程序中相当普遍,特别是网络适配器,经常使用循环缓冲区和处理器交换数据。内核里有一个通用的循环缓冲区实现,在
⑵原子变量:完整的锁机制对一个简单的整数来讲显得有些浪费,针对这种情况,内核提供了一种原子的证书类型,atomic_t,定义在中
- void atomic_set(atomic_t *v, int i); /*设置原子变量 v 为整数值 i.*/
- atomic_t v = ATOMIC_INIT(0); /*编译时使用宏定义 ATOMIC_INIT 初始化原子值.*/
- int atomic_read(atomic_t *v); /*返回 v 的当前值.*/
- void atomic_add(int i, atomic_t *v);/*由 v 指向的原子变量加 i. 返回值是 void*/
- void atomic_sub(int i, atomic_t *v); /*从 *v 减去 i.*/
- void atomic_inc(atomic_t *v);
- void atomic_dec(atomic_t *v); /*递增或递减一个原子变量.*/
- int atomic_inc_and_test(atomic_t *v);
- int atomic_dec_and_test(atomic_t *v);
- int atomic_sub_and_test(int i, atomic_t *v);
- /*进行一个特定的操作并且测试结果; 如果, 在操作后, 原子值是 0, 那么返回值是真; 否则, 它是假. 注意没有 atomic_add_and_test.*/
- int atomic_add_negative(int i, atomic_t *v);
- /*加整数变量 i 到 v. 如果结果是负值返回值是真, 否则为假.*/
- int atomic_add_return(int i, atomic_t *v);
- int atomic_sub_return(int i, atomic_t *v);
- int atomic_inc_return(atomic_t *v);
- int atomic_dec_return(atomic_t *v);
- /*像 atomic_add 和其类似函数, 除了它们返回原子变量的新值给调用者.*/
atomic_t数据项必须只能通过上述函数来访问,如果将原子变量传递给了需要整形参数的函数,则会遇到编译错误。只有原子变量的数目是原子的,atomic_t变量才能工作,需要多个atomic_t变量的操作,仍然需要某种类型的锁
⑶位操作:内核提供了一套函数来原子地修改或测试单个位。原子位操作非常快, 因为它们使用单个机器指令来进行操作, 并且不需要禁止中断. 函数依赖于具体的框架,在 中声明. nr 参数(描述要操作哪个位)在ARM体系中定义为unsigned int:
- void set_bit(nr, void *addr); /*设置第 nr 位在 addr 指向的数据项中。*/
- void clear_bit(nr, void *addr); /*清除指定位在 addr 处的无符号长型数据.*/
- void change_bit(nr, void *addr);/*翻转nr位.*/
- test_bit(nr, void *addr); /*这个函数是唯一一个不需要是原子的位操作; 它简单地返回这个位的当前值.*/
- /*以下原子操作如同前面列出的, 除了它们还返回这个位以前的值.*/
- int test_and_set_bit(nr, void *addr);
- int test_and_clear_bit(nr, void *addr);
- int test_and_change_bit(nr, void *addr);
⑷seqlock:2.6内核包含了一对新机制来提供快速地, 无锁地存取一个共享资源。 seqlock要保护的资源小, 简单,并且常常被存取,并且很少写存取但是必须要快。seqlock 通常不能用在保护包含指针的数据结构。seqlock 定义在 。
- /*两种初始化方法*/
- seqlock_t lock1 = SEQLOCK_UNLOCKED;
- seqlock_t lock2;
- seqlock_init(&lock2);
如果你的 seqlock 可能从一个中断处理里存取, 你应当使用 IRQ 安全的版本来代替:
- unsigned int read_seqbegin_irqsave(seqlock_t *lock, unsigned long flags);
- int read_seqretry_irqrestore(seqlock_t *lock, unsigned int seq, unsigned long flags);
写者必须获取一个排他锁来进入由一个 seqlock 保护的临界区,写锁由一个自旋锁实现, 调用:
- void write_seqlock(seqlock_t *lock);
- void write_sequnlock(seqlock_t *lock);
因为自旋锁用来控制写存取, 所有通常的变体都可用:
- void write_seqlock_irqsave(seqlock_t *lock, unsigned long flags);
- void write_seqlock_irq(seqlock_t *lock);
- void write_seqlock_bh(seqlock_t *lock);
- void write_sequnlock_irqrestore(seqlock_t *lock, unsigned long flags);
- void write_sequnlock_irq(seqlock_t *lock);
- void write_sequnlock_bh(seqlock_t *lock);
如果write_tryseqlock可以获得自旋锁,它也会返回非零值。
⑸RCU:读取-复制-更新(read-copy-update)是一个高级的互斥方法, 在正确的条件下可获得高的性能.很知名,但在驱动中的使用很少。一个应用实例是,网络路由表。
阅读(527) | 评论(0) | 转发(0) |