FastDFS v3.0 版本以后,使用到了libevent库来处理网络连接请求。 其实,FastDFS 中 tracker、storage 的网络架构基本一样。下面以tracker 为例子介绍。
fdfs_trackerd.c 是 tracker 中main 函数的所在文件。
fdfs_trackerd.c 137行:
- sock = socketServer(bind_addr, g_server_port, &result);
- if (sock < 0)
- {
- log_destroy();
- return result;
- }
- if ((result=tcpsetserveropt(sock, g_fdfs_network_timeout)) != 0)
- {
- log_destroy();
- return result;
- }
代码分析:
1. 调用sockopt.c 中socketServer函数,打开监听端口,返回socket句柄;
2. 通过 tcpsetserveropt 函数,内部是在调用setsockopt 实现 SO_SNDTIMEO、SO_RCVTIMEO、TCP_NODELAY、tcpsetkeepalive等参数的设置。
接着往下走,是tracker 的一些初始化代码tracker_service_init,我们暂时先跳过,只分析网络部分。详细的入口分析,放在下一篇文章完成。
fdfs_trackerd.c 309行:
- bTerminateFlag = false;
- bAcceptEndFlag = false;
- tracker_accept_loop(sock);
- bAcceptEndFlag = true;
- if (g_schedule_flag)
- {
- pthread_kill(schedule_tid, SIGINT);
- }
- tracker_terminate_threads();
这里,通过调用 tracker_accept_loop 函数 进入 socket 的 accept 循环。
我们进入这个函数,
tracker_service.c 202行:
- void tracker_accept_loop(int server_sock)
- {
- int incomesock;
- struct sockaddr_in inaddr;
- socklen_t sockaddr_len;
- struct tracker_thread_data *pThreadData;
- while (g_continue_flag)
- {
- sockaddr_len = sizeof(inaddr);
- incomesock = accept(server_sock, (struct sockaddr*)&inaddr, &sockaddr_len);
- if (incomesock < 0) //error
- {
- if (!(errno == EINTR || errno == EAGAIN))
- {
- logError("file: "__FILE__", line: %d, " \
- "accept failed, " \
- "errno: %d, error info: %s", \
- __LINE__, errno, STRERROR(errno));
- }
- continue;
- }
- pThreadData = g_thread_data + incomesock % g_work_threads;
- if (write(pThreadData->pipe_fds[1], &incomesock, \
- sizeof(incomesock)) != sizeof(incomesock))
- {
- close(incomesock);
- logError("file: "__FILE__", line: %d, " \
- "call write failed, " \
- "errno: %d, error info: %s", \
- __LINE__, errno, STRERROR(errno));
- }
- }
- }
注意收到网络connect请求后,依据接收来的socket句柄 incomesock,找到执行线程对象;
- pThreadData = g_thread_data + incomesock % g_work_threads;
接着,使用管道通信的方式通知网络连接incomesock的到来。
全局变量g_thread_data的类型 是 struct tracker_thread_data *
- struct tracker_thread_data *g_thread_data = NULL;
而线程结构的声明在tracker_service.h 25行:
- struct tracker_thread_data
- {
- struct event_base *ev_base;
- int pipe_fds[2];
- };
libevent实现了网络IO,timer,signal的事件触发机制. 可以很方便的应用于event-driven服务器中,作为其底层事件处理模块. 比较成功的案例有 memcache(分布式缓存).
event_base是整个libevent的核心部分. 而对g_thread_data的初始化动作,是在tracker_service_init 初始化函数完成的。
tracker_service_init 中 tracker_service.c 83行:
- g_thread_data = (struct tracker_thread_data *)malloc(sizeof( \
- struct tracker_thread_data) * g_work_threads);
- if (g_thread_data == NULL)
- {
- logError("file: "__FILE__", line: %d, " \
- "malloc %d bytes fail, errno: %d, error info: %s", \
- __LINE__, (int)sizeof(struct tracker_thread_data) * \
- g_work_threads, errno, STRERROR(errno));
- return errno != 0 ? errno : ENOMEM;
- }
- g_tracker_thread_count = 0;
- pDataEnd = g_thread_data + g_work_threads;
- for (pThreadData=g_thread_data; pThreadData<pDataEnd; pThreadData++)
- {
- pThreadData->ev_base = event_base_new();
- if (pThreadData->ev_base == NULL)
- {
- result = errno != 0 ? errno : ENOMEM;
- logError("file: "__FILE__", line: %d, " \
- "event_base_new fail.", __LINE__);
- return result;
- }
- if (pipe(pThreadData->pipe_fds) != 0)
- {
- result = errno != 0 ? errno : EPERM;
- logError("file: "__FILE__", line: %d, " \
- "call pipe fail, " \
- "errno: %d, error info: %s", \
- __LINE__, result, STRERROR(result));
- break;
- }
- if ((result=set_nonblock(pThreadData->pipe_fds[0])) != 0)
- {
- break;
- }
- if ((result=pthread_create(&tid, &thread_attr, \
- work_thread_entrance, pThreadData)) != 0)
- {
- logError("file: "__FILE__", line: %d, " \
- "create thread failed, startup threads: %d, " \
- "errno: %d, error info: %s", \
- __LINE__, g_tracker_thread_count, \
- result, STRERROR(result));
- break;
- }
- else
- {
- if ((result=pthread_mutex_lock(&tracker_thread_lock)) != 0)
- {
- logError("file: "__FILE__", line: %d, " \
- "call pthread_mutex_lock fail, " \
- "errno: %d, error info: %s", \
- __LINE__, result, STRERROR(result));
- }
- g_tracker_thread_count++;
- if ((result=pthread_mutex_unlock(&tracker_thread_lock)) != 0)
- {
- logError("file: "__FILE__", line: %d, " \
- "call pthread_mutex_lock fail, " \
- "errno: %d, error info: %s", \
- __LINE__, result, STRERROR(result));
- }
- }
- }
代码分析:
1. 首先根据配置g_work_threads,创建工作者线程对象g_thread_data;
2. 循环初始化libevent event_base结构体 pThreadData->ev_base = event_base_new();
3. 创建管道 if (pipe(pThreadData->pipe_fds) != 0)
4. 创建工作者线程,线程执行函数是 work_thread_entrance
上面分析到,当accept 收到socket连接后,会通过pipe 通知给工作者线程。下面分析线程执行函数work_thread_entrance。
我们进入这个函数,
tracker_service.c 239行:
- static void *work_thread_entrance(void* arg)
- {
- int result;
- struct tracker_thread_data *pThreadData;
- struct event ev_notify;
- pThreadData = (struct tracker_thread_data *)arg;
- do
- {
- event_set(&ev_notify, pThreadData->pipe_fds[0], \
- EV_READ | EV_PERSIST, recv_notify_read, NULL);
- if ((result=event_base_set(pThreadData->ev_base, &ev_notify)) != 0)
- {
- logCrit("file: "__FILE__", line: %d, " \
- "event_base_set fail.", __LINE__);
- break;
- }
- if ((result=event_add(&ev_notify, NULL)) != 0)
- {
- logCrit("file: "__FILE__", line: %d, " \
- "event_add fail.", __LINE__);
- break;
- }
- while (g_continue_flag)
- {
- event_base_loop(pThreadData->ev_base, 0);
- }
- } while (0);
- event_base_free(pThreadData->ev_base);
- if ((result=pthread_mutex_lock(&tracker_thread_lock)) != 0)
- {
- logError("file: "__FILE__", line: %d, " \
- "call pthread_mutex_lock fail, " \
- "errno: %d, error info: %s", \
- __LINE__, result, STRERROR(result));
- }
- g_tracker_thread_count--;
- if ((result=pthread_mutex_unlock(&tracker_thread_lock)) != 0)
- {
- logError("file: "__FILE__", line: %d, " \
- "call pthread_mutex_lock fail, " \
- "errno: %d, error info: %s", \
- __LINE__, result, STRERROR(result));
- }
- return NULL;
- }
代码分析:
1. event_set 设置事件回调函数 recv_notify_read
2. while 循环,event_base_loop
3. event_base_loop 等待事件被触发,然后调用事件的回调函数。
通俗点讲,就是,pipe 上面有事件到来,那么就去执行
recv_notify_read 。好,我们接着分析 recv_notify_read 函数。
tracker_nio.c 63行:
- void recv_notify_read(int sock, short event, void *arg)
- {
- int bytes;
- int incomesock;
- int result;
- struct tracker_thread_data *pThreadData;
- struct fast_task_info *pTask;
- char szClientIp[IP_ADDRESS_SIZE];
- in_addr_t client_addr;
- while (1)
- {
- if ((bytes=read(sock, &incomesock, sizeof(incomesock))) < 0)
- {
- if (!(errno == EAGAIN || errno == EWOULDBLOCK))
- {
- logError("file: "__FILE__", line: %d, " \
- "call read failed, " \
- "errno: %d, error info: %s", \
- __LINE__, errno, STRERROR(errno));
- }
- break;
- }
- else if (bytes == 0)
- {
- break;
- }
- if (incomesock < 0)
- {
- struct timeval tv;
- tv.tv_sec = 1;
- tv.tv_usec = 0;
- pThreadData = g_thread_data + (-1 * incomesock - 1) % \
- g_work_threads;
- event_base_loopexit(pThreadData->ev_base, &tv);
- return;
- }
- client_addr = getPeerIpaddr(incomesock, \
- szClientIp, IP_ADDRESS_SIZE);
- if (g_allow_ip_count >= 0)
- {
- if (bsearch(&client_addr, g_allow_ip_addrs, \
- g_allow_ip_count, sizeof(in_addr_t), \
- cmp_by_ip_addr_t) == NULL)
- {
- logError("file: "__FILE__", line: %d, " \
- "ip addr %s is not allowed to access", \
- __LINE__, szClientIp);
- close(incomesock);
- continue;
- }
- }
- if (tcpsetnonblockopt(incomesock) != 0)
- {
- close(incomesock);
- continue;
- }
- pTask = free_queue_pop();
- if (pTask == NULL)
- {
- logError("file: "__FILE__", line: %d, " \
- "malloc task buff failed, you should " \
- "increase the parameter: max_connections", \
- __LINE__);
- close(incomesock);
- continue;
- }
- pThreadData = g_thread_data + incomesock % g_work_threads;
- strcpy(pTask->client_ip, szClientIp);
-
- event_set(&pTask->ev_read, incomesock, EV_READ, \
- client_sock_read, pTask);
- if (event_base_set(pThreadData->ev_base, &pTask->ev_read) != 0)
- {
- task_finish_clean_up(pTask);
- close(incomesock);
- logError("file: "__FILE__", line: %d, " \
- "event_base_set fail.", __LINE__);
- continue;
- }
- event_set(&pTask->ev_write, incomesock, EV_WRITE, \
- client_sock_write, pTask);
- if ((result=event_base_set(pThreadData->ev_base, \
- &pTask->ev_write)) != 0)
- {
- task_finish_clean_up(pTask);
- close(incomesock);
- logError("file: "__FILE__", line: %d, " \
- "event_base_set fail.", __LINE__);
- continue;
- }
- if (event_add(&pTask->ev_read, &g_network_tv) != 0)
- {
- task_finish_clean_up(pTask);
- close(incomesock);
- logError("file: "__FILE__", line: %d, " \
- "event_add fail.", __LINE__);
- continue;
- }
- }
- }
代码分析:
1.首先从管道里面读取 socket 句柄 incomesock;
2. 126行,pTask = free_queue_pop(); 取出 task 结构;
3. 接下来设置libevent中对 incomesock 的读、写事件的回调函数:client_sock_read、client_sock_write
4. 通过event_set ,这样incomesock 也进入libevent 事件模型里面,等待着网络事件的发生。
当socket 网络可读事件到来的时候,会执行client_sock_read,当读取足够的数据后,
tracker_nio.c 325 行:
- pTask->offset += bytes;
- if (pTask->offset >= pTask->length) //recv done
- {
- pTask->req_count++;
- tracker_deal_task(pTask);
- return;
- }
tracker_deal_task 函数功能 是 tracker 根据协议处理不同的任务:
tracker_service.c 3137行:
- int tracker_deal_task(struct fast_task_info *pTask)
- {
- TrackerHeader *pHeader;
- int result;
- pHeader = (TrackerHeader *)pTask->data;
- switch(pHeader->cmd)
- {
- case TRACKER_PROTO_CMD_STORAGE_BEAT:
- TRACKER_CHECK_LOGINED(pTask)
- result = tracker_deal_storage_beat(pTask);
- break;
- ...
- ...
- ...
- case TRACKER_PROTO_CMD_TRACKER_COMMIT_NEXT_LEADER:
- result = tracker_deal_commit_next_leader(pTask);
- break;
- default:
- logError("file: "__FILE__", line: %d, " \
- "client ip: %s, unkown cmd: %d", \
- __LINE__, pTask->client_ip, \
- pHeader->cmd);
- result = EINVAL;
- break;
- }
- pHeader = (TrackerHeader *)pTask->data;
- pHeader->status = result;
- pHeader->cmd = TRACKER_PROTO_CMD_RESP;
- long2buff(pTask->length - sizeof(TrackerHeader), pHeader->pkg_len);
- send_add_event(pTask);
- return 0;
- }
代码分析:
1. 依据协议pHeader->cmd,处理具体业务;
2. 业务处理完成后,调用send_add_event(pTask);
接着跟踪send_add_event,会进入client_sock_write,将数据通过socket发送给客户端。
tracker_nio.c 353行:
- bytes = send(sock, pTask->data + pTask->offset, \
- pTask->length - pTask->offset, 0);
至此,tracker 的网络部分,分析完成了。storage 和 tracker 分析过程一样,不再重复。
欢迎感兴趣的朋友一起交流研究,提出意见。
FastDFS技术交流群:164684842
阅读(1574) | 评论(0) | 转发(0) |