分类:
2008-12-09 18:59:36
11.6 thread
synchronization
Thread间的race问题产生的原因如下:
(一) .
architecture 本身的原因
1.有些architecture的写操作需要多个memory cycle,导致,当还没有执行完所有的memory cycle之前,别的thread就执行了read操作,读取的是不一致的数据
2. 如果两个thread都修改一个变量,修改操作本身不是原子操作
修改操作分成了读取到register, 修改register,
将register写回memory三个阶段,那么这三个阶段相互穿插,就也会造成race condition。
(二) 程序本身软件原因
读取一个变量后,根据该值执行相应操作,不是原子的,导致在读取变量之后,执行操作之前,有可能别的thread会作出一定的动作。造成race condition。
同步的机制:
1.Multex
避免死锁:
(1)为了保证不造成dead lock,当使用多个multex的时候, 我们要保证我们的程序任何加锁的地方的加锁顺序要一致。即如果有2个multex, 那么任何thread都要先给A枷锁,然后再给b枷锁,一旦有thread把顺序搞反了,那么很容易就死锁了。
(2)使用trylock,一般应用于比较复杂的场合:如果你已经获得了一些锁,那么你可以使用trylock测试是否可以加新锁,如果成功了,那么你就加上了新锁,,否则, 就将以前获得的锁释放掉。
比如: 有2个资源,r1, r2,r1对应mutex1, r2对应mutex2,
有4个thread:
ABCD,其中AB通过mutex访问r1,
CD也通过mutex1访问r1,
同时也通过mutex2访问r2,并且要求CD访问r1的时候必须同时访问r2。此外还有一个thread E,它只访问r2. 我们要保证CD的上锁顺序保持一致,即先锁mutex2,再锁mutex1。当如果A或者B使用r1的时候,如果C也想使用r1, c可以先给mutex2加锁,然后尝试给mutex1加锁。如果失败了,那么C应该将已经获得的mutex2给释放掉,因为有可能thread E向访问r2,你用不了,总不能一直占着吧。这就是试探性加锁的好处,如果你不是采用试探性加锁,那么此时,C就会带着mutexa2锁一直等待mutex1锁,而E就一直得不到mutex2锁。
结果:
AB:只给mutex1加锁
CD: 按顺序给mutex2,
mutex1加锁,其中,给mutex1是试探性加锁
E: 只给mutex2加锁
试探性加锁比普通加锁要好。
#include int pthread_mutex_init(pthread_mutex_t *restrict mutex,
const pthread_mutexattr_t *restrict attr); int pthread_mutex_destroy(pthread_mutex_t *mutex); |
Both return: 0 if OK, error number on failure |
#include int pthread_mutex_lock(pthread_mutex_t *mutex); int pthread_mutex_trylock(pthread_mutex_t *mutex); int pthread_mutex_unlock(pthread_mutex_t *mutex); |
All return: 0 if OK, error number on failure |
2. readerwriter locks
Readerwriter locks也称作shared exclusive locks, 他只对于读者很多,很频繁,但是写者比较少,并且频率较低的场合,这样性能才高。此外,如果读者不断,可能造成写者被一直堵塞,造成饥饿,因此,一旦有一个写者因为得不到锁而赌赛,那么后来的读者就不再允许他们再次获得读锁,直到当前堵塞的写者获得了锁,并完成了任务后,才放开读者的加锁操作。
#include int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock,
const pthread_rwlockattr_t *restrict attr); int pthread_rwlock_destroy(pthread_rwlock_t *rwlock); |
Both return: 0 if OK, error number on failure |
#include int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock); int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock); int pthread_rwlock_unlock(pthread_rwlock_t *rwlock); |
All return: 0 if OK, error number on failure |
3. condition variable
Condition variable 总是和一个mutex锁一起使用。这个mutex锁是为了保护condition. 你持有该锁,你就敢修改conditon的状态,否则你就不能修改condition。使用者要想在一个condition variable上等待,首先要获得锁。然后调用pthread_cond_wait操作。该操作会判断conditon variable的值,如果没有被设置,那么就会等待,等待之前,将mutex会释放掉。如果等待到了,那么就会返回,返回之前,又会获取对该mutex的锁。一般情况下,调用signal或者brocast的thread可以拥有或不拥有该锁。当然,拥有肯定是安全的。
#include int pthread_cond_init(pthread_cond_t *restrict cond, pthread_condattr_t *restrict attr); int pthread_cond_destroy(pthread_cond_t *cond); |
Both return: 0 if OK, error number on failure |
#include int pthread_cond_wait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex); int pthread_cond_timedwait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, const struct
timespec *restrict timeout); |
Both return: 0 if OK, error number on failure |
#include int pthread_cond_signal(pthread_cond_t *cond); int pthread_cond_broadcast(pthread_cond_t *cond); |
Both return: 0 if OK, error number on failure |
说什么都是没用的下面是我用这几个函数写的4例子:分别是:
1.使用1个mutex的1个producer和2个customer的同步
2. 使用readerwriter
lock的1个producer和2个customer的同步
3.使用condition
variable的1个producer和1个reader的同步
4.使用condition
variable的1个producer和2个reader的同步
代码1:使用1个mutex的1个producer和2个customer的同步
/*1 producer 2 customer, using only one mutex, */
#include
#include
#include
#include
#include
#define WORK_CNT 10
int haha1 = 100;
int haha2 = 200;
bool quit = false;
bool producerQuited = false;
char queue[WORK_CNT] = {0};
int work_cnt = 0;
pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;
void produceWork( char cont )
{
//add cont to the queue
pthread_mutex_lock( &m );
if( work_cnt
{
queue[work_cnt] = cont;
work_cnt ++;
printf("producer
added \'%c\' to work queue\n", cont );
}
else
{
// puts("producer found no space, no work is pushed");
}
pthread_mutex_unlock( &m
);
}
char customWork()
{
//custom the work
char c ;
pthread_mutex_lock(&m);
if( work_cnt>0 )
{
c = queue[work_cnt-1];
work_cnt --;
printf("customer
%d eat \'%c\' from queue\n",(unsigned long)pthread_self(), c);
}
else
{
// puts("customer
found no work in the queue");
c = -1;
}
pthread_mutex_unlock(&m);
return c;
}
void* producer( void* para )
{
printf("producer %d
started...", (unsigned long)pthread_self() );
//producer put work to queue
while( !quit )
{
char c = (char) (rand()
% 26)+'a';
produceWork(c);
}
puts("producer
end");
producerQuited = true;
return (void*)0;
}
void* customer( void * para )
{
printf("customer %d
runs\n", (unsigned long )pthread_self() );
//fetch work and process
while(!quit || !producerQuited
)
{
customWork();
}
//producer quited, we customed
all works
while( customWork()>0 )
customWork();
printf("customer %d
end\n", (unsigned long)pthread_self()
);
return ((void*)0);
}
int main()
{
pthread_t p, c1,c2;
void *pstatus, *status1,
*status2;
int err = pthread_create(
&p, NULL, producer, NULL );
if( err != 0 )
puts("producer
create error");
err = pthread_create( &c1,
NULL, customer, NULL );
if( err != 0 )
puts( "customer 1
create error" );
err = pthread_create( &c2,
NULL, customer, NULL );
if( err != 0 )
puts( "customer 2
create error" );
sleep( 1 );
quit = true;
if( pthread_join( p,
&pstatus ) != 0 )
puts("join
producer failed");
if( pthread_join( c1,
&status1 ) != 0 )
puts( "join
customer 1 failed");
if( pthread_join( c2,
&status2 ) != 0 )
puts( "join
customer 2 failed");
printf("customer 1
returns: %d\n", ((int*)status1));
printf("customer 2
returns: %d\n", ((int*)status2));
printf("producer returns:
%d\n", ((int*)pstatus));
pthread_mutex_destroy(&m);
puts( "all sub threads
exited" );
}
代码2. 使用readerwriter lock的1个producer和2个customer的同步
/*1 producer 2 customer, using readerwriter lock, */
#include
#include
#include
#include
#include
#define WORK_CNT 10
int haha1 = 100;
int haha2 = 200;
bool quit = false;
bool producerQuited = false;
char queue[WORK_CNT] = {0};
int work_cnt = 0;
//pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;
pthread_rwlock_t m = PTHREAD_RWLOCK_INITIALIZER;
void produceWork( char cont )
{
//add cont to the queue
pthread_rwlock_wrlock( &m
);
if( work_cnt >= WORK_CNT )
work_cnt = 0;
queue[work_cnt] = cont;
work_cnt ++;
printf("producer added
\'%c\' to work queue\n", cont );
pthread_rwlock_unlock( &m
);
}
char readerWork()
{
//custom the work
char c ;
pthread_rwlock_rdlock(&m);
printf("reader %d :
", (unsigned long)pthread_self() );
for( int i=0; i
printf(" %c
", queue[i] );
puts("");
pthread_rwlock_unlock(&m);
return c;
}
void* producer( void* para )
{
printf("producer %d
started...\n", (unsigned long)pthread_self() );
//producer put work to queue
while( !quit )
{
char c = (char) (rand()
% 26)+'a';
produceWork(c);
sleep(1);
}
puts("producer
end");
producerQuited = true;
return (void*)0;
}
void* reader( void * para )
{
printf("reader %d
runs\n", (unsigned long )pthread_self() );
//fetch work and process
while(!quit || !producerQuited
)
{
readerWork();
sleep(1);
}
//producer quited, we customed
all works
while( readerWork()>0 )
readerWork();
printf("reader %d
end\n", (unsigned long)pthread_self()
);
return ((void*)0);
}
int main()
{
pthread_t p, c1,c2;
void *pstatus, *status1,
*status2;
int err = pthread_create(
&p, NULL, producer, NULL );
if( err != 0 )
puts("producer
create error");
err = pthread_create( &c1,
NULL, reader, NULL );
if( err != 0 )
puts( "reader 1
create error" );
err = pthread_create( &c2,
NULL, reader, NULL );
if( err != 0 )
puts( "reader 2
create error" );
sleep( 10 );
quit = true;
if( pthread_join( p,
&pstatus ) != 0 )
puts("join
producer failed");
if( pthread_join( c1,
&status1 ) != 0 )
puts( "join reader
1 failed");
if( pthread_join( c2,
&status2 ) != 0 )
puts( "join reader
2 failed");
printf("reader 1 returns:
%d\n", ((int*)status1));
printf("reader 2 returns:
%d\n", ((int*)status2));
printf("producer returns:
%d\n", ((int*)pstatus));
pthread_rwlock_destroy(&m);
puts( "all sub threads
exited" );
}
代码3.使用condition variable的1个producer和1个reader的同步
/*1 producer 1 customer, using condition
variable , */
#include
#include
#include
#include
#include
#define WORK_CNT 10
int haha1 = 100;
int haha2 = 200;
bool quit = false;
bool producerQuited = false;
char queue[WORK_CNT] = {0};
int work_cnt = 0;
pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t canp = PTHREAD_COND_INITIALIZER;
pthread_cond_t canr = PTHREAD_COND_INITIALIZER;
void produceWork( char cont )
{
//add
cont to the queue
pthread_mutex_lock(
&m );
//if
the queue is full, we need to wait
if(
work_cnt >= WORK_CNT )
pthread_cond_wait(&canp,
&m);
//after
the wakeup, the work_cnt should be rechecked, it may changed
if(
work_cnt < WORK_CNT )
{
//now
we have the lock and we can produce
queue[work_cnt]
= cont;
work_cnt
++;
printf("producer
added \'%c\' to work queue\n", cont );
}
pthread_mutex_unlock(
&m );
//we
need to notity the reader
pthread_cond_signal(
&canr );
}
char readerWork()
{
//custom
the work
char
c ;
pthread_mutex_lock(&m);
//the
queue is empty, we need to wait
//this
may cause wait forever when the producer stopped to fill in
//any
content into the queue and before seting the producerQuited to true
//and
work_cnt is 0 by chance. as producer will never produce new content ,
//so
producer may not signal the canr condition variable, so the reader may
//wait
forever. in order to avoid this, we added a pthread_cond_signal(&canr)
//after
the producerQuited being set to true to notify possible blocked readers
if(
work_cnt<=0 && !producerQuited )
pthread_cond_wait(
&canr, &m );
//now
we can read, and we have got the mutex lock
if(
work_cnt>0 )
{
printf("reader
%d : get %c \n", (unsigned long)pthread_self(), queue[work_cnt-1] );
c
= queue[work_cnt-1];
work_cnt--;
}
else
c
= -1;
pthread_mutex_unlock(&m);
pthread_cond_signal(&canp);
return
c;
}
void* producer( void* para )
{
printf("producer
%d started...\n", (unsigned long)pthread_self() );
//producer
put work to queue
while(
!quit )
{
char
c = (char) (rand() % 26)+'a';
produceWork(c);
sleep(1);
}
puts("producer
end");
producerQuited
= true;
//some
times. after we quited form the while loop, and before we set produceQuited to
true
//the
reader may call pthread_cond_wait(&canr) , and the producer will never
produce any contents
//that
will lead to the read to block for ever. so after we set producerQuited to
true, we may
//make
another signal to the reader. or we can use pthread_cond_timedwait to avoid
wait forever
pthread_cond_signal(&canr);
return
(void*)0;
}
void* reader( void * para )
{
printf("reader
%d runs\n", (unsigned long )pthread_self() );
//fetch
work and process
while(!quit
|| !producerQuited )
{
readerWork();
sleep(1);
}
//producer
quited, we customed all works
while(
readerWork()>0 )
readerWork();
printf("reader
%d end\n", (unsigned long)pthread_self()
);
return
((void*)0);
}
int main()
{
/*
struct sigaction act, oldact;
act.sa_handler
= handler;
sigemptyset(
&act.sa_mask );
act.sa_flags
= 0;
if(
sigaction( SIGUSR1, &act, &oldact )<0 )
{
puts("set
handler error");
exit(1);
}
*/
pthread_t
p, c1,c2;
void
*pstatus, *status1, *status2;
int
err = pthread_create( &p, NULL, producer, NULL );
if(
err != 0 )
puts("producer
create error");
err
= pthread_create( &c1, NULL, reader, NULL );
if(
err != 0 )
puts(
"reader 1 create error" );
// err
= pthread_create( &c2, NULL, reader, NULL );
// if(
err != 0 )
// puts(
"reader 2 create error" );
sleep(
10 );
quit
= true;
if(
pthread_join( p, &pstatus ) != 0 )
puts("join
producer failed");
if(
pthread_join( c1, &status1 ) != 0 )
puts(
"join reader 1 failed");
// if(
pthread_join( c2, &status2 ) != 0 )
// puts(
"join reader 2 failed");
printf("reader
1 returns: %d\n", ((int*)status1));
// printf("reader
2 returns: %d\n", ((int*)status2));
printf("producer
returns: %d\n", ((int*)pstatus));
pthread_mutex_destroy(&m);
pthread_cond_destroy(&canr);
pthread_cond_destroy(&canp);
puts(
"all sub threads exited" );
}
代码4.使用condition variable的1个producer和2个reader的同步
/*1 producer 2 customer, using condition variable , */
#include
#include
#include
#include
#include
#define WORK_CNT 10
int haha1 = 100;
int haha2 = 200;
bool quit = false;
bool producerQuited = false;
char queue[WORK_CNT] = {0};
int work_cnt = 0;
pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t canp = PTHREAD_COND_INITIALIZER;
pthread_cond_t canr = PTHREAD_COND_INITIALIZER;
void produceWork( char cont )
{
//add cont to the queue
pthread_mutex_lock( &m );
//if the queue is full, we
need to wait
if( work_cnt >= WORK_CNT )
pthread_cond_wait(&canp,
&m);
//after the wakeup, the
work_cnt should be rechecked, it may changed
if( work_cnt < WORK_CNT )
{
//now we have the lock
and we can produce
queue[work_cnt] = cont;
work_cnt ++;
printf("producer
added \'%c\' to work queue\n", cont );
}
pthread_mutex_unlock( &m
);
//we need to notity the reader
pthread_cond_broadcast(
&canr );
}
char readerWork()
{
//custom the work
char c ;
pthread_mutex_lock(&m);
//the queue is empty, we need
to wait
//this may cause wait forever
when the producer stopped to fill in
//any content into the queue
and before seting the producerQuited to true
//and work_cnt is 0 by chance.
as producer will never produce new content ,
//so producer may not signal
the canr condition variable, so the reader may
//wait forever. in order to
avoid this, we added a pthread_cond_signal(&canr)
//after the producerQuited
being set to true to notify possible blocked readers
if( work_cnt<=0 &&
!producerQuited )
pthread_cond_wait(
&canr, &m );
//now we can read, and we have
got the mutex lock
if( work_cnt>0 )
{
printf("reader %d
: get %c \n", (unsigned long)pthread_self(), queue[work_cnt-1] );
c = queue[work_cnt-1];
work_cnt--;
}
else
c = -1;
pthread_mutex_unlock(&m);
pthread_cond_broadcast(&canp);
return c;
}
void* producer( void* para )
{
printf("producer %d
started...\n", (unsigned long)pthread_self() );
//producer put work to queue
while( !quit )
{
char c = (char) (rand()
% 26)+'a';
produceWork(c);
}
puts("producer
end");
producerQuited = true;
//some times. after we quited
form the while loop, and before we set produceQuited to true
//the reader may call
pthread_cond_wait(&canr) , and the producer will never produce any contents
//that will lead to the read
to block for ever. so after we set producerQuited to true, we may
//make another signal to the
reader. or we can use pthread_cond_timedwait to avoid wait forever
pthread_cond_signal(&canr);
return (void*)0;
}
void* reader( void * para )
{
printf("reader %d
runs\n", (unsigned long )pthread_self() );
//fetch work and process
while(!quit || !producerQuited
)
{
readerWork();
}
//producer quited, we customed
all works
while( readerWork()>0 )
readerWork();
printf("reader %d
end\n", (unsigned long)pthread_self()
);
return ((void*)0);
}
int main()
{
/*
struct sigaction act, oldact;
act.sa_handler = handler;
sigemptyset( &act.sa_mask
);
act.sa_flags = 0;
if( sigaction( SIGUSR1,
&act, &oldact )<0 )
{
puts("set handler
error");
exit(1);
}
*/
pthread_t p, c1,c2;
void *pstatus, *status1,
*status2;
int err = pthread_create(
&p, NULL, producer, NULL );
if( err != 0 )
puts("producer
create error");
err = pthread_create( &c1,
NULL, reader, NULL );
if( err != 0 )
puts( "reader 1
create error" );
err = pthread_create( &c2,
NULL, reader, NULL );
if( err != 0 )
puts( "reader 2
create error" );
sleep( 1 );
quit = true;
if( pthread_join( p,
&pstatus ) != 0 )
puts("join
producer failed");
if( pthread_join( c1,
&status1 ) != 0 )
puts( "join reader
1 failed");
if( pthread_join( c2,
&status2 ) != 0 )
puts( "join reader
2 failed");
printf("reader 1 returns:
%d\n", ((int*)status1));
printf("reader 2 returns:
%d\n", ((int*)status2));
printf("producer returns:
%d\n", ((int*)pstatus));
pthread_mutex_destroy(&m);
pthread_cond_destroy(&canr);
pthread_cond_destroy(&canp);
puts( "all sub threads
exited" );
}