打算换工作了,求职北京地区内核职位..
分类: LINUX
2013-06-23 14:01:04
lock free机制分析 (by chishanmingshen)
http://chishanmingshen.blog.chinaunix.net
1.先说说几个概念
锁只是同步方法的一种,同步分阻塞同步和非阻塞同步。
lockfree就可以保证非阻塞同步,同时也尽量保证了waitfree,但依然做不到完全的waitfree。
2.既然讨论无锁,就得说说有锁会导致的问题
a)死锁
多个锁不按顺序的话
b)优先级反转
可能高优先级的等待低优先级的
c)影响实时性
等锁时间不定
d)信号安全
信号处理中不能用锁
e)崩溃处理
崩溃时可能占着锁
f)抢占的影响
被抢占时可能还占着锁
g)影响整体性能
切换进程影响性能
最后,锁的实现是跟硬件相关的,自然影响移植性。
3.无锁首先得根据业务逻辑,这个是首要的。
脱离业务去有锁或无锁是没有意义的。
根据业务流程去简化,一般可以将锁的存在压缩到最小。另外锁只是同步机制的一种,应该还有其它选择。
4.如果已经将有锁的存在压缩到最小的数据结构了,就可以考虑无锁算法了。
通用的lock free算法都是针对基本的数据结构的:buffer,list,queue,map
基本原理就是CAS机制了。现在几乎所有的CPU指令都支持CAS的原子操作,X86下对应的是CMPXCHG汇编指令。(可见,lockfree是依赖于机器体系结构的,而且lockfree其实还是有锁的,只是CAS给压缩含义了)
一般CAS会封装成下面的形式:(以32bit机为例)
bool cas32( int * pVal, int oldVal, int newVal );
pVal 表示要比较和替换数值的地址,oldVal表示期望的值,newVal表示希望替换成的值。
gcc中:
[code]bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...)
type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...)
These builtins perform an atomic compare and swap. That is, if the current value of *ptr is oldval, then write newval into *ptr.
The “bool” version returns true if the comparison is successful and newval was written. The “val” version returns the contents of *ptr before the operation.
[/code]
另外,gcc的对于内核中内存屏障也有封装:(内核的kfifo实现可以看到,内核在kfifo时有比CAS机制更加地lock free,用mb即可。当然mb实现也跟机器架构有关)
[code]__sync_synchronize (...)
This builtin issues a full memory barrier.[/code]
[code] #define LOCK_PREFIX "lock;"
#define __sync_bool_compare_and_swap(mem, oldval, newval) \
({ __typeof (*mem) ret; \
__asm __volatile (LOCK_PREFIX "cmpxchgl %2, %1;sete %%al; movzbl %%al,%%eax" \
: "=a" (ret), "=m" (*mem) \
: "r" (newval), "m" (*mem), "a" (oldval)\
:"memory"); \
ret; }) [/code]
returns true if the comparison is successful and newval was written.
5.下面以说说无锁队列的实现,从中可以抓着lock free的核心机制。
EnQueue(x)
{
q = newrecord();
q->value = x;
q->next = NULL;
do{
p = tail;
}while( CAS(p->next, NULL, q) != TRUE);
CAS(tail, p, q);
}
DeQueue()
{
do{
p = head;
if(p->next == NULL){
returnERR_EMPTY_QUEUE;
}
while( CAS(head, p, p->next) != TRUE );
returnp->next->value;
}
上面的加入队列有一个问题:如果T1在成功完成第一个CAS,还没开始第二个CAS时挂掉了。那么如果其它threads就会全死循环了,都在等着next字段为NUL的tail,但其实这个tail的next恒指向T1新增的节点了。
修改后的:
EnQueue(x)
{
q = newrecord();
q->value = x;
q->next = NULL;
p = tail;
oldp = p
do{
while(p->next != NULL)
p = p->next;
}while( CAS(p.next, NULL, q) != TRUE);
CAS(tail, oldp, q);
}
6.CAS的ABA问题
thread 1在共享内存空间中读到的值为A,此时
thread 1被抢占了,thread 2执行
thread 2把共享变量里的值从A改成了B,再改回到A,此时被thread 1抢占。
thread 1回来看到共享变量里的值没有被改变,于是继续执行。
这个就是lock free的ABA问题,CAS无法判断目标内容从A变为B,然后又变为A这种情况。(这个A就是保护共享空间的内容,很可能就是指针:像队列、链表实现中,就是节点的地址!)
解决的办法通常是使用一个额外的tag来记录这种情况,并且使用CAS2(double
CAS)同时检查tag和目标内存内容两个值都没有发生变化。注意:CAS2最多支持检查一个64
bit长度的内存指并原子交换他们的内容。但是,可以想象,那个tag也有溢出的问题,所以还是没有彻底解决ABA问题。其实,保证每次添加的节点内存唯一,这样就彻底了。
小结:lock free不是真正的无锁,只是将锁的存在含义压缩了!依然不能完全的wait free,在极端情况下还是会有饥饿问题!