Chinaunix首页 | 论坛 | 博客
  • 博客访问: 382479
  • 博文数量: 82
  • 博客积分: 1855
  • 博客等级: 上尉
  • 技术积分: 846
  • 用 户 组: 普通用户
  • 注册时间: 2010-09-12 12:28
文章存档

2013年(3)

2012年(8)

2011年(71)

分类: LINUX

2011-10-13 17:31:51

  1. /* 这是一个使用例子。 */

  2. /* 在我的P4双核机器上,可以比单线程版本快20%,但复杂性远高于20%:( */


  3. #include "threadpool.h"

  4. unsigned long long sum(unsigned long long start, unsigned long long end)
  5. {
  6.     unsigned long long sum;

  7.     sum = 0;
  8.     for (; start<=end; ++start)
  9.         sum += start;

  10.     return sum;
  11. }

  12. struct per_sum {
  13.     unsigned long long sum, start, end;
  14.     pthread_mutex_t lock;
  15.     pthread_cond_t cond;
  16. };

  17. void threaded_sum(void *voidp)
  18. {
  19.     struct per_sum *per_sum = voidp;

  20.     printf("thread %p start\n", voidp);

  21.     if (!per_sum) {
  22. // printf("per_sum == NULL\n");
  23.         return;
  24.     }
  25.     per_sum->sum = sum(per_sum->start, per_sum->end);
  26.     per_sum->start = per_sum->end = 0;
  27.     pthread_mutex_lock(&per_sum->lock);
  28.     printf("thread %p exit, end=%lld\n", voidp, per_sum->end);
  29.     pthread_cond_signal(&per_sum->cond);
  30.     pthread_mutex_unlock(&per_sum->lock);
  31. }

  32. int main(void)
  33. {
  34. #define NR_THREADS 2
  35.     struct thread_worker* workers[NR_THREADS];
  36.     struct per_sum per_sums[NR_THREADS];
  37.     struct thread_pool *pool;
  38.     int i;

  39.     unsigned long long start, end;
  40.     unsigned long long result = 0;
  41.     unsigned long long delta = 0x10ffffff;

  42. // printf("mutli threading ... ");

  43.     pool = thread_pool_create(NR_THREADS, NULL);
  44.     if (!pool)
  45.         exit(-1);

  46.     for (i=0; i<NR_THREADS; ++i) {
  47.         if (pthread_mutex_init(&per_sums[i].lock, NULL)) {
  48.             printf("failed init mutex\n");
  49.             exit(3);
  50.         }
  51.         if (pthread_cond_init(&per_sums[i].cond, NULL)) {
  52.             printf("failed init cond\n");
  53.             exit(4);
  54.         }
  55.         if (thread_pool_lend(pool, threaded_sum, (void*)&per_sums[i], &workers[i])) {
  56.             printf("failed to lend thread %d\n", i);
  57.             exit(5);
  58.         }
  59.     }


  60.     start = 0;

  61.     /* activate threads */
  62.     for (i=0; i<NR_THREADS; i++) {
  63.         per_sums[i].start = start;
  64.         per_sums[i].end = per_sums[i].start + delta;
  65.         start = per_sums[i].end + 1;
  66.         thread_pool_activate(workers[i]);
  67.     }

  68.     for (i=0; i<NR_THREADS; i++) {
  69.         pthread_mutex_lock(&per_sums[i].lock);
  70.         while (per_sums[i].end != 0)
  71.             pthread_cond_wait(&per_sums[i].cond, &per_sums[i].lock);
  72.         result += per_sums[i].sum;
  73.         pthread_mutex_unlock(&per_sums[i].lock);
  74.     }

  75.     /* activate threads again */
  76.     for (i=0; i<NR_THREADS; i++) {
  77.         per_sums[i].start = start;
  78.         per_sums[i].end = per_sums[i].start + delta;
  79.         start = per_sums[i].end + 1;
  80.         thread_pool_activate(workers[i]);
  81.     }
  82.     end = per_sums[NR_THREADS-1].end;

  83.     for (i=0; i<NR_THREADS; i++) {
  84.         pthread_mutex_lock(&per_sums[i].lock);
  85.         while (per_sums[i].end != 0)
  86.             pthread_cond_wait(&per_sums[i].cond, &per_sums[i].lock);
  87.         result += per_sums[i].sum;
  88.         pthread_mutex_unlock(&per_sums[i].lock);
  89.     }

  90.     for (i=0; i<NR_THREADS; ++i) {
  91.         if (thread_pool_giveback(pool, workers[i])) {
  92.             printf("failed to giveback thread %d\n", i);
  93.             exit(6);
  94.         }
  95.         pthread_mutex_destroy(&per_sums[i].lock);
  96.         pthread_cond_destroy(&per_sums[i].cond);
  97.     }

  98.     thread_pool_clean(pool);
  99.     printf("sum = %lld\n\n", result);
  100.     return 0;
  101. }
 基本想法是这样的:
      1、预创建的线程通过mutex休眠在线程池中。这样,通过unlock该mutex就可以唤醒该线程了;
      2、出于简单性的目标,一个线程池内的所有线程的属性都是相同的。这个属性在创建线程池可以指定;
      3、一般来讲,线程池内的线程不能被取消、或者调用pthread_exit()退出。这些管理性工作是由线程池本身完成的。即,在使用线程池借出的线程时,函数返回应该只用return。
      4、从线程池“借出”的线程,可以归还给线程池。实际上也必须归还给线程池,这样线程池可以完成最后的清理工作。
      5、如果实在需要取消一个线程,那么好吧,只是别忘了告诉线程池你取消了它的手下
  1. #include "threadpool.h" /* #include了所有必要的系统头文件 */

  2. #define THWK_F_CLEAN 1 /* 设置此标志着threadpool正在进行清理操作,此时线程退出。 */
  3. #define THWK_F_RUNNING 2 /* 设置这个标志主要是为了避免一个race condition,后述。 */

  4. struct thread_worker_arg {
  5.     void (*action)(void*); /* user programmer指定的实际函数 */
  6.     void *what; /* action的参数 */
  7. };

  8. struct thread_worker {
  9.     pthread_t id; /* just as its name */
  10.     struct thread_worker_arg arg; /* 用于给sleepy_wrapper()传送参数,后述。 */
  11.     pthread_mutex_t lock; /* 用于实现线程池内空闲线程的休眠,它实际上并不保护什么临界区。 */
  12.     struct thread_worker *next; /* 用于链表线程池内的其他线程 */
  13.     unsigned long long delay; /* 未用,计划用于测量调度延迟。 */
  14.     unsigned long flags; /* 标志,后述。 */
  15. };

  16. struct thread_pool {
  17.     pthread_mutex_t lock; /* 用于同步对于thread_pool自身的访问操作 */
  18.     struct thread_worker *first; /* 所有线程链接于此 */
  19.     int total; /* 总线程数 */
  20.     int current_nr; /* 池内空闲线程数 */
  21. };

  22. /* 未用,计划用于测量调度延迟。 */
  23. inline unsigned long long get_ticks(void)
  24. {
  25. // __asm__ ("rdtsc");

  26.     return 0ULL;
  27. }

  28. /* 用于支持线程在被取消时的必要清理操作。 */
  29. static void sleepy_wrapper_cleanup(void *voidp)
  30. {
  31.     struct thread_worker *worker = voidp;

  32.     pthread_mutex_unlock(&worker->lock);
  33.     free(worker);
  34. }

  35. /* 这就是线程池内线程的执行函数了。 */
  36. static void* sleepy_wrapper(void *voidp)
  37. {
  38.     struct thread_worker *worker = voidp;

  39.     while (1) {
  40.         pthread_cleanup_push(sleepy_wrapper_cleanup, worker); /* 预设置上一个清理函数,防止线程取消时内存泄漏。 */
  41.         pthread_mutex_lock(&worker->lock); /* 空闲线程应该休眠于此,这个mutex在创建thread pool时就锁住了。或者本循环结束时锁住。 */
  42.         worker->delay = get_ticks() - worker->delay; /* 暂时无用。 */
  43.         if (THWK_F_CLEAN & worker->flags) /* 线程池正在清理本身,所以线程至此就退出了。 */
  44.             goto done; /* 你可能觉得这个goto用得有些多余,但如果不这样编译就会提示句法错误,因为pthread_cleanup_{push,pop}是用宏实现的!你可以参考一下它们的实现。 */
  45.         worker->flags |= THWK_F_RUNNING; /* 后述。 */
  46.         if (worker->arg.action) /* 进行线程实际的工作 */
  47.             worker->arg.action(worker->arg.what);
  48. done:
  49.         pthread_mutex_unlock(&worker->lock); /* 解锁这个mutex,允许这个thread的下一次使用 */
  50.         pthread_cleanup_pop(0);
  51.         if (THWK_F_CLEAN & worker->flags) /* 清理线程池 */
  52.             break;
  53.         pthread_mutex_lock(&worker->lock); /* 先锁住这个锁,以让本循环开头的pthread_mutex_lock()使线程进入休眠。这个调用应该是成功的,否则就会引用deadlock。 */
  54.         worker->flags &= ~THWK_F_RUNNING; /* 设计这个标志的意义在于防止有线程激活操作在以上unlock/lock之间发生,如果这样的话,就会引起deadlock,激活操作的实现后述。 */
  55.     }
  56.     pthread_exit(0);
  57. }

  58. /* 无需废话的函数。 */
  59. pthread_t thread_pool_rawid(struct thread_worker *worker)
  60. {
  61.     return worker->id;
  62. }

  63. /* 如果线程被取消了,通知线程池忘记它,目前的实现很简单。*/
  64. void thread_pool_forget(struct thread_pool *pool, struct thread_worker *worker)
  65. {
  66.     pool->total--;
  67. }

  68. /* 线程激活操作 */
  69. void thread_pool_activate(struct thread_worker *worker)
  70. {
  71.     worker->delay = get_ticks();
  72.     while (thread_pool_is_running(worker)) /* 防止出现deadlock */
  73.         ;
  74.     pthread_mutex_unlock(&worker->lock); /* 使sleepy_wrapper()内循环开头部分的lock()操作返回,即线程得以唤醒执行实际的action(what)*/
  75. }

  76. /* 另一个无须废话的函数 */
  77. int thread_pool_is_running(struct thread_worker *worker)
  78. {
  79.     return (worker->flags & THWK_F_RUNNING);
  80. }

  81. /* 从线程池中借出一个线程,其实就是一个从链表头中摘出thread_worker的简单函数 */
  82. int thread_pool_lend(struct thread_pool *pool, void (*action)(void*), void* what, struct thread_worker **worker)
  83. {
  84.     if (!action || !pool || !worker)
  85.         return -EINVAL;

  86.     pthread_mutex_lock(&pool->lock);
  87.     *worker = pool->first;
  88.     if (worker) {
  89.         (*worker)->arg.action = action;
  90.         (*worker)->arg.what = what;
  91.         pool->first = (*worker)->next;
  92.         (*worker)->next = NULL;
  93.         pool->current_nr--;
  94.     }
  95.     pthread_mutex_unlock(&pool->lock);
  96.     return 0;
  97. }

  98. /* 向线程池里归还一个thread,头插法插入thread_worker链表。 */
  99. int thread_pool_giveback(struct thread_pool *pool, struct thread_worker *worker)
  100. {
  101.     if (!pool || !worker)
  102.         return -EINVAL;

  103.     while (thread_pool_is_running(worker))
  104.         ;

  105.     pthread_mutex_lock(&pool->lock);
  106.     worker->next = pool->first;
  107.     pool->first = worker;
  108.     worker->arg.action = NULL;
  109.     worker->arg.what = NULL;
  110.     pool->current_nr++;
  111.     pthread_mutex_unlock(&pool->lock);

  112.     return 0;
  113. }

  114. /* 虽然有点长,但仍然是无须废话:线程池创建 */
  115. struct thread_pool* thread_pool_create(int nr_to_create, pthread_attr_t *attr)
  116. {
  117.     struct thread_pool *pool;
  118.     struct thread_worker *worker;
  119.     int i, chk;

  120.     if (!nr_to_create)
  121.         return NULL;

  122.     pool = malloc(sizeof(struct thread_pool));
  123.     if (!pool)
  124.         return NULL;

  125.     pool->first = NULL;
  126.     pool->total = 0;
  127.     pthread_mutex_init(&pool->lock, NULL);

  128.     for (i=0; i<nr_to_create; ++i) {
  129.         worker = malloc(sizeof(struct thread_worker));
  130.         if (!worker)
  131.             break;
  132.         memset(worker, 0, sizeof(struct thread_worker));

  133.         pthread_mutex_init(&worker->lock, NULL);
  134.         pthread_mutex_lock(&worker->lock);

  135.         chk = pthread_create(&worker->id, attr, sleepy_wrapper, (void*)worker);
  136.         if (chk) {
  137.             pthread_mutex_unlock(&worker->lock);
  138.             pthread_mutex_destroy(&worker->lock);
  139.             free(worker);
  140.             break;
  141.         }
  142.         worker->next = pool->first;
  143.         pool->first = worker;
  144.     }
  145.     
  146.     pool->total = i;
  147.     pool->current_nr = i;
  148.     if (0 == i) {
  149.         pthread_mutex_destroy(&pool->lock);
  150.         free(pool);
  151.         pool = NULL;
  152.     }
  153.     return pool;
  154. }

  155. /* 清理线程池。 */
  156. int thread_pool_clean(struct thread_pool *pool)
  157. {
  158.     struct thread_worker *worker;

  159.     pthread_mutex_lock(&pool->lock);
  160.     if (pool->total != pool->current_nr) {
  161.         pthread_mutex_unlock(&pool->lock);
  162.         return -EBUSY;
  163.     }

  164.     while (NULL != (worker = pool->first)) {
  165.         worker->flags = THWK_F_CLEAN; /* this is =, rather than |= ! */
  166.         pthread_mutex_unlock(&worker->lock);
  167.         pthread_join(worker->id, NULL);
  168.         pool->first = worker->next;
  169.         pthread_mutex_destroy(&worker->lock);
  170.         free(worker);
  171.     }

  172.     pthread_mutex_unlock(&pool->lock);
  173.     pthread_mutex_destroy(&pool->lock);
  174.     free(pool);
  175.     return 0;
  176. }
阅读(1396) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~