这一星期看了一本书:posix 多线程程序设计。在这本书的最后是一个关于工作队列的程序,感觉写的不错。在此做一些记号,以便以后方便自己,也方便大家。
这个工作队列一共分2部分,一部分是是工作队列部分,另外一部分是调用队列部分。我们先说工作队列部分。
这个工作队列使用回调函数实现上层的具体功能,并且参数也是上层传递的参数。感觉是经典的回调方式,把上层的功能和队列层的实现完全分开。所以这个队列很有实际使用意义。下面看代码,听小弟一一道来,说的不对的地方大家多多提醒。
typedef struct workq_ele_tag {
struct workq_ele_tag *next;
void *data;
} workq_ele_t;
这是工作队列里的元素,每一项具体工作就用这个表示的,其中data是调用层传递的参数地址。
typedef struct workq_tag {
pthread_mutex_t mutex;
pthread_cond_t cv; /* wait for work */
pthread_attr_t attr; /* create detached threads */
workq_ele_t *first, *last; /* work queue */
int valid; /* set when valid */
int quit; /* set when workq should quit */
int parallelism; /* number of threads required */
int counter; /* current number of threads */
int idle; /* number of idle threads */
void (*engine)(void *arg); /* user engine */
} workq_t;
这是整个工作队列的表示。
其中的first和last是工作队列的头和 尾,在具体执行的时候就是把first取出来然后执行特定的操作。
pthread_attr_t是为了创建线程的时候方便,它用于设置线程是detach状态,以便线程退出的时候,我们就不用再去考虑它的回收问题。最后一行的engine函数,就是调用层传递过来的具体函数。quit表示是否这个工作队列是否退出, parallelism是这个队列最大可以启动的线程数,counter是目前运行的线程数,idle是目前休眠在条件变量上的线程数。最上面的2个是条件变量和互斥量。
/*
* Initialize a work queue.
* 这个函数使调用层调用来初始化工作队列的, 第二个参数表示这个队列的线程最大数,第三个参数是具体执行时候的回调函数
*/
int workq_init (workq_t *wq, int threads, void (*engine)(void *arg))
{
int status;
status = pthread_attr_init (&wq->attr);
if (status != 0)
return status;
status = pthread_attr_setdetachstate (
&wq->attr, PTHREAD_CREATE_DETACHED);//设置线程自己处理结束以后的回收问题
if (status != 0) {
pthread_attr_destroy (&wq->attr);
return status;
}
status = pthread_mutex_init (&wq->mutex, NULL);
if (status != 0) {
pthread_attr_destroy (&wq->attr);
return status;
}
status = pthread_cond_init (&wq->cv, NULL);
if (status != 0) {
pthread_mutex_destroy (&wq->mutex);
pthread_attr_destroy (&wq->attr);
return status;
}
wq->quit = 0; /* not time to quit */
wq->first = wq->last = NULL; /* no queue entries */
wq->parallelism = threads; /* max servers */
wq->counter = 0; /* no server threads yet */
wq->idle = 0; /* no idle servers */
wq->engine = engine;
wq->valid = WORKQ_VALID; //设置队列有效
return 0;
}
经过上面初始化,这个队列等待具体队列元素的插入。
/*
* Add an item to a work queue.
*这个也是上层调用的函数,第一个参数和初始化的时候传递的第一个参数一样,第二个参数是void类型,也就是说我们可以传入任何类型的参数,多个参数可以使用结构体。
*/
int workq_add (workq_t *wq, void *element)
{
workq_ele_t *item;
pthread_t id;
int status;
if (wq->valid != WORKQ_VALID)//检查队列是否有效初始化
return EINVAL;
/*
* Create and initialize a request structure.
*/
item = (workq_ele_t *)malloc (sizeof (workq_ele_t));
if (item == NULL)
return ENOMEM;
item->data = element;//上层传递过来的第二个参数
item->next = NULL;
status = pthread_mutex_lock (&wq->mutex);//操作工作队列,要先锁住,然后添加队列项
if (status != 0)
{
free (item);
return status;
}
/*
* Add the request to the end of the queue, updating the
* first and last pointers.
*/
if (wq->first == NULL) //空队列
wq->first = item;
else
wq->last->next = item;
wq->last = item;
/*
* if any threads are idling, wake one.
*/
if (wq->idle > 0) //如果当前有空闲线程,唤醒即可
{
status = pthread_cond_signal (&wq->cv);
if (status != 0)
{
pthread_mutex_unlock (&wq->mutex);
return status;
}
}
else if (wq->counter < wq->parallelism) //没有空闲队列,当前已经存在的线程数小于最大线程数,接下来就是建立一个线程,把队列作为参数传给她
{
/*
* If there were no idling threads, and we're allowed to
* create a new thread, do so.
*/
DPRINTF (("Creating new worker\n"));
status = pthread_create(&id, &wq->attr, workq_server, (void*)wq);
if (status != 0)
{
pthread_mutex_unlock (&wq->mutex);
return status;
}
wq->counter++; //新建了每一个线程,那当前存在的线程数自然+1
}
pthread_mutex_unlock (&wq->mutex);
return 0;
}
下面我们来看看具体的新建线程:
/*
* Thread start routine to serve the work queue.
*/
static void *workq_server (void *arg)
{
struct timespec timeout;
workq_t *wq = (workq_t *)arg;
workq_ele_t *we;
int status, timedout;
/*
* We don't need to validate the workq_t here... we don't
* create server threads until requests are queued (the
* queue has been initialized by then!) and we wait for all
* server threads to terminate before destroying a work
* queue.
*/
DPRINTF (("A worker is starting\n"));
status = pthread_mutex_lock (&wq->mutex);
if (status != 0)
return NULL;
while (1) //如果队列项多,可以循环执行
{
timedout = 0;
DPRINTF (("Worker waiting for work\n"));
clock_gettime (CLOCK_REALTIME, &timeout);
timeout.tv_sec += 2; //在条件变量上等待的时候,加超时
while (wq->first == NULL && !wq->quit) //没有具体队列项,并且不退出,这里等待的就是上层程序往队列里添加工作项
{
/*
* Server threads time out after spending 2 seconds
* waiting for new work, and exit.
*/
status = pthread_cond_timedwait(&wq->cv, &wq->mutex, &timeout);//在条件变量上等待,2s的超时
if (status == ETIMEDOUT)
{
DPRINTF (("Worker wait timed out\n"));//这里表超时,置超时标志
timedout = 1;
break;/* 这里注意,这里使用break,是为了检测是否超时,也就是说timedwait返回以后,没有继续检测条件,所以这里有空闲窗口,理想的形式应该是:while(假)
wait()
*/
}
else if (status != 0)
{
/*
* 原文说:这里出错的概率很微小,但是有并发的概率
*如果出错,线程会返回,允许其它线程来处理。
*但是注意:如果只有一个线程的话,退出以后就不会执行
*直到另外一个队列项插入以后,才会执行
*/ DPRINTF (("Worker wait failed, %d (%s)\n", status, strerror (status)));
wq->counter--;
pthread_mutex_unlock (&wq->mutex);
return NULL;
}
}
DPRINTF (("Work queue: %#lx, quit: %d\n", wq->first, wq->quit));
we = wq->first;//再次检查是否有队列项
if (we != NULL) //从wait返回,再次获得锁,才能操作队列,把最前面的一项取出来,执行回调函数,这里要注意回调函数必须是可重入的。
{
wq->first = we->next;
if (wq->last == we)
wq->last = NULL;
status = pthread_mutex_unlock (&wq->mutex);
if (status != 0)
return NULL;
DPRINTF (("Worker calling engine\n"));
wq->engine (we->data);
free (we);
status = pthread_mutex_lock (&wq->mutex);//执行完了再次获得锁,往下进行
if (status != 0)
return NULL;
}
/*
* If there are no more work requests, and the servers
* have been asked to quit, then shut down.
*/
if (wq->first == NULL && wq->quit)//检查是否退出,并且队列没有工作项
{
DPRINTF (("Worker shutting down\n"));
wq->counter--;
/*
* NOTE: Just to prove that every rule has an
* exception, I'm using the "cv" condition for two
* separate predicates here. That's OK, since the
* case used here applies only once during the life
* of a work queue -- during rundown. The overhead
* is minimal and it's not worth creating a separate
* condition variable that would be waited and
* signaled exactly once!
*/
if (wq->counter == 0)//当前线程是处理最后一个队列项的线程,并且destroy正在wait等待,所以唤醒它,destroy下面说
pthread_cond_broadcast (&wq->cv);
pthread_mutex_unlock (&wq->mutex);
return NULL;
}
/*
* If there's no more work, and we wait for as long as
* we're allowed, then terminate this server thread.
*/
if (wq->first == NULL && timedout) //队列上没有工作项,并且上面wait是超时返回的,线程退出循环,存在的线程数-1,线程返回。线程返回不能使用exit,否则整个进程会退出
{
DPRINTF (("engine terminating due to timeout.\n"));
wq->counter--;
break;
}
}
pthread_mutex_unlock (&wq->mutex);
DPRINTF (("Worker exiting\n"));
return NULL;
}
/*销毁队列*/
int workq_destroy (workq_t *wq)
{
int status, status1, status2;
if (wq->valid != WORKQ_VALID)
return EINVAL;
status = pthread_mutex_lock (&wq->mutex);//获得队列锁,
if (status != 0)
return status;
wq->valid = 0; /* prevent any other operations
设置退出控制位 */
/*
* Check whether any threads are active, and run them down:
*
* 1. set the quit flag
* 2. broadcast to wake any servers that may be asleep
* 4. wait for all threads to quit (counter goes to 0)
* Because we don't use join, we don't need to worry
* about tracking thread IDs.
*/
if (wq->counter > 0)
{
wq->quit = 1;
/* if any threads are idling, wake them. */
if (wq->idle > 0) //有空闲线程在wait,在条件变量上广播,
{
status = pthread_cond_broadcast (&wq->cv);
if (status != 0)
{
pthread_mutex_unlock (&wq->mutex);
return status;
}
}
/*
* Just to prove that every rule has an exception, I'm
* using the "cv" condition for two separate predicates
* here. That's OK, since the case used here applies
* only once during the life of a work queue -- during
* rundown. The overhead is minimal and it's not worth
* creating a separate condition variable that would be
* waited and signalled exactly once!
*/
while (wq->counter > 0) //等待还在执行的线程结束
{
status = pthread_cond_wait (&wq->cv, &wq->mutex);
if (status != 0)
{
pthread_mutex_unlock (&wq->mutex);
return status;
}
}
}
status = pthread_mutex_unlock (&wq->mutex);
if (status != 0)
return status;
status = pthread_mutex_destroy (&wq->mutex);
status1 = pthread_cond_destroy (&wq->cv);
status2 = pthread_attr_destroy (&wq->attr);
return (status ? status : (status1 ? status1 : status2));
}
下面是上层的几个函数
int main (int argc, char *argv[])
{
pthread_t thread_id;
engine_t *engine;
int count = 0, calls = 0;
int status;
status = pthread_key_create (&engine_key, destructor);//舍子线程私有数据的键值,然后各个线程都可以使用,并且是线程自己所有,同名,内容不同,destructor函数使线程结束时调用的函数
if (status != 0)
err_abort (status, "Create key");
status = workq_init (&workq, 1, engine_routine);//这里中间的参数本来是4,我为了看下流程,顺便检测下线程私有数据二修改成1的,第三个参数是传递给工作队列的回调函数。也就是当我们在队列里添加工作项的饿时候,会触发这个函数的执行
if (status != 0)
err_abort (status, "Init work queue");
status = pthread_create (&thread_id, NULL, thread_routine, NULL);//创建线程
if (status != 0)
err_abort (status, "Create thread");
// (void)thread_routine (NULL);
status = pthread_join (thread_id, NULL);//等待线程结束,专用于等待上面刚刚创建的线程
if (status != 0)
err_abort (status, "Join thread");
status = workq_destroy (&workq);
if (status != 0)
err_abort (status, "Destroy work queue");
/*
* By now, all of the engine_t structures have been placed
* on the list (by the engine thread destructors), so we
* can count and summarize them.
*/
engine = engine_list_head;
while (engine != NULL) //这里是为了查看每个工作队列项处理了几个请求项
{
count++;
calls += engine->calls;
printf ("engine %d: %d calls\n", count, engine->calls);
engine = engine->link;
}
printf ("%d engine threads processed %d calls\n", count, calls);
return 0;
}
先看看主函数创建的函数:
/*
* Thread start routine that issues work queue requests.
*/
void *thread_routine (void *arg)//这个线程和工作队列的线程没有一点关系,主要就一句:
{
power_t *element;
int count;
unsigned int seed = (unsigned int)time (NULL);
int status;
/*
* Loop, making requests.
*/
for (count = 0; count < ITERATIONS; count++)
{
element = (power_t*)malloc (sizeof (power_t));
if (element == NULL)
errno_abort ("Allocate element");
element->value = rand_r (&seed) % 20;
element->power = rand_r (&seed) % 7;
DPRINTF (("Request: %d^%d\n", element->value, element->power));
status = workq_add (&workq, (void*)element);// 就是这里,循环往队列添加几个工作项,工作项添加以后,就会触发队列上的回调函数
if (status != 0)
err_abort (status, "Add to work queue");
// sleep (rand_r (&seed) % 5);
sleep(1);
}
return NULL;
}
接下来,我们看看回调函数:
回调函数也很简单,首先不要考虑
pthread_getspecific和pthread_setspecific,还有engine结构有关的操作,这三部分和这个工作队列本身没有关系。所以有效回调函数就是这个函数最下面的计算乘积的部分。这部分应该是我们使用的时候要求改的部分,把我们的代码替换计算这部分,那么程序执行起来就是执行我们自己的代码了啊。另外如果我们传递的具体工作项里面有自己的线程函数,那么每个工作项就可以执行单独的函数了。
void engine_routine (void *arg)
{
engine_t *engine;
power_t *power = (power_t*)arg;
int result, count;
int status;
engine = pthread_getspecific (engine_key);
if (engine == NULL)
{
engine = (engine_t*)malloc (sizeof (engine_t));
status = pthread_setspecific(engine_key, (void*)engine);
if (status != 0)
err_abort (status, "Set tsd");
engine->thread_id = pthread_self ();
engine->calls = 1;
}
else
engine->calls++;
result = 1;
printf ("Engine: computing %d^%d\n", power->value, power->power);
for (count = 1; count <= power->power; count++)
result *= power->value;
free (arg);
}
最后,我们说说线程私有数据。就像上面那样用的话,我们是在回调函数里set和get,回调函数退出,这个是不受影响的。因为开始建立的key是全局的,我们所使用的数据,不管是set还是get都是和这个key有关的,。结合上面代码,所以只有线程退出时候,才执行
destructor函数,用于把有计数的结构存放到全局表里,然后进程退出的时候,把各个线程执行的次数打印出来:
上面截图是我设置的一个工作队列线程,4个工作项的执行情况,只要一个线程不退,线程私有数据就会一直有效,而和回调函数执行完没有关系,也就是说线程私有数据属于线程级别的。