一、适用场景
首先,必须明确一点,线程池不是万能的,它有其特定的使用场景。使用线程池是为了减小线程本身的开销对应用性能所产生的影响,但是其前提是线程本身创建、销毁的开销和线程执行任务的开销相比是不可忽略的。如果线程本身创建、销毁的开销对应用程序的性能可以忽略不计,那么使用/不使用线程池对程序的性能并不会有太大的影响。
线程池通常适合以下几种场景:
①、单位时间内处理的任务频繁,且任务时间较短
②、对实时性要求较高。如果接收到任务之后再创建线程,可能无法满足实时性的要求,此时必须使用线程池。
③、必须经常面对高突发性事件。比如Web服务器。如果有足球转播,则服务器将产生巨大冲击,此时使用传统方法,则必须不停的大量创建、销毁线程。此时采用动态线程池可以避免这种情况的发生。
二、代码实现
注意事项:
之
前使用的是非分离线程,当存在线程异常退出时,操作系统无法及时回收内存空间。为了解决此问题,现使用分离线程。但是必须注意:不要使用
pthread_detach()来使线程成为分离线程,而应该通过线程属性(pthread_attr_t)的参数来设置线程为分离线程。否则,当线程
退出后,再调用pthread_kill()来判断线程是否还存在时,很可能出现段错误。
2.1 头文件
-
#if !defined(__THREAD_POOL_H__)
-
#define __THREAD_POOL_H__
-
-
#include
-
#include
-
#include
-
#include
-
#include
-
#include
-
#include
-
-
-
typedef int bool;
-
#define false (0)
-
#define true (1)
-
-
-
typedef struct _thread_worker_t
-
{
-
void *(*process)(void *arg);
-
void *arg;
-
struct _thread_worker_t *next;
-
}thread_worker_t;
-
-
-
typedef struct
-
{
-
pthread_mutex_t queue_lock;
-
pthread_cond_t queue_ready;
-
-
thread_worker_t *head;
-
bool isdestroy;
-
pthread_t *threadid;
-
int reqnum;
-
int num;
-
int queue_size;
-
}thread_pool_t;
-
-
-
extern int thread_pool_init(thread_pool_t **pool, int num);
-
extern int thread_pool_add_worker(thread_pool_t *pool, void *(*process)(void *arg), void *arg);
-
extern int thread_pool_keepalive(thread_pool_t *pool);
-
extern int thread_pool_destroy(thread_pool_t *pool);
-
-
#endif /*__THREAD_POOL_H__*/
2.2 函数实现
-
-
-
-
-
-
-
-
int thread_pool_init(thread_pool_t **pool, int num)
-
{
-
int idx = 0;
-
-
-
*pool = (thread_pool_t*)calloc(1, sizeof(thread_pool_t));
-
if(NULL == *pool)
-
{
-
return -1;
-
}
-
-
-
pthread_mutex_init(&((*pool)->queue_lock), NULL);
-
pthread_cond_init(&((*pool)->queue_ready), NULL);
-
(*pool)->head = NULL;
-
(*pool)->reqnum = num;
-
(*pool)->queue_size = 0;
-
(*pool)->isdestroy = false;
-
(*pool)->threadid = (pthread_t*)calloc(1, num*sizeof(pthread_t));
-
if(NULL == (*pool)->threadid)
-
{
-
free(*pool);
-
(*pool) = NULL;
-
-
return -1;
-
}
-
-
-
for(idx=0; idx
-
{
-
ret = thread_create_detach(*pool, idx);
-
if(0 != ret)
-
{
-
return -1;
-
}
-
(*pool)->num++;
-
}
-
-
return 0;
-
}
-
-
-
-
-
-
-
-
-
int thread_pool_add_worker(thread_pool_t *pool, void *(*process)(void *arg), void *arg)
-
{
-
thread_worker_t *worker=NULL, *member=NULL;
-
-
worker = (thread_worker_t*)calloc(1, sizeof(thread_worker_t));
-
if(NULL == worker)
-
{
-
return -1;
-
}
-
-
worker->process = process;
-
worker->arg = arg;
-
worker->next = NULL;
-
-
pthread_mutex_lock(&(pool->queue_lock));
-
-
member = pool->head;
-
if(NULL != member)
-
{
-
while(NULL != member->next) member = member->next;
-
member->next = worker;
-
}
-
else
-
{
-
pool->head = worker;
-
}
-
-
pool->queue_size++;
-
-
pthread_mutex_unlock(&(pool->queue_lock));
-
pthread_cond_signal(&(pool->queue_ready));
-
-
return 0;
-
}
-
-
-
-
-
-
-
-
-
-
-
-
int thread_pool_keepalive(thread_pool_t *pool)
-
{
-
int idx=0, ret=0;
-
-
for(idx=0; idxnum; idx++)
-
{
-
ret = pthread_kill(pool->thread[idx], 0);
-
if(ESRCH == ret)
-
{
-
ret = thread_create_detach(pool, idx);
-
if(ret < 0)
-
{
-
return -1;
-
}
-
}
-
}
-
-
return 0;
-
}
-
-
-
-
-
-
-
int thread_pool_destroy(thread_pool_t *pool)
-
{
-
int idx = 0;
-
thread_worker_t *member = NULL;
-
-
if(false != pool->isdestroy)
-
{
-
return -1;
-
}
-
-
pool->isdestroy = true;
-
-
pthread_cond_broadcast(&(pool->queue_ready));
-
for(idx=0; idxnum; idx++)
-
{
-
ret = pthread_kill(pool->threadid[idx], 0);
-
if(ESRCH == ret)
-
{
-
continue;
-
}
-
else
-
{
-
idx--;
-
sleep(1);
-
}
-
}
-
-
free(pool->threadid);
-
pool->threadid = NULL;
-
-
while(NULL != pool->head)
-
{
-
member = pool->head;
-
pool->head = member->next;
-
free(member);
-
}
-
-
pthread_mutex_destroy(&(pool->queue_lock));
-
pthread_cond_destroy(&(pool->queue_ready));
-
free(pool);
-
-
return 0;
-
}
-
-
-
-
-
-
-
-
-
-
static int thread_create_detach(thread_pool_t *pool, int idx)
-
{
-
int ret = 0;
-
pthread_attr_t attr;
-
-
do
-
{
-
ret = pthread_attr_init(&attr);
-
if(0 != ret)
-
{
-
return -1;
-
}
-
-
ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
-
if(0 != ret)
-
{
-
return -1;
-
}
-
-
ret = pthread_create(&((*pool)->threadid[idx]), &attr, thread_routine, *pool);
-
if(0 != ret)
-
{
-
pthread_attr_destroy(&attr);
-
-
if(EINTR == errno)
-
{
-
continue;
-
}
-
return -1;
-
}
-
pthread_attr_destroy(&attr);
-
}while(0);
-
-
return 0;
-
}
-
-
-
-
-
-
-
static void *thread_routine(void *arg)
-
{
-
thread_worker_t *worker = NULL;
-
thread_pool_t *pool = (thread_pool_t*)arg;
-
-
while(1)
-
{
-
pthread_mutex_lock(&(pool->queue_lock));
-
while((false == pool->isdestroy) && (0 == pool->queue_size))
-
{
-
pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock));
-
}
-
-
if(false != pool->isdestroy)
-
{
-
pthread_mutex_unlock(&(pool->queue_lock));
-
pthread_exit(NULL);
-
}
-
-
pool->queue_size--;
-
worker = pool->head;
-
pool->head = worker->next;
-
pthread_mutex_unlock(&(pool->queue_lock);
-
-
(*(worker->process))(worker->arg);
-
-
free(worker);
-
worker = NULL;
-
}
-
}
-
#define THREAD_MAX_NUM (32)
-
#define SLEEP (10)
-
-
int myprocess(void *arg)
-
{
-
fprintf(stdout, "[%s][%d] threadid:%d arg:%d", __FILE__, __LINE__, pthread_self(), *(int*)arg);
-
return 0;
-
}
-
-
int main(void)
-
{
-
int ret=0, idx=0;
-
thread_pool_t *pool = NULL;
-
int array[THREAD_MAX_NUM] = {0};
-
-
ret = thread_pool_init(&pool, THREAD_MAX_NUM);
-
if(ret < 0)
-
{
-
return -1;
-
}
-
-
for(idx=0; idx
-
{
-
array[idx] = idx;
-
thread_pool_add_worker(pool, myprocess, &array[idx]);
-
}
-
-
thread_pool_destroy(pool);
-
pool = NULL;
-
return 0;
-
}
阅读(926) | 评论(0) | 转发(0) |