main
|-----......
|-----socketServer
|-----tcpsetserveropt
|-----daemon_init
|-----tracker_service_init
|-----.....
-
g_thread_data = (struct tracker_thread_data *)malloc(sizeof( \
-
struct tracker_thread_data) * g_work_threads)
为全局g_thread_data分配空间,g_thread_data指向的也是一段连续内存,连续内存按照tracker_thread_data对象的个数进行划分
-
g_tracker_thread_count = 0;
-
pDataEnd = g_thread_data + g_work_threads;
-
for (pThreadData=g_thread_data; pThreadData<pDataEnd; pThreadData++)
-
{
-
pThreadData->ev_base = event_base_new();//创建eventbase
-
if (pThreadData->ev_base == NULL)
-
{
-
result = errno != 0 ? errno : ENOMEM;
-
logError("file: "__FILE__", line: %d, " \
-
"event_base_new fail.", __LINE__);
-
return result;
-
}
-
-
if (pipe(pThreadData->pipe_fds) != 0)//创建管道
-
{
-
result = errno != 0 ? errno : EPERM;
-
logError("file: "__FILE__", line: %d, " \
-
"call pipe fail, " \
-
"errno: %d, error info: %s", \
-
__LINE__, result, STRERROR(result));
-
break;
-
}
-
-
if ((result=set_nonblock(pThreadData->pipe_fds[0])) != 0)//设置非阻塞
-
{
-
break;
-
}
-
-
if ((result=pthread_create(&tid, &thread_attr, \//创建工作线程
-
work_thread_entrance, pThreadData)) != 0)
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"create thread failed, startup threads: %d, " \
-
"errno: %d, error info: %s", \
-
__LINE__, g_tracker_thread_count, \
-
result, STRERROR(result));
-
break;
-
}
-
else //创建线程成功
-
{
-
if ((result=pthread_mutex_lock(&tracker_thread_lock)) != 0)//加锁
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"call pthread_mutex_lock fail, " \
-
"errno: %d, error info: %s", \
-
__LINE__, result, STRERROR(result));
-
}
-
g_tracker_thread_count++; //修改全局线程数目,这里加锁是为了防止该变量在其他线程中被修改
-
if ((result=pthread_mutex_unlock(&tracker_thread_lock)) != 0)
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"call pthread_mutex_lock fail, " \
-
"errno: %d, error info: %s", \
-
__LINE__, result, STRERROR(result));
-
}
-
}
-
}
该段代码在tracke_service_init函数中初始化了全局g_thread_data对象,包括为每个tracker_thread_data对象中的eventbase初始化,为每个tracker_thread_data对象中的pipe初始化,为每个tracker_thread_data对象创建一个工作线程,修改全局g_tracker_thread_count工作线程记录数变量
下面介绍work_thread_entrance函数,该函数是工作线程的入口函数
-
static void *work_thread_entrance(void* arg)
-
{
-
int result;
-
struct tracker_thread_data *pThreadData;
-
struct event ev_notify;//创建一个event
-
-
pThreadData = (struct tracker_thread_data *)arg;
-
do
-
{
-
event_set(&ev_notify, pThreadData->pipe_fds[0], \ //设定event监听管道pipe_fds[0]
-
EV_READ | EV_PERSIST, recv_notify_read, NULL);
-
if ((result=event_base_set(pThreadData->ev_base, &ev_notify)) != 0)//邦定event到event_base
-
{
-
logCrit("file: "__FILE__", line: %d, " \
-
"event_base_set fail.", __LINE__);
-
break;
-
}
-
if ((result=event_add(&ev_notify, NULL)) != 0)
-
{
-
logCrit("file: "__FILE__", line: %d, " \
-
"event_add fail.", __LINE__);
-
break;
-
}
-
-
while (g_continue_flag)
-
{
-
event_base_loop(pThreadData->ev_base, 0);
-
}
-
} while (0);
-
-
event_base_free(pThreadData->ev_base);
-
-
if ((result=pthread_mutex_lock(&tracker_thread_lock)) != 0)
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"call pthread_mutex_lock fail, " \
-
"errno: %d, error info: %s", \
-
__LINE__, result, STRERROR(result));
-
}
-
g_tracker_thread_count--;
-
if ((result=pthread_mutex_unlock(&tracker_thread_lock)) != 0)
-
{
-
logError("file: "__FILE__", line: %d, " \
-
"call pthread_mutex_lock fail, " \
-
"errno: %d, error info: %s", \
-
__LINE__, result, STRERROR(result));
-
}
-
-
return NULL;
-
}
在该函数中,要完成以下工作
(1) 执行event_set关联event到pipe_fds[0],监听EV_READ与EV_PERSIST事件
(2) 执行event_add,把event绑定到eventbase上
(3) 执行event_add函数
(4) 执行event_base_loop函数,开始event循环
这样,线程开始监听pipe_fds[0]上的EV_READ与EV_PERSIST事件,当事件被触发后,调用事件处理函数recv_notify_read,该函数将在下一节进行分析
阅读(1351) | 评论(0) | 转发(0) |