Chinaunix首页 | 论坛 | 博客
  • 博客访问: 402579
  • 博文数量: 58
  • 博客积分: 1775
  • 博客等级: 上尉
  • 技术积分: 755
  • 用 户 组: 普通用户
  • 注册时间: 2010-12-12 15:03
文章分类

全部博文(58)

文章存档

2012年(5)

2011年(43)

2010年(10)

分类: LINUX

2011-12-09 20:15:36

  1. #include   
  2. #include   
  3. #include   
  4. #include   
  5. #include   
  6. #include   
  7. /* 
  8. *线程池里所有运行和等待的任务都是一个CThread_worker 
  9. *由于所有任务都在链表里,所以是一个链表结构 
  10. */  
  11. typedef struct worker  
  12. {  
  13.     /*回调函数,任务运行时会调用此函数,注意也可声明成其它形式*/  
  14.     void *(*process) (void *arg);  
  15.     void *arg;/*回调函数的参数*/  
  16.     struct worker *next;  
  17.   
  18.   
  19. } CThread_worker;  
  20.   
  21. /*线程池结构*/  
  22. typedef struct  
  23. {  
  24.      pthread_mutex_t queue_lock;  
  25.      pthread_cond_t queue_ready;  
  26.   
  27.   
  28.     /*链表结构,线程池中所有等待任务*/  
  29.      CThread_worker *queue_head;  
  30.   
  31.     /*是否销毁线程池*/  
  32.     int shutdown;  
  33.      pthread_t *threadid;  
  34.     /*线程池中允许的活动线程数目*/  
  35.     int max_thread_num;  
  36.     /*当前等待队列的任务数目*/  
  37.     int cur_queue_size;  
  38. } CThread_pool;  
  39.   
  40. int pool_add_worker (void *(*process) (void *arg), void *arg);  
  41. void *thread_routine (void *arg);  
  42.   
  43. static CThread_pool *pool = NULL;  
  44. void pool_init (int max_thread_num)//初始化线程池  
  45. {  
  46.      pool = (CThread_pool *) malloc (sizeof (CThread_pool));  
  47.   
  48.      pthread_mutex_init (&(pool->queue_lock), NULL);  
  49.      pthread_cond_init (&(pool->queue_ready), NULL);//?  
  50.   
  51.      pool->queue_head = NULL;  
  52.   
  53.      pool->max_thread_num = max_thread_num;  
  54.      pool->cur_queue_size = 0;  
  55.   
  56.      pool->shutdown = 0;  
  57.   
  58.      pool->threadid =(pthread_t *) malloc (max_thread_num * sizeof (pthread_t));  
  59.      int i = 0;  
  60.      for (i = 0; i < max_thread_num; i++)  
  61.      {  
  62.          pthread_create (&(pool->threadid[i]), NULL, thread_routine,NULL);  
  63.      }  
  64. }  
  65.   
  66. /*向线程池中加入任务*/  
  67. int pool_add_worker (void *(*process) (void *arg), void *arg)  
  68. {  
  69.     /*构造一个新任务*/  
  70.      CThread_worker *newworker =(CThread_worker *) malloc (sizeof (CThread_worker));  
  71.      newworker->process = process;  
  72.      newworker->arg = arg;  
  73.      newworker->next = NULL;/*别忘置空*/  
  74.      pthread_mutex_lock (&(pool->queue_lock));  
  75.     /*将任务加入到等待队列中*/  
  76.      CThread_worker *member = pool->queue_head;  
  77.     if (member != NULL)  
  78.      {  
  79.         while (member->next != NULL)  
  80.              member = member->next;  
  81.          member->next = newworker;  
  82.      }  
  83.     else  
  84.      {  
  85.          pool->queue_head = newworker;  
  86.      }  
  87.   
  88.   
  89.      assert (pool->queue_head != NULL);  
  90.   
  91.   
  92.      pool->cur_queue_size++;  
  93.      pthread_mutex_unlock (&(pool->queue_lock));  
  94.     /*好了,等待队列中有任务了,唤醒一个等待线程; 
  95.      注意如果所有线程都在忙碌,这句没有任何作用*/  
  96.      pthread_cond_signal (&(pool->queue_ready));  
  97.     return 0;  
  98. }  
  99.   
  100. /*销毁线程池,等待队列中的任务不会再被执行,但是正在运行的线程会一直 
  101. 把任务运行完后再退出*/  
  102. int pool_destroy ()  
  103. {  
  104.     if (pool->shutdown)  
  105.         return -1;/*防止两次调用*/  
  106.      pool->shutdown = 1;  
  107.   
  108.   
  109.     /*唤醒所有等待线程,线程池要销毁了*/  
  110.      pthread_cond_broadcast (&(pool->queue_ready));  
  111.   
  112.   
  113.     /*阻塞等待线程退出,否则就成僵尸了*/  
  114.     int i;  
  115.     for (i = 0; i < pool->max_thread_num; i++)  
  116.          pthread_join (pool->threadid[i], NULL);  
  117.      free (pool->threadid);  
  118.   
  119.   
  120.     /*销毁等待队列*/  
  121.      CThread_worker *head = NULL;  
  122.      while (pool->queue_head != NULL)  
  123.      {  
  124.          head = pool->queue_head;  
  125.          pool->queue_head = pool->queue_head->next;  
  126.          free (head);  
  127.      }  
  128.     /*条件变量和互斥量也别忘了销毁*/  
  129.      pthread_mutex_destroy(&(pool->queue_lock));  
  130.      pthread_cond_destroy(&(pool->queue_ready));  
  131.        
  132.      free (pool);  
  133.     /*销毁后指针置空是个好习惯*/  
  134.      pool=NULL;  
  135.     return 0;  
  136. }  
  137. void * thread_routine (void *arg)  
  138. {  
  139.      printf ("starting thread 0x%x\n", pthread_self ());  
  140.      while (1)  
  141.      {  
  142.          pthread_mutex_lock (&(pool->queue_lock));  
  143.         /*如果等待队列为0并且不销毁线程池,则处于阻塞状态; 注意pthread_cond_wait是一个原子操作,等待前会解锁,唤醒后会加锁*/  
  144.         while (pool->cur_queue_size == 0 && !pool->shutdown)  
  145.          {  
  146.              printf ("thread 0x%x is waiting\n", pthread_self ());  
  147.              pthread_cond_wait (&(pool->queue_ready), &(pool->queue_lock));  
  148.          }  
  149.   
  150.   
  151.         /*线程池要销毁了*/  
  152.         if (pool->shutdown)  
  153.          {  
  154.             /*遇到break,continue,return等跳转语句,千万不要忘记先解锁*/  
  155.              pthread_mutex_unlock (&(pool->queue_lock));  
  156.              printf ("thread 0x%x will exit\n", pthread_self ());  
  157.              pthread_exit (NULL);  
  158.          }  
  159.   
  160.   
  161.          printf ("thread 0x%x is starting to work\n", pthread_self ());  
  162.   
  163.   
  164.         /*assert是调试的好帮手*/  
  165.          assert (pool->cur_queue_size != 0);  
  166.          assert (pool->queue_head != NULL);  
  167.            
  168.         /*等待队列长度减去1,并取出链表中的头元素*/  
  169.          pool->cur_queue_size--;  
  170.          CThread_worker *worker = pool->queue_head;  
  171.          pool->queue_head = worker->next;  
  172.          pthread_mutex_unlock (&(pool->queue_lock));  
  173.   
  174.   
  175.         /*调用回调函数,执行任务*/  
  176.          (*(worker->process)) (worker->arg);  
  177.          free (worker);  
  178.          worker = NULL;  
  179.      }  
  180.     /*这一句应该是不可达的*/  
  181.      pthread_exit (NULL);  
  182. }  
  183.   
  184.  //   下面是测试代码  
  185.   
  186. void *myprocess (void *arg)  
  187. {  
  188.      printf ("threadid is 0x%x, working on task %d\n", pthread_self (),*(int *) arg);  
  189.      sleep (1);/*休息一秒,延长任务的执行时间*/  
  190.     return NULL;  
  191. }  
  192. int main (int argc, char **argv)  
  193. {  
  194.     pool_init (3);/*线程池中最多三个活动线程*/  
  195.        
  196.     /*连续向池中投入10个任务*/  
  197.     int *workingnum = (int *) malloc (sizeof (int) * 10);  
  198.     int i;  
  199.     for (i = 0; i < 10; i++)  
  200.      {  
  201.          workingnum[i] = i;  
  202.          pool_add_worker (myprocess, &workingnum[i]);  
  203.      }  
  204.     /*等待所有任务完成*/  
  205.      sleep (5);  
  206.     /*销毁线程池*/  
  207.         pool_destroy ();  
  208.         free (workingnum);  
  209.         return 0;  
  210. }
阅读(5298) | 评论(0) | 转发(2) |
给主人留下些什么吧!~~