池化技术
在系统开发过程中,我们经常会用到池化技术。通俗的讲,池化技术就是:把一些资源预先分配好,组织到对象池中,之后的业务使用资源从对象池中获取,使用完后放回到对象池中。这样做带来几个明显的好处:
1),资源重复使用, 减少了资源分配和释放过程中的系统消耗。比如,在IO密集型的服务器上,并发处理过程中的子线程或子进程的创建和销毁过程,带来的系统开销将是难以接受的。所以在业务实现上,通常把一些资源预先分配好,如线程池,数据库连接池,Redis连接池,HTTP连接池等,来减少系统消耗,提升系统性能。
2),可以对资源的整体使用做限制。这个好理解,相关资源预分配且只在预分配是生成,后续不再动态添加,从而限制了整个系统对资源的使用上限。类似一个令牌桶的功能。
3),池化技术分配对象池,通常会集中分配,这样有效避免了碎片化的问题。
线程池
有了上面的池化技术铺垫,理解线程池就很容易了:
1)先启动若干数量的线程,并让这些线程都处于睡眠状态,并以一定的方式组织起来,形成“线程池”;
2)当客户端有一个新请求时,就会从线程池中选取一个线程,并唤醒线程,让它来处理客户端的这个请求;
3)当处理完这个请求后,线程又处于睡眠状态,并且放回到线程池中,等待任务到来调度。
线程池设计
如上图,是基于线程条件变量唤醒的简单线程池模型,主要包括了:任务,任务队列,工作线程几个组件。说明如下:
任务(job)
任务是对一次可执行过的描述,通常包括执行的方法(执行函数)和数据。比如,一个报文接收过程就是一个任务,包括:报文处理函数和报文内容。本例中,对job的定义如下:
-
typedef struct thread_job {
-
struct list_head list;
-
struct thread_pool* pool;
-
job_process_func jfunc; // 任务执行函数
-
void* data; // 任务数据,由调用者分配,框架释放
-
} job_t
任务队列(job_queue)
任务队列缓存着所有待执行的任务,供各个工作线程调度。当工作线程空闲或被唤醒时,会尝试从任务队列中摘取一个任务,然后在线程空间内执行。任务队列用线程互斥锁进行保护。为了方便理解整个任务执行转换的过程,本例中,把空闲任务队列单独出来了。 所以,任务的状态有四种:
1)在空闲对列中等待业务;
2)在业务流程中装配;
3)在工作队列中等待调度;
4)在工作线程中执行。
工作线程
工作线程等待在条件变量上(job_dequeue_valid,新任务需要处理),当新任务到达时,线程被唤醒。线程会尝试从任务队列上获取任务,并执行它;执行完毕后重新等待在条件变量上。如此往复:
-
while (1)
-
{
-
if (thrdp_stopping(pool))
-
{
-
pthread_exit(NULL);
-
}
-
-
pthread_mutex_lock(&(pool->qlock));
-
while (list_empty(&pool->job_queue))
-
{
-
// nothing to process, wait until job come in
-
printf("thread<%u> get none job, hangup...\n", (unsigned int)pthread_self() );
-
pthread_cond_wait(&(pool->job_dequeue_valid), &(pool->qlock));
-
-
if (thrdp_stopping(pool))
-
{
-
pthread_exit(NULL);
-
}
-
-
}
-
job = list_first_entry(&pool->job_queue, struct thread_job, list);
-
list_del(&job->list);
-
pthread_mutex_unlock(&(pool->qlock));
-
job->jfunc(job);
-
}
如上代码片段,特别提醒一下,当工作线程从等待条件中被唤醒时,一定要对当前的执行条件再检查,原因是当前线程可能是被“虚假唤醒”。
比如,当前队列是空的,有A,B,C三个工作线程都在等待任务;当一个任务R0到来时,A,B,C三个线程均可能被唤醒;由于任务队列锁的关系,其中只有一个(假如A)取得的实际任务,而B和C均未取得任务,因此B和C称为被“虚假唤醒”。此时,B和C必须对任务获取状态进行再次判断,确保正常获取到任务,方能往下执行调度,否则,应该重新休眠等待任务。
代码实现
附完整代码和测试代码threadpool.h
-
#ifndef __THREAD_POOL_H__
-
#define __THREAD_POOL_H__
-
#include <linux/types.h>
-
#include <pthread.h>
-
#include <syslog.h>
-
#include <sys/stat.h>
-
#include <unistd.h>
-
#include <stdio.h>
-
#include <stdlib.h>
-
#include <sys/socket.h>
-
#include <sys/types.h>
-
#include <string.h>
-
#include <asm/types.h>
-
#include <linux/netlink.h>
-
#include <linux/socket.h>
-
#include <linux/types.h>
-
#include <sys/socket.h>
-
#include <netinet/in.h>
-
#include <arpa/inet.h>
-
#include <errno.h>
-
#include <unistd.h>
-
-
#include "list.h"
-
-
-
#define THRDP_STATUS_STOPPING 0x01 // 线程池是否已经关闭
-
-
typedef void (*job_process_func)(void*);
-
-
struct thread_pool
-
{
-
int thread_num; // 线程池中开启线程的个数
-
int max_job_num; // 队列中最大job的个数
-
struct list_head job_queue; // 待处理的job队列
-
struct list_head free_job_queue; // job空闲池
-
-
-
pthread_t *pthreads; //线程池中所有线程的pthread_t
-
pthread_mutex_t qlock; //互斥信号量,保护job_queue
-
pthread_mutex_t fqlock; //互斥信号量,保护free_job_queue
-
pthread_cond_t job_dequeue_valid; // 隊列不為空,需要喚醒線程處理
-
pthread_cond_t job_enqueue_valid; // 隊列沒有滿,可以繼續添加job
-
-
int flags;
-
};
-
-
typedef struct thread_job {
-
struct list_head list;
-
struct thread_pool* pool;
-
job_process_func jfunc; // 任务执行函数
-
void* data; // 任务数据,由调用者分配,框架释放
-
} job_t;
-
-
struct thread_pool* THRDP_new(unsigned int thread_num, unsigned int max_job_num);
-
void THRDP_destroy(struct thread_pool* pool);
-
struct thread_job* THRDP_job_get(struct thread_pool* pool);
-
void THRDP_job_put(struct thread_pool* pool, struct thread_job* job);
-
void THRDP_job_add(struct thread_pool* pool, struct thread_job* job);
-
-
#endif
注意,上面引用了list.h,请查看我之前发的用户态list的头文件。
threadpool.c
-
#include "threadpool.h"
-
-
-
static inline int thrdp_stopping (struct thread_pool* pool)
-
{
-
return pool->flags & THRDP_STATUS_STOPPING;
-
}
-
-
static void* thrdp_process_func(void* arg)
-
{
-
struct thread_pool* pool = (struct thread_pool*)arg;
-
struct thread_job* job;
-
-
while (1)
-
{
-
if (thrdp_stopping(pool))
-
{
-
pthread_exit(NULL);
-
}
-
-
pthread_mutex_lock(&(pool->qlock));
-
while (list_empty(&pool->job_queue))
-
{
-
// nothing to process, wait until job come in
-
printf("thread<%u> get none job, hangup...\n", (unsigned int)pthread_self() );
-
pthread_cond_wait(&(pool->job_dequeue_valid), &(pool->qlock));
-
-
if (thrdp_stopping(pool))
-
{
-
pthread_exit(NULL);
-
}
-
-
}
-
job = list_first_entry(&pool->job_queue, struct thread_job, list);
-
list_del(&job->list);
-
pthread_mutex_unlock(&(pool->qlock));
-
job->jfunc(job);
-
}
-
}
-
-
-
struct thread_pool* THRDP_new(unsigned int thread_num, unsigned int max_job_num)
-
{
-
struct thread_pool* pool = NULL;
-
struct thread_job* job;
-
struct thread_job* tmp;
-
int i;
-
-
if(thread_num < 2)
-
{
-
thread_num = 2;
-
}
-
-
if (max_job_num < thread_num)
-
{
-
max_job_num = thread_num;
-
}
-
-
pool = malloc(sizeof(struct thread_pool));
-
if (NULL == pool)
-
{
-
printf("failed to malloc thread_pool!\n");
-
return NULL;
-
}
-
memset(pool, 0, sizeof(struct thread_pool) );
-
pool->thread_num = thread_num;
-
pool->max_job_num = max_job_num;
-
-
INIT_LIST_HEAD(&pool->job_queue);
-
INIT_LIST_HEAD(&pool->free_job_queue);
-
-
if (pthread_mutex_init(&(pool->qlock), NULL))
-
{
-
printf("failed to init job queue mutex lock!\n");
-
goto ERROR_OUT;
-
}
-
if (pthread_mutex_init(&(pool->fqlock), NULL))
-
{
-
printf("failed to init free job queue mutex lock!\n");
-
goto ERROR_OUT1;
-
}
-
if (pthread_cond_init(&(pool->job_dequeue_valid), NULL))
-
{
-
printf("failed to init cond job_dequeue_valid!\n");
-
goto ERROR_OUT2;
-
}
-
if (pthread_cond_init(&(pool->job_enqueue_valid), NULL))
-
{
-
printf("failed to init cond job_enqueue_valid!\n");
-
goto ERROR_OUT3;
-
}
-
-
for (i = 0; i < max_job_num; i++)
-
{
-
job = (struct thread_job*)malloc(sizeof(struct thread_job));
-
if (NULL == job)
-
{
-
goto ERROR_OUT4;
-
}
-
memset(job, 0, sizeof(struct thread_job));
-
list_add(&job->list, &pool->free_job_queue);
-
}
-
-
pool->pthreads = malloc(sizeof(pthread_t) * thread_num);
-
if (NULL == pool->pthreads)
-
{
-
printf("failed to malloc pthreads!\n");
-
goto ERROR_OUT4;
-
}
-
memset(pool->pthreads, 0, sizeof(pthread_t) * thread_num);
-
-
-
for (i = 0; i < pool->thread_num; ++i)
-
{
-
pthread_create(&(pool->pthreads[i]), NULL, thrdp_process_func, (void*)pool);
-
}
-
-
printf("create pool done\n");
-
return pool;
-
-
ERROR_OUT4:
-
if (!list_empty(&pool->free_job_queue))
-
{
-
list_for_each_entry_safe(job, tmp, &pool->free_job_queue, list)
-
{
-
list_del(&job->list);
-
free(job);
-
}
-
}
-
pthread_cond_destroy(&(pool->job_enqueue_valid));
-
ERROR_OUT3:
-
pthread_cond_destroy(&(pool->job_dequeue_valid));
-
ERROR_OUT2:
-
pthread_mutex_destroy(&(pool->fqlock));
-
ERROR_OUT1:
-
pthread_mutex_destroy(&(pool->qlock));
-
ERROR_OUT:
-
free(pool);
-
return NULL;
-
-
}
-
-
void THRDP_destroy(struct thread_pool* pool)
-
{
-
struct thread_job* job;
-
struct thread_job* tmp;
-
int i;
-
-
if (!pool)
-
{
-
return;
-
}
-
pthread_mutex_lock(&(pool->qlock));
-
if (thrdp_stopping(pool)) //线程池已经退出了,就直接返回
-
{
-
pthread_mutex_unlock(&(pool->qlock));
-
return;
-
}
-
-
// 設置stop標記,喚醒阻塞的線程,使其退出
-
pool->flags |= THRDP_STATUS_STOPPING;
-
pthread_mutex_unlock(&(pool->qlock));
-
-
pthread_cond_broadcast(&(pool->job_dequeue_valid)); //唤醒线程池中正在阻塞的线程
-
pthread_cond_broadcast(&(pool->job_enqueue_valid)); //唤醒添加任务的threadpool_add_job函数
-
-
for (i = 0; i < pool->thread_num; ++i)
-
{
-
pthread_join(pool->pthreads[i], NULL); //等待线程池的所有线程执行完毕
-
}
-
free(pool->pthreads);
-
-
// all threads had done, no one using jobs, just free them
-
if (!list_empty(&pool->job_queue))
-
{
-
list_for_each_entry_safe(job, tmp, &pool->job_queue, list)
-
{
-
list_del(&job->list);
-
free(job);
-
}
-
}
-
-
if (!list_empty(&pool->free_job_queue))
-
{
-
list_for_each_entry_safe(job, tmp, &pool->free_job_queue, list)
-
{
-
list_del(&job->list);
-
free(job);
-
}
-
}
-
-
pthread_mutex_destroy(&(pool->qlock));
-
pthread_mutex_destroy(&(pool->fqlock));
-
pthread_cond_destroy(&(pool->job_dequeue_valid));
-
pthread_cond_destroy(&(pool->job_enqueue_valid));
-
}
-
-
/**
-
* get a free JOB
-
* if all jobs busy, wait util someone be free
-
*/
-
struct thread_job* THRDP_job_get(struct thread_pool* pool)
-
{
-
struct thread_job* job;
-
-
if (!pool || thrdp_stopping(pool))
-
{
-
return NULL;
-
}
-
-
pthread_mutex_lock(&(pool->fqlock));
-
while (list_empty(&pool->free_job_queue))
-
{
-
pthread_cond_wait(&(pool->job_enqueue_valid), &(pool->fqlock));
-
if (thrdp_stopping(pool))
-
{
-
pthread_mutex_unlock(&(pool->fqlock));
-
return NULL;
-
}
-
-
}
-
job = list_first_entry(&pool->free_job_queue, struct thread_job, list);
-
list_del(&job->list);
-
pthread_mutex_unlock(&(pool->fqlock));
-
job->pool = pool;
-
return job;
-
}
-
-
-
/**
-
* put back a used job back to free_job_queue
-
*
-
* this will free the job data which MUST allocated dynamically
-
*/
-
void THRDP_job_put(struct thread_pool* pool, struct thread_job* job)
-
{
-
int notify = 0;
-
if (!pool || !job )
-
{
-
return;
-
}
-
if (job->data)
-
{
-
free(job->data);
-
job->data = NULL;
-
}
-
-
memset(job, 0, sizeof(struct thread_job));
-
-
pthread_mutex_lock(&(pool->fqlock));
-
notify = list_empty(&pool->free_job_queue);
-
list_add_tail(&job->list, &pool->free_job_queue);
-
pthread_mutex_unlock(&(pool->fqlock));
-
-
if (notify)
-
{
-
printf("put back %p to free queue, notify wait jos tasks\n", job);
-
pthread_cond_broadcast(&(pool->job_enqueue_valid));
-
}
-
}
-
-
/**
-
* add a job to thread-pool for schedule
-
*/
-
void THRDP_job_add(struct thread_pool* pool, struct thread_job* job)
-
{
-
int notify = 0;
-
-
if (!pool || !job )
-
{
-
return;
-
}
-
-
pthread_mutex_lock(&(pool->qlock));
-
notify = list_empty(&pool->job_queue);
-
list_add_tail(&job->list, &pool->job_queue);
-
pthread_mutex_unlock(&(pool->qlock));
-
-
if (notify)
-
{
-
// notify somebody to handle the new job
-
printf("add new job, notify someone to handle it\n");
-
pthread_cond_broadcast(&(pool->job_dequeue_valid));
-
}
-
}
测试文件thrdp_test.c
-
#include <stdio.h>
-
-
#include <string.h>
-
#include <stdlib.h>
-
#include <syslog.h>
-
#include <sys/socket.h>
-
#include <sys/types.h>
-
#include <signal.h>
-
#include <getopt.h>
-
#include <netinet/in.h>
-
#include <arpa/inet.h>
-
#include <string.h>
-
#include <sys/select.h>
-
#include <sys/un.h>
-
#include <stddef.h>
-
#include <unistd.h>
-
#include <errno.h>
-
#include <sys/stat.h>
-
#include <fcntl.h>
-
#include <sys/wait.h>
-
#include <unistd.h>
-
#include <uuid/uuid.h>
-
#include <sys/time.h>
-
-
#include "threadpool.h"
-
-
#define SRV_PORT 9966
-
#define UDP_PKG_MAX_LEN 4096
-
-
struct pkg_desc
-
{
-
int srvfd;
-
struct sockaddr_in cliaddr;
-
char data[UDP_PKG_MAX_LEN];
-
};
-
-
-
void pkg_process(void* job)
-
{
-
struct thread_job* pjob = (struct thread_job*) job;
-
struct pkg_desc* ppkg = (struct pkg_desc*) pjob->data;
-
-
printf("************* process in %u ******************\n", (unsigned int)pthread_self());
-
printf("content:: %s\n", ppkg->data);
-
sendto(ppkg->srvfd, ppkg->data, strlen(ppkg->data), 0, (struct sockaddr*)&ppkg->cliaddr, sizeof(ppkg->cliaddr));
-
THRDP_job_put(pjob->pool, pjob);
-
}
-
-
-
void start_server(void)
-
{
-
struct sockaddr_in srvin, cliin;
-
socklen_t addrlen;
-
int cmd_mgmt_fd;
-
struct thread_pool* pool = NULL;
-
-
-
cmd_mgmt_fd = socket(AF_INET, SOCK_DGRAM, 0);
-
if(cmd_mgmt_fd < 0)
-
{
-
printf("start_cmd_mgmt_process socket fd failed.\n");
-
return;
-
}
-
-
memset(&srvin, 0, sizeof(struct sockaddr_in));
-
srvin.sin_family = AF_INET;
-
srvin.sin_addr.s_addr = inet_addr("127.0.0.1");
-
srvin.sin_port = htons(SRV_PORT);
-
-
if(bind(cmd_mgmt_fd, (struct sockaddr*)&srvin, sizeof(struct sockaddr_in)) < 0)
-
{
-
printf("bind error");
-
}
-
-
-
if ((pool = THRDP_new(2, 8)) == NULL)
-
{
-
printf("create thread pool error\n");
-
return;
-
}
-
-
while(1)
-
{
-
struct pkg_desc* ppkg = NULL;
-
struct thread_job* job = THRDP_job_get(pool);
-
int count;
-
-
if (NULL == job)
-
{
-
printf("get thread process task error\n");
-
return;
-
}
-
ppkg = (struct pkg_desc*) malloc(sizeof(struct pkg_desc));
-
-
if (NULL == ppkg)
-
{
-
printf("malloc pkg error\n");
-
return;
-
}
-
memset(ppkg, 0, sizeof(struct pkg_desc));
-
-
-
addrlen = sizeof(struct sockaddr_in);
-
count = recvfrom(cmd_mgmt_fd, ppkg->data, UDP_PKG_MAX_LEN, 0, (struct sockaddr*)&ppkg->cliaddr, &addrlen);
-
if(count <= 0)
-
{
-
printf("recvform error\n");
-
continue;
-
}
-
-
ppkg->srvfd = cmd_mgmt_fd;
-
job->data = ppkg;
-
job->jfunc = pkg_process;
-
THRDP_job_add(pool, job);
-
}
-
}
-
-
-
void start_client(int client_id)
-
{
-
struct sockaddr_in srvaddr;
-
struct sockaddr_in peeraddr;
-
int len = sizeof(peeraddr);
-
int fd;
-
char ctnbuf[512] = {0};
-
char rcvbuf[512] = {0};
-
unsigned int i = 0;
-
fd = socket(AF_INET, SOCK_DGRAM, 0);
-
if(fd < 0)
-
{
-
printf("start_client socket fd failed.\n");
-
return;
-
}
-
-
memset(&srvaddr, 0, sizeof(struct sockaddr_in));
-
srvaddr.sin_family = AF_INET;
-
srvaddr.sin_addr.s_addr = inet_addr("127.0.0.1");
-
srvaddr.sin_port = htons(SRV_PORT);
-
-
-
while(1)
-
{
-
-
memset(ctnbuf, 0, 512);
-
sprintf(ctnbuf, "%d say just test:%u", client_id, i++);
-
-
sendto(fd, ctnbuf, strlen(ctnbuf), 0, (struct sockaddr*)&srvaddr, sizeof(srvaddr));
-
memset(rcvbuf, 0, 512);
-
len = sizeof(peeraddr);
-
recvfrom(fd, rcvbuf, 512, 0, (struct sockaddr*)&peeraddr, &len);
-
printf("recv content:%s\n", rcvbuf);
-
}
-
}
-
-
int main(int argc, char** argv)
-
{
-
if (argc < 2)
-
{
-
printf("usage : %s [-s/-c] [cid]", argv[0]);
-
return 0;
-
}
-
if (!strcmp(argv[1], "-s"))
-
{
-
start_server();
-
}
-
else
-
{
-
start_client(atoi(argv[2]));
-
}
-
return 0;
-
-
}
测试是一个简单的udp回射服务器模型,用线程池的方式做服务端并发。
编译: gcc -g -o thrdp *.c
启动server端: ./thrdp -s
多终端启动client端: ./thrdp -c 100,./thrdp -c 102, ...
运行截图如下
阅读(2461) | 评论(0) | 转发(0) |