Chinaunix首页 | 论坛 | 博客
  • 博客访问: 15273029
  • 博文数量: 2005
  • 博客积分: 11986
  • 博客等级: 上将
  • 技术积分: 22535
  • 用 户 组: 普通用户
  • 注册时间: 2007-05-17 13:56
文章分类

全部博文(2005)

文章存档

2014年(2)

2013年(2)

2012年(16)

2011年(66)

2010年(368)

2009年(743)

2008年(491)

2007年(317)

分类: LINUX

2011-02-27 22:54:46

头文件thrmgr.h thrmgr.h.rar
  1. /*
  2.  * Copyright (C) 2004 Trog
  3.  *
  4.  * This program is free software; you can redistribute it and/or modify
  5.  * it under the terms of the GNU General Public License as published by
  6.  * the Free Software Foundation; either version 2 of the License, or
  7.  * (at your option) any later version.
  8.  *
  9.  * This program is distributed in the hope that it will be useful,
  10.  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  11.  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12.  * GNU General Public License for more details.
  13.  *
  14.  * You should have received a copy of the GNU General Public License
  15.  * along with this program; if not, write to the Free Software
  16.  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
  17.  */

  18. #ifndef __THRMGR_H__
  19. #define __THRMGR_H__ 1

  20. #ifdef __cplusplus
  21. extern "C" {
  22. #endif

  23. #include <pthread.h>
  24. #include <sys/time.h>
  25. #include <utils/Log.h>
  26. #include <stdlib.h>

  27. #define mmalloc malloc
  28. typedef struct work_item_tag {
  29.     struct work_item_tag *next;
  30.     void *data;
  31.     struct timeval time_queued;
  32. } work_item_t;
  33.     
  34. typedef struct work_queue_tag {
  35.     work_item_t *head;
  36.     work_item_t *tail;
  37.     int item_count;
  38. } work_queue_t;

  39. typedef enum {
  40.     POOL_INVALID,
  41.     POOL_VALID,
  42.     POOL_EXIT,
  43. } pool_state_t;

  44. typedef struct threadpool_tag {
  45.     pthread_mutex_t pool_mutex;
  46.     pthread_cond_t pool_cond;
  47.     pthread_attr_t pool_attr;
  48.     
  49.     pool_state_t state;
  50.     int thr_max;
  51.     int thr_alive;
  52.     int thr_idle;
  53.     int idle_timeout;
  54.     
  55.     void (*handler)(void *);
  56.     
  57.     work_queue_t *queue;
  58. } threadpool_t;

  59. threadpool_t *thrmgr_new(int max_threads, int idle_timeout, void (*handler)(void *));
  60. void thrmgr_destroy(threadpool_t *threadpool);
  61. int thrmgr_dispatch(threadpool_t *threadpool, void *user_data);

  62. #ifdef __cplusplus
  63. }
  64. #endif

  65. #endif
源文件thrmgr.cpp thrmgr.cpp.rar
  1. /*
  2.  * Copyright (C) 2004 Trog
  3.  *
  4.  * This program is free software; you can redistribute it and/or modify
  5.  * it under the terms of the GNU General Public License as published by
  6.  * the Free Software Foundation; either version 2 of the License, or
  7.  * (at your option) any later version.
  8.  *
  9.  * This program is distributed in the hope that it will be useful,
  10.  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  11.  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12.  * GNU General Public License for more details.
  13.  *
  14.  * You should have received a copy of the GNU General Public License
  15.  * along with this program; if not, write to the Free Software
  16.  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
  17.  */

  18. #include <pthread.h>
  19. #include <time.h>
  20. #include <errno.h>

  21. #include "thrmgr.h"

  22. #define FALSE (0)
  23. #define TRUE (1)

  24. work_queue_t *work_queue_new()
  25. {
  26.     work_queue_t *work_q;

  27.     work_q = (work_queue_t *) mmalloc(sizeof(work_queue_t));

  28.     work_q->head = work_q->tail = NULL;
  29.     work_q->item_count = 0;
  30.     return work_q;
  31. }

  32. void work_queue_add(work_queue_t *work_q, void *data)
  33. {
  34.     work_item_t *work_item;

  35.     if (!work_q) {
  36.         return;
  37.     }
  38.     work_item = (work_item_t *) mmalloc(sizeof(work_item_t));
  39.     work_item->next = NULL;
  40.     work_item->data = data;
  41.     gettimeofday(&(work_item->time_queued), NULL);

  42.     if (work_q->head == NULL) {
  43.         work_q->head = work_q->tail = work_item;
  44.         work_q->item_count = 1;
  45.     } else {
  46.         work_q->tail->next = work_item;
  47.         work_q->tail = work_item;
  48.         work_q->item_count++;
  49.     }
  50.     return;
  51. }

  52. void *work_queue_pop(work_queue_t *work_q)
  53. {
  54.     work_item_t *work_item;
  55.     void *data;

  56.     if (!work_q || !work_q->head) {
  57.         return NULL;
  58.     }
  59.     work_item = work_q->head;
  60.     data = work_item->data;
  61.     work_q->head = work_item->next;
  62.     if (work_q->head == NULL) {
  63.         work_q->tail = NULL;
  64.     }
  65.     free(work_item);
  66.     return data;
  67. }

  68. void thrmgr_destroy(threadpool_t *threadpool)
  69. {
  70.     if (!threadpool || (threadpool->state != POOL_VALID)) {
  71.         return;
  72.     }
  73.       if (pthread_mutex_lock(&threadpool->pool_mutex) != 0) {
  74.            LOGE("!Mutex lock failed\n");
  75.         exit(-1);
  76.     }
  77.     threadpool->state = POOL_EXIT;

  78.     /* wait for threads to exit */
  79.     if (threadpool->thr_alive > 0) {
  80.         // 广播告知大家全部退出各自的工作,老子要死了
  81.         if (pthread_cond_broadcast(&(threadpool->pool_cond)) != 0) {
  82.             pthread_mutex_unlock(&threadpool->pool_mutex);
  83.             return;
  84.         }
  85.     }
  86.     while (threadpool->thr_alive > 0) {
  87.         // 等着最后一个worker退出时
  88.         // 执行pthread_cond_broadcast(&threadpool->pool_cond);来唤醒这里[luther.gliethttp]
  89.         if (pthread_cond_wait (&threadpool->pool_cond, &threadpool->pool_mutex) != 0) {
  90.             pthread_mutex_unlock(&threadpool->pool_mutex);
  91.             return;
  92.         }
  93.     }
  94.       if (pthread_mutex_unlock(&threadpool->pool_mutex) != 0) {
  95.         LOGE("!Mutex unlock failed\n");
  96.         exit(-1);
  97.       }

  98.     pthread_mutex_destroy(&(threadpool->pool_mutex));
  99.     pthread_cond_destroy(&(threadpool->pool_cond));
  100.     pthread_attr_destroy(&(threadpool->pool_attr));
  101.     free(threadpool);
  102.     return;
  103. }

  104. // 初始化线程池参数
  105. // 1. max_threads -- 最大线程数
  106. // 2. idle_timeout -- 线程空闲等待时间,如idle_timeout秒还没有人使用本worker线程,那么本线程将自行销毁[luther.gliethttp]
  107. threadpool_t *thrmgr_new(int max_threads, int idle_timeout, void (*handler)(void *))
  108. {
  109.     threadpool_t *threadpool;

  110.     if (max_threads <= 0) {
  111.         return NULL;
  112.     }

  113.     threadpool = (threadpool_t *) mmalloc(sizeof(threadpool_t));

  114.     threadpool->queue = work_queue_new();
  115.     if (!threadpool->queue) {
  116.         free(threadpool);
  117.         return NULL;
  118.     }
  119.     threadpool->thr_max = max_threads;
  120.     threadpool->thr_alive = 0; // 没有一个创建的线程
  121.     threadpool->thr_idle = 0; // 没有一个空闲的线程
  122.     threadpool->idle_timeout = idle_timeout;
  123.     threadpool->handler = handler; // 线程池消化threadpool->queue任务池上数据时使用到的统一消化函数[luther.gliethttp]

  124.     pthread_mutex_init(&(threadpool->pool_mutex), NULL);
  125.     if (pthread_cond_init(&(threadpool->pool_cond), NULL) != 0) {
  126.         free(threadpool);
  127.         return NULL;
  128.     }

  129.     if (pthread_attr_init(&(threadpool->pool_attr)) != 0) {
  130.         free(threadpool);
  131.         return NULL;
  132.     }

  133.     if (pthread_attr_setdetachstate(&(threadpool->pool_attr), PTHREAD_CREATE_DETACHED) != 0) {
  134.         free(threadpool);
  135.         return NULL;
  136.     }
  137.     threadpool->state = POOL_VALID;

  138.     return threadpool;
  139. }

  140. // 线程池中一个线程,每次向线程池中追加一个线程就会使得统计变量threadpool->thr_alive++
  141. // 对统计变量进行++加加操作是在用户程序使用thrmgr_dispatch派发新数据到threadpool->queue任务池时
  142. // 自动完成的
  143. void *thrmgr_worker(void *arg)
  144. {
  145.     threadpool_t *threadpool = (threadpool_t *) arg;
  146.     void *job_data;
  147.     int retval, must_exit = FALSE;
  148.     struct timespec timeout;
  149.     /* loop looking for work */
  150.     for (;;) {
  151.         if (pthread_mutex_lock(&(threadpool->pool_mutex)) != 0) { // 获取锁
  152.             /* Fatal error */
  153.             LOGE("!Fatal: mutex lock failed\n");
  154.             exit(-2);
  155.         }
  156.         timeout.tv_sec = time(NULL) + threadpool->idle_timeout;
  157.         timeout.tv_nsec = 0;
  158.         threadpool->thr_idle++;
  159.         // 开始饿狗抢食啦[luther.gliethttp]
  160.         while (((job_data=work_queue_pop(threadpool->queue)) == NULL) // 从任务池中争抢一个任务来干[luther.gliethttp]
  161.                 && (threadpool->state != POOL_EXIT)) { // 因为如果线程池中有很多任务的话可能有多个线程
  162.             /* Sleep, awaiting wakeup */ // 来争抢来做这些任务.
  163.             retval = pthread_cond_timedwait(&(threadpool->pool_cond),
  164.                 &(threadpool->pool_mutex), &timeout); // 如果没有任务可做,那么将等待thrmgr_new线程池默认的idle_timeout秒
  165.             if (retval == ETIMEDOUT) { // 如果idle_timeout秒之内,仍然没有人唤醒我做事
  166.                 must_exit = TRUE; // (或者因为线程池中线程太多被其他线程池线程都抢走了)那么我将强行把自己退出
  167.                 break;
  168.             }
  169.         }
  170.         threadpool->thr_idle--; // 表示线程池中的空闲线程少了一个
  171.         if (threadpool->state == POOL_EXIT) {
  172.             must_exit = TRUE;
  173.         }

  174.         if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) {
  175.             /* Fatal error */
  176.             LOGE("!Fatal: mutex unlock failed\n");
  177.             exit(-2);
  178.         }
  179.         if (job_data) {
  180.             threadpool->handler(job_data); // 本线程消化threadpool->queue任务池上该job_data数据时使用到的统一消化函数
  181.         } else if (must_exit) {
  182.             break; // 自己已经idle_timeout秒没有事做了,自行强行销毁
  183.         }
  184.     }

  185.     if (pthread_mutex_lock(&(threadpool->pool_mutex)) != 0) {
  186.         /* Fatal error */
  187.         LOGE("!Fatal: mutex lock failed\n");
  188.         exit(-2);
  189.     }
  190.     threadpool->thr_alive--; // 线程池中可用线程少了一个,可能是idle_timeout秒没有事做,
  191.     if (threadpool->thr_alive == 0) { // 也可能是thrmgr_destroy强行退出
  192.         /* signal that all threads are finished */
  193.         pthread_cond_broadcast(&threadpool->pool_cond); // 如果是thrmgr_destroy强行退出,就必须执行该broadcast
  194.     } // 否则thrmgr_destroy将因为pthread_cond_wait而一直等待下去[luther.gliethttp]
  195.     if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) {
  196.         /* Fatal error */
  197.         LOGE("!Fatal: mutex unlock failed\n");
  198.         exit(-2);
  199.     }
  200.     return NULL;
  201. }

  202. // 用户程序使用thrmgr_dispatch派发新数据到threadpool->queue任务池
  203. int thrmgr_dispatch(threadpool_t *threadpool, void *user_data)
  204. {
  205.     pthread_t thr_id;

  206.     if (!threadpool) {
  207.         return FALSE;
  208.     }

  209.     /* Lock the threadpool */
  210.     if (pthread_mutex_lock(&(threadpool->pool_mutex)) != 0) {
  211.         LOGE("!Mutex lock failed\n");
  212.         return FALSE;
  213.     }

  214.     if (threadpool->state != POOL_VALID) {
  215.         if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) {
  216.             LOGE("!Mutex unlock failed\n");
  217.             return FALSE;
  218.         }
  219.         return FALSE;
  220.     }
  221.     work_queue_add(threadpool->queue, user_data); // 向threadpool->queue任务池推入一个新任务

  222.     if ((threadpool->thr_idle == 0) && // 当前没有一个idle空闲的线程(因为他们都在做事或者本来就还一个都没有创建)
  223.             (threadpool->thr_alive < threadpool->thr_max)) { // 同时已经创建的线程数目没有超过默认最大数量
  224.         /* Start a new thread */
  225.         if (pthread_create(&thr_id, &(threadpool->pool_attr),// ok, 那么我们就再创建一个新线程到线程池中
  226.                 thrmgr_worker, threadpool) != 0) {
  227.             LOGE("!pthread_create failed\n");
  228.         } else {
  229.             threadpool->thr_alive++; // 线程池中又多了一个可用的做事线程
  230.         }
  231.     }

  232.     pthread_cond_signal(&(threadpool->pool_cond)); // 唤醒刚刚做完事的线程让他继续做事,如果没有,那么上面创建的线程将会做该事

  233.     if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) {
  234.         LOGE("!Mutex unlock failed\n");
  235.         return FALSE;
  236.     }
  237.     return TRUE;
  238. }
阅读(6406) | 评论(2) | 转发(1) |
给主人留下些什么吧!~~

2011-05-22 15:19:36

学习了,多谢楼主分享哦!也欢迎广大linux爱好者来我的论坛一起讨论arm哦!www.lt-net.cn