Chinaunix首页 | 论坛 | 博客
  • 博客访问: 190134
  • 博文数量: 75
  • 博客积分: 2773
  • 博客等级: 少校
  • 技术积分: 765
  • 用 户 组: 普通用户
  • 注册时间: 2010-08-10 13:09
文章分类
文章存档

2019年(3)

2017年(4)

2015年(3)

2013年(5)

2012年(11)

2011年(45)

2010年(4)

分类: LINUX

2012-03-26 13:58:04

锁机制(lock) 是多线程编程中最常用的同步机制,用来对多线程间共享的临界区(Critical Section) 进行保护。

Pthreads提供了多种锁机制,常见的有:
1) Mutex(互斥量):pthread_mutex_***
2) Spin lock(自旋锁):pthread_spin_***
3) Condition Variable(条件变量):pthread_con_***
4) Read/Write lock(读写锁):pthread_rwlock_***

在多线程编中,根据应用场合的不同,选择合适的锁来进行同步,对多线程程序的性能影响非常大. 本文主要对 pthread_mutex 和 pthread_spinlock 两种锁制机进行比较,并讨论其适用的场合.

1 Pthread mutex

Mutex属于sleep-waiting类型的锁. 从 2.6.x 系列稳定版内核开始, Linux 的 mutex 都是 futex (Fast-Usermode-muTEX)锁.
futex(快速用户区互斥的简称)是一个在Linux上实现锁定和构建高级抽象锁如信号量和POSIX互斥的基本工具。它们第一次出现在内核开发的2.5.7版;其语义在2.5.40固定下来,然后在2.6.x系列稳定版内核中出现。
Futex 是由Hubertus Franke(IBM Thomas J. Watson 研究中心), Matthew Kirkwood,Ingo Molnar(Red Hat)和 Rusty Russell(IBM Linux 技术中心)等人创建的。
Futex 是由用户空间的一个对齐的整型变量和附在其上的内核空间等待队列构成. 多进程或多线程绝大多数情况下对位于用户空间的futex 的整型变量进行操作(汇编语言调用CPU提供的原子操作指令来增加或减少),而其它情况下,则需要通过代价较大的系统调用来对位于内核空间的等待队列进行操作(如唤醒等待的进程/线程,或 将当前进程/线程放入等待队列). 除了多个线程同时竞争锁的少数情况外,基于 futex 的 lock 操作是不需要进行代价昂贵的系统调用操作的.
.
这种机制的核心思想是通过将大多数情况下非同时竞争 lock 的操作放到在用户空间来执行,而不是代价昂贵的内核系统调用方式来执行,从而提高了效率.

Pthreads提供的Mutex锁操作相关的API主要有:
1、 pthread_mutex_lock (pthread_mutex_t *mutex);
2、 pthread_mutex_trylock (pthread_mutex_t *mutex);
3、 pthread_mutex_unlock (pthread_mutex_t *mutex);

因为源代码比较长,这里不做摘录,大家可以参考:
glibc-2.12.2/nptl/pthread_mutex_lock.c

2 Pthread spinlock

spinlock,也称自旋锁,是属于busy-waiting类型的锁.在多处理器环境中, 自旋锁最多只能被一个可执行线程持有。如果一个可执行线程试图获得一个被争用(已经被持有的)自旋锁,那么该线程就会一直进行忙等待,自旋,也就是空转,等待锁重新可用。如果锁未被争用,请求锁的执行线程便立刻得到它,继续执行。

一个被争用的自旋锁使得请求它的线程在等待锁重新可用时自旋,特别的浪费CPU时间,所以自旋锁不应该被长时间的持有。实际上,这就是自旋锁的设计初衷,在短时间内进行轻量级加锁。

Kernel中的自旋锁不能够在能够导致睡眠的环境中使用。举个例子,一个线程A获得了自旋锁L;这个时候,发生了中断,在对应的中断处理函数B中,也尝试获得自旋锁L,就会中断处理程序进行自旋。但是原先锁的持有者只有在中断处理程序结束后,采用机会释放自旋锁,从而导致死锁。
由于涉及到多个处理器环境下,spin lock的效率非常重要。因为在等待spin lock的过程,处理器只是不停的循环检查,并不执行其他指令。但即使这样, 一般来说,spinlock的开销还是比进程调度(context switch)少得多。这就是spin lock 被广泛应用在多处理器环境的原因

Pthreads提供的与Spin Lock锁操作相关的API主要有:
pthread_spin_lock (pthread_spinlock_t *lock);
pthread_spin_trylock (pthread_spinlock_t *lock);
pthread_spin_unlock (pthread_spinlock_t *lock);

下面,来看一下spinlock在pthread中的实现:

1) spin lock的数据结构

glibc-2.12.2\nptl\sysdeps\unix\sysv\linux\i386\bits\pthreadtypes.h

1
typedef volatile int pthread_spinlock_t;

2) pthread_spin_lock

glibc-2.12.2\nptl\sysdeps\i386\pthread_spin_lock.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#ifndef LOCK_PREFIX
# ifdef UP
#  define LOCK_PREFIX    /* nothing */
# else
#  define LOCK_PREFIX    "lock;"
# endif
#endif
 
int
pthread_spin_lock (lock)
     pthread_spinlock_t *lock;
{
  asm ("\n"
       "1:\t" LOCK_PREFIX "decl %0\n\t"
       "jne 2f\n\t"
       ".subsection 1\n\t"
       ".align 16\n"
       "2:\trep; nop\n\t"
       "cmpl $0, %0\n\t"
       "jg 1b\n\t"
       "jmp 2b\n\t"
       ".previous"
       : "=m" (*lock)
       : "m" (*lock));
 
  return 0;
}

a、 LOCK_PREFIX: 是为了在SMP下锁总线,保证接下来一条指令的原子性。

b、 %0: 这里是*lock的值,先将lock的值减一,如果ZF=0(lock值不为0),跳到下面的2标签处继续执行;否则执行结束(lock值为0)。
c、 jne: Jump near if not equal (ZF=0). Not supported in 64-bit mode.

下面继续看2标签处的代码:
d、 rep; nop: 为实际上为多个nop指令,实际上这条指令可以降低CPU的运行频率,减低电的消耗量,但最重要的是,提高了整体的效率。因为这段指令执行太快的话,会生成很多读取内存变量的指令,另外的一个CPU可能也要写这个内存变量,现在的CPU经常需要重新排序指令来提高效率,如果读指令太多的话,为了保证指令之间的依赖性,CPU会以牺牲流水线执行(pipeline)所带来的好处。从pentium 4以后,intel引进了一条pause指令,专门用于spin lock这种情况,据intel的文档说,加上pause可以提高25倍的效率!。
e、 cmpl $0, %0 :比较lock与0的大小,当发现Lock大于0的时候,跳回到1标签,尝试重新获得锁;否则,跳回到标签2继续进行循环。

f、 标签1处的代码,在尝试获得锁的时候,直接将lock值减1,如果获得锁操作失败的时候,实际上lock值已经被减了1。这样会不会有问题呢?实际上,这个问题不用担心,因为在释放锁的时候,lock的值还会被重新设置为1。

.subsection和.previous之间的这段代码用来检测spin lock何时被释放. 这段代码与其它的常用指令代码并不是放在同一个代码段中的,因为大部分情况下,lock都会成功返回,将这段lock失败后的操作代码与其它的代码分开,会提高高速缓存的效率(有限的高速缓存可以放置更多的数据)。

3) pthread_spin_unlock

glibc-2.12.2\nptl\sysdeps\i386\pthread_spin_unlock.S

1
2
3
4
5
6
7
8
9
10
11
12
13
.globl    pthread_spin_unlock
    .type    pthread_spin_unlock,@function
    .align    16
pthread_spin_unlock:
    movl    4(%esp), %eax
    movl    $1, (%eax)
    xorl    %eax, %eax
    ret
    .size    pthread_spin_unlock,.-pthread_spin_unlock
 
    /* The implementation of pthread_spin_init is identical.  */
    .globl    pthread_spin_init
pthread_spin_init = pthread_spin_unlock

pthread_spin_unlock()就简单很多了,只是简单的将lock值设置为1,并返回0

3 性能测试对比

测试环境

Memory: 16G
Cpu: 8 core

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
processor       : 7
vendor_id       : GenuineIntel
cpu family      : 6
model           : 23
model name      : Intel(R) Xeon(R) CPU           E5410  @ 2.33GHz
stepping        : 6
cpu MHz         : 2327.529
cache size      : 6144 KB
physical id     : 1
siblings        : 4
core id         : 7
cpu cores       : 4
fpu             : yes
fpu_exception   : yes
cpuid level     : 10
wp              : yes
flags           : fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm syscall nx lm pni monitor ds_cpl est tm2 cx16 xtpr lahf_lm
bogomips        : 4655.07
clflush size    : 64
cache_alignment : 64
address sizes   : 38 bits physical, 48 bits virtual
power management:

 OS: Rhel 5U4 , Linux Kernel : 2.6.18-164.el5

 测试方法:

在 case1~case3 中,通过逐渐增加临界区(Critical Section)的长度来比较在此情况下 pthread spinlock 与 pthread mutex 的性能.
Case4: 是在 case1 的基础上,给各个线程增加非临界区的工作,以减少冲突. 在此情况下 pthread spinlock 与 pthread mutex 的性能.

在每个 case 中,从线程数依次从 1个线程增加到 15个线程,
并重复执行 10 次以保证测试结果不受意外情况的影响.

3.1 Case 1:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include "TimeHelper.h"
 
#define MAX_ARRAY_NUM 10000000
 
using namespace std;
 
union AlignInt32
{
    uint32_t _member;
    char _align[64]; // for false sharing for multi-core
 
};
 
struct StatItem
{
    uint32_t    _times;
    uint32_t    _id;
};
union AlignStat
{
    StatItem _item;
    char _align[64]; // for false sharing for multi-core
};
 
AlignInt32 g_Array[10000000];
volatile uint32_t g_Index=0;
 
#ifdef USE_SPINLOCK
pthread_spinlock_t spinlock;
#else
pthread_mutex_t mutex;
#endif
 
pid_t gettid() { return syscall( __NR_gettid ); }
 
void *consumer(void *arg)
{
    AlignStat* pItem=(AlignStat*)arg;
    while (1)
    {
#ifdef USE_SPINLOCK
        pthread_spin_lock(&spinlock);
#else
        pthread_mutex_lock(&mutex);
#endif
 
        if (g_Index>=MAX_ARRAY_NUM)
        {
#ifdef USE_SPINLOCK
            pthread_spin_unlock(&spinlock);
#else
            pthread_mutex_unlock(&mutex);
#endif
            break;
        }
        ++(pItem->_item._times);
        g_Array[g_Index]._member=g_Index;
        ++g_Index;
 
#ifdef USE_SPINLOCK
        pthread_spin_unlock(&spinlock);
#else
        pthread_mutex_unlock(&mutex);
#endif
 
    }
 
    return NULL;
}
 
int main(int argc, char *argv[])
{
 
    uint64_t t1,t2;
    uint64_t nTimeSum=0;
    uint32_t nThreadNum=0;
 
#ifdef USE_SPINLOCK
    pthread_spin_init(&spinlock, 0);
    fprintf(stderr,"case for spinlock: ");
#else
    pthread_mutex_init(&mutex, NULL);
    fprintf(stderr,"case for mutex: ");
#endif
 
    int32_t nCpuNum = (int)sysconf( _SC_NPROCESSORS_ONLN )*2;
    fprintf(stderr,"cpu_num=%d\n",nCpuNum/2);
    for(int32_t j=1; j< nCpuNum; j++)
    {
        nTimeSum=0;
        nThreadNum=j;
        AlignStat *pStatArray= new AlignStat[nThreadNum];
        memset(pStatArray,0x0,nThreadNum*sizeof(AlignStat));
 
        for(uint32_t nLoop=10; nLoop> 0 ; nLoop--)
        {
            g_Index=0;
            pthread_t * pThreadArray=new pthread_t[nThreadNum];
            // Measuring time before starting the threads...
            t1=TimeHelper::nowTime();
            for(uint32_t i=0; i
            {
                pStatArray[i]._item._id=i;
                if ( pthread_create(&pThreadArray[i], NULL, consumer, (void *)(&pStatArray[i]) ))
                {
                    perror( "error: pthread_create" );
                    nThreadNum = i;
                    break;
                }
            }
            for(uint32_t i=0; i
            {
                pthread_join(pThreadArray[i], NULL);
            }
            // Measuring time after threads finished...
            t2=TimeHelper::nowTime();
            nTimeSum+=t2-t1;
            delete [] pThreadArray;
        }
 
        fprintf(stderr,"RepeatTimes=%d, ThreadNum=%d, UsedTime=%.6lf s\n",10, nThreadNum,(double(nTimeSum))/1000000);
        for(uint32_t i=0; i
        {
            fprintf(stderr,"thread_id=%u\t times=%u\n",pStatArray[i]._item._id,pStatArray[i]._item._times);
        }
        delete [] pStatArray;
      }
 
#ifdef USE_SPINLOCK
    pthread_spin_destroy(&spinlock);
#else
    pthread_mutex_destroy(&mutex);
#endif
 
    return 0;
}

3.2 Case2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
void *consumer(void *arg)
{
    AlignStat* pItem=(AlignStat*)arg;
    while (1)
    {
#ifdef USE_SPINLOCK
        pthread_spin_lock(&spinlock);
#else
        pthread_mutex_lock(&mutex);
#endif
 
        if (g_Index>=MAX_ARRAY_NUM)
        {
#ifdef USE_SPINLOCK
            pthread_spin_unlock(&spinlock);
#else
            pthread_mutex_unlock(&mutex);
#endif
            break;
        }
        ++(pItem->_item._times);
        g_Array[g_Index]._member=g_Index;
        ++g_Index;
        // add critical section's length
        list tmpList;
        for(uint32_t i=0; i< 3; i++)
        {
            tmpList.push_back(i);
        }
 
#ifdef USE_SPINLOCK
        pthread_spin_unlock(&spinlock);
#else
        pthread_mutex_unlock(&mutex);
#endif
 
    }
 
    return NULL;
}

3.3 Case3

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
void *consumer(void *arg)
{
    AlignStat* pItem=(AlignStat*)arg;
    while (1)
    {
#ifdef USE_SPINLOCK
        pthread_spin_lock(&spinlock);
#else
        pthread_mutex_lock(&mutex);
#endif
 
        if (g_Index>=MAX_ARRAY_NUM)
        {
#ifdef USE_SPINLOCK
            pthread_spin_unlock(&spinlock);
#else
            pthread_mutex_unlock(&mutex);
#endif
            break;
        }
        ++(pItem->_item._times);
        g_Array[g_Index]._member=g_Index;
        ++g_Index;
        // add critical section's length
        list tmpList;
        for(uint32_t i=0; i< 6; i++)
        {
            tmpList.push_back(i);
        }
 
#ifdef USE_SPINLOCK
        pthread_spin_unlock(&spinlock);
#else
        pthread_mutex_unlock(&mutex);
#endif
 
    }
 
    return NULL;
}

3.4 Case4

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17