main
|-----......
|-----socketServer
|-----tcpsetserveropt
|-----daemon_init
|-----
tracker_service_init
|-----.....
tracker_service_init函数对tracker服务进行初始化工作,主要完成的任务有下面几点
(1) 初始化线程锁与线程属性变量
(2) 初始化全局Task链表全局变量g_free_queue
(3) 初始化工作线程全局变量g_thread_data
(4) 创建工作线程
tracker_service_init函数如下
-
int tracker_service_init()
-
{
-
int result;
-
struct tracker_thread_data *pThreadData;
-
struct tracker_thread_data *pDataEnd;
-
pthread_t tid;
-
pthread_attr_t thread_attr;
-
//初始化tracker_thread_lock锁
-
if ((result=init_pthread_lock(&tracker_thread_lock)) != 0)
-
{
-
return result;
-
}
-
//初始化lb_thread_lock锁
-
if ((result=init_pthread_lock(&lb_thread_lock)) != 0)
-
{
-
return result;
-
}
-
//初始化线程属性
-
if ((result=init_pthread_attr(&thread_attr, g_thread_stack_size)) != 0)
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"init_pthread_attr fail, program exit!", __LINE__);
-
return result;
-
}
-
//初始化全局链表
-
if ((result=free_queue_init(g_max_connections, TRACKER_MAX_PACKAGE_SIZE,\
-
TRACKER_MAX_PACKAGE_SIZE, sizeof(TrackerClientInfo))) != 0)
-
{
-
return result;
-
}
-
-
g_thread_data = (struct tracker_thread_data *)malloc(sizeof( \
-
struct tracker_thread_data) * g_work_threads);
-
if (g_thread_data == NULL)
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"malloc %d bytes fail, errno: %d, error info: %s", \
-
__LINE__, (int)sizeof(struct tracker_thread_data) * \
-
g_work_threads, errno, STRERROR(errno));
-
return errno != 0 ? errno : ENOMEM;
-
}
-
-
g_tracker_thread_count = 0;
-
pDataEnd = g_thread_data + g_work_threads;
-
for (pThreadData=g_thread_data; pThreadData<pDataEnd; pThreadData++)
-
{
-
pThreadData->ev_base = event_base_new();//创建eventbase
-
if (pThreadData->ev_base == NULL)
-
{
-
result = errno != 0 ? errno : ENOMEM;
-
logError("file: "__FILE__", line: %d, " \
-
"event_base_new fail.", __LINE__);
-
return result;
-
}
-
-
if (pipe(pThreadData->pipe_fds) != 0)//创建管道
-
{
-
result = errno != 0 ? errno : EPERM;
-
logError("file: "__FILE__", line: %d, " \
-
"call pipe fail, " \
-
"errno: %d, error info: %s", \
-
__LINE__, result, STRERROR(result));
-
break;
-
}
-
-
if ((result=set_nonblock(pThreadData->pipe_fds[0])) != 0)//设置非阻塞
-
{
-
break;
-
}
-
-
if ((result=pthread_create(&tid, &thread_attr, \//创建工作线程
-
work_thread_entrance, pThreadData)) != 0)
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"create thread failed, startup threads: %d, " \
-
"errno: %d, error info: %s", \
-
__LINE__, g_tracker_thread_count, \
-
result, STRERROR(result));
-
break;
-
}
-
else //创建线程成功
-
{
-
if ((result=pthread_mutex_lock(&tracker_thread_lock)) != 0)//加锁
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"call pthread_mutex_lock fail, " \
-
"errno: %d, error info: %s", \
-
__LINE__, result, STRERROR(result));
-
}
-
g_tracker_thread_count++; //修改全局线程数目,这里加锁是为了防止该变量在其他线程中被修改
-
if ((result=pthread_mutex_unlock(&tracker_thread_lock)) != 0)
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"call pthread_mutex_lock fail, " \
-
"errno: %d, error info: %s", \
-
__LINE__, result, STRERROR(result));
-
}
-
}
-
}
-
-
pthread_attr_destroy(&thread_attr);//销毁线程属性变量
-
-
return 0;
-
}
在tracker_thread_init函数的开头处初始化了线程锁与线程属性变量,初始化全局g_free_queue队列在函数free_queue_init中
tracker_thread_data数据结构如下
-
struct tracker_thread_data
-
{
-
struct event_base *ev_base;
-
int pipe_fds[2];
-
}
一个tracker_thread_data对象包括一个eventbase指针对象与一个管道pipe对象
free_queue_init函数代码如下:
-
//初始化g_free_queue与g_mpool,g_mpool表示链表首节点,g_free_queue表示链表
-
int free_queue_init(const int max_connections, const int min_buff_size, \
-
const int max_buff_size, const int arg_size)
-
{
-
struct fast_task_info *pTask;
-
char *p;
-
char *pCharEnd;
-
int block_size;
-
int alloc_size;
-
int64_t total_size;
-
int result;
-
-
if ((result=init_pthread_lock(&(g_free_queue.lock))) != 0)//加锁
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"init_pthread_lock fail, errno: %d, error info: %s", \
-
__LINE__, result, STRERROR(result));
-
return result;
-
}
-
-
block_size = sizeof(struct fast_task_info) + arg_size;//一个块的大小=fast_task_info的大小+arg_size
-
alloc_size = block_size * max_connections;//需分配的字节数=一个块的大小*最大连接数
-
-
if (max_buff_size > min_buff_size)//如果参数中max大于min
-
{
-
total_size = alloc_size; //则总字节数=需分配字节数
-
g_free_queue.malloc_whole_block = false;//这里置为false,表示需要单独分配空间,空间已经包含在totalsize中了
-
}
-
else //当min==max 或者min>max
-
{
-
struct rlimit rlimit_data;
-
rlim_t max_data_size;
-
-
if (getrlimit(RLIMIT_DATA, &rlimit_data) < 0) //获取linux系统资源限制,这里获取的是一次malloc所能得到的
-
{//最大的空间的大小
-
logError("file: "__FILE__", line: %d, " \
-
"call getrlimit fail, " \
-
"errno: %d, error info: %s", \
-
__LINE__, errno, STRERROR(errno));
-
return errno != 0 ? errno : EPERM;
-
}
-
if (rlimit_data.rlim_cur == RLIM_INFINITY)
-
{
-
max_data_size = 512 * 1024 * 1024;//512M
-
}
-
else
-
{
-
max_data_size = rlimit_data.rlim_cur;
-
if (max_data_size > 512 * 1024 * 1024)
-
{
-
max_data_size = 512 * 1024 * 1024;
-
}
-
}
-
-
total_size = alloc_size+(int64_t)min_buff_size*max_connections;//这里相当于每个块增加了min_buff_size的大小
-
if (total_size <= max_data_size)//<=512M
-
{
-
g_free_queue.malloc_whole_block = true;//表示不需要单独分配空间,需要的空间已经包含在total_size中了
-
block_size += min_buff_size;//调整一个块的大小
-
}
-
else
-
{
-
g_free_queue.malloc_whole_block = false;//需要单独分配空间
-
total_size = alloc_size;//不包括min_buff_size*max_connections
-
}
-
}
-
-
g_mpool = (struct fast_task_info *)malloc(total_size);//为g_mpool分配空间
-
if (g_mpool == NULL)
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"malloc "INT64_PRINTF_FORMAT" bytes fail, " \
-
"errno: %d, error info: %s", \
-
__LINE__, total_size, errno, STRERROR(errno));
-
return errno != 0 ? errno : ENOMEM;
-
}
-
memset(g_mpool, 0, total_size);//置0
-
-
pCharEnd = ((char *)g_mpool) + total_size;
-
for (p=(char *)g_mpool; p<pCharEnd; p += block_size)
-
{
-
pTask = (struct fast_task_info *)p;
-
pTask->size = min_buff_size;//表示单独分配的空间大小
-
-
pTask->arg = p + sizeof(struct fast_task_info);//fast_task_info结构后存放参数
-
if (g_free_queue.malloc_whole_block)//不需要单独分配,空间已包含在total_size
-
{
-
pTask->data = (char *)pTask->arg + arg_size;//让指针越过存放参数的空间
-
}
-
else//false,表示需要单独分配空间
-
{
-
pTask->data = (char *)malloc(pTask->size);//单独分配空间
-
if (pTask->data == NULL)//分配失败
-
{
-
free_queue_destroy();
-
-
logError("file: "__FILE__", line: %d, " \
-
"malloc %d bytes fail, " \
-
"errno: %d, error info: %s", \
-
__LINE__, pTask->size, \
-
errno, STRERROR(errno));
-
return errno != 0 ? errno : ENOMEM;
-
}
-
}
-
}
-
-
g_free_queue.tail = (struct fast_task_info *)(pCharEnd - block_size);//找到尾部指针
-
for (p=(char *)g_mpool; p<(char *)g_free_queue.tail; p += block_size)//把分配的空间用指针串连起来
-
{
-
pTask = (struct fast_task_info *)p;
-
pTask->next = (struct fast_task_info *)(p + block_size);
-
}
-
-
g_free_queue.max_connections = max_connections;
-
g_free_queue.min_buff_size = min_buff_size;
-
g_free_queue.max_buff_size = max_buff_size;
-
g_free_queue.arg_size = arg_size;
-
g_free_queue.head = g_mpool;//设置队列的头节点
-
g_free_queue.tail->next = NULL;
-
-
return 0;
-
}
fast_task_queue对象
-
struct fast_task_queue
-
{
-
struct fast_task_info *head; //执行链表头节点
-
struct fast_task_info *tail; //指向链表尾节点
-
pthread_mutex_t lock; //互斥锁
-
int max_connections; //最大连接数
-
int min_buff_size; //最小的buffsize
-
int max_buff_size; //最大的buffsize
-
int arg_size; //参数size
-
bool malloc_whole_block; //标识fast_task_info中的数据缓冲区data是否包括在分配总内存中,还是单独进行了分配
-
}
fast_task_info对象
-
struct fast_task_info
-
{
-
char client_ip[IP_ADDRESS_SIZE]; //IP地址
-
struct event ev_read; //read event
-
struct event ev_write; //write event
-
void *arg; //extra argument pointer //指向参数
-
char *data; //buffer for write or recv //指向存放数据的缓冲区
-
int size; //alloc size //为data分配的空间的大小,默认8*1024byte
-
int length; //data length //当前数据缓冲区data的长度
-
int offset; //current offset //当前数据缓冲区data的偏移量
-
int req_count; //request count //request数目
-
struct fast_task_info *next; //下一个fast_task_info对象
-
TaskFinishCallBack finish_callback; //回调函数
-
}
一个fast_task_info对象对应一个请求,请求的数据放入fast_task_info对象的数据缓冲区data中
free_queue_init函数执行完成后,全局变量g_free_queue对象初始化完成,g_free_queue对象指向的是一段连续内存,这段内存按照fast_task_info对象个数进行了划分,并使用fast_task_info对象中的next指针,进行了对象串联,info对象的空间中可能包括数据缓存区data,也有可能没有包括数据缓冲区data,是否包括数据缓冲区需要根据queue对象的malloc_whole_block指定,而malloc_whole_block又是根据连续内存分配的容量是否大于512M来决定。
阅读(1965) | 评论(0) | 转发(0) |