Chinaunix首页 | 论坛 | 博客
  • 博客访问: 835797
  • 博文数量: 91
  • 博客积分: 2544
  • 博客等级: 少校
  • 技术积分: 1885
  • 用 户 组: 普通用户
  • 注册时间: 2006-12-12 09:08
文章存档

2016年(10)

2014年(2)

2013年(4)

2012年(23)

2011年(23)

2010年(13)

2009年(14)

2007年(2)

分类: LINUX

2012-08-28 07:15:23

主流程
1,概述:

tracker服务和sotrage的处理方式相似。
(1) 在每个tracker服务线程work_thread_entrance函数中,创建全局管道,并把的读端pipe_fds[0]描述符添加到事件的读监控队列中,并设置消息处理函数recv_notify_read。
(2) 而管道读描述符pipe_fds[0]是由主监控循环tracker_accept_loop来写的,当tcp连接完成后,连接监控主线程会把连接成功的描述符通过pipe_fds[0]写入管道,tracker的work_thread_entrance线程的recv_notify_read函数接受到连接好的socket描述符,再次把它加入到libevent事件读和写队列中。
(3) 而最终来处理任务消息的其实是client_sock_read和client_sock_write函数,分别处理连接端的读和写请求。


2, 基本数据结构

typedef struct
{
int sock;  //socket描述符
int port;  //端口
char ip_addr[IP_ADDRESS_SIZE]; //ip地址
char group_name[FDFS_GROUP_NAME_MAX_LEN + 1];  //tracker的组名
} TrackerServerInfo;


typedef struct {
TrackerServerInfo *pTrackerServer;
int running_time;     //running seconds, more means higher weight
int restart_interval; //restart interval, less mean higher weight
bool if_leader;       //if leader
} TrackerRunningStatus;


3, 主函数

int main(int argc, char *argv[])
{
char *conf_filename;
int result;
int sock;
pthread_t schedule_tid;
struct sigaction act;
//默认3个tracker调度进程
ScheduleEntry scheduleEntries[SCHEDULE_ENTRIES_COUNT];
ScheduleArray scheduleArray;

//配置文件
if (argc < 2)
{
printf("Usage: %s \n", argv[0]);
return 1;
}

//记录启动时间
g_up_time = time(NULL);
srand(g_up_time);

log_init();

#if defined(DEBUG_FLAG) && defined(OS_LINUX)
if (getExeAbsoluteFilename(argv[0], g_exe_name, \
sizeof(g_exe_name)) == NULL)
{
log_destroy();
return errno != 0 ? errno : ENOENT;
}
#endif

conf_filename = argv[1];
memset(bind_addr, 0, sizeof(bind_addr));
//读取配置文件选项
if ((result=tracker_load_from_conf_file(conf_filename, \
bind_addr, sizeof(bind_addr))) != 0)
{
log_destroy();
return result;
}

if ((result=tracker_load_status_from_file(&g_tracker_last_status)) != 0)
{
log_destroy();
return result;
}

base64_init_ex(&g_base64_context, 0, '-', '_', '.');
if ((result=set_rand_seed()) != 0)
{
logCrit("file: "__FILE__", line: %d, " \
"set_rand_seed fail, program exit!", __LINE__);
return result;
}

if ((result=tracker_mem_init()) != 0)
{
log_destroy();
return result;
}

//创建tracker的server socket
sock = socketServer(bind_addr, g_server_port, &result);
if (sock < 0)
{
log_destroy();
return result;
}

//设置tracker socket选项
if ((result=tcpsetserveropt(sock, g_fdfs_network_timeout)) != 0)
{
log_destroy();
return result;
}
//后台执行
daemon_init(true);
umask(0);
if (dup2(g_log_context.log_fd, STDOUT_FILENO) < 0 || \
dup2(g_log_context.log_fd, STDERR_FILENO) < 0)
{
logCrit("file: "__FILE__", line: %d, " \
"call dup2 fail, errno: %d, error info: %s, " \
"program exit!", __LINE__, errno, STRERROR(errno));
g_continue_flag = false;
return errno;
}
//初始化tracker服务进程
if ((result=tracker_service_init()) != 0)
{
log_destroy();
return result;
}
memset(&act, 0, sizeof(act));
sigemptyset(&act.sa_mask);
        //设置信号处理函数
act.sa_handler = sigUsrHandler;
if(sigaction(SIGUSR1, &act, NULL) < 0 || \
sigaction(SIGUSR2, &act, NULL) < 0)
{
logCrit("file: "__FILE__", line: %d, " \
"call sigaction fail, errno: %d, error info: %s", \
__LINE__, errno, STRERROR(errno));
return errno;
}

act.sa_handler = sigHupHandler;
if(sigaction(SIGHUP, &act, NULL) < 0)
{
logCrit("file: "__FILE__", line: %d, " \
"call sigaction fail, errno: %d, error info: %s", \
__LINE__, errno, STRERROR(errno));
return errno;
}
act.sa_handler = SIG_IGN;
if(sigaction(SIGPIPE, &act, NULL) < 0)
{
logCrit("file: "__FILE__", line: %d, " \
"call sigaction fail, errno: %d, error info: %s", \
__LINE__, errno, STRERROR(errno));
return errno;
}

act.sa_handler = sigQuitHandler;
if(sigaction(SIGINT, &act, NULL) < 0 || \
sigaction(SIGTERM, &act, NULL) < 0 || \
sigaction(SIGQUIT, &act, NULL) < 0)
{
logCrit("file: "__FILE__", line: %d, " \
"call sigaction fail, errno: %d, error info: %s", \
__LINE__, errno, STRERROR(errno));
return errno;
}

#if defined(DEBUG_FLAG)
/*
#if defined(OS_LINUX)
memset(&act, 0, sizeof(act));
sigemptyset(&act.sa_mask);
        act.sa_sigaction = sigSegvHandler;
        act.sa_flags = SA_SIGINFO;
        if (sigaction(SIGSEGV, &act, NULL) < 0 || \
         sigaction(SIGABRT, &act, NULL) < 0)
{
logCrit("file: "__FILE__", line: %d, " \
"call sigaction fail, errno: %d, error info: %s", \
__LINE__, errno, STRERROR(errno));
return errno;
}
#endif
*/

memset(&act, 0, sizeof(act));
sigemptyset(&act.sa_mask);
act.sa_handler = sigDumpHandler;
if(sigaction(SIGUSR1, &act, NULL) < 0 || \
sigaction(SIGUSR2, &act, NULL) < 0)
{
logCrit("file: "__FILE__", line: %d, " \
"call sigaction fail, errno: %d, error info: %s", \
__LINE__, errno, STRERROR(errno));
return errno;
}
#endif

#ifdef WITH_HTTPD
if (!g_http_params.disabled)
{
if ((result=tracker_httpd_start(bind_addr)) != 0)
{
logCrit("file: "__FILE__", line: %d, " \
"tracker_httpd_start fail, program exit!", \
__LINE__);
return result;
}

}

if ((result=tracker_http_check_start()) != 0)
{
logCrit("file: "__FILE__", line: %d, " \
"tracker_http_check_start fail, " \
"program exit!", __LINE__);
return result;
}
#endif
//设置运行权限
if ((result=set_run_by(g_run_by_group, g_run_by_user)) != 0)
{
log_destroy();
return result;
}

scheduleArray.entries = scheduleEntries;
scheduleArray.count = SCHEDULE_ENTRIES_COUNT;
//设置调度实体处理函数
memset(scheduleEntries, 0, sizeof(scheduleEntries));
scheduleEntries[0].id = 1;
scheduleEntries[0].time_base.hour = TIME_NONE;
scheduleEntries[0].time_base.minute = TIME_NONE;
scheduleEntries[0].interval = g_sync_log_buff_interval;
scheduleEntries[0].task_func = log_sync_func;
scheduleEntries[0].func_args = &g_log_context;

scheduleEntries[1].id = 2;
scheduleEntries[1].time_base.hour = TIME_NONE;
scheduleEntries[1].time_base.minute = TIME_NONE;
scheduleEntries[1].interval = g_check_active_interval;
scheduleEntries[1].task_func = tracker_mem_check_alive;
scheduleEntries[1].func_args = NULL;

scheduleEntries[2].id = 3;
scheduleEntries[2].time_base.hour = 0;
scheduleEntries[2].time_base.minute = 0;
scheduleEntries[2].interval = TRACKER_SYNC_STATUS_FILE_INTERVAL;
scheduleEntries[2].task_func = tracker_write_status_to_file;
scheduleEntries[2].func_args = NULL;
if ((result=sched_start(&scheduleArray, &schedule_tid, \
g_thread_stack_size, &g_continue_flag)) != 0)
{
log_destroy();
return result;
}
//选主
if ((result=tracker_relationship_init()) != 0)
{
log_destroy();
return result;
}

log_set_cache(true);

bTerminateFlag = false;
bAcceptEndFlag = false;

//接受连接主函数
tracker_accept_loop(sock);
bAcceptEndFlag = true;
if (g_schedule_flag)
{
pthread_kill(schedule_tid, SIGINT);
}
//终止线程
tracker_terminate_threads();

#ifdef WITH_HTTPD
if (g_http_check_flag)
{
tracker_http_check_stop();
}

while (g_http_check_flag)
{
usleep(50000);
}
#endif

while ((g_tracker_thread_count != 0) || g_schedule_flag)
{

/*
#if defined(DEBUG_FLAG) && defined(OS_LINUX)
if (bSegmentFault)
{
sleep(5);
break;
}
#endif
*/

usleep(50000);
}
tracker_mem_destroy();
tracker_service_destroy();
tracker_relationship_destroy();
logInfo("exit nomally.\n");
log_destroy();
return 0;
}

//tracker服务初始化
//tracker服务初始化在tracker_service_init函数中完成,其实就是开启默认是4个工作服务线程。
int tracker_service_init()
{
int result;
struct tracker_thread_data *pThreadData;
struct tracker_thread_data *pDataEnd;
pthread_t tid;
pthread_attr_t thread_attr;
    
        //初始化线程锁
if ((result=init_pthread_lock(&tracker_thread_lock)) != 0)
{
return result;
}
        //初始化
if ((result=init_pthread_lock(&lb_thread_lock)) != 0)
{
return result;
}
        //设置线程属性
if ((result=init_pthread_attr(&thread_attr, g_thread_stack_size)) != 0)
{
logError("file: "__FILE__", line: %d, " \
"init_pthread_attr fail, program exit!", __LINE__);
return result;
}
        //队列初始化
if ((result=free_queue_init(g_max_connections, TRACKER_MAX_PACKAGE_SIZE,\
                TRACKER_MAX_PACKAGE_SIZE, sizeof(TrackerClientInfo))) != 0)
{
return result;
}
        //为每个线程分配一个线程结构
g_thread_data = (struct tracker_thread_data *)malloc(sizeof( \
struct tracker_thread_data) * g_work_threads);
if (g_thread_data == NULL)
{
logError("file: "__FILE__", line: %d, " \
"malloc %d bytes fail, errno: %d, error info: %s", \
__LINE__, (int)sizeof(struct tracker_thread_data) * \
g_work_threads, errno, STRERROR(errno));
return errno != 0 ? errno : ENOMEM;
}
    
        //遍历N个工作线程结构,创建管道
g_tracker_thread_count = 0;
pDataEnd = g_thread_data + g_work_threads;
for (pThreadData=g_thread_data; pThreadData
{
pThreadData->ev_base = event_base_new();
if (pThreadData->ev_base == NULL)
{
result = errno != 0 ? errno : ENOMEM;
logError("file: "__FILE__", line: %d, " \
"event_base_new fail.", __LINE__);
return result;
}
                //创建管道
if (pipe(pThreadData->pipe_fds) != 0)
{
result = errno != 0 ? errno : EPERM;
logError("file: "__FILE__", line: %d, " \
"call pipe fail, " \
"errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
break;
}
                //把管道的读端设置成非阻塞
if ((result=set_nonblock(pThreadData->pipe_fds[0])) != 0)
{
break;
}
                //创建tracker工作线程
if ((result=pthread_create(&tid, &thread_attr, \
work_thread_entrance, pThreadData)) != 0)
{
logError("file: "__FILE__", line: %d, " \
"create thread failed, startup threads: %d, " \
"errno: %d, error info: %s", \
__LINE__, g_tracker_thread_count, \
result, STRERROR(result));
break;
}
else
{
                        //创建线程成功,用全局变量记录tracker线程的数量
if ((result=pthread_mutex_lock(&tracker_thread_lock)) != 0)
{
logError("file: "__FILE__", line: %d, " \
"call pthread_mutex_lock fail, " \
"errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
}
g_tracker_thread_count++;
if ((result=pthread_mutex_unlock(&tracker_thread_lock)) != 0)
{
logError("file: "__FILE__", line: %d, " \
"call pthread_mutex_lock fail, " \
"errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
}
}
}

pthread_attr_destroy(&thread_attr);
return 0;
}


//tracker服务工作线程函数
设置管道读端的消息处理函数recv_notify_read
static void *work_thread_entrance(void* arg)
{
int result;
struct tracker_thread_data *pThreadData;
struct event ev_notify;

pThreadData = (struct tracker_thread_data *)arg;
do
{
                //设置管道读端的处理函数
event_set(&ev_notify, pThreadData->pipe_fds[0], \
EV_READ | EV_PERSIST, recv_notify_read, NULL);
if ((result=event_base_set(pThreadData->ev_base, &ev_notify)) != 0)
{
logCrit("file: "__FILE__", line: %d, " \
"event_base_set fail.", __LINE__);
break;
}
if ((result=event_add(&ev_notify, NULL)) != 0)
{
logCrit("file: "__FILE__", line: %d, " \
"event_add fail.", __LINE__);
break;
}

while (g_continue_flag)
{
event_base_loop(pThreadData->ev_base, 0);
}
} while (0);

event_base_free(pThreadData->ev_base);

if ((result=pthread_mutex_lock(&tracker_thread_lock)) != 0)
{
logError("file: "__FILE__", line: %d, " \
"call pthread_mutex_lock fail, " \
"errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
}
g_tracker_thread_count--;
if ((result=pthread_mutex_unlock(&tracker_thread_lock)) != 0)
{
logError("file: "__FILE__", line: %d, " \
"call pthread_mutex_lock fail, " \
"errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
}

return NULL;
}



阅读(6975) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~