Chinaunix首页 | 论坛 | 博客
  • 博客访问: 988558
  • 博文数量: 200
  • 博客积分: 5011
  • 博客等级: 大校
  • 技术积分: 2479
  • 用 户 组: 普通用户
  • 注册时间: 2008-06-27 15:07
文章分类

全部博文(200)

文章存档

2009年(12)

2008年(190)

我的朋友

分类:

2008-12-09 18:59:36

11.6 thread synchronization

Thread间的race问题产生的原因如下:

() . architecture 本身的原因

1.有些architecture的写操作需要多个memory cycle,导致,当还没有执行完所有的memory cycle之前,别的thread就执行了read操作,读取的是不一致的数据

2. 如果两个thread都修改一个变量,修改操作本身不是原子操作

修改操作分成了读取到register, 修改register, register写回memory三个阶段,那么这三个阶段相互穿插,就也会造成race condition

() 程序本身软件原因

读取一个变量后,根据该值执行相应操作,不是原子的,导致在读取变量之后,执行操作之前,有可能别的thread会作出一定的动作。造成race condition

 

同步的机制:

1Multex

避免死锁:

1)为了保证不造成dead lock,当使用多个multex的时候, 我们要保证我们的程序任何加锁的地方的加锁顺序要一致。即如果有2multex, 那么任何thread都要先给A枷锁,然后再给b枷锁,一旦有thread把顺序搞反了,那么很容易就死锁了。

2)使用trylock,一般应用于比较复杂的场合:如果你已经获得了一些锁,那么你可以使用trylock测试是否可以加新锁,如果成功了,那么你就加上了新锁,,否则, 就将以前获得的锁释放掉。

比如: 2个资源,r1, r2r1对应mutex1, r2对应mutex2, 4thread: ABCD,其中AB通过mutex访问r1, CD也通过mutex1访问r1, 同时也通过mutex2访问r2,并且要求CD访问r1的时候必须同时访问r2。此外还有一个thread E,它只访问r2. 我们要保证CD的上锁顺序保持一致,即先锁mutex2,再锁mutex1。当如果A或者B使用r1的时候,如果C也想使用r1, c可以先给mutex2加锁,然后尝试给mutex1加锁。如果失败了,那么C应该将已经获得的mutex2给释放掉,因为有可能thread E向访问r2,你用不了,总不能一直占着吧。这就是试探性加锁的好处,如果你不是采用试探性加锁,那么此时,C就会带着mutexa2锁一直等待mutex1锁,而E就一直得不到mutex2锁。

结果:

AB:只给mutex1加锁

CD: 按顺序给mutex2, mutex1加锁,其中,给mutex1是试探性加锁

E: 只给mutex2加锁

试探性加锁比普通加锁要好。

#include

 

int pthread_mutex_init(pthread_mutex_t *restrict

 mutex,

                       const pthread_mutexattr_t

 *restrict attr);

 

int pthread_mutex_destroy(pthread_mutex_t *mutex);

 

Both return: 0 if OK, error number on failure

 

#include

 

int pthread_mutex_lock(pthread_mutex_t *mutex);

 

int pthread_mutex_trylock(pthread_mutex_t *mutex);

 

int pthread_mutex_unlock(pthread_mutex_t *mutex);

 

All return: 0 if OK, error number on failure

 

2. readerwriter locks

Readerwriter locks也称作shared exclusive locks, 他只对于读者很多,很频繁,但是写者比较少,并且频率较低的场合,这样性能才高。此外,如果读者不断,可能造成写者被一直堵塞,造成饥饿,因此,一旦有一个写者因为得不到锁而赌赛,那么后来的读者就不再允许他们再次获得读锁,直到当前堵塞的写者获得了锁,并完成了任务后,才放开读者的加锁操作。

#include

 

int pthread_rwlock_init(pthread_rwlock_t *restrict

 rwlock,

                        const pthread_rwlockattr_t

 *restrict attr);

 

int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);

 

Both return: 0 if OK, error number on failure

 

#include

 

int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);

 

int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);

 

int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);

 

All return: 0 if OK, error number on failure

 

 

3. condition variable

Condition  variable 总是和一个mutex锁一起使用。这个mutex锁是为了保护condition. 你持有该锁,你就敢修改conditon的状态,否则你就不能修改condition。使用者要想在一个condition variable上等待,首先要获得锁。然后调用pthread_cond_wait操作。该操作会判断conditon variable的值,如果没有被设置,那么就会等待,等待之前,将mutex会释放掉。如果等待到了,那么就会返回,返回之前,又会获取对该mutex的锁。一般情况下,调用signal或者brocastthread可以拥有或不拥有该锁。当然,拥有肯定是安全的。

#include

 

int pthread_cond_init(pthread_cond_t *restrict cond,

                      pthread_condattr_t *restrict

 attr);

 

int pthread_cond_destroy(pthread_cond_t *cond);

 

Both return: 0 if OK, error number on failure

 

#include

 

int pthread_cond_wait(pthread_cond_t *restrict cond,

                      pthread_mutex_t *restrict

 mutex);

 

int pthread_cond_timedwait(pthread_cond_t

 *restrict cond,

                           pthread_mutex_t

 *restrict mutex,

                           const struct timespec

 *restrict timeout);

 

Both return: 0 if OK, error number on failure

 

#include

 

int pthread_cond_signal(pthread_cond_t *cond);

 

int pthread_cond_broadcast(pthread_cond_t *cond);

 

Both return: 0 if OK, error number on failure

 

说什么都是没用的下面是我用这几个函数写的4例子:分别是:

1.使用1mutex1producer2customer的同步

2. 使用readerwriter lock1producer2customer的同步

3.使用condition variable1producer1reader的同步

4.使用condition variable1producer2reader的同步

 

代码1:使用1mutex1producer2customer的同步

/*1 producer 2 customer, using only one mutex, */

 

#include

#include

#include

#include

#include

 

#define WORK_CNT 10

 

int haha1 = 100;

int haha2 = 200;

bool quit = false;

bool producerQuited = false;

 

char queue[WORK_CNT] = {0};

int work_cnt = 0;

pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;

 

void produceWork( char cont )

{

       //add cont to the queue

       pthread_mutex_lock( &m );

       if( work_cnt

       {

             

              queue[work_cnt] = cont;

              work_cnt ++;

              printf("producer added \'%c\' to work queue\n", cont );

       }

       else

       {

       //     puts("producer found no space, no work is pushed");

       }

       pthread_mutex_unlock( &m );

}

 

char customWork()

{

       //custom the work

       char c ;

       pthread_mutex_lock(&m);

       if( work_cnt>0 )

       {

              c = queue[work_cnt-1];

              work_cnt --;

              printf("customer %d eat \'%c\' from queue\n",(unsigned long)pthread_self(),  c);

       }

       else

       {

//            puts("customer found no work in the queue");

              c = -1;

       }

       pthread_mutex_unlock(&m);

       return c;

}

void* producer( void* para )

{

       printf("producer %d started...", (unsigned long)pthread_self() );

 

       //producer put work to queue

       while( !quit )

       {

              char c = (char) (rand() % 26)+'a';

              produceWork(c);

       }

       puts("producer end");

       producerQuited = true;

       return (void*)0;

 

}

 

void* customer( void * para )

{

       printf("customer %d runs\n", (unsigned long )pthread_self() );

       //fetch work and process

       while(!quit || !producerQuited )

       {

              customWork();

       }

      

       //producer quited, we customed all works

       while( customWork()>0 )

              customWork();

 

       printf("customer %d end\n", (unsigned long)pthread_self()  );

       return ((void*)0);

}

 

 

 

int main()

{

       pthread_t p, c1,c2;

       void *pstatus, *status1, *status2;

 

       int err = pthread_create( &p, NULL, producer, NULL );

       if( err != 0 )

              puts("producer create error");

 

       err = pthread_create( &c1, NULL, customer, NULL );

       if( err != 0 )

              puts( "customer 1 create error" );

 

       err = pthread_create( &c2, NULL, customer, NULL );

       if( err != 0 )

              puts( "customer 2 create error" );

      

       sleep( 1 );

       quit = true;

       if( pthread_join( p, &pstatus ) != 0 )

              puts("join producer failed");

       if( pthread_join( c1, &status1 ) != 0 )

              puts( "join customer 1 failed");

       if( pthread_join( c2, &status2 ) != 0 )

              puts( "join customer 2 failed");

 

       printf("customer 1 returns: %d\n", ((int*)status1));

       printf("customer 2 returns: %d\n", ((int*)status2));

       printf("producer returns: %d\n", ((int*)pstatus));

      

       pthread_mutex_destroy(&m);

       puts( "all sub threads exited" );

}

 

代码2. 使用readerwriter lock1producer2customer的同步

/*1 producer 2 customer, using readerwriter lock, */

 

#include

#include

#include

#include

#include

 

#define WORK_CNT 10

 

int haha1 = 100;

int haha2 = 200;

bool quit = false;

bool producerQuited = false;

 

char queue[WORK_CNT] = {0};

int work_cnt = 0;

//pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;

pthread_rwlock_t m = PTHREAD_RWLOCK_INITIALIZER;

 

void produceWork( char cont )

{

       //add cont to the queue

       pthread_rwlock_wrlock( &m );

       if( work_cnt >= WORK_CNT )

              work_cnt = 0;

             

       queue[work_cnt] = cont;

       work_cnt ++;

       printf("producer added \'%c\' to work queue\n", cont );

       pthread_rwlock_unlock( &m );

}

 

char readerWork()

{

       //custom the work

       char c ;

       pthread_rwlock_rdlock(&m);

       printf("reader %d : ", (unsigned long)pthread_self() );

       for( int i=0; i

              printf(" %c ", queue[i] );

       puts("");

       pthread_rwlock_unlock(&m);

       return c;

}

void* producer( void* para )

{

       printf("producer %d started...\n", (unsigned long)pthread_self() );

 

       //producer put work to queue

       while( !quit )

       {

              char c = (char) (rand() % 26)+'a';

              produceWork(c);

              sleep(1);

       }

       puts("producer end");

       producerQuited = true;

       return (void*)0;

 

}

 

void* reader( void * para )

{

       printf("reader %d runs\n", (unsigned long )pthread_self() );

       //fetch work and process

       while(!quit || !producerQuited )

       {

              readerWork();

              sleep(1);

       }

      

       //producer quited, we customed all works

       while( readerWork()>0 )

              readerWork();

 

       printf("reader %d end\n", (unsigned long)pthread_self()  );

       return ((void*)0);

}

 

 

 

int main()

{

       pthread_t p, c1,c2;

       void *pstatus, *status1, *status2;

 

       int err = pthread_create( &p, NULL, producer, NULL );

       if( err != 0 )

              puts("producer create error");

 

       err = pthread_create( &c1, NULL, reader, NULL );

       if( err != 0 )

              puts( "reader 1 create error" );

 

       err = pthread_create( &c2, NULL, reader, NULL );

       if( err != 0 )

              puts( "reader 2 create error" );

      

       sleep( 10 );

       quit = true;

       if( pthread_join( p, &pstatus ) != 0 )

              puts("join producer failed");

       if( pthread_join( c1, &status1 ) != 0 )

              puts( "join reader 1 failed");

       if( pthread_join( c2, &status2 ) != 0 )

              puts( "join reader 2 failed");

 

       printf("reader 1 returns: %d\n", ((int*)status1));

       printf("reader 2 returns: %d\n", ((int*)status2));

       printf("producer returns: %d\n", ((int*)pstatus));

 

       pthread_rwlock_destroy(&m);

       puts( "all sub threads exited" );

}

代码3.使用condition variable1producer1reader的同步

/*1 producer 1 customer, using condition variable , */

 

#include

#include

#include

#include

#include

 

#define WORK_CNT 10

 

int haha1 = 100;

int haha2 = 200;

bool quit = false;

bool producerQuited = false;

 

char queue[WORK_CNT] = {0};

int work_cnt = 0;

pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;

pthread_cond_t canp = PTHREAD_COND_INITIALIZER;

pthread_cond_t canr = PTHREAD_COND_INITIALIZER;

 

void produceWork( char cont )

{

       //add cont to the queue

       pthread_mutex_lock( &m );

 

       //if the queue is full, we need to wait

       if( work_cnt >= WORK_CNT )

              pthread_cond_wait(&canp, &m);

      

       //after the wakeup, the work_cnt should be rechecked, it may changed

       if( work_cnt < WORK_CNT )

       {

              //now we have the lock and we can produce

              queue[work_cnt] = cont;

              work_cnt ++;

              printf("producer added \'%c\' to work queue\n", cont );

       }

       pthread_mutex_unlock( &m );

 

       //we need to notity the reader

       pthread_cond_signal( &canr );

}

 

char readerWork()

{

       //custom the work

       char c ;

       pthread_mutex_lock(&m);

 

       //the queue is empty, we need to wait

       //this may cause wait forever when the producer stopped to fill in

       //any content into the queue and before seting the producerQuited to true

       //and work_cnt is 0 by chance. as producer will never produce new content ,

       //so producer may not signal the canr condition variable, so the reader may

       //wait forever. in order to avoid this, we added a pthread_cond_signal(&canr)

       //after the producerQuited being set to true to notify possible blocked readers

       if( work_cnt<=0 && !producerQuited )

              pthread_cond_wait( &canr, &m );

      

       //now we can read, and we have got the mutex lock

       if( work_cnt>0 )

       {

              printf("reader %d : get %c \n", (unsigned long)pthread_self(), queue[work_cnt-1] );

              c = queue[work_cnt-1];

              work_cnt--;

       }

       else

              c = -1;

      

       pthread_mutex_unlock(&m);

       pthread_cond_signal(&canp);

      

       return c;

}

void* producer( void* para )

{

       printf("producer %d started...\n", (unsigned long)pthread_self() );

 

       //producer put work to queue

       while( !quit )

       {

              char c = (char) (rand() % 26)+'a';

              produceWork(c);

              sleep(1);

       }

       puts("producer end");

       producerQuited = true;

 

      

       //some times. after we quited form the while loop, and before we set produceQuited to true

       //the reader may call pthread_cond_wait(&canr) , and the producer will never produce any contents

       //that will lead to the read to block for ever. so after we set producerQuited to true, we may

       //make another signal to the reader. or we can use pthread_cond_timedwait to avoid wait forever

       pthread_cond_signal(&canr);

       return (void*)0;

 

}

 

void* reader( void * para )

{

       printf("reader %d runs\n", (unsigned long )pthread_self() );

       //fetch work and process

       while(!quit || !producerQuited )

       {

              readerWork();

              sleep(1);

       }

      

       //producer quited, we customed all works

       while( readerWork()>0 )

              readerWork();

 

       printf("reader %d end\n", (unsigned long)pthread_self()  );

       return ((void*)0);

}

 

 

 

int main()

{

       /*

       struct      sigaction act, oldact;

       act.sa_handler = handler;

       sigemptyset( &act.sa_mask );

       act.sa_flags = 0;

       if( sigaction( SIGUSR1, &act, &oldact )<0 )

       {

              puts("set handler error");

              exit(1);

       }

       */

       pthread_t p, c1,c2;

       void *pstatus, *status1, *status2;

 

       int err = pthread_create( &p, NULL, producer, NULL );

       if( err != 0 )

              puts("producer create error");

 

       err = pthread_create( &c1, NULL, reader, NULL );

       if( err != 0 )

              puts( "reader 1 create error" );

 

//     err = pthread_create( &c2, NULL, reader, NULL );

//     if( err != 0 )

//            puts( "reader 2 create error" );

      

       sleep( 10 );

       quit = true;

       if( pthread_join( p, &pstatus ) != 0 )

              puts("join producer failed");

       if( pthread_join( c1, &status1 ) != 0 )

              puts( "join reader 1 failed");

//     if( pthread_join( c2, &status2 ) != 0 )

//            puts( "join reader 2 failed");

 

       printf("reader 1 returns: %d\n", ((int*)status1));

//     printf("reader 2 returns: %d\n", ((int*)status2));

       printf("producer returns: %d\n", ((int*)pstatus));

 

       pthread_mutex_destroy(&m);

       pthread_cond_destroy(&canr);

       pthread_cond_destroy(&canp);

       puts( "all sub threads exited" );

}

 

代码4.使用condition variable1producer2reader的同步

/*1 producer 2 customer, using condition variable , */

 

#include

#include

#include

#include

#include

 

#define WORK_CNT 10

 

int haha1 = 100;

int haha2 = 200;

bool quit = false;

bool producerQuited = false;

 

char queue[WORK_CNT] = {0};

int work_cnt = 0;

pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;

pthread_cond_t canp = PTHREAD_COND_INITIALIZER;

pthread_cond_t canr = PTHREAD_COND_INITIALIZER;

 

void produceWork( char cont )

{

       //add cont to the queue

       pthread_mutex_lock( &m );

 

       //if the queue is full, we need to wait

       if( work_cnt >= WORK_CNT )

              pthread_cond_wait(&canp, &m);

      

       //after the wakeup, the work_cnt should be rechecked, it may changed

       if( work_cnt < WORK_CNT )

       {

              //now we have the lock and we can produce

              queue[work_cnt] = cont;

              work_cnt ++;

              printf("producer added \'%c\' to work queue\n", cont );

       }

       pthread_mutex_unlock( &m );

 

       //we need to notity the reader

       pthread_cond_broadcast( &canr );

}

 

char readerWork()

{

       //custom the work

       char c ;

       pthread_mutex_lock(&m);

 

       //the queue is empty, we need to wait

       //this may cause wait forever when the producer stopped to fill in

       //any content into the queue and before seting the producerQuited to true

       //and work_cnt is 0 by chance. as producer will never produce new content ,

       //so producer may not signal the canr condition variable, so the reader may

       //wait forever. in order to avoid this, we added a pthread_cond_signal(&canr)

       //after the producerQuited being set to true to notify possible blocked readers

       if( work_cnt<=0 && !producerQuited )

              pthread_cond_wait( &canr, &m );

      

       //now we can read, and we have got the mutex lock

       if( work_cnt>0 )

       {

              printf("reader %d : get %c \n", (unsigned long)pthread_self(), queue[work_cnt-1] );

              c = queue[work_cnt-1];

              work_cnt--;

       }

       else

              c = -1;

      

       pthread_mutex_unlock(&m);

       pthread_cond_broadcast(&canp);

      

       return c;

}

void* producer( void* para )

{

       printf("producer %d started...\n", (unsigned long)pthread_self() );

 

       //producer put work to queue

       while( !quit )

       {

              char c = (char) (rand() % 26)+'a';

              produceWork(c);

       }

       puts("producer end");

       producerQuited = true;

 

      

       //some times. after we quited form the while loop, and before we set produceQuited to true

       //the reader may call pthread_cond_wait(&canr) , and the producer will never produce any contents

       //that will lead to the read to block for ever. so after we set producerQuited to true, we may

       //make another signal to the reader. or we can use pthread_cond_timedwait to avoid wait forever

       pthread_cond_signal(&canr);

       return (void*)0;

 

}

 

void* reader( void * para )

{

       printf("reader %d runs\n", (unsigned long )pthread_self() );

       //fetch work and process

       while(!quit || !producerQuited )

       {

              readerWork();

       }

      

       //producer quited, we customed all works

       while( readerWork()>0 )

              readerWork();

 

       printf("reader %d end\n", (unsigned long)pthread_self()  );

       return ((void*)0);

}

 

 

 

int main()

{

       /*

       struct      sigaction act, oldact;

       act.sa_handler = handler;

       sigemptyset( &act.sa_mask );

       act.sa_flags = 0;

       if( sigaction( SIGUSR1, &act, &oldact )<0 )

       {

              puts("set handler error");

              exit(1);

       }

       */

       pthread_t p, c1,c2;

       void *pstatus, *status1, *status2;

 

       int err = pthread_create( &p, NULL, producer, NULL );

       if( err != 0 )

              puts("producer create error");

 

       err = pthread_create( &c1, NULL, reader, NULL );

       if( err != 0 )

              puts( "reader 1 create error" );

 

       err = pthread_create( &c2, NULL, reader, NULL );

       if( err != 0 )

              puts( "reader 2 create error" );

      

       sleep( 1 );

       quit = true;

       if( pthread_join( p, &pstatus ) != 0 )

              puts("join producer failed");

       if( pthread_join( c1, &status1 ) != 0 )

              puts( "join reader 1 failed");

       if( pthread_join( c2, &status2 ) != 0 )

              puts( "join reader 2 failed");

 

       printf("reader 1 returns: %d\n", ((int*)status1));

       printf("reader 2 returns: %d\n", ((int*)status2));

       printf("producer returns: %d\n", ((int*)pstatus));

 

       pthread_mutex_destroy(&m);

       pthread_cond_destroy(&canr);

       pthread_cond_destroy(&canp);

       puts( "all sub threads exited" );

}

阅读(1027) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~