Chinaunix首页 | 论坛 | 博客
  • 博客访问: 5493653
  • 博文数量: 922
  • 博客积分: 19333
  • 博客等级: 上将
  • 技术积分: 11226
  • 用 户 组: 普通用户
  • 注册时间: 2007-03-27 14:33
文章分类

全部博文(922)

文章存档

2023年(1)

2020年(2)

2019年(1)

2017年(1)

2016年(3)

2015年(10)

2014年(17)

2013年(49)

2012年(291)

2011年(266)

2010年(95)

2009年(54)

2008年(132)

分类: LINUX

2012-01-31 22:06:44

++++++APUE读书笔记-11线程(3)++++++


6、线程同步
================================================
 当多个线程共享同一片内存的时候,我们需要保证每个线程看到的数据是一致的。如果线程使用的变量没有被其他线程使用,那么不会存在一致性的问题。类似,如果一个变量是只读,那么多个线程同时访问也不会出现一致性的问题。然而当有一个线程可以修改这个变量,而这个变量同时也可以被其他的线程修改和读取的时候,我们需要在线程之间进行同步,来保证它们访问变量内存的内容的时候的数据是合法的。
 当一个线程修改变量的时候,别的读取这个变量的线程会潜在地遭遇不一致的情况。在修改操作占用多于一个存储周期的处理器架构上面,这个情况在两次写周期之间进行读内存的时候很容易发生。虽然这个取决于处理器架构,但是一个可移植的程序不能对使用的处理器的架构做任何的假设。
 文中先给出了一个简单的情况:
 线程A:读-写-写
 线程B:-----读----
 当B的读发生在A的两个写周期之间的时候,A,B就存在不一致性的问题了。
 图中先给了一个解决方案:规定在访问变量之前,先对变量进行加锁。这样当一进程持有锁的时候,其它申请锁将被阻塞。
 然后又给出了一些其它导致不一致的情况的例子,具体参见参考资料以及其中的图示。

 (1)mutex(互斥信号量)
 通过使用pthreads中的互斥信号量接口,我们可以保护我们的数据,保证同一个时间,只有一个线程访问我们的数据。实际,mutex就是我们访问共享资源设置的以及使用完共享资源时释放的锁。如果我们解锁mutex的时候有多余一个线程处于阻塞状态,那么所有在这个锁上面阻塞的线程都变成可执行,然后第一个运行的将会设置锁,其他的看到锁被设置了就继续返回阻塞等待锁的下一回释放了。这样,在一个时间里面,只有一个线程在执行。
 要想使用互斥机制,我们需要自己设计数据访问规则。操作系统不会将我们的数据访问串行化。如果我们的一个线程访问数据的时候没有获取锁那么即使其他的线程加锁,也会出现不一致的情况。
 mutex变量用数据类型pthread_mutex_t数据类型替代,在我们使用mutex变量之前,我们必须首先将它用常量PTHREAD_MUTEX_INITIALIZER初始化(只用于静态分配的mutex)或者用pthread_mutex_init初始化。如果我们动态分配mutex(例如通过malloc),我们需要在释放内存之前调用pthread_mutex_destroy。

 #include
 int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);
 int pthread_mutex_destroy(pthread_mutex_t *mutex);
 返回:如果成功,两者返回0;如果失败,返回错误码。
 我们可以把参数attr设置为NULL这样,就会使用默认的初始值。以后讨论非默认的mutex属性。

 下面的函数用来对mutex进行加锁或者解锁。
 #include
 int pthread_mutex_lock(pthread_mutex_t *mutex);
 int pthread_mutex_trylock(pthread_mutex_t *mutex);
 int pthread_mutex_unlock(pthread_mutex_t *mutex);
 如果一个线程无法接受被阻塞,那么可以使用pthread_mutex_trylock有条件地添加锁。这样,如果调用pthread_mutex_trylock的时候mutex没有被上锁,那么将会正常一样上锁并且返回0;如果之前mutex被上了锁,那么pthread_mutex_trylock将会失败并且立即返回EBUSY。

 举例:
 #include
 #include
 struct foo {
     int             f_count;
     pthread_mutex_t f_lock;
     /* ... more stuff here ... */
 };

 struct foo *foo_alloc(void) /* allocate the object */
 {
     struct foo *fp;

     if ((fp = malloc(sizeof(struct foo))) != NULL) {
         fp->f_count = 1;
         if (pthread_mutex_init(&fp->f_lock, NULL) != 0) {
             free(fp);
             return(NULL);
         }
         /* ... continue initialization ... */
     }
     return(fp);
 }

 void foo_hold(struct foo *fp) /* add a reference to the object */
 {
     pthread_mutex_lock(&fp->f_lock);
     fp->f_count++;
     pthread_mutex_unlock(&fp->f_lock);
 }

 void foo_rele(struct foo *fp) /* release a reference to the object */
 {
     pthread_mutex_lock(&fp->f_lock);
     if (--fp->f_count == 0) { /* last reference */
         pthread_mutex_unlock(&fp->f_lock);
         pthread_mutex_destroy(&fp->f_lock);
         free(fp);
     } else {
         pthread_mutex_unlock(&fp->f_lock);
     }
 }
 这个例子使用mutex来保护一个数据结构,当有多个线程访问一个动态分配的对象的时候,我们可以给这个对象内嵌一个引用计数保护对象不会在线程被访问的时候被释放。
 在增加,减少,以及检查引用计数是否为0的时候,我们都会锁住mutex来保护它,最开始foo_alloc初始化的时设置引用计数为1的时候,不用设置这个锁保护,因为此时只有分配空间的那个线程引用它。如果这时候我们把这个结构放到一个链表中,那么它可以被其他线程找到,我们需要先为它加锁。
 在使用这个对象之前,线程要增加这个结构对象的引用计数;用完之后要减少引用计数;当引用计数为0的时候,要释放结构对象的内存空间。

 (2)死锁的避免
 当线程将要尝试对同一个信号两次加锁的时候,它会产生死锁但是实际上,由于mutex而产生死锁这个现象发生的很不明显。例如:我们在程序中使用了一个以上的互斥信号量,如果第一个线程在持有第一个互斥信号量的时候再申请第二个互斥信号量,而第二个互斥信号量被第二个线程持有并且第二个线程想要加锁第一个互斥信号量;这样两个线程都无法继续了,它们都互相等待对方持有的资源,这时发生的现象就叫做死锁。
 死锁可以通过仔细控制信号量加锁的次序来避免。例如:假设你有两个互斥信号量A和B。如果所有的线程都首先给A加锁然后才给B加锁,那么对于这两个互斥信号量之间将不会发生死锁的现象(当然你有可能在其它的信号量上面发生死锁),只有当存在其它的线程对A,B加锁的次序相反的时候,才有可能会产生死锁。
 有时一个应用程序的体系使得很难将一个特定顺序的加锁应用在它的身上。如果包含了足够的锁和数据结构,而你的函数还是无法用一个简单的方法来实现,那么应该换一个思路。这个时候,你兴许可以把你的锁释放,然后在稍后的一个时间尝试。你可以使用pthread_mutex_trylock来避免死锁。如果你已经成功的持有了pthread_mutex_trylock,那么你可以继续。如果没有,你可以释放你已经持有的资源,并且清理其它的工作,一会再尝试。
 举例
 具体的例子不多说了,参见参考资料的源代码。这里主要是给了两个例子,都使用两个信号量。为了避免死锁,在添加信号量的时候都按照相同的次序加锁。第一个例子锁的粒度比较细,导致程序代码结构有点复杂,但是性能应该更好;第二个例子锁的粒度比较粗,性能相对差一些,但是代码结构很简单。


 (3)读写锁
 读写锁和互斥信号量类似,但是读写锁允许更高程度的并行。使用互斥信号量的状态只能是锁和非锁两种状态,并且在一个时间只有一个线程可以拥有锁。读写锁有三种可能的状态:读锁,写锁,和解锁。同一时刻只能有一个线程可以有写锁的状态,但是可以有多个线程处于读锁的状态。
 当读写锁被处于写锁的时候,所有尝试加锁(无论是写锁还是读锁)的线程都会阻塞直到写锁释放;当处于读锁状态的时候,所有尝试加读锁的线程都会允许加锁,但任何尝试加读锁的线程都会被阻塞直到所有线程的读锁被释放。(有一句不太确定的原句,没有翻译,如下: Although implementations vary, readerwriter locks usually block additional readers if a lock is already held in read mode and a thread is blocked trying to acquire the lock in write mode. This prevents a constant stream of readers from starving waiting writers.)
 读写锁适合读取操作比修改操作频繁的情况。读写锁也叫共享互斥锁。当一个读写所处于读锁状态的时候,它处于共享模式;当处于写锁状态的时候,它处于互斥模式。
 和互斥信号量类似,读写锁也需要初始化之后才能使用。
 #include
 int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr);
 int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
 函数成功返回0,失败返回错误号码。
 读写锁通过调用pthread_rwlock_init来进行初始化,如果使用默认的属性,我们可以给attr传递一个空指针,我们后面会讨论读写锁的属性。
 在释放读写锁占用的内存之前,我们需要调用pthread_rwlock_destroy来清除它。如果pthread_rwlock_init为读写锁分配了任何的内存,那么pthread_rwlock_destroy就会释放这些资源。如果我们没有调用pthread_rwlock_destroy就直接释放读写锁的内存,那那么读写锁之前占用的那些额外的资源就会丢失。
 为了让一个读写锁处于读模式,我们调用pthread_rwlock_rdlock函数;使它处于写模式,我们需要调用pthread_rwlock_wrlock。无论我们处于什么锁模式,我们都使用pthread_rwlock_unlock来释放读写锁。

 #include
 int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
 int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
 int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
 函数成功返回0,失败返回错误号码。
 系统实现,可能会对读写锁的共享模式数量有所限制,所以我们需要检查pthread_rwlock_rdlock的返回。尽管pthread_rwlock_wrlock和pthread_rwlock_unlock有错误的返回码,如果我们设计妥当,我们就不许要检查其返回,只有我们不正确地使用它们的时候才会返回定义的错误码,例如使用一个没有初始化的锁,或者当我们请求了一个我们已经持有的锁导致死锁的时候。

 Single UNIX Specification也定义了有条件的读写锁。
 #include
 int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);
 int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);
 正确返回0,失败返回错误号码。
 如果能够获取到锁,这两个函数就会返回0,如果不能获取到锁,这两个函数就会返回错误码EBUSY。这些函数使用的情况和前面的类似。
 举例:
 例子参见相应的参考资料。这个例子是通过一个读写锁来保护一系列的工作请求队列。当有作业被插入,删除到队列中的时候,加写锁;如果只是查询队列中的作业,那么只需要读锁。

 (4)条件变量
 条件变量是另外一个用于线程的同步机制。条件变量提供一个线程同步的点,当使用互斥信号量的时候,条件变量允许线程以一种无竞争的方式等待任何条件的发生。
 条件本身被互斥信号量保护,线程改变条件状态的时候必须先锁住这个信号。其它线程在请求信号量之前,不会注意到条件的变化,因为锁住互斥信号量才能对条件进行检测。
 使用条件变量之前,必须首先对这个条件变量进行初始化。条件变量使用数据结构pthread_cond_t来进行表示。我们可以把常量PTHREAD_COND_INITIALIZER分配给静态分配的条件变量,但是如果我们采用动态的方式分配条件变量那么我们使用pthread_cond_init函数对它进行初始化。
 在释放条件变量所占用的内存空间的之前我们可以使用函数pthread_mutex_destroy对这个条件变量进行反初始化。
 #include
 int pthread_cond_init(pthread_cond_t *restrict cond, pthread_condattr_t *restrict attr);
 int pthread_cond_destroy(pthread_cond_t *cond);
 两者在成功的时候都返回0,如果失败会返回错误码。
 这里如果想要创建一个使用默认的属性的条件变量,那么我们就给pthread_cond_init函数的attr参数传递NULL指针。

 我们使用pthread_cond_wait来等待条件为true,如果在一定的时间之内条件没有被满足,那么会返回一个错误号码到一个指定的变量中。
 #include
 int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
 int pthread_cond_timedwait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex,
      const struct timespec *restrict timeout);
 两个函数如果成功都返回0,如果失败则返回一个错误号码。
 传递给函数pthread_cond_wait的互斥信号量mutex会保护这个条件。调用者把已经锁住的信号量传递给函数,这个函数原子性地把调用线程放到等待这个条件变量的线程等待队列上面,然后解锁这个互斥信号量。这样就把检测条件变量和线程为了等待条件变化而进入睡眠之间的时间窗口关闭了,这样线程不会错过条件的变化(因为检测到条件不行,才会解锁让其它线程有机会修改条件使之满足)。当pthread_cond_wait返回的时候,mutex会再次被锁住(因为条件满足了,所以再次锁住,继续后面的操作)。(这里可能比较难理解,总之是在这个函数的内部先在检查完条件并且等待之后做了一步解锁操作,收到满足条件的通知之后继续执行准备返回但是返回前又加锁了,看后面的例子会比较容易明白)
 函数pthread_cond_timedwait和pthread_cond_wait 的功能类似,但是它设置了一个超时的机制,指定我们等待的时间。这个时间通过timespec结构来表示,
 struct timespec {
  time_t tv_sec;   /* seconds */
  long   tv_nsec;  /* nanoseconds */
 };
 使用这个结构,我们需要使用绝对时间值来指定我们将要等待多久,而不是一个相对的时间值。例如,我们想要等待3分钟,我们不是给这个结构赋值为3分钟,而是把now+3这个时间赋值给它。
 我们可以使用gettimeofday来获取使用timeval结构表示的当前时间,然后把这个结构转化成timespec结构,来获取绝对的时间值。函数如下:
 void maketimeout(struct timespec *tsp, long minutes)
 {
  struct timeval now;

  /* 获取当前时间 */
  gettimeofday(&now);

  /*把timeval表示的时间转换成timespec结构表示的时间*/
  tsp->tv_sec = now.tv_sec;
  tsp->tv_nsec = now.tv_usec * 1000; /* 微秒转换成纳秒 */

  /* 为当前时间增加超时等待时长*/
  tsp->tv_sec += minutes * 60;
 }
 如果超时了条件也没有满足,那么pthread_cond_timewait将会重新请求互斥信号量并且返回ETIMEDOUT。当pthread_cond_wait和pthread_cond_timedwait成功返回的时候,需要一个线程重新估计条件值,因为可能另外有线程已经运行并且改变了条件。
 有两个函数用来通知线程一个条件已经被满足了。pthread_cond_signal函数将会唤醒一个等待在一个条件上面的线程;pthread_cond_broadcast函数将会唤醒所有的线程等待一个条件。
 POSIX标准允许pthread_cond_signal的实现唤醒不止一个线程,这样会使得实现更为简单。
 #include
 int pthread_cond_signal(pthread_cond_t *cond);
 int pthread_cond_broadcast(pthread_cond_t *cond);
 两者如果成功返回0,如果失败返回错误号码。
 当我们调用pthread_cond_signal或者pthread_cond_broadcast的时候,也就是说我们将会给线程或者条件发送信号。我们需要足够地仔细,只能在修改了条件状态的时候才给线程发送信号。

 举例:
 条件变量的使用方法如下:
 #include
 struct msg {
     struct msg *m_next;
     /* ... more stuff here ... */
 };
 struct msg *workq;
 pthread_cond_t qready = PTHREAD_COND_INITIALIZER;
 pthread_mutex_t qlock = PTHREAD_MUTEX_INITIALIZER;

 void process_msg(void) {
     struct msg *mp;

     for (;;) {
         pthread_mutex_lock(&qlock);/*这里是互斥相关,因为需要访问工作队列,所以进行操作之前首先上锁,保证其他线程不能再修改了*/
         while (workq == NULL)
             pthread_cond_wait(&qready, &qlock);/*这里是同步相关,发现队列为空,所以在相应的条件变量上面等待,等待函数的内部实际做的操作是检测并且将线程置于等待队列之后再解开锁便于其它线程修改工作队列使条件满足*/
         mp = workq;/*到这里表示刚才解锁等待的时候有线程修改了工作队列并且通知本线程条件满足了,于是从前面的等待函数中返回,并且返回之前再将刚才解开的锁重新加上,防止之后的修改期间又有其他线程干扰*/
         workq = mp->m_next;
         pthread_mutex_unlock(&qlock);/*修改之后真正地解开锁*/
         /* now process the message mp */
     }
 }

 void enqueue_msg(struct msg *mp) {
     pthread_mutex_lock(&qlock);/*这里是互斥相关,准备修改工作队列,所以加锁*/
     mp->m_next = workq;
     workq = mp;
     pthread_mutex_unlock(&qlock);
     pthread_cond_signal(&qready);/*这里是同步相关,通知队列状态的变化给等待的线程*/
 }

 上面的例子,展示了如何使用条件变量和互斥信号量一起来实现线程之间的同步。
 条件用来表示工作队列(work queue)的状态。我们通过互斥信号量来保护条件并且通过一个while循环来对条件进行检测。当我们把一个消息放到工作队列上(work queue)的时候,我们需要持有这个互斥信号量,但是我们再条件满足通知等待线程的时候不需要持有这个互斥信号量。只要线程在我们调用cond_signal之前将消息推送至工作队列,我们就可以释放互斥信号量。因为我们是在一个while循环中检查这个条件,所以不会导致问题:线程将会醒来,发现队列还是空的,然后又继续进入等待状态了。如果代码无法忍受这个竞争(比如没有那个while循环???),那么我们将需要在发送信号给线程的时候也持有这个锁。

参考:


7、总结
================================================
 这一章里面我们介绍了线程相关的内容,讨论了创建和销毁一个线程的POSIX相关函数。我们也介绍了线程的同步。我们讨论了三个基本的同步机制,互斥信号量,读写锁,以及条件变量,同时我们也看到了我们是如何利用它们来保护共享资源的。

参考:

 

 

阅读(1714) | 评论(3) | 转发(0) |
给主人留下些什么吧!~~

vaqeteart2013-10-14 10:27:40

注意:
while (workq == NULL)
             pthread_cond_wait(&qready, &qlock);
这里用while而不用if来循环判断是否等待,据我了解,是因为使用if,那从 pthread_cond_wait返回后,可能workq不满足条件了,因为可能有多个线程使用这段代码,这段代码后续操作都是基于"workq==NULL"这个条件,第一个执行这段代码的线程返回的当然正常操作,后面的就不行了。

vaqeteart2013-05-23 11:17:59

一个不错的应用:
线程池,实现很多,可自行搜索threadpool相关代码,或参见:http://www.oschina.net/p/threadpool
关键:
1、有一个队列,长度为M,每个队列元素是待执行的任务对应的函数。
2、启动N个线程,这N个线程执行添加的任务(即队列中的函数),每个线程是一个死循环,大致做的事情是:
    每当队列中有元素则其中的某个空闲线程读取队列元素,并执行相应的函数;当队列为空则等待队列非空。

vaqeteart2012-08-31 17:57:34

关于pthread_conf_signal函数应当在unlock之前还是之后,有这个文章:
http://blog.csdn.net/xjtuse_mal/article/details/5413101
其中主要部分:
1)线程1获取mutex,在进行数据处理的时候,线程2也想获取mutex,但是此时被线程1所占用,线程2进入休眠,等待mutex被释放。

2)线程1做完数据处理后,调用pthread_cond_signal()唤醒等待队列中某个线程,在本例中也就是线程2。线程1在调用 pthread_mutex_unlock()前,因为系统调度的原因,线程2获取使用CPU的权利,那么它就想要开始处理数据,但是在开始处理之 前,mutex必须被获取,很遗憾,线程1正在使用mutex,所以线程2被迫再次进入休眠。

3)然