Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1744439
  • 博文数量: 438
  • 博客积分: 9799
  • 博客等级: 中将
  • 技术积分: 6092
  • 用 户 组: 普通用户
  • 注册时间: 2012-03-25 17:25
文章分类

全部博文(438)

文章存档

2019年(1)

2013年(8)

2012年(429)

分类: 系统运维

2012-03-31 22:27:05

当多个线程控制共享相同的内存时,我们需要确保每个线程看到它数据的一致的外观。如果每个线程使用其它线程不读或修改的变量,就不会有一致性的问 题。相似地,如果一个变量是只读的,当多个线程同时读它时也是不会有一致性问题的。然而,当一个线程可以修改一个变量而其它线程可以读或修改,那么我们需 要同步这些线程来确保在访问变量内存内容时它们不会使用一个无效的值。


当一个线程修改一个变量时,其它线程在读这个变量的值时可能潜在地看到不一致性。在修改需要多于一个内存周期的处理器架构上,这可能在内存的读与内存写周期交叉时发生。当然,这个行为取决于架构,但是可移植的程序不能对所使用的处理器架构作任何假设。


下面一个两个线程同时读写同一个变量的假定的例子:线程A读取变量然后写入一个新值,但是写操作耗费两个内存周期。如果线程B在这两个写周期之间读取相同的值,那么它将看到一个不一致的值。


为了解决这个问题,线程必须使用一个锁,它将允许只有一个线程在同一时刻访问这个变量。上面的例子里,如果线程B想读取这个变量,它需要一个锁。相似地,当线程A更新变量时,它需要相同的锁。因此,线程B将不能读取变量,直到线程A释放这个锁。


你也需要同步两个或多个同时尝试修改相同变量的线程。考虑你增加一个变量的情况。增加操作通常分解为三个步骤:


1、把内存地址读入寄存器;


2、增加寄存器里的值;


3、把新值写到内存地址。


如 果两个线程尝试几乎同时增加相同的变量,而不与对方同步,那么结果可能是不一致的。你最后可能得到一个比之前大一或二的值,取决于当第二个线程开始操作时 所观察到的值。如果第二个线程在第一个线程执行步骤3时执行步骤1,那么第二个线程将读取到和第一个线程相同的初始值,增加它,再写回,而没有干净的结 果。


如果修改是原子的,那么就不会有竞争。在前一个例子里,如果增加只花费一个内存周期,那么不会有竞争。如果我们的数据总是在相继一致, 那么我们不需要额外的同步。我们的操作是相继一致的,当多个进程不能观察到我们数据里的不一致性时。在当代计算机系统里,内存访问花费多个总线周期,而多 处理器通常在多个处理器之间交叉总线周期,所以我们不被保证我们的数据是相继一致的。


在一个相继一致的环境,我们可以把对我们数据的修改解 释为一列由运行线程执行的操作步骤。我们可以说诸如“线程A增加变量,然后线程B增加这个变量,所以它的值比原来大二”或“线程B增加变量,然后线程A增 加这个变量,所以它的值比原来大二”之类的话。两个线程可能的顺序不会导致变量任何其它的值。


除了计算机架构,竞争也会根据我们程序使用变量的可能看到不一致性的方式发生。例如,我们可能增加一个变量,然后基于这个值作决定。增加步骤和作决定的步骤不是原子的,所以这给不一致性的发生打开了一个间隙。


互斥体(Mutexes)

我 们可以保护我们的数据并确保某时刻只有一个线程访问,通过使用pthreads的mutual-exclusion接口。一个mutex基本上是一个锁, 我们在访问一个共享资源前设置它(lock)并在完成时释放它(unlock)。当它被设置时,任何其它尝试设置它的线程都会阻塞,直到我们释放了它。如 果多于一个线程在我们解锁互斥体时处于阻塞状态,那么所有阻塞在该锁上的线程都变得可运行,第一个运行的线程能够设置这个锁。其它的将看到这个互斥体仍然 被锁而回去等待它变得重新可用。这种方式,某一时刻只有一个线程进程。


这个彼此互斥机制只在我们把所有线程设计为遵守相同数据访问规则时才工作。操作系统并不为我们序列化数据的访问。如果我们允许一个线程不先申请锁而访问一个共享资源,那么即使其它线程都这么做不一致性仍然会发生。


一 人互斥体变量由pthread_mutex_t数据类型表示。在我们使用一个mutex变量之前,我们必须首先初始化它,通过把它设置为常量 PTHREAD_MUTEX_INITIALIZER(只针对于静态分配的互斥体)或调用pthread_mutex_init。如果我们动态地分配互斥 体(例如通过调用malloc),那么我们需要在释放内存前调用pthread_mutex_destroy。


  1. #include <pthread.h>

  2. int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);

  3. int pthread_mutex_destroy(pthread_mutex_t *mutex);

  4. 成功返回0,失败返回错误码。


为了使用默认属性初始化一个互斥体,我们把attr设置为NULL。我们将在12.4节讨论非默认的互斥体属性。


为了锁住一个互斥体,我们调用pthread_mutex_lock。如果互斥体已经被锁了,那么调用线程将阻塞,直到互斥体被解锁。要解锁一个互斥体,我们调用pthread_mutex_unlock。



  1. #include <pthread.h>

  2. int pthread_mutex_lock(pthread_mutex_t *mutex);

  3. int pthread_mutex_trylock(pthread_mutex_t *mutex);

  4. int pthread_mutex_unlock(pthread_mutex_t *mutex);

  5. 成功返回0,失败返回错误号。


如 果一个线程不能容许阻塞,那么它可以使用pthread_mutex_trylock来有条件地锁这个互斥体。如果互斥体在 pthread_mutex_trylock被调用时已经解锁,那么pthread_mutex_trylock会不阻塞地锁住这个互斥体并返回0。否 则,pthread_mutex_trylock将会失败,返回EBUSY而不锁住这个互斥体。


下面的代码演示了一个用来保护一个数据结构体的互斥体。当多个线程需要访问一个动态分配的对象时,我们可以在对象里嵌入一个引用计数来保证我们在所有线程使用它完毕前不会释放它的内存。



  1. #include <stdlib.h>
  2. #include <pthread.h>

  3. struct foo {
  4.     int f_count;
  5.     pthread_mutex_t f_lock;
  6.     /* ... more stuff here ... */
  7. };

  8. struct foo*
  9. foo_alloc(void) /* allocate the object */
  10. {
  11.     struct foo *fp;

  12.     if ((fp = malloc(sizeof(struct foo))) != NULL) {
  13.         fp->f_count = 1;
  14.         if (pthread_mutex_init(&fp->f_lock, NULL) != 0) {
  15.             free(fp);
  16.             return(NULL);
  17.         }
  18.         /* ... continue initialization ... */
  19.     }
  20.     return(fp);
  21. }

  22. void
  23. foo_hold(struct foo *fp) /* add a reference to the object */
  24. {
  25.     pthread_mutex_lock(&fp->f_lock);
  26.     fp->f_count++;
  27.     pthread_mutex_unlock(&fp->f_lock);
  28. }

  29. void
  30. foo_rele(struct foo *fp) /* release a reference to the object */
  31. {
  32.     pthread_mutex_lock(&fp->f_lock);
  33.     if (--fp->f_count == 0) { /* last reference */
  34.         pthread_mutex_unlock(&fp->f_lock);
  35.         pthread_mutex_destroy(&fp->f_lock);
  36.         free(fp);
  37.     } else {
  38.         pthread_mutex_unlock(&fp->f_lock);
  39.     }
  40. }

我们在增加引用计数前锁住互斥体,减去引用计数,并核查引用计数是否达到0。当我们在foo_alloc函数里把引用计数初始化为1时是不需 要锁的,因为分配线程是它目前的唯一的引用。如果我们这时把这个结构体放到一个列表上,那么它可能被其它线程发现,所以我们需要先锁住它。

在使用这个对象前,线程被期望为它加上一个引用计数。当它们完成时,它们必须释放这个引用。当最后的引用被释放时,对象的内存被释放。


死锁的避免(Deadlock Avoidance)

一 个线程如果尝试两次锁住同一个互斥体那么它将把自己死锁,但是有更不明显的方式来创建互斥体的死锁。例如,当我们在程序里使用多于一个的互斥体,如果我们 允许一个线程得到一个互斥体而在尝试锁住第二个互斥体时阻塞,同时另一个线程得到第二个互斥体而尝试锁住第一个互斥体,那么一个死锁会发生。没有任何一个 线程可以进行,因为两者都需要对方得到的资源,所以我们有了一个死锁。


死锁可以通过小心地控制互斥体锁的顺序来避免。例如,假设你有两个互 斥体,A和B,你需要同时锁住它们。如果所有的线程总是在互斥体B之前锁住互斥体A,则在这两个互斥体上不会有死锁发生(但是你仍可以死锁在别的资源 上)。相似地,如果所有线程总是在互斥体A之前锁住互斥体B,也没有死锁发生。你只当尝试以别的线程的相反顺序锁住互斥体时才会有潜在的死锁。


有 时,一个应用的架构让你很难应用一个锁的顺序。如果有足够的锁和数据结构,你可用的函数仍不能被建模为适合一个简单的层次结构,那么你将必须尝试一些其它 的方法。在这种情况下,你可能可以释放你的锁然后在稍后再次尝试。你可以使用pthread_mutex_trylock接口来避免这种情况下的死锁。如 果你已经得到了锁,而pthread_mutex_trylock成功了,那么你可以继续进行。然而,如果它不能得到这个锁,你可以释放你已经有的锁,清 理,稍后再次尝试。


在这个例子里,我们更新上面的代码来展示两个互斥体的使用。我们通过确保当需要同时获取两个锁时,我们总是以相同的顺序 锁住它们,来避免死锁。第二个互斥体保护了一个我们用来跟踪foo数据结构体的哈希列表。在foo结构体里的f_lock互斥体保护了foo结构体里其它 域的访问。



  1. #include <stdlib.h>
  2. #include <pthread.h>

  3. #define NHASH 29
  4. #define HASH(fp) (((unsigned long)fp)%NHASH)
  5. struct foo *fh[NHASH];

  6. pthread_mutex_t hashlock = PTHREAD_MUTEX_INITIALIZER;

  7. struct foo {
  8.     int f_count;
  9.     pthread_mutex_t f_lock;
  10.     struct foo *f_next; /* protected by hashlock */
  11.     int f_id;
  12.     /* ... more stuff here ... */
  13. };

  14. struct foo*
  15. foo_alloc(void) /* allocate the object */
  16. {
  17.     struct foo *fp;

  18.     if ((fp = malloc(sizeof(struct foo))) != NULL) {
  19.         fp->f_count = 1;
  20.         if (pthread_mutex_init(&fp->f_lock, NULL) != 0) {
  21.             free(fp);
  22.             return(NULL);
  23.         }
  24.         idx = HASH(fp);
  25.         pthread_mutex_lock(&hashlock);
  26.         fp->f_next = fh[idx];
  27.         fh[idx] = fp->f_next;
  28.         pthread_mutex_lock(&fp->f_lock);
  29.         pthread_mutex_unlock(&hashlock);
  30.         /* ... continue initialization ... */
  31.         pthread_mutex_unlock(&fp->f_lock);
  32.     }
  33.     return(fp);
  34. }

  35. void
  36. foo_hold(struct foo *fp) /* add a reference to the object */
  37. {
  38.     pthread_mutex_lock(&fp->f_lock);
  39.     fp->f_count++;
  40.     pthread_mutex_unlock(&fp->f_lock);
  41. }

  42. struct foo *
  43. foo_find(int id) /* find an existing object */
  44. {
  45.     struct foo *fp;
  46.     int idx;

  47.     idx = HASH(fp);

  48.     pthread_mutex_lock(&hashlock);
  49.     for (fp = fh[idx]; fp != NULL; fp = fp->f_next) {
  50.         if (fp->f_id == id) {
  51.             foo_hold(fp);
  52.             break;
  53.         }
  54.     }
  55.     pthread_mutex_unlock(&hashlock);
  56.     return(fp);
  57. }

  58. void
  59. foo_rele(struct foo *fp) /* release a reference to the object */
  60. {
  61.     struct foo *tfp;
  62.     int idx;

  63.     pthread_mutex_lock(&fp->f_lock);
  64.     if (fp->f_count == 1) { /* last reference */
  65.         pthread_mutex_unlock(&fp->f_lock);
  66.         pthread_mutex_lock(&hashlock);
  67.         pthread_mutex_lock(&fp->f_lock);
  68.         /* need to recheck the condition */
  69.         if (fp->f_count != 1) {
  70.             fp->f_count--;
  71.             pthread_mutex_unlock(&fp->f_lock);
  72.             pthread_mutex_unlock(&hashlock);
  73.             return;
  74.         }
  75.         /* remove from list */
  76.         idx = HASH(fp);
  77.         tfp = fh[idx];
  78.         if (tfp == fp) {
  79.             fh[idx] = fp->f_next;
  80.         } else {
  81.             while (tfp->f_next != fp)
  82.                 tfp = tfp->f_next;
  83.             tfp->f_next = fp->f_next;
  84.         }
  85.         pthread_mutex_unlock(&hashlock);
  86.         pthread_mutex_unlock(&fp->f_lcok);
  87.         pthread_mutex_destroy(&fp->f_lock);
  88.         free(fp);
  89.     } else {
  90.         fp->f_count--;
  91.         pthread_mutex_unlock(&fp->f_lock);
  92.     }
  93. }

比较前面的两个代码,我们看到我们的分配函数现在锁住哈希列表锁,把新的结构体回到一个哈希桶里,并在解锁哈希列表锁前锁住新结构体的互斥 体。因为这个新的结构体被放到一个全局列表里,所以其它线程可以找到它,因此我们需要在它们尝试访问这个新结构体时阻塞它们,直到我们完成了初始化。

foo_find函数锁住哈希列表锁并查找请求的结构体。如果它被找到,那么我们增加引用计数并返回该结构体的指针。注意我们遵守锁的顺序,通过在foo_hold锁住foo结构体的f_lock互斥体之前,在foo_find里锁住哈希列表锁。


现 在随着两个锁,foo_rele函数更复杂了。如果这是最后的引用,那么我们需要解锁结构体互斥体以便我们可以请求哈希列表锁,因为我们必须从哈希列表中 删除这个结构体。然后我们申请结构体的互斥体。因为我们可能已经阻塞在上次得到结构体互斥体时,所以我们需要重新检查条件来看我们是否仍需要释放这个结构 体。如果另一个线程发现这个结构体并给它加上一个引用,当我们为了锁的顺序而阻塞的时候,那么我们简单地减少引用计数,解锁所有的东西,然后返回。


这个锁是复杂的,所以我们需要重新审视我们的设计。我们也可以仔细简化事物,通过使用哈希列表锁来保护结构体引用计数。结构体互斥体也以用来保护foo结构体里的其它所有东西。下面的代码反应了这个改变。



  1. #include <stdlib.h>
  2. #include <pthread.h>

  3. #define NHASH 29
  4. #define HASH(fp) (((unsigned long)fp)%NHASH)
  5. struct foo *fh[NHASH];

  6. pthread_mutex_t hashlock = PTHREAD_MUTEX_INITIALIZER;

  7. struct foo {
  8.     int f_count;
  9.     pthread_mutex_t f_lock;
  10.     struct foo *f_next; /* protected by hashlock */
  11.     int f_id;
  12.     /* ... more stuff here ... */
  13. };

  14. struct foo*
  15. foo_alloc(void) /* allocate the object */
  16. {
  17.     struct foo *fp;
  18.     int idx;

  19.     if ((fp = malloc(sizeof(struct foo))) != NULL) {
  20.         fp->f_count = 1;
  21.         if (pthread_mutex_init(&fp->f_lock, NULL) != 0) {
  22.             free(fp);
  23.             return(NULL);
  24.         }
  25.         idx = HASH(fp);
  26.         pthread_mutex_lock(&hashlock);
  27.         fp->f_next = fh[idx];
  28.         fh[idx] = fp->f_next;
  29.         pthread_mutex_lock(&fp->f_lock);
  30.         pthread_mutex_unlock(&hashlock);
  31.         /* ... continue initialization ... */
  32.         pthread_mutex_unlock(&fp->f_lock);
  33.     }
  34.     return(fp);
  35. }

  36. void
  37. foo_hold(struct foo *fp) /* add a reference to the object */
  38. {
  39.     pthread_mutex_lock(&hashlock);
  40.     fp->f_count++;
  41.     pthread_mutex_unlock(&hashlock);
  42. }

  43. struct foo *
  44. foo_find(int id) /* find an existing object */
  45. {
  46.     struct foo *fp;
  47.     int idx;

  48.     idx = HASH(fp);

  49.     pthread_mutex_lock(&hashlock);
  50.     for (fp = fh[idx]; fp != NULL; fp = fp->f_next) {
  51.         if (fp->f_id == id) {
  52.             fp->f_count++;
  53.             break;
  54.         }
  55.     }
  56.     pthread_mutex_unlock(&hashlock);
  57.     return(fp);
  58. }

  59. void
  60. foo_rele(struct foo *fp) /* release a reference to the object */
  61. {
  62.     struct foo *tfp;
  63.     int idx;

  64.     pthread_mutex_lock(&hashlock);
  65.     if (--fp->f_count == 0) { /* last reference, remove from list */
  66.         idx = HASH(fp);
  67.         tfp = fh[idx];
  68.         if (tfp == fp) {
  69.             fh[idx] = fp->f_next;
  70.         } else {
  71.             while (tfp->f_next != fp)
  72.                 tfp = tfp->f_next;
  73.             tfp->f_next = fp->f_next;
  74.         }
  75.         pthread_mutex_unlock(&hashlock);
  76.         pthread_mutex_destroy(&fp->f_lock);
  77.         free(fp);
  78.     } else {
  79.         pthread_mutex_unlock(&hashlock);
  80.     }
  81. }

注意代码比之前简单了多少。当我们为两个目的使用相同的锁时,关于哈希列表和引用计数的锁顺序的问题已经没有了。多线程软件设计涉及这些类型 的折衷。如果你的锁粒度太粗,那么你可能会让太多的线程阻塞在相同的锁上,这样只从并发基本得不到什么提升的可能。如果你的锁粒度太细,那么你可能从额外 的锁的过度开销丧失性能,而且还造就了复杂的代码。作为一个程序员,你必须在代码复杂度和性能间找到正确的平衡点,并仍然满足你的锁的需求。

读写锁(Reader-Writer Locks)

读写锁和互斥体类似,除了它允许更高程度的并行化。使用互斥体,状态只有锁或无锁,且一次只有一个线程可以锁。一个读写锁有三个状态:在读模式上锁,在写模式上锁,和无锁。一次只有一个线程可以拿到写模式的读写锁,但多个线程可以同时拿到一个读模式的读写锁。


当 一个读写锁被写锁时,所有尝试锁它的线程都被阻塞,直到它被解锁。当一个读写锁被读锁时,所有尝试以读模式锁它的线程都可以得到访问,但任何尝试以写模式 锁它的线程都会阻塞,直到所有线程释放它们的读锁。尽管实现不同,读写锁通常阻塞额外的读者,如果一个锁以读模式被得到,而一个尝试以写模式申请锁的线程 正被阻塞。这避免了恒定的读者流饿死了等待的写者。


读写锁非常适合这种情况:数据结构被读比被修改更经常。当一个读写锁以写模式被得到,它保护的数据结构可以安全地被修改,因为一次只有一个线程可以得到写模式的锁。当读写锁以读模式被得到,那么它保护的数据结构可以被多个线程读,只要线程首先得到这个读模式的锁。


读写锁也被称为共享互斥(shared-exclusive)锁。当一个读写所被读锁时,它可以说是以共享模式被锁。当它被写锁时,它可以说是以互斥模式被锁。


和互斥体一样,读写锁并须在使用前初始化,并在释放它们底层内存时被销毁。



  1. #include <pthread.h>

  2. int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr);

  3. int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);

  4. 两者成功返回0,失败返回错误号。


一个读写锁通过调用phthread_rwlock_init初始化。我们可以传递一个空指针作为attr,如果我们想这个读写锁有默认的属性。我们在12.4节讨论读写锁的属性。


在 释放读写锁后面的内存之前,我们必须调用pthread_rwlock_destroy来清理它。如果pthread_rwlock_init为读写锁分 配了任何资源,那么pthread_rwlock_destroy会释放这些资源。如果我们不首先调用pthread_rwlock_destroy再释 放一个读写锁后面的内存,那么任何分配给这个锁的资源都会被丢失。


为了以读模式锁住一个读写锁,我们调用pthread_rwlock_rdlock。为了写锁一个读写锁,我们调用pthread_rwlock_wrlock。不管我们如何锁一个读写锁,我们可以调用pthread_rwlock_unlock来解锁它。



  1. #include <pthread.h>

  2. int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);

  3. int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);

  4. int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);

  5. 三者成功返回0,失败返回错误号。


实 现可以在一个读写锁可以以共享模式被锁的次数上加个限制,所以我们需要检查pthread_rwlock_rdlock的返回值。即使 pthread_rwlock_wrlock和pthread_rwlock_unlock有错误返回,如果我们恰当地设计了我们的锁那么我们仍不需要检 查它们。唯一返回的错误定义在当我们不恰当使用它们时,比如使用一个未初始化的锁,或当我们尝试申请已经拥有的锁而可能死锁时。


SUS也定义了读写锁原始例程的条件版本。



  1. #include <pthread.h>

  2. int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);

  3. int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);

  4. 两者成功返回0,失败返回错误号。


当锁被得到时,这些函数返回0。否则,它们返回错误EBUSY。这些函数可以用在遵守一个锁层次也不足以避免一个死锁的情况,如我们之前讨论的。


下面的代码演示了读写锁的使用。一个工作请求队列被单个读写锁保护。这个例子展示了一个可能的实现:多个工作线程获取由单个主线程分配给它们的工作。



  1. #include <pthread.h>
  2. #include <stdlib.h>

  3. struct job {
  4.     struct job *j_next;
  5.     struct job *j_prev;
  6.     pthread_t j_id; /* tells which thread handlers this job */
  7. };

  8. struct queue {
  9.     struct job *q_head;
  10.     struct job *q_tail;
  11.     pthread_rwlock_t q_lock;
  12. };

  13. /*
  14.  * Initialize a queue
  15.  */
  16. int
  17. queue_init(struct queue *qp)
  18. {
  19.     int err;
  20.     
  21.     qp->q_head = NULL;
  22.     qp->q_tail = NULL;
  23.     err = pthread_rwlock_init(&qp->q_lock, NULL);
  24.     if (err != 0)
  25.         return(err);

  26.     /* ... continue initialization ... */

  27.     return(0);
  28. }

  29. /*
  30.  * Insert a job at the head of the queue.
  31.  */
  32. void
  33. job_insert(struct queue *qp, struct job *jp)
  34. {
  35.     pthread_rwlock_wrlock(&qp->q_lock);
  36.     jp->j_next = qp->q_head;
  37.     jp->j_prev = NULL;
  38.     if (qp->q_head != NULL)
  39.         qp->q_head->j_prev = jp;
  40.     else
  41.         qp->q_tail = jp; /* list was empty */
  42.     qp->q_head = jp;
  43.     pthread_rwlock_unlock(&qp->q_lock);
  44. }

  45. /*
  46.  * Append a job on the tail of the queue.
  47.  */
  48. void
  49. job_append(struct queue *qp, struct job *jp)
  50. {
  51.     pthread_rwlock_wrlock(&qp->q_lock);
  52.     jp->j_next = NULL;
  53.     jp->j_prev = qp->q_tail;
  54.     if (qp->q_tail != NULL)
  55.         qp->q_tail->j_next = jp;
  56.     else
  57.         qp->q_head = jp; /* list was empty */
  58.     qp->q_tail = jp;
  59.     pthread_rwlock_unlock(&qp->q_lock);
  60. }

  61. /*
  62.  * Remove the given job from a queue.
  63.  */
  64. void
  65. job_remove(struct queue *qp, struct job *jp)
  66. {
  67.     pthread_rwlock_wrlock(&qp->q_lock);
  68.     if (jp == qp->q_head) {
  69.         qp->q_head = jp->j_next;
  70.         if (qp->q_tail == jp)
  71.             qp->q_tail = NULL;
  72.     } else if (jp == qp->q_tail) {
  73.         qp->q_tail = jp->j_prev;
  74.         if (qp->q_head == jp)
  75.             qp->q_head = NULL;
  76.     } else {
  77.         jp->j_prev->j_next = jp->j_next;
  78.         jp->j_next->j_prev = jp->j_prev;
  79.     }
  80.     pthread_rwlock_unlock(&qp->q_lock);
  81. }

  82. /*
  83.  * Find a job for the given thread ID.
  84.  */
  85. struct job *
  86. job_find(struct queue *qp, pthread_t id)
  87. {
  88.     struct job *jp;

  89.     if (pthread_rwlock_rdlock(&qp->q_lock) != 0)
  90.         return(NULL);

  91.     for (jp = qp->q_head; jp != NULL; jp = jp->j_next)
  92.         if (pthread_equal(jp->j_id, id))
  93.             break;

  94.     pthread_rwlock_unlock(&qp->q_lock);
  95.     return(jp);
  96. }

在这个例子里,我们每当需要加一个工作给队列或从队列删除一个工作时都以写模式锁住队列的读写锁。每当我们查找队列时,我们以读模式得到锁,允许所有的工作线程业并发地查找队列。使用一个读写锁只在线程查找队列比增删工作频繁得多的时候才会提升性能。

工作线程只把那些匹配它们线程ID的工作从队列中移除。因为工作结构体一次只被一个线程使用,所以它们不用额外的锁。


条件变量(Condition Variables)

条件变量是线程可用的另一个同步机制。条件变量提供了为线程集结的位置。当和互斥体使用时,条件变量允许线程以无竞争的方式等待任意条件的发生。


条件本身被一个互斥体保护。一个线程必须首先锁住互斥体来改变条件状态。其它线程将不会注意到改变,直到它们申请这个互斥体时,因为互斥体必须被锁住才能得到条件的值。


在 条件变量被使用前,它必须首先被初始化。一个条件变量,由pthread_cond_t数据类型表示,可以用两种方式初始化。我们可以把常量 PTHREAD_COND_INITIALIZER赋给一个静态分配的变量,但是如果条件变量是动态分配的,我们可以使用 pthread_cond_init函数来初始化它。


我们可以使用pthread_cond_destroy函数来反初始化一个条件变量,在释放它底下的内存前。



  1. #include <pthread.h>

  2. int pthread_cond_init(pthread_cond_t *restrict cond, pthread_condattr_t *restrict attr);

  3. int pthread_cond_destroy(pthread_cond_t *cond);

  4. 两者成功返回0,失败返回错误号。


除非你必须用非默认属性创建一个条件变量,否则pthread_cond_init的attr参数可以被设置为NULL。我们将在12.4节讨论条件变量的属性。


我们使用pthread_cond_wait来等待一个条件为真。一个变体被提供来返回一个错误号,如果条件在指定的时间内没有被满足。



  1. #include <pthread.h>

  2. int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);

  3. int pthread_cond_timedwait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, const struct timespec *restrict timeout);

  4. 成功返回0,失败返回错误号。


传 递给pthread_cond_wait的互斥体保护这个条件。调用者锁住它传递给这个函数,它然后被调用线程自动放在等待这个条件的线程列表上,并解锁 这个互斥体。这关闭了条件被检查和线程等待条件改变时睡眠之间的时间间隙,以便线程不会错过条件的某个改变。当pthread_cond_wait返回 时,互斥体会再次被锁住。


pthread_cond_timedwait函数和pthread_cond_wait函数一样工作,只是有额外的超时。这个timeout值指定我们将等待多久。它由timespec结构体指定,这里一个时间值由秒数和部分秒数示。部分秒数以纳秒为单位表示:
struct timespec {
  time_t tv_sec;  /* seconds */
  long tv_nsec;  /* nanoseconds */
};


使用这个结构体,我们需要指定我们愿意等待多久,作为一个绝对时间而不是相对时间。例如,如果我们愿意等待3分钟,我们需要把现在+3分钟翻译到timespec结构体里,而不是把3分钟翻译进去。


我们可以使用gettimeofday(6.10节)来得到用timeval结构体表示的当前时间,并把它翻译到timespec结构体里。为了得到timeout值的绝对时间,我们可以使用下面的函数:


void
makdtimeout(struct timespec *tsp, long minutes)
{
  struct timeval now;
 
  /* get the current time */
  gettimeofday(&now);
  tsp->tv_sec = now.tv_sec;
  tsp->tv_nsec = now.tv_usec * 1000; /* usec to nsec */
  /* add the offset to get timeout value */
  tsp->tv_sec += minutes * 60;
}


如 果timeout超时而条件没有发生,pthread_cond_timedwait将重新申请互斥体并返回错误ETIMEDOUT。当从一个 pthread_cond_wait或pthread_cond_timedwait的成功调用里返回时,一个线程需要重新评估这个条件,因为另一个线程 可能已经运行并已改变了这个条件。


有两个函数来通知线程一个条件已经被满足。pthread_cond_signal函数将唤醒等待在一个条件上的某个线程,而pthread_cond_broadcast函数将唤醒等待在一个条件上的所有线程。


POSIX规定允许pthread_cond_signal的实现唤醒不只一个线程,来让实现更加简单。



  1. #include <pthread.h>

  2. int pthread_cond_signal(pthread_cond_t *cond);

  3. int pthread_cond_broadcast(pthread_cond_t *cond);

  4. 成功返回0,失败返回错误号。


当我们调用pthread_cond_signal或pthread_cond_broadcast时,我们被告知是向线程还是条件发送信号。我们必须小心只在改变条件状态之后才发送信号给线程。


下面的代码展示了如何一起使用条件变量和互斥体来同步线程的一个例子:



  1. #include <pthread.h>

  2. struct msg {
  3.     struct msg *m_next;
  4.     /* ... more stuff here ... */
  5. };
  6. struct msg *workq;
  7. pthread_cond_t qready = PTHREAD_COND_INITIALIZER;
  8. pthread_mutex_t qlock = PTHREAD_MUTEX_INITIALIZER;

  9. void
  10. process_msg(void)
  11. {
  12.     struct msg *mp;

  13.     for (;;) {
  14.         pthread_mutex_lock(&qlock);
  15.         while (workq == NULL)
  16.             pthread_cond_wait(&qready, &qlock);
  17.         mp = workq;
  18.         workq = mp->m_next;
  19.         pthread_mutext_unlock(&qlock);
  20.         /* now process the message mp */
  21.     }
  22. }

  23. void
  24. enqueue_msg(struct msg *mp)
  25. {
  26.     pthread_mutex_lock(&qlock);
  27.     mp->m_next = workq;
  28.     workq = mp;
  29.     pthread_mutex_unlock(&qlock);
  30.     pthread_cond_signal(&qready);
  31. }

条件是工作队列的状态。我们用一个互斥体保护这个条件,并在一个while循环里评估这个条件。当我们把一个消息放到工作队列上时,我们需要 得到这个互斥体,但是我们在向等待着的线程发送信号时不必握住这个互斥体。只要一个线程在我们调用cond_signal之前从队列中拉出消息是无害的, 我们就可以在释放互斥体之后做这件事。因为我们在while循环里检查这个条件,所以这里没有体现一个问题:一个线程将会醒来,发现队列仍为空,然后再次 回去等待。如果代码不能容忍这个竞争,那么我们应该在发送信号给线程时握住这个互斥体。
阅读(1159) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~