Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1725792
  • 博文数量: 98
  • 博客积分: 667
  • 博客等级: 上士
  • 技术积分: 1631
  • 用 户 组: 普通用户
  • 注册时间: 2009-04-27 15:59
个人简介

一沙一世界 一树一菩提

文章分类

全部博文(98)

文章存档

2021年(8)

2020年(16)

2019年(8)

2017年(1)

2016年(11)

2015年(17)

2014年(9)

2013年(4)

2012年(19)

2011年(1)

2009年(4)

分类: C/C++

2014-11-28 17:38:08

这一星期看了一本书:posix 多线程程序设计。在这本书的最后是一个关于工作队列的程序,感觉写的不错。在此做一些记号,以便以后方便自己,也方便大家。
这个工作队列一共分2部分,一部分是是工作队列部分,另外一部分是调用队列部分。我们先说工作队列部分。
这个工作队列使用回调函数实现上层的具体功能,并且参数也是上层传递的参数。感觉是经典的回调方式,把上层的功能和队列层的实现完全分开。所以这个队列很有实际使用意义。下面看代码,听小弟一一道来,说的不对的地方大家多多提醒。
typedef struct workq_ele_tag {
    struct workq_ele_tag        *next;
    void                        *data;
} workq_ele_t;
这是工作队列里的元素,每一项具体工作就用这个表示的,其中data是调用层传递的参数地址。
typedef struct workq_tag {
    pthread_mutex_t     mutex;
    pthread_cond_t      cv;             /* wait for work */
    pthread_attr_t      attr;           /* create detached threads */
    workq_ele_t         *first, *last;  /* work queue */
    int                 valid;          /* set when valid */
    int                 quit;           /* set when workq should quit */
    int                 parallelism;    /* number of threads required */
    int                 counter;        /* current number of threads */
    int                 idle;           /* number of idle threads */
    void                (*engine)(void *arg);   /* user engine */
} workq_t;
这是整个工作队列的表示。
其中的first和last是工作队列的头和 尾,在具体执行的时候就是把first取出来然后执行特定的操作。
pthread_attr_t是为了创建线程的时候方便,它用于设置线程是detach状态,以便线程退出的时候,我们就不用再去考虑它的回收问题。最后一行的engine函数,就是调用层传递过来的具体函数。quit表示是否这个工作队列是否退出, parallelism是这个队列最大可以启动的线程数,counter是目前运行的线程数,idle是目前休眠在条件变量上的线程数。最上面的2个是条件变量和互斥量。
/*
 * Initialize a work queue.
* 这个函数使调用层调用来初始化工作队列的, 第二个参数表示这个队列的线程最大数,第三个参数是具体执行时候的回调函数
 */
int workq_init (workq_t *wq, int threads, void (*engine)(void *arg))
{
    int status;


    status = pthread_attr_init (&wq->attr);
    if (status != 0)
        return status;
    status = pthread_attr_setdetachstate (
        &wq->attr, PTHREAD_CREATE_DETACHED);//设置线程自己处理结束以后的回收问题
    if (status != 0) {
        pthread_attr_destroy (&wq->attr);
        return status;
    }
    status = pthread_mutex_init (&wq->mutex, NULL);
    if (status != 0) {
        pthread_attr_destroy (&wq->attr);
        return status;
    }
    status = pthread_cond_init (&wq->cv, NULL);
    if (status != 0) {
        pthread_mutex_destroy (&wq->mutex);
        pthread_attr_destroy (&wq->attr);
        return status;
    }
    wq->quit = 0;                       /* not time to quit */
    wq->first = wq->last = NULL;        /* no queue entries */
    wq->parallelism = threads;          /* max servers */
    wq->counter = 0;                    /* no server threads yet */
    wq->idle = 0;                       /* no idle servers */
    wq->engine = engine;
    wq->valid = WORKQ_VALID;  //设置队列有效
    return 0;
}
经过上面初始化,这个队列等待具体队列元素的插入。
/*
 * Add an item to a work queue.
*这个也是上层调用的函数,第一个参数和初始化的时候传递的第一个参数一样,第二个参数是void类型,也就是说我们可以传入任何类型的参数,多个参数可以使用结构体。
 */
int workq_add (workq_t *wq, void *element)
{
    workq_ele_t *item;
    pthread_t id;
    int status;


    if (wq->valid != WORKQ_VALID)//检查队列是否有效初始化
        return EINVAL;


    /*
     * Create and initialize a request structure.
     */
    item = (workq_ele_t *)malloc (sizeof (workq_ele_t));
    if (item == NULL)
        return ENOMEM;
    item->data = element;//上层传递过来的第二个参数
    item->next = NULL;
    status = pthread_mutex_lock (&wq->mutex);//操作工作队列,要先锁住,然后添加队列项
    if (status != 0)
{
        free (item);
        return status;
    }


    /*
     * Add the request to the end of the queue, updating the
     * first and last pointers.
     */
    if (wq->first == NULL) //空队列
        wq->first = item;
    else
        wq->last->next = item;
    wq->last = item;


    /*
     * if any threads are idling, wake one.
     */
    if (wq->idle > 0) //如果当前有空闲线程,唤醒即可
{
        status = pthread_cond_signal (&wq->cv);
        if (status != 0) 
{
            pthread_mutex_unlock (&wq->mutex);
            return status;
        }
    } 
else if (wq->counter < wq->parallelism) //没有空闲队列,当前已经存在的线程数小于最大线程数,接下来就是建立一个线程,把队列作为参数传给她
    {
        /*
         * If there were no idling threads, and we're allowed to
         * create a new thread, do so.
         */
        DPRINTF (("Creating new worker\n"));
        status = pthread_create(&id, &wq->attr, workq_server, (void*)wq);
        if (status != 0) 
{
            pthread_mutex_unlock (&wq->mutex);
            return status;
        }
        wq->counter++;  //新建了每一个线程,那当前存在的线程数自然+1
    }
    pthread_mutex_unlock (&wq->mutex);
    return 0;
}
下面我们来看看具体的新建线程:


/*
 * Thread start routine to serve the work queue.
 */
static void *workq_server (void *arg)
{
    struct timespec timeout;
    workq_t *wq = (workq_t *)arg;
    workq_ele_t *we;
    int status, timedout;


    /*
     * We don't need to validate the workq_t here... we don't
     * create server threads until requests are queued (the
     * queue has been initialized by then!) and we wait for all
     * server threads to terminate before destroying a work
     * queue.
     */
    DPRINTF (("A worker is starting\n"));
    status = pthread_mutex_lock (&wq->mutex);
    if (status != 0)
        return NULL;


    while (1) //如果队列项多,可以循环执行
{
        timedout = 0;
        DPRINTF (("Worker waiting for work\n"));
        clock_gettime (CLOCK_REALTIME, &timeout);
        timeout.tv_sec += 2; //在条件变量上等待的时候,加超时


        while (wq->first == NULL && !wq->quit) //没有具体队列项,并且不退出,这里等待的就是上层程序往队列里添加工作项
{
            /*
             * Server threads time out after spending 2 seconds
             * waiting for new work, and exit.
             */
            status = pthread_cond_timedwait(&wq->cv, &wq->mutex, &timeout);//在条件变量上等待,2s的超时
            if (status == ETIMEDOUT) 
{
                DPRINTF (("Worker wait timed out\n"));//这里表超时,置超时标志
                timedout = 1;
                break;/* 这里注意,这里使用break,是为了检测是否超时,也就是说timedwait返回以后,没有继续检测条件,所以这里有空闲窗口,理想的形式应该是:while(假)
                                                    wait()     
*/                                            
            } 
else if (status != 0) 
{
                /*
                 * 原文说:这里出错的概率很微小,但是有并发的概率
                 *如果出错,线程会返回,允许其它线程来处理。
                *但是注意:如果只有一个线程的话,退出以后就不会执行
                *直到另外一个队列项插入以后,才会执行                
                 */    DPRINTF (("Worker wait failed, %d (%s)\n", status, strerror (status)));
                wq->counter--;
                pthread_mutex_unlock (&wq->mutex);
                return NULL;
            }
        }
        DPRINTF (("Work queue: %#lx, quit: %d\n", wq->first, wq->quit));
        we = wq->first;//再次检查是否有队列项
        if (we != NULL) //从wait返回,再次获得锁,才能操作队列,把最前面的一项取出来,执行回调函数,这里要注意回调函数必须是可重入的。
{
            wq->first = we->next;
            if (wq->last == we)
                wq->last = NULL;
            status = pthread_mutex_unlock (&wq->mutex);
            if (status != 0)
                return NULL;
            DPRINTF (("Worker calling engine\n"));
            wq->engine (we->data);
            free (we);
            status = pthread_mutex_lock (&wq->mutex);//执行完了再次获得锁,往下进行
            if (status != 0)
                return NULL;
        }
        /*
         * If there are no more work requests, and the servers
         * have been asked to quit, then shut down.
         */
        if (wq->first == NULL && wq->quit)//检查是否退出,并且队列没有工作项
{
            DPRINTF (("Worker shutting down\n"));
            wq->counter--;


            /*
             * NOTE: Just to prove that every rule has an
             * exception, I'm using the "cv" condition for two
             * separate predicates here.  That's OK, since the
             * case used here applies only once during the life
             * of a work queue -- during rundown. The overhead
             * is minimal and it's not worth creating a separate
             * condition variable that would be waited and
             * signaled exactly once!
             */
            if (wq->counter == 0)//当前线程是处理最后一个队列项的线程,并且destroy正在wait等待,所以唤醒它,destroy下面说
                pthread_cond_broadcast (&wq->cv);
            pthread_mutex_unlock (&wq->mutex);
            return NULL;
        }
        /*
         * If there's no more work, and we wait for as long as
         * we're allowed, then terminate this server thread.
         */
        if (wq->first == NULL && timedout) //队列上没有工作项,并且上面wait是超时返回的,线程退出循环,存在的线程数-1,线程返回。线程返回不能使用exit,否则整个进程会退出
{
            DPRINTF (("engine terminating due to timeout.\n"));
            wq->counter--;
            break;
        }
    }


    pthread_mutex_unlock (&wq->mutex);
    DPRINTF (("Worker exiting\n"));
    return NULL;
}

/*销毁队列*/
int workq_destroy (workq_t *wq)
{
    int status, status1, status2;


    if (wq->valid != WORKQ_VALID)
        return EINVAL;
    status = pthread_mutex_lock (&wq->mutex);//获得队列锁,
    if (status != 0)
        return status;
    wq->valid = 0;                 /* prevent any other operations设置退出控制位 */


    /*
     * Check whether any threads are active, and run them down:
     *
     * 1.       set the quit flag
     * 2.       broadcast to wake any servers that may be asleep
     * 4.       wait for all threads to quit (counter goes to 0)
     *          Because we don't use join, we don't need to worry
     *          about tracking thread IDs.
     */
    if (wq->counter > 0) 
{
        wq->quit = 1;
        /* if any threads are idling, wake them. */
        if (wq->idle > 0) //有空闲线程在wait,在条件变量上广播,
{
            status = pthread_cond_broadcast (&wq->cv);
            if (status != 0) 
{
                pthread_mutex_unlock (&wq->mutex);
                return status;
            }
        }


        /*
         * Just to prove that every rule has an exception, I'm
         * using the "cv" condition for two separate predicates
         * here. That's OK, since the case used here applies
         * only once during the life of a work queue -- during
         * rundown. The overhead is minimal and it's not worth
         * creating a separate condition variable that would be
         * waited and signalled exactly once!
         */
        while (wq->counter > 0) //等待还在执行的线程结束
{
            status = pthread_cond_wait (&wq->cv, &wq->mutex);
            if (status != 0) 
{
                pthread_mutex_unlock (&wq->mutex);
                return status;
            }
        }       
    }



    status = pthread_mutex_unlock (&wq->mutex);
    if (status != 0)
        return status;
    status = pthread_mutex_destroy (&wq->mutex);
    status1 = pthread_cond_destroy (&wq->cv);
    status2 = pthread_attr_destroy (&wq->attr);
    return (status ? status : (status1 ? status1 : status2));
}

下面是上层的几个函数


int main (int argc, char *argv[])
{
    pthread_t thread_id;
    engine_t *engine;
    int count = 0, calls = 0;
    int status;


    status = pthread_key_create (&engine_key, destructor);//舍子线程私有数据的键值,然后各个线程都可以使用,并且是线程自己所有,同名,内容不同,destructor函数使线程结束时调用的函数
    if (status != 0)
        err_abort (status, "Create key");
    status = workq_init (&workq, 1, engine_routine);//这里中间的参数本来是4,我为了看下流程,顺便检测下线程私有数据二修改成1的,第三个参数是传递给工作队列的回调函数。也就是当我们在队列里添加工作项的饿时候,会触发这个函数的执行
    if (status != 0)
        err_abort (status, "Init work queue");
    status = pthread_create (&thread_id, NULL, thread_routine, NULL);//创建线程
    if (status != 0)
        err_abort (status, "Create thread");
//    (void)thread_routine (NULL);
    status = pthread_join (thread_id, NULL);//等待线程结束,专用于等待上面刚刚创建的线程
    if (status != 0)
        err_abort (status, "Join thread");
    status = workq_destroy (&workq);
    if (status != 0)
        err_abort (status, "Destroy work queue");


    /*
     * By now, all of the engine_t structures have been placed
     * on the list (by the engine thread destructors), so we
     * can count and summarize them.
     */
    engine = engine_list_head;
    while (engine != NULL) //这里是为了查看每个工作队列项处理了几个请求项
{
        count++;
        calls += engine->calls;
        printf ("engine %d: %d calls\n", count, engine->calls);
        engine = engine->link;
    }
    printf ("%d engine threads processed %d calls\n", count, calls);
    return 0;
}
先看看主函数创建的函数:
/*
 * Thread start routine that issues work queue requests.
 */
void *thread_routine (void *arg)//这个线程和工作队列的线程没有一点关系,主要就一句:
{
    power_t *element;
    int count;
    unsigned int seed = (unsigned int)time (NULL);
    int status;


    /*
     * Loop, making requests.
     */
    for (count = 0; count < ITERATIONS; count++) 
{
        element = (power_t*)malloc (sizeof (power_t));
        if (element == NULL)
            errno_abort ("Allocate element");

        element->value = rand_r (&seed) % 20;
        element->power = rand_r (&seed) % 7;

        DPRINTF (("Request: %d^%d\n", element->value, element->power));
        status = workq_add (&workq, (void*)element);// 就是这里,循环往队列添加几个工作项,工作项添加以后,就会触发队列上的回调函数
        if (status != 0)
            err_abort (status, "Add to work queue");

//        sleep (rand_r (&seed) % 5);
sleep(1);
    }
    return NULL;
}
接下来,我们看看回调函数:
回调函数也很简单,首先不要考虑pthread_getspecific和pthread_setspecific,还有engine结构有关的操作,这三部分和这个工作队列本身没有关系。所以有效回调函数就是这个函数最下面的计算乘积的部分。这部分应该是我们使用的时候要求改的部分,把我们的代码替换计算这部分,那么程序执行起来就是执行我们自己的代码了啊。另外如果我们传递的具体工作项里面有自己的线程函数,那么每个工作项就可以执行单独的函数了。
void engine_routine (void *arg)
{
    engine_t *engine;
    power_t *power = (power_t*)arg;
    int result, count;
    int status;


    engine = pthread_getspecific (engine_key);

    if (engine == NULL) 
{
        engine = (engine_t*)malloc (sizeof (engine_t));
        status = pthread_setspecific(engine_key, (void*)engine);
        if (status != 0)
            err_abort (status, "Set tsd");

        engine->thread_id = pthread_self ();
        engine->calls = 1;
    } 
else
        engine->calls++;

    result = 1;
    printf ("Engine: computing %d^%d\n", power->value, power->power);
    for (count = 1; count <= power->power; count++)
        result *= power->value;
    free (arg);
}

最后,我们说说线程私有数据。就像上面那样用的话,我们是在回调函数里set和get,回调函数退出,这个是不受影响的。因为开始建立的key是全局的,我们所使用的数据,不管是set还是get都是和这个key有关的,。结合上面代码,所以只有线程退出时候,才执行destructor函数,用于把有计数的结构存放到全局表里,然后进程退出的时候,把各个线程执行的次数打印出来:

上面截图是我设置的一个工作队列线程,4个工作项的执行情况,只要一个线程不退,线程私有数据就会一直有效,而和回调函数执行完没有关系,也就是说线程私有数据属于线程级别的。
阅读(3150) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~