Coder Lifechf.blog.chinaunix.net
chf
全部博文(16)
2014年(1)
2012年(2)
2011年(11)
2010年(2)
clifford
格伯纳
douyaqia
asdmusic
scliu
kelvin22
andydiao
flrhece
ytbythes
HIBIKI_L
分类: C/C++
2011-01-11 11:09:14
#ifndef __THREAD_POOL_H__ #define __THREAD_POOL_H__ #define MAX_THREADS 64 #define MAX_QUEUE_SIZE MAX_THREADS * 16 typedef void *threadpool; typedef void (*dispatch_func)(void *); threadpool create_thread_pool(int num_threads); int dispatch_work (threadpool thread_pool, dispatch_func work_func, void *arg); void destroy_thread_pool (threadpool thread_pool); #endif
#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <pthread.h> #include "threadpool.h" typedef struct __work_t work_t; struct __work_t { void (*func) (void *arg); void *arg; struct __work_t *next; }; typedef struct __thread_pool { pthread_mutex_t queue_lock; pthread_cond_t have_work; pthread_cond_t queue_empty; pthread_t *threads; work_t *queue_head; work_t *queue_tail; int queue_size; int stop_accept; int shutdown; int threads_count; int threads_idle; } thread_pool_t; void * work_func (void *arg) { work_t *cur_work; thread_pool_t *pool = (thread_pool_t *) arg; while (1) { pthread_mutex_lock (&(pool->queue_lock)); pool->threads_idle ++; while (!pool->queue_size && !pool->shutdown) { pthread_cond_wait (&(pool->have_work), &(pool->queue_lock)); } if (pool->shutdown) { pthread_mutex_unlock (&(pool->queue_lock)); break; } cur_work = pool->queue_head; pool->queue_size --; if (!pool->queue_size) { pool->queue_head = NULL; pool->queue_tail = NULL; if (pool->stop_accept) { pthread_cond_signal (&(pool->queue_empty)); } } else { pool->queue_head = cur_work->next; } pool->threads_idle --; pthread_mutex_unlock (&(pool->queue_lock)); (cur_work->func) (cur_work->arg); free (cur_work); } return NULL; } threadpool create_thread_pool(int num_threads) { thread_pool_t *pool; int i; if ((num_threads <= 0) || (num_threads > MAX_THREADS)) return NULL; pool = (thread_pool_t *) malloc(sizeof(thread_pool_t)); if (pool == NULL) { printf("Out of memory creating a new threadpool!\n"); return NULL; } pool->threads = (pthread_t*) malloc (sizeof(pthread_t) * num_threads); if(!pool->threads) { printf("Out of memory creating a new threadpool!\n"); free (pool); return NULL; } pool->threads_count = num_threads; pool->threads_idle = 0; pool->queue_size = 0; pool->queue_head = NULL; pool->queue_tail = NULL; pool->shutdown = 0; pool->stop_accept= 0; if(pthread_mutex_init(&(pool->queue_lock), NULL)) { printf("thread mutex initiation fail!\n"); free (pool->threads); free (pool); return NULL; } if(pthread_cond_init(&(pool->have_work), NULL)) { printf("thread cond initiation fail!\n"); free (pool->threads); free (pool); return NULL; } if(pthread_cond_init(&(pool->queue_empty), NULL)) { printf("thread cond initiation fail!\n"); free (pool->threads); free (pool); return NULL; } for (i=0; i<num_threads; i++) { if(pthread_create(&(pool->threads[i]), NULL, work_func, pool)) { printf("Thread[%d] create fail!\n", i); free (pool->threads); free (pool); return NULL; } } return (threadpool *) pool; } int dispatch_work (threadpool thread_pool, dispatch_func work_func, void *arg) { thread_pool_t *pool = (thread_pool_t *)thread_pool; work_t *cur_work; if (pool->queue_size > MAX_QUEUE_SIZE) return 1; if (pool->stop_accept) return -1; cur_work = (work_t *) malloc(sizeof(work_t)); if(cur_work == NULL) { printf("Out of memory creating a work struct!\n"); return -2; } cur_work->func = work_func; cur_work->arg = arg; cur_work->next = NULL; pthread_mutex_lock(&(pool->queue_lock)); if(pool->queue_size == 0) { pool->queue_head = cur_work; pool->queue_tail = cur_work; pthread_cond_signal(&(pool->have_work)); } else { pool->queue_tail->next = cur_work; pool->queue_tail = cur_work; } pool->queue_size++; pthread_mutex_unlock(&(pool->queue_lock)); return 0; } void destroy_thread_pool (threadpool thread_pool) { int i=0; void* nothing; thread_pool_t *pool = (thread_pool_t *)thread_pool; pthread_mutex_lock (&(pool->queue_lock)); pool->stop_accept = 1; while (pool->queue_size) { pthread_cond_wait (&(pool->queue_empty), &(pool->queue_lock)); } pool->shutdown = 1; pthread_mutex_unlock (&(pool->queue_lock)); for (i=0; i<pool->threads_count; i++) { pthread_join(pool->threads[i], ¬hing); } pthread_cond_destroy (&(pool->have_work)); pthread_cond_destroy (&(pool->queue_empty)); pthread_mutex_destroy(&(pool->queue_lock)); free (pool->threads); free (pool); return; }
#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <pthread.h> #include <errno.h> #include "threadpool.h" void work_1(void *arg) { pthread_t tid = pthread_self (); int seconds = (int)arg; printf ("work_1 sleep %d --%x\n", seconds, tid); sleep (seconds); printf ("work_1 done %d --%x\n", seconds, tid); } void work_2(void *arg) { pthread_t tid = pthread_self (); int seconds = (int)arg; printf ("work_2 sleep %d --%x\n", seconds, tid); sleep (seconds); printf ("work_2 done %d --%x\n", seconds, tid); } int main (int argc, char **argv) { threadpool tp; int i; tp = create_thread_pool (7); for(i=1; i<16; i++) { dispatch_work( tp, work_1, (void *) i); } for(i=1; i<12; i++) { dispatch_work (tp, work_2, (void *) i); }
destroy_thread_pool (tp); return 0; }
上一篇:多进程和多线程
下一篇:gcc 嵌入汇编实现 memcpy
登录 注册