分类: LINUX
2012-12-11 13:59:26
在 上一篇文章结 束时,我描述了一个比较特殊的难题:如果线程正在等待某个特定条件发生,它应该如何处理这种情况?它可以重复对互斥对象锁定和解锁,每次都会检查共享数据 结构,以查找某个值。但这是在浪费时间和资源,而且这种繁忙查询的效率非常低。解决这个问题的最佳方法是使用 pthread_cond_wait() 调用来等待特殊条件发生。
了解 pthread_cond_wait() 的作用非常重要 -- 它是 POSIX 线程信号发送系统的核心,也是最难以理解的部分。
首先,让我们考虑以下情况:线程为查看已链接列表而锁定了互斥对象,然而该列表恰巧是空的。这一特定线程什么也干不了 -- 其设计意图是从列表中除去节点,但是现在却没有节点。因此,它只能:
锁定互斥对象时,线程将调用 pthread_cond_wait(&mycond,&mymutex)。pthread_cond_wait() 调用相当复杂,因此我们每次只执行它的一个操作。
pthread_cond_wait() 所做的第一件事就是同时对互斥对象解锁(于是其它线程可以修改已链接列表),并等待条件 mycond 发生(这样当 pthread_cond_wait() 接收到另一个线程的“信号”时,它将苏醒)。现在互斥对象已被解锁,其它线程可以访问和修改已链接列表,可能还会添加项。
此时,pthread_cond_wait() 调用还未返回。对互斥对象解锁会立即发生,但等待条件 mycond 通常是一个阻塞操作,这意味着线程将睡眠,在它苏醒之前不会消耗 CPU 周期。这正是我们期待发生的情况。线程将一直睡眠,直到特定条件发生,在这期间不会发生任何浪费 CPU 时间的繁忙查询。从线程的角度来看,它只是在等待 pthread_cond_wait() 调用返回。
现在继续说明,假设另一个线程(称作“2 号线程”)锁定了 mymutex 并对已链接列表添加了一项。在对互斥对象解锁之后,2 号线程会立即调用函数 pthread_cond_broadcast(&mycond)。此操作之后,2 号线程将使所有等待 mycond 条件变量的线程立即苏醒。这意味着第一个线程(仍处于 pthread_cond_wait() 调用中)现在将苏醒。
现在,看一下第一个线程发生了什么。您可能会认为在 2 号线程调用 pthread_cond_broadcast(&mymutex) 之后,1 号线程的 pthread_cond_wait() 会立即返回。不是那样!实际上,pthread_cond_wait() 将执行最后一个操作:重新锁定 mymutex。一旦 pthread_cond_wait() 锁定了互斥对象,那么它将返回并允许 1 号线程继续执行。那时,它可以马上检查列表,查看它所感兴趣的更改。
那个过程非常复杂,因此让我们先来回顾一下。第一个线程首先调用:
pthread_mutex_lock(&mymutex); |
然后,它检查了列表。没有找到感兴趣的东西,于是它调用:
pthread_cond_wait(&mycond, &mymutex); |
然后,pthread_cond_wait() 调用在返回前执行许多操作:
pthread_mutex_unlock(&mymutex); |
它对 mymutex 解锁,然后进入睡眠状态,等待 mycond 以接收 POSIX 线程“信号”。一旦接收到“信号”(加引号是因为我们并不是在讨论传统的 UNIX 信号,而是来自 pthread_cond_signal() 或 pthread_cond_broadcast() 调用的信号),它就会苏醒。但 pthread_cond_wait() 没有立即返回 -- 它还要做一件事:重新锁定 mutex:
pthread_mutex_lock(&mymutex); |
pthread_cond_wait() 知道我们在查找 mymutex “背后”的变化,因此它继续操作,为我们锁定互斥对象,然后才返回。
现在已回顾了 pthread_cond_wait() 调用,您应该了解了它的工作方式。应该能够叙述 pthread_cond_wait() 依次执行的所有操作。尝试一下。如果理解了 pthread_cond_wait(),其余部分就相当容易,因此请重新阅读以上部分,直到记住为止。好,读完之后,能否告诉我在调用 pthread_cond_wait() 之 前,互斥对象必须处于什么状态?pthread_cond_wait() 调用返回之后,互斥对象处于什么状态?这两个问题的答案都是“锁定”。既然已经完全理解了 pthread_cond_wait() 调用,现在来继续研究更简单的东西 -- 初始化和真正的发送信号和广播进程。到那时,我们将会对包含了多线程工作队列的 C 代码了如指掌。
条件变量是一个需要初始化的真实数据结构。以下就初始化的方法。首先,定义或分配一个条件变量,如下所示:
pthread_cond_t mycond; |
然后,调用以下函数进行初始化:
pthread_cond_init(&mycond,NULL); |
瞧,初始化完成了!在释放或废弃条件变量之前,需要毁坏它,如下所示:
pthread_cond_destroy(&mycond); |
很简单吧。接着讨论 pthread_cond_wait() 调用。
一旦初始化了互斥对象和条件变量,就可以等待某个条件,如下所示:
pthread_cond_wait(&mycond, &mymutex); |
请注意,代码在逻辑上应该包含 mycond 和 mymutex。一个特定条件只能有一个互斥对象,而且条件变量应该表示互斥数据“内部”的一种特殊的条件更改。一个互斥对象可以用许多条件变量(例 如,cond_empty、cond_full、cond_cleanup),但每个条件变量只能有一个互斥对象。
对于发送信号和广播,需要注意一点。如果线程更改某些共享数据,而且它想要唤醒所有正在等待的线程,则应使用 pthread_cond_broadcast 调用,如下所示:
pthread_cond_broadcast(&mycond); |
在某些情况下,活动线程只需要唤醒第一个正在睡眠的线程。假设您只对队列添加了一个工作作业。那么只需要唤醒一个工作程序线程(再唤醒其它线程是不礼貌的!):
pthread_cond_signal(&mycond); |
此函数只唤醒一个线程。如果 POSIX 线程标准允许指定一个整数,可以让您唤醒一定数量的正在睡眠的线程,那就更完美了。但是很可惜,我没有被邀请参加会议。
我将演示如何创建多线程工作组。在这个方案中,我们创建了许多工作程序线程。每个线程都会检查 wq(“工作队列”),查看是否有需要完成的工作。如果有需要完成的工作,那么线程将从队列中除去一个节点,执行这些特定工作,然后等待新的工作到达。
与此同时,主线程负责创建这些工作程序线程、将工作添加到队列,然后在它退出时收集所有工作程序线程。您将会遇到许多 C 代码,好好准备吧!
需要队列是出于两个原因。首先,需要队列来保存工作作业。还需要可用于跟踪已终止线程的数据结构。还记得前几篇文章(请参阅本文结尾处的 参考资料)中,我曾提到过需要使用带有特定进程标识的 pthread_join 吗?使用“清除队列”(称作 "cq")可以解决无法等待 任何已终止线程的问题(稍后将详细讨论这个问题)。以下是标准队列代码。将此代码保存到文件 queue.h 和 queue.c:
/* queue.h ** Copyright 2000 Daniel Robbins, Gentoo Technologies, Inc. ** Author: Daniel Robbins ** Date: 16 Jun 2000 */ typedef struct node { struct node *next; } node; typedef struct queue { node *head, *tail; } queue; void queue_init(queue *myroot); void queue_put(queue *myroot, node *mynode); node *queue_get(queue *myroot); |
/* queue.c
** Copyright 2000 Daniel Robbins, Gentoo Technologies, Inc.
** Author: Daniel Robbins
** Date: 16 Jun 2000
**
** This set of queue functions was originally thread-aware. I
** redesigned the code to make this set of queue routines
** thread-ignorant (just a generic, boring yet very fast set of queue
** routines). Why the change? Because it makes more sense to have
** the thread support as an optional add-on. Consider a situation
** where you want to add 5 nodes to the queue. With the
** thread-enabled version, each call to queue_put() would
** automatically lock and unlock the queue mutex 5 times -- that's a
** lot of unnecessary overhead. However, by moving the thread stuff
** out of the queue routines, the caller can lock the mutex once at
** the beginning, then insert 5 items, and then unlock at the end.
** Moving the lock/unlock code out of the queue functions allows for
** optimizations that aren't possible otherwise. It also makes this
** code useful for non-threaded applications.
**
** We can easily thread-enable this data structure by using the
** data_control type defined in control.c and control.h. */
#include |
我编写的并不是线程安全的队列例程,事实上我创建了一个“数据包装”或“控制”结构,它可以是任何线程支持的数据结构。看一下 control.h:
#include typedef struct data_control { pthread_mutex_t mutex; pthread_cond_t cond; int active; } data_control; |
现在您看到了 data_control 结构定义,以下是它的视觉表示:
图像中的锁代表互斥对象,它允许对数据结构进行互斥访问。黄色的星代表条件变量,它可以睡眠,直到所讨论的数据结构改变为止。 on/off 开关表示整数 "active",它告诉线程此数据是否是活动的。在代码中,我使用整数 active 作为标志,告诉工作队列何时应该关闭。以下是 control.c:
/* control.c ** Copyright 2000 Daniel Robbins, Gentoo Technologies, Inc. ** Author: Daniel Robbins ** Date: 16 Jun 2000 ** ** These routines provide an easy way to make any type of ** data-structure thread-aware. Simply associate a data_control ** structure with the data structure (by creating a new struct, for ** example). Then, simply lock and unlock the mutex, or ** wait/signal/broadcast on the condition variable in the data_control ** structure as needed. ** ** data_control structs contain an int called "active". This int is ** intended to be used for a specific kind of multithreaded design, ** where each thread checks the state of "active" every time it locks ** the mutex. If active is 0, the thread knows that instead of doing ** its normal routine, it should stop itself. If active is 1, it ** should continue as normal. So, by setting active to 0, a ** controlling thread can easily inform a thread work crew to shut ** down instead of processing new jobs. Use the control_activate() ** and control_deactivate() functions, which will also broadcast on ** the data_control struct's condition variable, so that all threads ** stuck in pthread_cond_wait() will wake up, have an opportunity to ** notice the change, and then terminate. */ #include "control.h" int control_init(data_control *mycontrol) { int mystatus; if (pthread_mutex_init(&(mycontrol->mutex),NULL)) return 1; if (pthread_cond_init(&(mycontrol->cond),NULL)) return 1; mycontrol->active=0; return 0; } int control_destroy(data_control *mycontrol) { int mystatus; if (pthread_cond_destroy(&(mycontrol->cond))) return 1; if (pthread_cond_destroy(&(mycontrol->cond))) return 1; mycontrol->active=0; return 0; } int control_activate(data_control *mycontrol) { int mystatus; if (pthread_mutex_lock(&(mycontrol->mutex))) return 0; mycontrol->active=1; pthread_mutex_unlock(&(mycontrol->mutex)); pthread_cond_broadcast(&(mycontrol->cond)); return 1; } int control_deactivate(data_control *mycontrol) { int mystatus; if (pthread_mutex_lock(&(mycontrol->mutex))) return 0; mycontrol->active=0; pthread_mutex_unlock(&(mycontrol->mutex)); pthread_cond_broadcast(&(mycontrol->cond)); return 1; } |
在开始调试之前,还需要一个文件。以下是 dbug.h:
#define dabort() \ { printf("Aborting at line %d in source file %s\n",__LINE__,__FILE__); abort(); } |
此代码用于处理工作组代码中的不可纠正错误。
说到工作组代码,以下就是:
#include |
现在来快速初排代码。定义的第一个结构称作 "wq",它包含了 data_control 和队列头。data_control 结构用于仲裁对整个队列的访问,包括队列中的节点。下一步工作是定义实际的工作节点。要使代码符合本文中的示例,此处所包含的都是作业号。
接着,创建清除队列。注释说明了它的工作方式。好,现在让我们跳过 threadfunc()、join_threads()、create_threads() 和 initialize_structs() 调用,直接跳到 main()。所做的第一件事就是初始化结构 -- 这包括初始化 data_controls 和队列,以及激活工作队列。
现在初始化线程。如果看一下 create_threads() 调用,似乎一切正常 -- 除了一件事。请注意,我们正在分配清除节点,以及初始化它的线程号和 TID 组件。我们还将清除节点作为初始自变量传递给每一个新的工作程序线程。为什么这样做?
因为当某个工作程序线程退出时,它会将其清除节点连接到清除队列,然后终止。那时,主线程会在清除队列中检测到这个节点(利用条件变 量),并将这个节点移出队列。因为 TID(线程标识)存储在清除节点中,所以主线程可以确切知道哪个线程已终止了。然后,主线程将调用 pthread_join(tid),并联接适当的工作程序线程。如果没有做记录,那么主线程就需要按任意顺序联接工作程序线程,可能是按它们的创建顺 序。由于线程不一定按此顺序终止,那么主线程可能会在已经联接了十个线程时,等待联接另一个线程。您能理解这种设计决策是如何使关闭代码加速的吗(尤其在 使用几百个工作程序线程的情况下)?
我们已启动了工作程序线程(它们已经完成了执行 threadfunc(),稍后将讨论此函数),现在主线程开始将工作节点插入工作队列。首先,它锁定 wq 的控制互斥对象,然后分配 16000 个工作包,将它们逐个插入队列。完成之后,将调用 pthread_cond_broadcast(),于是所有正在睡眠的线程会被唤醒,并开始执行工作。此时,主线程将睡眠两秒钟,然后释放工作队列,并 通知工作程序线程终止活动。接着,主线程会调用 join_threads() 函数来清除所有工作程序线程。
现在来讨论 threadfunc(),这是所有工作程序线程都要执行的代码。当工作程序线程启动时,它会立即锁定工作队列互斥对象,获取一个工作节点(如果有的 话),然后对它进行处理。如果没有工作,则调用 pthread_cond_wait()。您会注意到这个调用在一个非常紧凑的 while() 循环中,这是非常重要的。当从 pthread_cond_wait() 调用中苏醒时,决不能认为条件肯定发生了 -- 它 可能发生了,也可能没有发生。如果发生了这种情况,即错误地唤醒了线程,而列表是空的,那么 while 循环将再次调用 pthread_cond_wait()。
如果有一个工作节点,那么我们只打印它的作业号,释放它并退出。然而,实际代码会执行一些更实质性的操作。在 while() 循环结尾,我们锁定了互斥对象,以便检查 active 变量,以及在循环顶部检查新的工作节点。如果执行完此代码,就会发现如果 wq.control.active 是 0,while 循环就会终止,并会执行 threadfunc() 结尾处的清除代码。
工作程序线程的清除代码部件非常有趣。首先,由于 pthread_cond_wait() 返回了锁定的互斥对象,它会对 work_queue 解锁。然后,它锁定清除队列,添加清除代码(包含了 TID,主线程将使用此 TID 来调用 pthread_join()),然后再对清除队列解锁。此后,它发信号给所有 cq 等待者 (pthread_cond_signal(&cq.control.cond)),于是主线程就知道有一个待处理的新节点。我们不使用 pthread_cond_broadcast(),因为没有这个必要 -- 只有一个线程(主线程)在等待清除队列中的新节点。当它调用 join_threads() 时,工作程序线程将打印关闭消息,然后终止,等待主线程发出的 pthread_join() 调用。
如果要查看关于如何使用条件变量的简单示例,请参考 join_threads() 函数。如果还有工作程序线程,join_threads() 会一直执行,等待清除队列中新的清除节点。如果有新节点,我们会将此节点移出队列、对清除队列解锁(从而使工作程序可以添加清除节点)、联接新的工作程序 线程(使用存储在清除节点中的 TID)、释放清除节点、减少“现有”线程的数量,然后继续。