#include <unistd.h><br />
#include <stdlib.h><br />
#include <errno.h><br />
#include <string.h><br />
#include <stdio.h><br />
#include <pthread.h><br />
<br />
#define POOL_THREAD_NUM 10<br />
<br />
typedef struct tpool_work {<br />
void *(*routine)(void*); <br />
void *arg; <br />
struct tpool_work *next; <br />
}tpool_work_t;<br />
<br />
typedef struct tpool {<br />
int shutdown; <br />
int max_thr_num; <br />
pthread_t *thr_id; <br />
tpool_work_t *queue_head; <br />
pthread_mutex_t queue_lock; <br />
pthread_cond_t queue_ready; <br />
}tpool_t;<br />
<br />
void tpool_destroy();<br />
int tpool_create(int max_thr_num);<br />
int tpool_add_work(void*(*routine)(void*), void *arg);<br />
<br />
static tpool_t *tpool = NULL;<br />
<br />
/*<br />
* main()<br />
*/<br />
void *func(void *arg)<br />
{<br />
printf("thread %d\n", (int)arg);<br />
sleep(2);<br />
return NULL;<br />
}<br />
<br />
int main(int argc, char *argv[])<br />
{<br />
int i;<br />
<br />
if (tpool_create(POOL_THREAD_NUM) != 0) {<br />
printf("tpool_create failed\n");<br />
exit(1);<br />
}<br />
<br />
for (i = 0; i < 20; ++i) {<br />
tpool_add_work(func, (void*)i);<br />
}<br />
<br />
sleep(5);<br />
tpool_destroy();<br />
return 0;<br />
}<br />
<br />
/*<br />
* functions()<br />
*/<br />
static void *thread_routine(void *arg)<br />
{<br />
tpool_work_t *work;<br />
<br />
while(1) {<br />
pthread_mutex_lock(&tpool->queue_lock);<br />
while(!tpool->queue_head && !tpool->shutdown) {<br />
pthread_cond_wait(&tpool->queue_ready, &tpool->queue_lock); //释放锁,所有新建的线程阻塞<br />
}<br />
if (tpool->shutdown) {<br />
pthread_mutex_unlock(&tpool->queue_lock);<br />
pthread_exit(NULL);<br />
}<br />
work = tpool->queue_head;<br />
tpool->queue_head = tpool->queue_head->next;<br />
pthread_mutex_unlock(&tpool->queue_lock);<br />
<br />
work->routine(work->arg);<br />
free(work);<br />
}<br />
return NULL; <br />
}<br />
<br />
int tpool_create(int max_thr_num)<br />
{<br />
int i;<br />
<br />
tpool = calloc(1, sizeof(tpool_t));<br />
if (!tpool) {<br />
printf("%s: calloc failed\n", __FUNCTION__);<br />
exit(1);<br />
}<br />
<br />
tpool->max_thr_num = max_thr_num; //最大线程数<br />
tpool->shutdown = 0; //线程池enable<br />
tpool->queue_head = NULL; //线程链表 <br />
if (pthread_mutex_init(&tpool->queue_lock, NULL) !=0) { //创建锁<br />
printf("%s: pthread_mutex_init failed, errno:%d, error:%s\n",<br />
__FUNCTION__, errno, strerror(errno));<br />
exit(1);<br />
}<br />
if (pthread_cond_init(&tpool->queue_ready, NULL) !=0 ) { //创建条件变量<br />
printf("%s: pthread_cond_init failed, errno:%d, error:%s\n", <br />
__FUNCTION__, errno, strerror(errno));<br />
exit(1);<br />
}<br />
<br />
tpool->thr_id = calloc(max_thr_num, sizeof(pthread_t)); //线程id<br />
if (!tpool->thr_id) {<br />
printf("%s: calloc failed\n", __FUNCTION__);<br />
exit(1);<br />
}<br />
for (i = 0; i < max_thr_num; ++i) { //创建线程<br />
if (pthread_create(&tpool->thr_id[i], NULL, thread_routine, NULL) != 0){<br />
printf("%s:pthread_create failed, errno:%d, error:%s\n", __FUNCTION__, <br />
errno, strerror(errno));<br />
exit(1);<br />
}<br />
} <br />
return 0;<br />
}<br />
<br />
void tpool_destroy()<br />
{<br />
int i;<br />
tpool_work_t *member;<br />
<br />
if (tpool->shutdown) {<br />
return;<br />
}<br />
tpool->shutdown = 1; <br />
<br />
pthread_mutex_lock(&tpool->queue_lock);<br />
pthread_cond_broadcast(&tpool->queue_ready); //唤醒所有线程<br />
pthread_mutex_unlock(&tpool->queue_lock);<br />
for (i = 0; i < tpool->max_thr_num; ++i) {<br />
pthread_join(tpool->thr_id[i], NULL); //等待线程结束<br />
}<br />
free(tpool->thr_id); <br />
<br />
while(tpool->queue_head) {<br />
member = tpool->queue_head;<br />
tpool->queue_head = tpool->queue_head->next;<br />
free(member);<br />
}<br />
<br />
pthread_mutex_destroy(&tpool->queue_lock); //销毁锁<br />
pthread_cond_destroy(&tpool->queue_ready); //销毁条件变量<br />
<br />
free(tpool); <br />
}<br />
<br />
int tpool_add_work(void *(*routine)(void*), void *arg)<br />
{<br />
tpool_work_t *work, *member;<br />
<br />
if (!routine){<br />
printf("%s:Invalid argument\n", __FUNCTION__);<br />
return -1;<br />
}<br />
<br />
work = malloc(sizeof(tpool_work_t));<br />
if (!work) {<br />
printf("%s:malloc failed\n", __FUNCTION__);<br />
return -1;<br />
}<br />
work->routine = routine;<br />
work->arg = arg;<br />
work->next = NULL;<br />
<br />
pthread_mutex_lock(&tpool->queue_lock); <br />
member = tpool->queue_head;<br />
if (!member) {<br />
tpool->queue_head = work;<br />
} else {<br />
while(member->next) {<br />
member = member->next;<br />
}<br />
member->next = work;<br />
}<br />
pthread_cond_signal(&tpool->queue_ready);<br />
pthread_mutex_unlock(&tpool->queue_lock);<br />
<br />
return 0; <br />
}<br />
阅读(869) | 评论(0) | 转发(0) |