Chinaunix首页 | 论坛 | 博客
  • 博客访问: 268135
  • 博文数量: 113
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 1044
  • 用 户 组: 普通用户
  • 注册时间: 2015-02-15 16:09
文章分类

全部博文(113)

文章存档

2016年(5)

2015年(108)

我的朋友

分类: C/C++

2016-01-21 00:32:13

ibevent多线程使用事项

    在linux平台上使用c开发网络程序的同志们一般情况下都对鼎鼎大名的libevent非常的熟悉了。但是一些新进入此领域的new new people们对此都是一头雾水。原本的迷茫再加上开源软件一贯的“帮助文件”缺失作风,让我们这些新手们显的非常的无助。幸好有一些热心的朋友们帮忙,才化险为夷啊!

    前几天一直在开发一个locker server,虽然公司现有的locker server能很好的运转,但是毕竟是net的,通用性不广,当我们要在linux上开发多集群系统的时候现有的locker server就未免显得有点捉襟见肘了。正是在开发locker server的过程中使用到了libevent。

    总体上,libevent是很好用的。一两个函数就能搞定你复杂的网络通讯工作了。当然了,这句话得用在你使用的是“单线程”的情况下。虽然在linux系统中,进程的资源和window系统中进程的资源相比轻量级很多,代价也相当的没有那么昂贵,所以很多的软件都是使用“多进程”方式实现的,比如大名鼎鼎的apache。但是在我们的系统中,我们使用了“单进程多线程”的方式,这样,我们就能在单机上启动多个进程,以达到“伪分布式”的效果来达到测试的目的。

      那么这个时候就要注意libevent的使用了,因为对于event_base来说,不是线程安全的。也就是说多线程不能share同一个event_base,就算是加锁操作也不行。那么这个时候就只能采取“单线程单event_base”的策略了。我的做法是做一个task pool(任务对象池),每个任务会被一个thread执行,当然了,thread肯定也是从thread pool拿出来的,而在task pool初始化的时候,我就给每个task中的event_base初始化了对象,这样,万事大吉了。

      这个地方注意了以后,就开始说网络通讯了。在使用libevent的时候,触发事件是在接收到网络连接(或者timeout事件超时)的时候。所以你需要在事件处理函数中判断时间源,其次libevent接收网络通讯的字节流时是使用了libevnet中自带的缓冲的,所以当你接收的时候一定要注意累加,并且多次loop或者注册 event_event中的事件。所以在我的task中,会有接收的data。当然了如果你的协议是分为header和body的,通常header比较短,body比较长,而且在client,header和body通常是连续发送的,这样,在使用libevent的时候,header和body是同时被接收到的,这点一定要注意,所以提醒你在接收数据的函数中,需要区分接收header部分还是body部分;当body非常长,超过libevent的缓冲时,是需要多次多次触发接收函数的,这点也要注意,就是让你需要在接收的时候除了区分header和body以外,还要注意一次接收不完全的情况下,对于数据需要累加。

      当你在使用libevent时,event_set事件时,只要不是使用EV_PERSIST注册的事件是不需要在接收完一次数据后多次event_add的,只有当你不使用EV_PERSIST时,你的事件才需要多次event_add到event_base中;当然了,使用了EV_PERSIST注册的函数在event_base被task pool回收时是要显式的event_del该注册事件的,没有使用EV_PERSIST注册的事件是不需要显式的使用event_del删除该事件的。


点击(此处)折叠或打开

  1. static void read_buffer(int client_socket_fd,short event_type,void *arg)
  2. {
  3.     if(NULL == arg)
  4.     {
  5.         log_error("File:"__FILE__",Line:%d.event base arg is NULL.",__LINE__);
  6.         return;
  7.     }
  8.     task_info_t *task_info = (task_info_t *) arg;

  9.     if(event_type == EV_TIMEOUT)
  10.     /*
  11.     这个地方注意需要判断是否超时
  12.     因为我event_add事件的时候没有使用ev_persist
  13.     所以当超时时需要再add一次事件到event_base的loop中
  14.     */
  15.     {
  16.         if(0 != event_add(&task_info->on_read,&task_info->timeout))
  17.         {
  18.             log_error("File:"__FILE__",Line:%d.repeart add read header event to event_base is error.");
  19.             close(task_info->on_read.ev_fd);
  20.             task_pool_push(task_info);

  21.         }
  22.         return;
  23.     }

  24.     int bytes;
  25.     /*
  26.     这个地方就是开始接收头部
  27.     接收头部时,可能分为好几次从缓冲中取得,所以需要一个while累加
  28.     */
  29.     while(header == task_info->read_type)//recv header
  30.     {
  31.         bytes = recv(client_socket_fd,task_info->header_buffer+task_info->offset,REQUEST_LENGTH -task_info->offset,0);
  32.         if(0 > bytes )
  33.         {
  34.             if (errno == EAGAIN || errno == EWOULDBLOCK)
  35.             {
  36.                 if(0 != event_add(&task_info->on_read, &task_info->timeout))
  37.                 {
  38.                     close(task_info->on_read.ev_fd);
  39.                     task_pool_push(task_info);

  40.                     log_error("File: "__FILE__", line: %d, "\
  41.                         "event_add fail.", __LINE__);
  42.                     return;
  43.                 }
  44.             }
  45.             else
  46.             {
  47.                 log_error("File: "__FILE__", line: %d,recv failed,errno: %d, error info: %s",
  48.                     __LINE__, errno, strerror(errno));

  49.                 close(task_info->on_read.ev_fd);
  50.                 task_pool_push(task_info);
  51.             }
  52.             return;
  53.         }
  54.         else if(0 == bytes)
  55.         {
  56.             log_warning("File:"__FILE__",Line:%d.recv buffer form network is error.disconnection the network.",
  57.                     __LINE__);
  58.             close(task_info->on_read.ev_fd);
  59.             task_pool_push(task_info);
  60.             return;
  61.         }

  62.         if(REQUEST_LENGTH > bytes+task_info->offset)
  63.         {
  64.             log_warning("File:"__FILE__",Line:%d.recv header is not over.",__LINE__);
  65.             task_info->offset += bytes;
  66.             if(0 != event_add(&task_info->on_read, &task_info->timeout))
  67.             {
  68.                 close(task_info->on_read.ev_fd);
  69.                 task_pool_push(task_info);
  70.                 log_error("File: "__FILE__", line: %d, "\
  71.                     "event_add fail.", __LINE__);
  72.                 return;
  73.             }
  74.         }
  75.         else
  76.         {
  77.             task_info->read_type = body;
  78.             deal_request_header(task_info);
  79.             task_info->body_buffer = (char *) malloc(task_info->request_info.length);
  80.             if(NULL == task_info->body_buffer)
  81.             {
  82.                 log_error("File:"__FILE__",Line:%d.alloc mem to task_info data is error.",__LINE__);
  83.                 close(client_socket_fd);
  84.                 task_pool_push(task_info);
  85.                 return;
  86.             }
  87.             memset(task_info->body_buffer,0,task_info->request_info.length);
  88.             task_info->offset = 0;//set recv body buffer offset to 0
  89.             break;
  90.         }
  91.     }

  92.     /*
  93.     这个地方就是开始接收body,
  94.     和header一样,也要考虑body多次接收累加的情况。
  95.     */
  96.     while(body == task_info->read_type)
  97.     {
  98.         bytes = recv(client_socket_fd,task_info->body_buffer+task_info->offset,task_info->request_info.length-task_info->offset,0);
  99.         if(0 > bytes )
  100.         {
  101.             if (errno == EAGAIN || errno == EWOULDBLOCK)
  102.             {
  103.                 if(0 != event_add(&task_info->on_read, &task_info->timeout))
  104.                 {
  105.                     close(task_info->on_read.ev_fd);
  106.                     task_pool_push(task_info);

  107.                     log_error("File: "__FILE__", line: %d, "\
  108.                         "event_add fail.", __LINE__);
  109.                     return;
  110.                 }
  111.             }
  112.             else
  113.             {
  114.                 log_error("File: "__FILE__", line: %d,recv failed,errno: %d, error info: %s",
  115.                     __LINE__, errno, strerror(errno));

  116.                 close(task_info->on_read.ev_fd);
  117.                 task_pool_push(task_info);
  118.             }
  119.             return;
  120.         }
  121.         else if(0 == bytes)
  122.         {
  123.             log_warning("File:"__FILE__",Line:%d.recv buffer form network is error.disconnection the network.",
  124.                     __LINE__);
  125.             close(task_info->on_read.ev_fd);
  126.             task_pool_push(task_info);
  127.             return;
  128.         }

  129.         if(task_info->request_info.length-task_info->offset > bytes)
  130.         {
  131.             log_warning("File:"__FILE__",Line:%d.recv body is not over.",__LINE__);
  132.             task_info->offset += bytes;
  133.             if(0 != event_add(&task_info->on_read, &task_info->timeout))
  134.             {
  135.                 close(task_info->on_read.ev_fd);
  136.                 task_pool_push(task_info);
  137.                 log_error("File: "__FILE__", line: %d, "\
  138.                     "event_add fail.", __LINE__);
  139.                 return;
  140.             }
  141.         }
  142.         else
  143.         {
  144.             task_info->read_type = unspecified;
  145.             break;
  146.         }
  147.     }
  148.     deal_request_body(client_socket_fd,task_info);
  149.     return;
  150. }

  151.  

  152. void deal_working_thread(void *arg)
  153. {
  154.     log_info("debug to this.");
  155.     int client_socket_fd = (int) arg;
  156.     if(0 > client_socket_fd)
  157.     {
  158.         log_error("File:"__FILE__",Line:%d.the arg means client socket filedesc is less 0!",__LINE__);
  159.         return;
  160.     }
  161.     /*
  162.     设置网络为非阻塞,libevent必须的
  163.     */
  164.     if(!set_nonblocking(client_socket_fd))
  165.     {
  166.         log_error("File:"__FILE__",Line:%d.set client socket filedesc is error.error info is %s!",
  167.                 __LINE__,strerror(errno));
  168.         close(client_socket_fd);
  169.         return;
  170.     }

  171.     task_info_t *task_info;
  172.     task_info = task_pool_pop();
  173.     /*
  174.     对event_base注册事件回调函数,
  175.     注意没有使用EV_PERSIST
  176.     */
  177.     do
  178.     {
  179.         task_info->read_type = header;
  180.         event_set(&task_info->on_read,client_socket_fd,EV_READ,read_buffer,(void *) task_info);
  181.         if(0 != event_base_set(task_info->event_base,&task_info->on_read))
  182.         {
  183.             log_error("File:"__FILE__",Line:%d.Associate the read header event to event_base is error.",__LINE__);
  184.             task_info->read_type = unspecified;
  185.             close(client_socket_fd);
  186.             task_pool_push(task_info);
  187.             break;
  188.         }

  189.         event_set(&task_info->on_write,client_socket_fd,EV_WRITE,response_handle,(void *) task_info);
  190.         if(0 != event_base_set(task_info->event_base,&task_info->on_write))
  191.         {
  192.             log_error("File:"__FILE__",Line:%d.Associate the write hander to event_base is error.",__LINE__);
  193.             task_info->read_type = unspecified;
  194.             close(client_socket_fd);
  195.             task_pool_push(task_info);
  196.             break;
  197.         }


  198.         if(0 != event_add(&task_info->on_read,&task_info->timeout))
  199.         {
  200.             log_error("File:"__FILE__",Line:%d.add the read header event to event_base is error.",__LINE__);
  201.             task_info->read_type = unspecified;
  202.             close(client_socket_fd);
  203.             task_pool_push(task_info);
  204.             break;
  205.         }

  206.         event_base_loop(task_info->event_base,EVLOOP_NONBLOCK);
  207.     }while(false);
  208.     return;
  209. }

阅读(2501) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~