Chinaunix首页 | 论坛 | 博客
  • 博客访问: 411493
  • 博文数量: 96
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 415
  • 用 户 组: 普通用户
  • 注册时间: 2015-05-22 09:08
个人简介

最近的研究方向:Nginx

文章分类
文章存档

2017年(2)

2016年(59)

2015年(35)

我的朋友

分类: LINUX

2016-01-24 16:29:20

最近看到的一篇不错的博文,让我初步的了解了线程池的机制,本文转自:http://blog.csdn.net/zouxinfox/article/details/3560891
什么时候需要创建线程池呢?简单的说,如果一个应用需要频繁的创建和销毁线程,而任务执行的时间又非常短,这样线程创建和销毁的带来的开销就不容忽视,这时也是线程池该出场的机会了。如果线程创建和销毁时间相比任务执行时间可以忽略不计,则没有必要使用线程池了。


    下面是Linux系统下用C语言创建的一个线程池。线程池会维护一个任务链表(每个CThread_worker结构就是一个任务)。
    pool_init()函数预先创建好max_thread_num个线程,每个线程执thread_routine ()函数。该函数中
  1. while (pool->cur_queue_size == 0)
  2. {
  3.       pthread_cond_wait (&(pool->queue_ready),&(pool->queue_lock));
  4. }
表示如果任务链表中没有任务,则该线程处于阻塞等待状态。否则从队列中取出任务并执行。
    
    pool_add_worker()函数向线程池的任务链表中加入一个任务,加入后通过调用pthread_cond_signal (&(pool->queue_ready))唤醒一个处于阻塞状态的线程(如果有的话)。
    
    pool_destroy ()函数用于销毁线程池,线程池任务链表中的任务不会再被执行,但是正在运行的线程会一直把任务运行完后再退出。
    
    下面贴出完整代码
  1. #include 
  2. #include 
  3. #include 
  4. #include 
  5. #include 
  6. #include 
  7. /*
  8. *线程池里所有运行和等待的任务都是一个CThread_worker
  9. *由于所有任务都在链表里,所以是一个链表结构
  10. */
  11. typedef struct worker
  12. {
  13.     /*回调函数,任务运行时会调用此函数,注意也可声明成其它形式*/
  14.     void *(*process) (void *arg);
  15.     void *arg;/*回调函数的参数*/
  16.     struct worker *next;
  17. } CThread_worker;

  18. /*线程池结构*/
  19. typedef struct
  20. {
  21.     pthread_mutex_t queue_lock;
  22.     pthread_cond_t queue_ready;
  23.     /*链表结构,线程池中所有等待任务*/
  24.     CThread_worker *queue_head;
  25.     /*是否销毁线程池*/
  26.     int shutdown;
  27.     pthread_t *threadid;
  28.     /*线程池中允许的活动线程数目*/
  29.     int max_thread_num;
  30.     /*当前等待队列的任务数目*/
  31.     int cur_queue_size;
  32. } CThread_pool;

  33. int pool_add_worker (void *(*process) (void *arg), void *arg);
  34. void *thread_routine (void *arg);

  35. static CThread_pool *pool = NULL;
  36. void
  37. pool_init (int max_thread_num)
  38. {
  39.     pool = (CThread_pool *) malloc (sizeof (CThread_pool));
  40.     pthread_mutex_init (&(pool->queue_lock), NULL);
  41.     pthread_cond_init (&(pool->queue_ready), NULL);
  42.     pool->queue_head = NULL;
  43.     pool->max_thread_num = max_thread_num;
  44.     pool->cur_queue_size = 0;
  45.     pool->shutdown = 0;
  46.     pool->threadid =
  47.         (pthread_t *) malloc (max_thread_num * sizeof (pthread_t));
  48.     int i = 0;
  49.     for (i = 0; i < max_thread_num; i++)
  50.     { 
  51.         pthread_create (&(pool->threadid[i]), NULL, thread_routine,
  52.                 NULL);
  53.     }
  54. }

  55. /*向线程池中加入任务*/
  56. int
  57. pool_add_worker (void *(*process) (void *arg), void *arg)
  58. {
  59.     /*构造一个新任务*/
  60.     CThread_worker *newworker =
  61.         (CThread_worker *) malloc (sizeof (CThread_worker));
  62.     newworker->process = process;
  63.     newworker->arg = arg;
  64.     newworker->next = NULL;/*别忘置空*/
  65.     pthread_mutex_lock (&(pool->queue_lock));
  66.     /*将任务加入到等待队列中*/
  67.     CThread_worker *member = pool->queue_head;
  68.     if (member != NULL)
  69.     {
  70.         while (member->next != NULL)
  71.             member = member->next;
  72.         member->next = newworker;
  73.     }
  74.     else
  75.     {
  76.         pool->queue_head = newworker;
  77.     }
  78.     assert (pool->queue_head != NULL);
  79.     pool->cur_queue_size++;
  80.     pthread_mutex_unlock (&(pool->queue_lock));
  81.     /*好了,等待队列中有任务了,唤醒一个等待线程;
  82.     注意如果所有线程都在忙碌,这句没有任何作用*/
  83.     pthread_cond_signal (&(pool->queue_ready));
  84.     return 0;
  85. }

  86. /*销毁线程池,等待队列中的任务不会再被执行,但是正在运行的线程会一直
  87. 把任务运行完后再退出*/
  88. int
  89. pool_destroy ()
  90. {
  91.     if (pool->shutdown)
  92.         return -1;/*防止两次调用*/
  93.     pool->shutdown = 1;
  94.     /*唤醒所有等待线程,线程池要销毁了*/
  95.     pthread_cond_broadcast (&(pool->queue_ready));
  96.     /*阻塞等待线程退出,否则就成僵尸了*/
  97.     int i;
  98.     for (i = 0; i < pool->max_thread_num; i++)
  99.         pthread_join (pool->threadid[i], NULL);
  100.     free (pool->threadid);
  101.     /*销毁等待队列*/
  102.     CThread_worker *head = NULL;
  103.     while (pool->queue_head != NULL)
  104.     {
  105.         head = pool->queue_head;
  106.         pool->queue_head = pool->queue_head->next;
  107.         free (head);
  108.     }
  109.     /*条件变量和互斥量也别忘了销毁*/
  110.     pthread_mutex_destroy(&(pool->queue_lock));
  111.     pthread_cond_destroy(&(pool->queue_ready));
  112.     
  113.     free (pool);
  114.     /*销毁后指针置空是个好习惯*/
  115.     pool=NULL;
  116.     return 0;
  117. }

  118. void *
  119. thread_routine (void *arg)
  120. {
  121.     printf ("starting thread 0x%x/n", pthread_self ());
  122.     while (1)
  123.     {
  124.         pthread_mutex_lock (&(pool->queue_lock));
  125.         /*如果等待队列为0并且不销毁线程池,则处于阻塞状态; 注意
  126.         pthread_cond_wait是一个原子操作,等待前会解锁,唤醒后会加锁*/
  127.         while (pool->cur_queue_size == 0 && !pool->shutdown)
  128.         {
  129.             printf ("thread 0x%x is waiting/n", pthread_self ());
  130.             pthread_cond_wait (&(pool->queue_ready), &(pool->queue_lock));
  131.         }
  132.         /*线程池要销毁了*/
  133.         if (pool->shutdown)
  134.         {
  135.             /*遇到break,continue,return等跳转语句,千万不要忘记先解锁*/
  136.             pthread_mutex_unlock (&(pool->queue_lock));
  137.             printf ("thread 0x%x will exit/n", pthread_self ());
  138.             pthread_exit (NULL);
  139.         }
  140.         printf ("thread 0x%x is starting to work/n", pthread_self ());
  141.         /*assert是调试的好帮手*/
  142.         assert (pool->cur_queue_size != 0);
  143.         assert (pool->queue_head != NULL);
  144.         
  145.         /*等待队列长度减去1,并取出链表中的头元素*/
  146.         pool->cur_queue_size--;
  147.         CThread_worker *worker = pool->queue_head;
  148.         pool->queue_head = worker->next;
  149.         pthread_mutex_unlock (&(pool->queue_lock));
  150.         /*调用回调函数,执行任务*/
  151.         (*(worker->process)) (worker->arg);
  152.         free (worker);
  153.         worker = NULL;
  154.     }
  155.     /*这一句应该是不可达的*/
  156.     pthread_exit (NULL);
  157. }
    下面是测试代码
  1. void *
  2. myprocess (void *arg)
  3. {
  4.     printf ("threadid is 0x%x, working on task %d/n", pthread_self (),*(int *) arg);
  5.     sleep (1);/*休息一秒,延长任务的执行时间*/
  6.     return NULL;
  7. }
  8. int
  9. main (int argc, char **argv)
  10. {
  11.     pool_init (3);/*线程池中最多三个活动线程*/
  12.     
  13.     /*连续向池中投入10个任务*/
  14.     int *workingnum = (int *) malloc (sizeof (int) * 10);
  15.     int i;
  16.     for (i = 0; i < 10; i++)
  17.     {
  18.         workingnum[i] = i;
  19.         pool_add_worker (myprocess, &workingnum[i]);
  20.     }
  21.     /*等待所有任务完成*/
  22.     sleep (5);
  23.     /*销毁线程池*/
  24.     pool_destroy ();
  25.     free (workingnum);
  26.     return 0;
  27. }
将上述所有代码放入threadpool.c文件中,
在Linux输入编译命令
$ gcc -o threadpool threadpool.c -lpthread

以下是运行结果

阅读(1872) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~