Chinaunix首页 | 论坛 | 博客
  • 博客访问: 58006
  • 博文数量: 7
  • 博客积分: 171
  • 博客等级: 入伍新兵
  • 技术积分: 121
  • 用 户 组: 普通用户
  • 注册时间: 2012-03-16 18:48
文章分类

全部博文(7)

文章存档

2012年(7)

我的朋友

分类: 云计算

2012-03-23 23:17:37


FastDFS v3.0 版本以后,使用到了libevent库来处理网络连接请求。 其实,FastDFS 中 tracker、storage 的网络架构基本一样。下面以tracker 为例子介绍。

fdfs_trackerd.c 是 tracker 中main 函数的所在文件。
fdfs_trackerd.c  137行:

  1.     sock = socketServer(bind_addr, g_server_port, &result);
  2.     if (sock < 0)
  3.     {
  4.         log_destroy();
  5.         return result;
  6.     }

  7.     if ((result=tcpsetserveropt(sock, g_fdfs_network_timeout)) != 0)
  8.     {
  9.         log_destroy();
  10.         return result;
  11.     }
代码分析:
1. 调用sockopt.c 中socketServer函数,打开监听端口,返回socket句柄;
2. 通过 tcpsetserveropt 函数,内部是在调用setsockopt 实现 SO_SNDTIMEO、SO_RCVTIMEO、TCP_NODELAY、tcpsetkeepalive等参数的设置。
接着往下走,是tracker 的一些初始化代码tracker_service_init,我们暂时先跳过,只分析网络部分。详细的入口分析,放在下一篇文章完成。
fdfs_trackerd.c  309行:

  1.     bTerminateFlag = false;
  2.     bAcceptEndFlag = false;

  3.     tracker_accept_loop(sock);
  4.     bAcceptEndFlag = true;
  5.     if (g_schedule_flag)
  6.     {
  7.         pthread_kill(schedule_tid, SIGINT);
  8.     }
  9.     tracker_terminate_threads();
这里,通过调用 tracker_accept_loop 函数 进入 socket 的 accept 循环。

我们进入这个函数,
tracker_service.c  202行:

  1. void tracker_accept_loop(int server_sock)
  2. {
  3.     int incomesock;
  4.     struct sockaddr_in inaddr;
  5.     socklen_t sockaddr_len;
  6.     struct tracker_thread_data *pThreadData;

  7.     while (g_continue_flag)
  8.     {
  9.         sockaddr_len = sizeof(inaddr);
  10.         incomesock = accept(server_sock, (struct sockaddr*)&inaddr, &sockaddr_len);
  11.         if (incomesock < 0) //error
  12.         {
  13.             if (!(errno == EINTR || errno == EAGAIN))
  14.             {
  15.                 logError("file: "__FILE__", line: %d, " \
  16.                     "accept failed, " \
  17.                     "errno: %d, error info: %s", \
  18.                     __LINE__, errno, STRERROR(errno));
  19.             }

  20.             continue;
  21.         }

  22.         pThreadData = g_thread_data + incomesock % g_work_threads;
  23.         if (write(pThreadData->pipe_fds[1], &incomesock, \
  24.             sizeof(incomesock)) != sizeof(incomesock))
  25.         {
  26.             close(incomesock);
  27.             logError("file: "__FILE__", line: %d, " \
  28.                 "call write failed, " \
  29.                 "errno: %d, error info: %s", \
  30.                 __LINE__, errno, STRERROR(errno));
  31.         }
  32.     }
  33. }
注意收到网络connect请求后,依据接收来的socket句柄 incomesock,找到执行线程对象;

  1. pThreadData = g_thread_data + incomesock % g_work_threads;
接着,使用管道通信的方式通知网络连接incomesock的到来。
全局变量g_thread_data的类型 是 struct tracker_thread_data *

  1. struct tracker_thread_data *g_thread_data = NULL;
而线程结构的声明在tracker_service.h 25行:

  1. struct tracker_thread_data
  2. {
  3.         struct event_base *ev_base;
  4.         int pipe_fds[2];
  5. };
libevent实现了网络IO,timer,signal的事件触发机制. 可以很方便的应用于event-driven服务器中,作为其底层事件处理模块. 比较成功的案例有 memcache(分布式缓存).  event_base是整个libevent的核心部分. 
而对g_thread_data的初始化动作,是在tracker_service_init 初始化函数完成的。

tracker_service_init 中 tracker_service.c 83行:
 
  1.     g_thread_data = (struct tracker_thread_data *)malloc(sizeof( \
  2.                 struct tracker_thread_data) * g_work_threads);
  3.     if (g_thread_data == NULL)
  4.     {
  5.         logError("file: "__FILE__", line: %d, " \
  6.             "malloc %d bytes fail, errno: %d, error info: %s", \
  7.             __LINE__, (int)sizeof(struct tracker_thread_data) * \
  8.             g_work_threads, errno, STRERROR(errno));
  9.         return errno != 0 ? errno : ENOMEM;
  10.     }

  11.     g_tracker_thread_count = 0;
  12.     pDataEnd = g_thread_data + g_work_threads;
  13.     for (pThreadData=g_thread_data; pThreadData<pDataEnd; pThreadData++)
  14.     {
  15.         pThreadData->ev_base = event_base_new();
  16.         if (pThreadData->ev_base == NULL)
  17.         {
  18.             result = errno != 0 ? errno : ENOMEM;
  19.             logError("file: "__FILE__", line: %d, " \
  20.                 "event_base_new fail.", __LINE__);
  21.             return result;
  22.         }

  23.         if (pipe(pThreadData->pipe_fds) != 0)
  24.         {
  25.             result = errno != 0 ? errno : EPERM;
  26.             logError("file: "__FILE__", line: %d, " \
  27.                 "call pipe fail, " \
  28.                 "errno: %d, error info: %s", \
  29.                 __LINE__, result, STRERROR(result));
  30.             break;
  31.         }

  32.         if ((result=set_nonblock(pThreadData->pipe_fds[0])) != 0)
  33.         {
  34.             break;
  35.         }

  36.         if ((result=pthread_create(&tid, &thread_attr, \
  37.             work_thread_entrance, pThreadData)) != 0)
  38.         {
  39.             logError("file: "__FILE__", line: %d, " \
  40.                 "create thread failed, startup threads: %d, " \
  41.                 "errno: %d, error info: %s", \
  42.                 __LINE__, g_tracker_thread_count, \
  43.                 result, STRERROR(result));
  44.             break;
  45.         }
  46.         else
  47.         {
  48.             if ((result=pthread_mutex_lock(&tracker_thread_lock)) != 0)
  49.             {
  50.                 logError("file: "__FILE__", line: %d, " \
  51.                     "call pthread_mutex_lock fail, " \
  52.                     "errno: %d, error info: %s", \
  53.                     __LINE__, result, STRERROR(result));
  54.             }
  55.             g_tracker_thread_count++;
  56.             if ((result=pthread_mutex_unlock(&tracker_thread_lock)) != 0)
  57.             {
  58.                 logError("file: "__FILE__", line: %d, " \
  59.                     "call pthread_mutex_lock fail, " \
  60.                     "errno: %d, error info: %s", \
  61.                     __LINE__, result, STRERROR(result));
  62.             }
  63.         }
  64.     }
代码分析:
1. 首先根据配置g_work_threads,创建工作者线程对象g_thread_data;
2. 循环初始化libevent  event_base结构体 pThreadData->ev_base = event_base_new();
3. 创建管道 if (pipe(pThreadData->pipe_fds) != 0)
4. 创建工作者线程,线程执行函数是 work_thread_entrance


上面分析到,当accept 收到socket连接后,会通过pipe 通知给工作者线程。下面分析线程执行函数work_thread_entrance。

我们进入这个函数,
tracker_service.c  239行:

  1. static void *work_thread_entrance(void* arg)
  2. {
  3.     int result;
  4.     struct tracker_thread_data *pThreadData;
  5.     struct event ev_notify;

  6.     pThreadData = (struct tracker_thread_data *)arg;
  7.     do
  8.     {
  9.         event_set(&ev_notify, pThreadData->pipe_fds[0], \
  10.             EV_READ | EV_PERSIST, recv_notify_read, NULL);
  11.         if ((result=event_base_set(pThreadData->ev_base, &ev_notify)) != 0)
  12.         {
  13.             logCrit("file: "__FILE__", line: %d, " \
  14.                 "event_base_set fail.", __LINE__);
  15.             break;
  16.         }
  17.         if ((result=event_add(&ev_notify, NULL)) != 0)
  18.         {
  19.             logCrit("file: "__FILE__", line: %d, " \
  20.                 "event_add fail.", __LINE__);
  21.             break;
  22.         }

  23.         while (g_continue_flag)
  24.         {
  25.             event_base_loop(pThreadData->ev_base, 0);
  26.         }
  27.     } while (0);

  28.     event_base_free(pThreadData->ev_base);

  29.     if ((result=pthread_mutex_lock(&tracker_thread_lock)) != 0)
  30.     {
  31.         logError("file: "__FILE__", line: %d, " \
  32.             "call pthread_mutex_lock fail, " \
  33.             "errno: %d, error info: %s", \
  34.             __LINE__, result, STRERROR(result));
  35.     }
  36.     g_tracker_thread_count--;
  37.     if ((result=pthread_mutex_unlock(&tracker_thread_lock)) != 0)
  38.     {
  39.         logError("file: "__FILE__", line: %d, " \
  40.             "call pthread_mutex_lock fail, " \
  41.             "errno: %d, error info: %s", \
  42.             __LINE__, result, STRERROR(result));
  43.     }

  44.     return NULL;
  45. }

代码分析:
1. event_set 设置事件回调函数 recv_notify_read
2. while 循环,event_base_loop
3. event_base_loop 等待事件被触发,然后调用事件的回调函数。

通俗点讲,就是,pipe 上面有事件到来,那么就去执行 recv_notify_read 。好,我们接着分析 recv_notify_read 函数。

tracker_nio.c 63行:

  1. void recv_notify_read(int sock, short event, void *arg)
  2. {
  3.     int bytes;
  4.     int incomesock;
  5.     int result;
  6.     struct tracker_thread_data *pThreadData;
  7.     struct fast_task_info *pTask;
  8.     char szClientIp[IP_ADDRESS_SIZE];
  9.     in_addr_t client_addr;

  10.     while (1)
  11.     {
  12.         if ((bytes=read(sock, &incomesock, sizeof(incomesock))) < 0)
  13.         {
  14.             if (!(errno == EAGAIN || errno == EWOULDBLOCK))
  15.             {
  16.                 logError("file: "__FILE__", line: %d, " \
  17.                     "call read failed, " \
  18.                     "errno: %d, error info: %s", \
  19.                     __LINE__, errno, STRERROR(errno));
  20.             }

  21.             break;
  22.         }
  23.         else if (bytes == 0)
  24.         {
  25.             break;
  26.         }

  27.         if (incomesock < 0)
  28.         {
  29.             struct timeval tv;
  30.                         tv.tv_sec = 1;
  31.                         tv.tv_usec = 0;
  32.             pThreadData = g_thread_data + (-1 * incomesock - 1) % \
  33.                     g_work_threads;
  34.             event_base_loopexit(pThreadData->ev_base, &tv);
  35.             return;
  36.         }

  37.         client_addr = getPeerIpaddr(incomesock, \
  38.                 szClientIp, IP_ADDRESS_SIZE);
  39.         if (g_allow_ip_count >= 0)
  40.         {
  41.             if (bsearch(&client_addr, g_allow_ip_addrs, \
  42.                     g_allow_ip_count, sizeof(in_addr_t), \
  43.                     cmp_by_ip_addr_t) == NULL)
  44.             {
  45.                 logError("file: "__FILE__", line: %d, " \
  46.                     "ip addr %s is not allowed to access", \
  47.                     __LINE__, szClientIp);

  48.                 close(incomesock);
  49.                 continue;
  50.             }
  51.         }

  52.         if (tcpsetnonblockopt(incomesock) != 0)
  53.         {
  54.             close(incomesock);
  55.             continue;
  56.         }

  57.         pTask = free_queue_pop();
  58.         if (pTask == NULL)
  59.         {
  60.             logError("file: "__FILE__", line: %d, " \
  61.                 "malloc task buff failed, you should " \
  62.                 "increase the parameter: max_connections", \
  63.                 __LINE__);
  64.             close(incomesock);
  65.             continue;
  66.         }

  67.         pThreadData = g_thread_data + incomesock % g_work_threads;

  68.         strcpy(pTask->client_ip, szClientIp);
  69.     
  70.         event_set(&pTask->ev_read, incomesock, EV_READ, \
  71.                 client_sock_read, pTask);
  72.         if (event_base_set(pThreadData->ev_base, &pTask->ev_read) != 0)
  73.         {
  74.             task_finish_clean_up(pTask);
  75.             close(incomesock);

  76.             logError("file: "__FILE__", line: %d, " \
  77.                 "event_base_set fail.", __LINE__);
  78.             continue;
  79.         }

  80.         event_set(&pTask->ev_write, incomesock, EV_WRITE, \
  81.                 client_sock_write, pTask);
  82.         if ((result=event_base_set(pThreadData->ev_base, \
  83.                 &pTask->ev_write)) != 0)
  84.         {
  85.             task_finish_clean_up(pTask);
  86.             close(incomesock);

  87.             logError("file: "__FILE__", line: %d, " \
  88.                     "event_base_set fail.", __LINE__);
  89.             continue;
  90.         }

  91.         if (event_add(&pTask->ev_read, &g_network_tv) != 0)
  92.         {
  93.             task_finish_clean_up(pTask);
  94.             close(incomesock);

  95.             logError("file: "__FILE__", line: %d, " \
  96.                 "event_add fail.", __LINE__);
  97.             continue;
  98.         }
  99.     }
  100. }
代码分析:
1.首先从管道里面读取 socket 句柄 incomesock;
2.  126行,pTask = free_queue_pop(); 取出 task 结构;
3.  接下来设置libevent中对 incomesock 的读、写事件的回调函数:client_sock_read、client_sock_write
4. 通过event_set ,这样incomesock 也进入libevent 事件模型里面,等待着网络事件的发生。

当socket 网络可读事件到来的时候,会执行client_sock_read,当读取足够的数据后,
tracker_nio.c 325 行:
 
  1. pTask->offset += bytes;
  2.         if (pTask->offset >= pTask->length) //recv done
  3.         {
  4.             pTask->req_count++;
  5.             tracker_deal_task(pTask);
  6.             return;
  7.         }
tracker_deal_task 函数功能 是 tracker 根据协议处理不同的任务:
tracker_service.c 3137行:

  1. int tracker_deal_task(struct fast_task_info *pTask)
  2. {
  3.     TrackerHeader *pHeader;
  4.     int result;

  5.     pHeader = (TrackerHeader *)pTask->data;
  6.     switch(pHeader->cmd)
  7.     {
  8.         case TRACKER_PROTO_CMD_STORAGE_BEAT:
  9.             TRACKER_CHECK_LOGINED(pTask)
  10.             result = tracker_deal_storage_beat(pTask);
  11.             break;

  12.         ...
  13. ...
  14. ...

  15.         case TRACKER_PROTO_CMD_TRACKER_COMMIT_NEXT_LEADER:
  16.             result = tracker_deal_commit_next_leader(pTask);
  17.             break;
  18.         default:
  19.             logError("file: "__FILE__", line: %d, " \
  20.                 "client ip: %s, unkown cmd: %d", \
  21.                 __LINE__, pTask->client_ip, \
  22.                 pHeader->cmd);
  23.             result = EINVAL;
  24.             break;
  25.     }

  26.     pHeader = (TrackerHeader *)pTask->data;
  27.     pHeader->status = result;
  28.     pHeader->cmd = TRACKER_PROTO_CMD_RESP;
  29.     long2buff(pTask->length - sizeof(TrackerHeader), pHeader->pkg_len);

  30.     send_add_event(pTask);

  31.     return 0;
  32. }
代码分析:
1. 依据协议pHeader->cmd,处理具体业务;
2. 业务处理完成后,调用send_add_event(pTask); 

接着跟踪send_add_event,会进入client_sock_write,将数据通过socket发送给客户端。
tracker_nio.c 353行:

  1. bytes = send(sock, pTask->data + pTask->offset, \
  2.                 pTask->length - pTask->offset, 0);
至此,tracker 的网络部分,分析完成了。storage 和 tracker 分析过程一样,不再重复。

欢迎感兴趣的朋友一起交流研究,提出意见。
FastDFS技术交流群:164684842

阅读(5725) | 评论(0) | 转发(1) |
给主人留下些什么吧!~~