Chinaunix首页 | 论坛 | 博客
  • 博客访问: 43232
  • 博文数量: 17
  • 博客积分: 26
  • 博客等级: 民兵
  • 技术积分: 100
  • 用 户 组: 普通用户
  • 注册时间: 2011-05-09 18:04
文章分类

全部博文(17)

文章存档

2016年(1)

2012年(8)

2011年(8)

分类:

2011-09-19 16:31:43

原文地址:一个Linux下C线程池的实现 作者:ebayboy

什么时候需要创建线程池呢?简单的说,如果一个应用需要频繁的创建和销毁线程,而任务执行的时间又非常短,这样线程创建和销毁的带来的开销就不容忽视,这时也是线程池该出场的机会了。如果线程创建和销毁时间相比任务执行时间可以忽略不计,则没有必要使用线程池了。

   下面是Linux系统下用C语言创建的一个线程池。线程池会维护一个任务链表(每个CThread_worker结构就是一个任务)。
   pool_init()函数预先创建好max_thread_num个线程,每个线程执thread_routine ()函数。该函数中
  1. while (pool->cur_queue_size == 0)
  2. {
  3.        pthread_cond_wait (&(pool->queue_ready),&(pool->queue_lock));
  4. }
表示如果任务链表中没有任务,则该线程出于阻塞等待状态。否则从队列中取出任务并执行。
  
   pool_add_worker()函数向线程池的任务链表中加入一个任务,加入后通过调用pthread_cond_signal (&(pool->queue_ready))唤醒一个出于阻塞状态的线程(如果有的话)。
  
   pool_destroy ()函数用于销毁线程池,线程池任务链表中的任务不会再被执行,但是正在运行的线程会一直把任务运行完后再退出。

下面贴出完整代码
#include
#include
#include
#include
#include
#include
#include
#include

/*
线程池里所有运行和等待的任务都是一个worker_t
由于所有任务都在任务链李,所以是一个链表结构
*/
typedef struct worker
{
    /* 回调函数,任务运行时会调用此函数,注意也可声明成其它形式 */
    void * (*process)(void * arg);
    void * arg; /* 回调函数参数 */
    struct worker * next;
}worker_t;

/* 线程池结构 */
typedef struct pool
{
    pthread_mutex_t queue_lock;
    pthread_cond_t queue_ready;
   
    /*链表结构,线程池中所有等待任务 */
    worker_t * queue_head;
   
    /* 是否销毁线程池 */
    int shutdown;
    pthread_t * threadid;
    /* 线程池中允许的活动线程数目 */
    int max_thread_num;
    /* 当前等待队列的任务数目 */
    int cur_queue_size;

}pool_t;

int pool_add_worker(void *(*process)(void *arg),void * arg);
void * thread_routine(void * arg);

static pool_t * pool = NULL;

void  pool_init(int max_thread_num)
{
    pool = (pool_t *)malloc(sizeof(pool_t));
   
    pthread_mutex_init(&(pool->queue_lock),NULL);
    pthread_cond_init(&(pool->queue_ready),NULL);

    pool->queue_head = NULL;
   
    pool->max_thread_num = max_thread_num;
    pool->cur_queue_size = 0;

    pool->shutdown = 0;
   
    pool->threadid = (pthread_t *)malloc(max_thread_num * sizeof(pthread_t));
    int i=0;
    for(i=0;i    {
        pthread_create(&(pool->threadid[i]),NULL,thread_routine,NULL);
    }
}
   
/* 向线程池中加入任务 */
int pool_add_worker(void * (*process)(void * arg),void *arg)
{
    /* 构造一个新任务 */
    worker_t * newworker = (worker_t *)malloc(sizeof(worker_t));
    newworker->process = process;
    newworker->arg = arg;
    newworker->next = NULL;  /* 别忘置空 */
   
    pthread_mutex_lock(&(pool->queue_lock));
    /* 将任务加入等待队列中 */
    worker_t * member = pool->queue_head;
    if(member!=NULL)
    {
        while(member->next!=NULL)
            member = member->next;
        member->next = newworker;
    }
    else
    {
        pool->queue_head = newworker;
    }
   
    assert(pool->queue_head!=NULL);

    pool->cur_queue_size++;
    pthread_mutex_unlock(&(pool->queue_lock));
   
    /* 等待队列中有任务了,唤醒一个等待线程;
       如果所有线程都在忙碌,这句没有任何作用
    */
    pthread_cond_signal(&(pool->queue_ready));
    return 0;
}

/*
销毁线程池,等待队列中的任务不会再执行,但是正在运行的线程会一直把任务运行完后再退出
*/
int pool_destroy()
{
    if(pool->shutdown)
        return -1;/* 防止两次调用 */
    pool->shutdown = 1;
   
    /* 唤醒所有等待线程,线程池要销毁了 */
    pthread_cond_broadcast(&(pool->queue_ready));
   
    /*阻塞等待线程退出,否则就成僵死线程了 */
    int i;
    for(i=0;imax_thread_num;i++)
        pthread_join(pool->threadid[i],NULL);
    free(pool->threadid);

    /* 销毁等待队列 */
    worker_t * head = NULL;
    while(pool->queue_head!=NULL)
    {
        head = pool->queue_head;
        pool->queue_head = pool->queue_head->next;
        free(head);
    }
    /* 条件变量和互斥量销毁 */
    pthread_mutex_destroy(&(pool->queue_lock));
    pthread_cond_destroy(&(pool->queue_ready));
   
    free(pool);   
    /* 销毁后指针置空是好习惯 */
    pool = NULL;
    return 0;
}

void * thread_routine(void * arg)
{
    printf("starting thread 0x%x\n",pthread_self());
    while(1)
    {
        pthread_mutex_lock(&(pool->queue_lock));
        /*如果等待队列为0并且不销毁线程池,则处于阻塞状态;
        注意pthread_cond_wait是一个原子操作,等待前会解锁,唤醒后会加锁
        */
        while(pool->cur_queue_size==0 && !pool->shutdown)
        {
            printf("thread 0x%x is waiting \n",pthread_self());
            pthread_cond_wait(&(pool->queue_ready),&(pool->queue_lock));
        }
       
        /* 线程池要销毁了 */
        if(pool->shutdown)
        {
            /* 遇到break,continue,return等跳转语句,千万不要忘记先解锁 */
            pthread_mutex_unlock(&(pool->queue_lock));
            printf("thread 0x%x will exit \n",pthread_self());
            pthread_exit(NULL);
        }   
        printf("thread 0x%x is starting to work\n",pthread_self());
       
        /* assert是调试的好帮手 */
        assert(pool->cur_queue_size !=0);
        assert(pool->queue_head!=NULL);

        /* 等待队列长度减1,并取出链表中的头元素*/
        pool->cur_queue_size--;
        worker_t * worker = pool->queue_head;
        pool->queue_head = worker->next;
        pthread_mutex_unlock(&(pool->queue_lock));

        /*调用回调函数,执行任务*/
        (*(worker->process))(worker->arg);
        free(worker);
        worker = NULL;
    }
    /* 这一句应该是不可达的 */
    pthread_exit(NULL);
}

void * myprocess(void * arg)
{
    printf("threadid is 0x%x ,working on task %d \n",pthread_self(),*(int *)arg);
    sleep(1);
    return NULL;
}
int main(int argc,char ** argv)
{
    pool_init(3);

    int * workingnum = (int *)malloc(sizeof(int)*10);
    int i;
    for(i=0;i<10;i++)
    {
        workingnum[i] = i;
        pool_add_worker(myprocess,&workingnum[i]);
    }
    /*等待所有任务完成*/
    sleep(5);
    pool_destroy();
       
    free(workingnum);
   
    return 0;
}
阅读(433) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~