Chinaunix首页 | 论坛 | 博客
  • 博客访问: 661143
  • 博文数量: 171
  • 博客积分: 2246
  • 博客等级: 大尉
  • 技术积分: 1574
  • 用 户 组: 普通用户
  • 注册时间: 2012-05-31 11:45
文章分类

全部博文(171)

文章存档

2018年(3)

2017年(4)

2015年(1)

2014年(20)

2013年(57)

2012年(86)

分类: LINUX

2014-12-24 14:56:58

本文也许对大部分人没有什么用处,但对我来说确实认识epoll的启发式文章,所以记下来,以方便以后逐步完善。

这段时间在看和做服务器程序,使用的epoll模型,但是对epoll的理解总是一片模糊:看了很多文章,上面总是说那三个函数怎么怎么用,却始终没有对epoll的原理做简单明了的阐述,也许本人是那种吹毛求疵的人,也许是自己对于不理解的东西根本就是选择性遗忘,也许是为了尽善尽美,我不断的搜寻一些我能理解的文章,但是很失望;只是最终好像有点“众里寻他”和“蓦然回首”的味道,直到无意之间找到了这下面的几篇文章,才有些拨云见日的感觉,不敢独享,所以粘贴复制顺便做了个图,希望更多像我这样的“独行者”和“徘徊者”能够看到,不胜荣幸!
还有,对于原理的理解和对于函数的使用都有需要注意的地方,只理解原理而不知道如何使用就好像学习国术只知道架子不知道打法一样,所以原理我贴一些,使用和使用中的问题我也贴一些,陆陆续续的都会再继续完善。
最大的心得就是,使用之前你最好先man一下,看看man page上的例子!


FROM:http://blog.csdn.net/russell_tao/article/details/7160071


只有在使用epoll ET(Edge Trigger)模式的时候,才需要关注数据是否读取完毕了。使用select或者epollLT模式,其实根本不用关注数据是否读完了,select/epoll检测到有数据可读去读就OK了。


linux
epoll如何实现高效处理百万句柄的


开发高性能网络程序时,windows开发者们言必称iocplinux开发者们则言必称epoll。大家都明白epoll是一种IO多路复用技 术,可以非常高效的处理数以百万计的socket句柄,比起以前的selectpoll效率高大发了。我们用起epoll来都感觉挺爽,确实快,那么, 它到底为什么可以高速处理这么多并发连接呢?


先简单回顾下如何使用C库封装的3epoll系统调用吧。

int epoll_create(int size);                                                            //打开一个文件描述符

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);  //对epfd(epoll_create的返回值)引用的epoll实例进行操作:对fd进行op操作。

int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);  //等待epfd引用的epoll实例上的事件发生。

使用起来很清晰,首先要调用epoll_create建立一个epoll对象,并返回指向此epoll对象的文件描述符epfd。参数size是内核保证能够正确处理的最大句柄数,多于这个最大数时内核可不保证效果。

epoll_ctl可以操作上面epoll_create建立的epoll实例(epfd指向此epoll):将刚建立的fd及其操作op加入到epfd中让其监控,或者把 epfd正在监控的某个socket句柄fd移出epfd,不再监控它等等。

epoll_wait在调用时,在给定的timeout时间内,当在监控的所有句柄中有事件发生时,就返回用户态的进程。

从上面的调用方式就可以看到epollselect/poll的优越之处:因为后者每次调用时都要传递你所要监控的所有socketselect/poll系统调用,这意味着需要将用户态的socket列表copy到内核态,如果以万计的句柄会导致每次都要copy几十几百KB的内存 到内核态,非常低效。而我们调用epoll_wait时就相当于以往调用select/poll,但是这时却不用传递socket句柄给内核,因为内核已 经在epoll_ctl中拿到了要监控的句柄列表。

所以,实际上在你调用epoll_create后,内核就已经在内核态开始准备帮你存储要监控的句柄了,每次调用epoll_ctl只是在往内核的数据结构里塞入新的socket句柄。

在内核里,一切皆文件。所以,epoll向内核注册了一个文件系统,用于存储上述的被监控socket。当你调用epoll_create时,就会在这个虚拟的epoll文件系统里创建一个file结点。当然这个file不是普通文件,它只服务于epollsocketfdepoll_create的返回值)。

epoll在被内核初始化时(操作系统启动),同时会开辟出epoll自己的内核高速cache区,用于安置每一个我们想监控的socket,这些 socket会以红黑树的形式保存在内核cache里,以支持快速的查找、插入、删除。这个内核高速cache区,就是建立连续的物理内存页,然后在之上 建立slab层,简单的说,就是物理上分配好你想要的size的内存对象,每次使用时都是使用空闲的已分配好的对象。








  1. static int __init eventpoll_init(void) {

  2.     ... ...

  3.     /* Allocates slab cache used to allocate "struct epitem" items */

  4.     epi_cache = kmem_cache_create("eventpoll_epi", sizeof(struct epitem), 0,SLAB_HWCACHE_ALIGN|EPI_SLAB_DEBUG|SLAB_PANIC, NULL, NULL);

  5.     /* Allocates slab cache used to allocate "struct eppoll_entry" */

  6.     pwq_cache = kmem_cache_create("eventpoll_pwq", sizeof(struct eppoll_entry), 0, EPI_SLAB_DEBUG|SLAB_PANIC, NULL, NULL);

  7.    ... ...

  8. }



epoll的高效就在于,当我们调用epoll_ctl往里塞入百万个句柄时,epoll_wait仍然可以飞快的返回,并有效的将发生事件的句柄 给我们用户。这是由于我们在调用epoll_create时,内核除了帮我们在epoll文件系统里建了个file结点,在内核cache里建了个红黑树用于存储以后epoll_ctl传来的socket外,还会再建立一个list链表,用于存储准备就绪的事件,当epoll_wait调用时,仅仅观察这个list链表里有没有数据即可。有数据就返回,没有数据就sleep,等到timeout时间到后即使链表没数据也返回。所以,epoll_wait非常高效。


而且,通常情况下即使我们要监控百万计的句柄,大多一次也只返回很少量的准备就绪句柄而已,所以,epoll_wait仅需要从内核态copy少量的句柄到用户态而已,如何能不高效?!

那么,这个准备就绪list链表是怎么维护的呢?当我们执行epoll_ctl时,除了把socket放到epoll文件系统里file对象对应的 红黑树上之外,还会给内核中断处理程序注册一个回调函数,告诉内核,如果这个句柄的中断到了,就把它放到准备就绪list链表里。所以,当一个 socket上有数据到了,内核在把网卡上的数据copy到内核中后就来把socket插入到准备就绪链表里了。

如此,一颗红黑树,一张准备就绪句柄链表,少量的内核cache,就帮我们解决了大并发下的socket处理问题。执行epoll_create 时,创建了红黑树和就绪链表,执行epoll_ctl时,如果增加socket句柄,则检查在红黑树中是否存在,存在立即返回,不存在则添加到树干上,然后向内核注册回调函数,用于当中断事件来临时向准备就绪链表中插入数据。执行epoll_wait时立刻返回准备就绪链表里的数据即可。

最后看看epoll独有的两种模式LTET。无论是LTET模式,都适用于以上所说的流程。区别是,LT模式下,只要一个句柄上的事件一次没有处理完,会在以后调用epoll_wait时次次返回这个句柄,而ET模式仅在第一次返回。

这件事怎么做到的呢?当一个socket句柄上有事件时,内核会把该句柄插入上面所说的准备就绪list链表,这时我们调用epoll_wait, 会把准备就绪的socket拷贝到用户态内存,然后清空准备就绪list链表,最后,epoll_wait干了件事,就是检查这些socket,如果不是 ET模式(就是LT模式的句柄了),并且这些socket上确实有未处理的事件时,又把该句柄放回到刚刚清空的准备就绪链表了。所以,非ET的句柄,只要 它上面还有事件,epoll_wait每次都会返回。而ET模式的句柄,除非有新中断到,即使socket上的事件没有处理完,也是不会次次从 epoll_wait返回的。


如何使用epoll

相比于selectepoll最大的好处在于它不会随着监听fd数目的增长而降低效率。因为在内核中的select实现中,它是采用轮询来处理的,轮询的fd数目越多,自然耗时越多。并且,在linux/posix_types.h头文件有这样的声明:

#define __FD_SETSIZE    1024

表示select最多同时监听1024fd,当然,可以通过修改头文件再重编译内核来扩大这个数目,但这似乎并不治本。

epoll的接口非常简单,一共就三个函数:

1. int epoll_create(int size);

创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大。当创建好epoll句柄后,它就是会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。

2. int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

epoll的事件注册函数,它不同与select()是在监听事件时告诉内核要监听什么类型的事件,而是在这里先注册要监听的事件类型。

第一个参数是epoll_create()的返回值,

第二个参数表示动作,用三个宏来表示:

EPOLL_CTL_ADD:注册新的fdepfd中;

EPOLL_CTL_MOD:修改已经注册的fd的监听事件;

EPOLL_CTL_DEL:从epfd中删除一个fd

第三个参数是需要监听的fd

第四个参数是告诉内核需要监听什么事,struct epoll_event结构如下:

struct epoll_event {

  __uint32_t events;  /* Epoll events */

  epoll_data_t data;  /* User data variable */

};

typedef union epoll_data

{

      void *ptr;

      int fd;

      __uint32_t u32;

      __uint64_t u64;

} epoll_data_t;

 events可以是以下几个宏的集合:

EPOLLIN      表示对应的文件描述符可以读(包括对端SOCKET正常关闭);

EPOLLOUT    表示对应的文件描述符可以写;

EPOLLPRI      表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);

EPOLLERR     表示对应的文件描述符发生错误;

EPOLLHUP     表示对应的文件描述符被挂断;

EPOLLET      EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。

EPOLLONESHOT: 只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里

 3. int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

等待事件的产生,类似于select()调用。参数events用来从内核得到事件的集合,maxevents告之内核这个events有多大,这个maxevents的值不能大于创建epoll_create()时的size,参数timeout是超时时间(毫秒,0会立即返回,-1将不确定,也有说法说是永久阻塞)。该函数返回需要处理的事件数目,如返回0表示已超时。

 再来看看ETLT模式:

man手册中,得到ETLT的具体描述如下

EPOLL事件有两种模型:

Edge Triggered (ET)  边缘触发 只有数据到来,才触发,不管缓存区中是否还有数据。

Level Triggered (LT)  水平触发 只要有数据都会触发。

 假如有这样一个例子:

1. 我们已经把一个用来从管道中读取数据的文件句柄(RFD)添加到epoll描述符

2. 这个时候从管道的另一端被写入了2KB的数据

3. 调用epoll_wait(2),并且它会返回RFD,说明它已经准备好读取操作

4. 然后我们读取了1KB的数据

5. 调用epoll_wait(2)......

 

Edge Triggered 工作模式:

如果我们在第1步将RFD添加到epoll描述符的时候使用了EPOLLET标志,那么在第5步调用epoll_wait(2)之后将有可能会挂起,因为剩余的数据还存在于文件的输入缓冲区内,而且数据发出端还在等待一个针对已经发出数据的反馈信息。只有在监视的文件句柄上发生了某个事件的时候 ET 工作模式才会汇报事件。因此在第5步的时候,调用者可能会放弃等待仍在存在于文件输入缓冲区内的剩余数据。在上面的例子中,会有一个事件产生在RFD句柄上,因为在第2步执行了一个写操作,然后,事件将会在第3步被销毁。因为第4步的读取操作没有读空文件输入缓冲区内的数据,因此我们在第5步调用 epoll_wait(2)完成后,是否挂起是不确定的。epoll工作在ET模式的时候,必须使用非阻塞套接口,以避免由于一个文件句柄的阻塞读/阻塞写操作把处理多个文件描述符的任务饿死。最好以下面的方式调用ET模式的epoll接口,在后面会介绍避免可能的缺陷。

   i    基于非阻塞文件句柄

   ii   只有当read(2)或者write(2)返回EAGAIN时才需要挂起,等待。但这并不是说每次read()时都需要循环读,直到读到产生一个EAGAIN才认为此次事件处理完成,
        当
read()返回的读到的数据长度小于请求的数据长度时,就可以确定此时缓冲中已没有数据了,也就可以认为此时读事件已处理完成。

Level Triggered 工作模式

相反的,以LT方式调用epoll接口的时候,它就相当于一个速度比较快的poll(2),并且无论后面的数据是否被使用,因此他们具有同样的职能。因为即使使用ET模式的epoll,在收到多个chunk的数据的时候仍然会产生多个事件。调用者可以设定EPOLLONESHOT标志,在 epoll_wait(2)收到事件后epoll会与事件关联的文件句柄从epoll描述符中禁止掉。因此当EPOLLONESHOT设定后,使用带有 EPOLL_CTL_MOD标志的epoll_ctl(2)处理文件句柄就成为调用者必须作的事情。

然后详细解释ET, LT:

LT(level triggered)是缺省的工作方式,并且同时支持blockno-block socket.在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的,所以,这种模式编程出错误可能性要小一点。传统的select/poll都是这种模型的代表.

ET(edge-triggered)是高速工作方式,只支持no-block socket。在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知,直到你做了某些操作导致那个文件描述符不再为就绪状态了(比如,你在发送,接收或者接收请求,或者发送接收的数据少于一定量时导致了一个EWOULDBLOCK 错误)。但是请注意,如果一直不对这个fdIO操作(从而导致它再次变成未就绪),内核不会发送更多的通知(only once),不过在TCP协议中,ET模式的加速效用仍需要更多的benchmark确认(这句话不理解)。

在许多测试中我们会看到如果没有大量的idle -connection或者dead-connectionepoll的效率并不会比select/poll高很多,但是当我们遇到大量的idle- connection(例如WAN环境中存在大量的慢速连接),就会发现epoll的效率大大高于select/poll。(未测试)

另外,当使用epollET模型来工作时,当产生了一个EPOLLIN事件后,读数据的时候需要考虑的是当recv()返回的大小如果等于请求的大小,那么很有可能是缓冲区还有数据未读完,也意味着该次事件还没有处理完,所以还需要再次读取:


  1. while(rs){
  2.   buflen = recv(activeevents[i].data.fd, buf, sizeof(buf), 0);
  3.   if(buflen < 0) {
  4.     // 由于是非阻塞的模式,所以当errno为EAGAIN时,表示当前缓冲区已无数据可读
  5.     // 在这里就当作是该次事件已处理处.

  6.     if(errno == EAGAIN)
  7.        break;
  8.     else
  9.        return;
  10.    } else if(buflen == 0) {
  11.      // 这里表示对端的socket已正常关闭.
  12.    }

  13.    if(buflen == sizeof(buf)
  14.      rs = 1; // 需要再次读取
  15.    else
  16.      rs = 0;
  17. }

还有,假如发送端流量大于接收端的流量(意思是epoll所在的程序读比转发的socket要快),由于是非阻塞的socket,那么send()函数虽然返回,但实际缓冲区的数据并未真正发给接收端,这样不断的读和发,当缓冲区满后会产生EAGAIN错误(参考man send),同时,不理会这次请求发送的数据.所以,需要封装socket_send()的函数用来处理这种情况,该函数会尽量将数据写完再返回,返回-1表示出错。在socket_send()内部,当写缓冲已满(send()返回-1,errnoEAGAIN),那么会等待后再重试.这种方式并不很完美,在理论上可能会长时间的阻塞在socket_send()内部,但暂没有更好的办法.


  1. ssize_t socket_send(int sockfd, const char* buffer, size_t buflen){
  2.   ssize_t tmp;
  3.   size_t total = buflen;
  4.   const char *p = buffer;

  5.   while(1) {
  6.     tmp = send(sockfd, p, total, 0);
  7.     if(tmp < 0) {
  8.       // 当send收到信号时,可以继续写,但这里返回-1.
  9.       if(errno == EINTR)
  10.         return -1;
  11.       // 当socket是非阻塞时,如返回此错误,表示写缓冲队列已满,
  12.       // 在这里做延时后再重试.

  13.       if(errno == EAGAIN) {
  14.         usleep(1000);
  15.         continue;
  16.       }
  17.       return -1;
  18.     }

  19.     if((size_t)tmp == total)
  20.       return buflen;
  21.     total -= tmp;
  22.     p += tmp;
  23.   }
  24.    return tmp;
  25. }

 代码1


  1. #include <iostream>
  2. #include <sys/socket.h>
  3. #include <sys/epoll.h>
  4. #include <netinet/in.h>
  5. #include <arpa/inet.h>
  6. #include <fcntl.h>
  7. #include <unistd.h>
  8. #include <stdio.h>
  9. #include <pthread.h>
  10. #include <errno.h>
  11.  #define MAXLINE 10
  12.  #define OPEN_MAX 100
  13.  #define LISTENQ 20
  14.  #define SERV_PORT 5555
  15.  #define INFTIM 1000
  16.  //线程池任务队列结构体
  17.  struct task
  18. {
  19.          int fd; //需要读写的文件描述符
  20.          struct task *next; //下一个任务
  21. };

  22.  //用于读写两个的两个方面传递参数
  23. struct user_data{
  24.          int fd;
  25.          unsigned int n_size;
  26.          char line[MAXLINE];
  27. };

  28.  //线程的任务函数

  29. void * readtask(void *args);
  30. void * writetask(void *args);
  31. //声明epoll_event结构体的变量,ev用于注册事件,数组用于回传要处理的事件
  32. struct epoll_event ev, events[20];
  33. int epfd;
  34. pthread_mutex_t mutex;
  35. pthread_cond_t cond1;
  36. struct task *readhead = NULL, *readtail = NULL, *writehead = NULL;

  37. void setnonblocking(int sock){
  38.          int opts;
  39.          opts = fcntl(sock, F_GETFL);
  40.          if (opts < 0) {
  41.                    perror("fcntl(sock,GETFL)");
  42.                    exit(1);
  43.          }

  44.          opts = opts | O_NONBLOCK;
  45.          if (fcntl(sock, F_SETFL, opts) < 0) {
  46.                    perror("fcntl(sock,SETFL,opts)");
  47.                    exit(1);
  48.          }
  49. }

  50. int main(){
  51.          int i, maxi, listenfd, connfd, sockfd, nfds;
  52.          pthread_t tid1, tid2;
  53.           struct task *new_task = NULL;
  54.          struct user_data *rdata = NULL;
  55.          socklen_t clilen;
  56.           pthread_mutex_init(&mutex, NULL);
  57.          pthread_cond_init(&cond1, NULL);
  58.          //初始化用于读线程池的线程
  59.           pthread_create(&tid1, NULL, readtask, NULL);
  60.          pthread_create(&tid2, NULL, readtask, NULL);
  61.           //生成用于处理accept的epoll专用的文件描述符

  62.          epfd = epoll_create(256);
  63.           struct sockaddr_in clientaddr;
  64.          struct sockaddr_in serveraddr;
  65.          listenfd = socket(AF_INET, SOCK_STREAM, 0);
  66.          //把socket设置为非阻塞方式
  67.          setnonblocking(listenfd);
  68.          //设置与要处理的事件相关的文件描述符
  69.          ev.data.fd = listenfd;
  70.          //设置要处理的事件类型
  71.          ev.events = EPOLLIN | EPOLLET;
  72.          //注册epoll事件
  73.          epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev);
  74.          bzero(&serveraddr, sizeof(serveraddr));
  75.          serveraddr.sin_family = AF_INET;
  76.          char *local_addr = "200.200.200.222";
  77.          inet_aton(local_addr, &(serveraddr.sin_addr));//htons(SERV_PORT);
  78.          serveraddr.sin_port = htons(SERV_PORT);
  79.          bind(listenfd, (sockaddr *) &serveraddr, sizeof(serveraddr));
  80.          listen(listenfd, LISTENQ);
  81.          maxi = 0;
  82.          for (;;) {

  83.                    //等待epoll事件的发生

  84.                    nfds = epoll_wait(epfd, events, 20, 500);

  85.                    //处理所发生的所有事件

  86.                    for (i = 0; i < nfds; ++i) {

  87.                             if (events[i].data.fd == listenfd) {

  88.                                      connfd = accept(listenfd, (sockaddr *) &clientaddr, &clilen);

  89.                                      if (connfd < 0) {

  90.                                                perror("connfd<0");

  91.                                                exit(1);

  92.                                      }

  93.                                      setnonblocking(connfd);

  94.                                      char *str = inet_ntoa(clientaddr.sin_addr);

  95.                                      std::cout << "connec_ from >>" << str << std::endl;

  96.                                      //设置用于读操作的文件描述符

  97.                                      ev.data.fd = connfd;

  98.                                      //设置用于注测的读操作事件

  99.                                      ev.events = EPOLLIN | EPOLLET;

  100.                                      //注册ev

  101.                                      epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev);

  102.                             } else if (events[i].events & EPOLLIN) {

  103.                                      printf("reading!\n");

  104.                                       if ((sockfd = events[i].data.fd) < 0) continue;

  105.                                        new_task = new task();

  106.                                        new_task->fd = sockfd;

  107.                                        new_task->next = NULL;

  108.                                         //添加新的读任务

  109.                                        pthread_mutex_lock(&mutex);

  110.                                        if (readhead == NULL) {

  111.                                               readhead = new_task;

  112.                                               readtail = new_task;

  113.                                        } else {

  114.                                               readtail->next = new_task;

  115.                                               readtail = new_task;

  116.                                         }

  117.                                                //唤醒所有等待cond1条件的线程

  118.                                                pthread_cond_broadcast(&cond1);

  119.                                                pthread_mutex_unlock(&mutex);

  120.                                      } else if (events[i].events & EPOLLOUT) {

  121.                                                rdata = (struct user_data *) events[i].data.ptr;

  122.                                                sockfd = rdata->fd;

  123.                                                write(sockfd, rdata->line, rdata->n_size);

  124.                                                 delete rdata;

  125.                                                //设置用于读操作的文件描述符

  126.                                                 ev.data.fd = sockfd;

  127.                                                //设置用于注测的读操作事件

  128.                                                ev.events = EPOLLIN | EPOLLET;

  129.                                                //修改sockfd上要处理的事件为EPOLIN

  130.                                                epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd, &ev);

  131.                                    }

  132.                    }

  133.          }

  134. }

  135. void * readtask(void *args){

  136.          int fd = -1;

  137.          unsigned int n;

  138.          //用于把读出来的数据传递出去

  139.          struct user_data *data = NULL;

  140.          while (1) {

  141.                    pthread_mutex_lock(&mutex);

  142.                    //等待到任务队列不为空

  143.                    while (readhead == NULL)

  144.                             pthread_cond_wait(&cond1, &mutex);

  145.                    fd = readhead->fd;

  146.                    //从任务队列取出一个读任务

  147.                    struct task *tmp = readhead;

  148.                    readhead = readhead->next;

  149.                    delete tmp;

  150.                    pthread_mutex_unlock(&mutex);

  151.                    data = new user_data();

  152.                    data->fd = fd;

  153.                    if ((n = read(fd, data->line, MAXLINE)) < 0) {

  154.                             if (errno == ECONNRESET) {

  155.                                      close(fd);

  156.                             } else

  157.                             std::cout << "readline error" << std::endl;

  158.                             if (data != NULL) delete data;

  159.                    } else if (n == 0) {

  160.                             close(fd);

  161.                             printf("Client close connect!\n");

  162.                             if (data != NULL)

  163.                             delete data;

  164.                     } else {

  165.                            data->n_size = n;

  166.                               //设置需要传递出去的数据

  167.                            ev.data.ptr = data;

  168.                            //设置用于注测的写操作事件

  169.                             ev.events = EPOLLOUT | EPOLLET;

  170.                             //修改sockfd上要处理的事件为EPOLLOUT

  171.                             epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);

  172.                      }

  173.          }

  174. }

代码2

 

  1. #include <stdio.h>

  2. #include <stdlib.h>

  3. #include <unistd.h>

  4. #include <string.h>

  5. #include <sys/types.h>

  6. #include <sys/socket.h>

  7. #include <sys/epoll.h>

  8. #include <netinet/in.h>

  9. #include <fcntl.h>

  10. #include <errno.h>

  11.  #define MAXLINE 1024

  12.  #define OPEN_MAX 100

  13.  #define EPOLL_SIZE 256

  14.  #define LISTENQ 20

  15.  #define SERV_PORT 5555

  16.  #define SERV_IP "127.0.0.1"

  17.  #define INFIM 1000

  18.  //ev用于注册事件, 数组用于回传要处理的事件

  19. struct epoll_event ev, events[20];

  20.  struct user_data_s{

  21.          int fd;

  22.          char data[MAXLINE];

  23.          int n_size;

  24. };

  25.  typedef struct user_data_s user_data_t;

  26.  int set_nonblock(int fd){

  27.          return fcntl(fd, F_SETFL, fcntl( fd, F_GETFL)|O_NONBLOCK);

  28. }

  29.  user_data_t * rdata;

  30.  int main(int argc, char **argv){

  31.          int i, n;

  32.          int epfd, listenfd, nfds, connfd, sockfd;

  33.          socklen_t clilen;

  34.          char cliip[24];

  35.          char buf[MAXLINE];

  36.           struct sockaddr_in servaddr, cliaddr;

  37.           //socket

  38.          listenfd = socket(AF_INET, SOCK_STREAM, 0);

  39.           //setnonblock

  40.          if(set_nonblock(listenfd)==-1) {

  41.                    perror("set_nonblock");

  42.                    return -1;

  43.          }

  44.          memset(&servaddr, 0, sizeof(servaddr));

  45.          servaddr.sin_family = AF_INET;

  46.          servaddr.sin_port = htons(SERV_PORT);

  47.          inet_pton(AF_INET, SERV_IP, &servaddr.sin_addr);

  48.           //bind

  49.          bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr));

  50.           //listen

  51.          listen(listenfd, LISTENQ);

  52.           printf("Listening %s %d\n", SERV_IP, SERV_PORT);

  53.           //注册epoll事件

  54.          epfd = epoll_create(EPOLL_SIZE);

  55.          ev.data.fd = listenfd;

  56.          ev.events= EPOLLIN | EPOLLET;

  57.          epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev);

  58.           for(;;) {

  59.                    //等待epoll事件的发生

  60.                    nfds = epoll_wait(epfd, events, 20, 500);

  61.                     //处理发生的所有事件

  62.                    for(i=0; i< nfds; i++) {

  63.                             //New connection

  64.                             if(events[i].data.fd == listenfd) {

  65.                                      clilen = sizeof(cliaddr);

  66.                                      if((connfd = accept(listenfd, (struct sockaddr *)&cliaddr, &clilen))==-1) {

  67.                                                perror("accept");

  68.                                                continue;

  69.                                      }

  70.                                      if(set_nonblock(connfd)==-1) {

  71.                                                perror("set_nonblock");

  72.                                                continue;

  73.                                      }

  74.                                      inet_ntop(AF_INET, &cliaddr.sin_addr, cliip, sizeof(cliip));

  75.                                      printf("New connection %s %d\n", cliip, ntohs(cliaddr.sin_port));

  76.  

  77.                                      ev.data.fd = connfd;

  78.                                      ev.events = EPOLLIN | EPOLLET;

  79.                                      epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev);

  80.                             } else if(events[i].events & EPOLLIN) { //可读事件

  81.                                      printf("reading...\n");

  82.                                      if((sockfd = events[i].data.fd)<0)

  83.                                        continue;

  84.                                      memset(buf, '\0', sizeof(buf));

  85.                                      if((n = read(sockfd, buf, sizeof(buf)))==-1) {

  86.                                                if(errno==ECONNRESET)

  87.                                                  close(sockfd);

  88.                                                else

  89.                                                  perror("read");

  90.                                      } else if( n==0 ) {

  91.                                                close(sockfd);

  92.                                                printf("client close connect!\n");

  93.                                      } else {

  94.                                                printf("read->[%s]\n", buf);

  95.                                                user_data_t udata;

  96.                                                udata.fd = sockfd;

  97.                                                memset(udata.data, '\0', sizeof(udata.data));

  98.                                                sprintf(udata.data,"%d", atoi(buf));

  99.                                                udata.n_size = strlen(udata.data);

  100.                                                 //注册写事件

  101.                                                ev.data.ptr = &udata;

  102.                                                ev.events = EPOLLOUT | EPOLLET;

  103.                                                epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd, &ev);

  104.                                      }

  105.                             } else if(events[i].events & EPOLLOUT) { //可写事件

  106.                                      printf("writing...\n");

  107.                                      if(events[i].events & EPOLLOUT) {

  108.                                                rdata = (user_data_t *)events[i].data.ptr;

  109.                                                sockfd = rdata->fd;

  110.                                                write(sockfd, rdata->data, rdata->n_size);

  111.                                                printf("write->[%s]\n",rdata->data);

  112.                                                 //注册读事件

  113.                                                ev.data.fd = sockfd;

  114.                                                ev.events = EPOLLIN | EPOLLET;

  115.                                                epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd, &ev);

  116.                                      }

  117.                             }

  118.                    }

  119.          }

  120.          return 0;

  121. }
Q&A

(转载者注:看完这个,再回头看看nginx源码,发现它在accept时用的是LT模式,read,write时是ET模式)
不知道是谁第一个犯了错,在网上贴出所谓epoll通用框架的代码。注意看accpet的处理:

 


  1. 1    epfd = epoll_create(10);

  2. 2     

  3. 3    struct sockaddr_in clientaddr;

  4. 4    struct sockaddr_in serveraddr;

  5. 5    listenfd = socket(AF_INET, SOCK_STREAM, 0);

  6. 6     

  7. 7    bool bReuseaddr = 1;

  8. 8    //setsockopt(listenfd,SOL_SOCKET,SO_REUSEADDR,(const char*)&bReuseaddr,sizeof(bool));

  9. 9    setnonblocking(listenfd);

  10. 10    ev.data.fd = listenfd;

  11. 11    ev.events = EPOLLIN | EPOLLET;

  12. 12    // ev.events=EPOLLIN;

  13. 13    epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev);

  14. 14    bzero(&serveraddr, sizeof(serveraddr));

  15. 15    serveraddr.sin_family = AF_INET;

  16. 16    // char *local_addr=INADDR_ANY;

  17. 17    // inet_aton(local_addr,&(serveraddr.sin_addr));//htons(SERV_PORT);

  18. 18    serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);

  19. 19    serveraddr.sin_port = htons(SERV_PORT);

  20. 20    bind(listenfd, (sockaddr *) &serveraddr, sizeof(serveraddr));

  21. 21    listen(listenfd, LISTENQ);

  22. 22    maxi = 0;

  23. 23    int nfds_count = 0, recvcount = 0, recvlen = 0;

  24. 24    for (;;) {

  25. 25        cout << "before epoll_wait! nfds_count=" << nfds_count << endl;

  26. 26        nfds = epoll_wait(epfd, events, 20, 10000000);

  27. 27     

  28. 28        // if(nfds>1) cout<<"nfds="<<nfds<<endl;

  29. 29        cout << "nfds=" << nfds << endl;

  30. 30     

  31. 31        for (i = 0; i < nfds; ++i) {

  32. 32            if (events[i].data.fd == listenfd) {

  33. 33                connfd = accept(listenfd, (sockaddr *) &clientaddr, &clilen);

  34. 34                nfds_count += 1;

  35. 35                if (connfd < 0) {

  36. 36                    perror("connfd<0");

  37. 37                    exit(1);

  38. 38                }

  39. 39                setnonblocking(connfd);

  40. 40                char *str = inet_ntoa(clientaddr.sin_addr);

  41. 41                cout << "connect from " << str << "connfd=" << connfd << endl;

  42. 42                ev.data.fd = connfd;

  43. 43                ev.events = EPOLLIN | EPOLLET;

  44. 44                //ev.events=EPOLLIN;

  45. 45                //ev.events=EPOLLIN|EPOLLOUT|EPOLLET;

  46. 46                epoll_ctl(epfd,EPOLL_CTL_ADD, connfd, &ev);

  47. 47      }

  48. 48          }   

  49. 49        }

代码是从某处(很多地方都是这段,连注释都一样)拷过来的。熟悉epoll的人看了应该很熟系,其实就是将linsten的fd也加入到epoll中,当有新连接加入时可以epoll_wait到,随后再用accpet处理。
事实上这段代码是有问题的——高并发的情况下,accept到的fd的数量跟client端的发起请求的数量并不相等。我测了一下,100个并发(其实也不算高了)往往少几个到几十个不等。

相信很多人被这段代码误导了,因为我在遇到问题的时候搜到不少帖子,用的都是这样的代码。只是奇怪的是没见几个人说遇到我提到的这个问题。不过有人说这段代码效率不高,提出用阻塞式IO把accpet提到一个单独的线程里做。撇开这样做性能上的优劣不说,倒是能解决我遇到的问题,但这治标不治本——我要用 epoll+nonblockio

下面说说这段误人子弟的代码,其实 man epoll 就能找到这段代码的出处。相信它是某位同志从里面帖出来然后玩弄了许久后好心放到网上的,不然不会有这么多的//注释。而问题就是出在注释上,注意到 man-page 里的代码是这样的:

1

ev.events = EPOLLIN;

2

ev.data.fd = listen_sock;

3

if (epoll_ctl(epollfd, EPOLL_CTL_ADD, listen_sock, &ev) == -1) {

4

    perror("epoll_ctl: listen_sock");

5

    exit(EXIT_FAILURE);

6

}

没错,man里并没有将listenfd以ET的方式加入epoll(不指定EPOLLET默认就是Level Triggered),事实也证明不设ET就没有问题了。难道说ET模式下就不能以非阻塞模式来accpet?答案是否定的。其实只要把accpet方式改一下就可以了,简单说就是if改while:

1

while ((confd = accept(listenfd, (struct sockaddr*) sa, &clientlen)) > 0) {

3

    //add connection to epoll

4

    peer = (struct sockaddr*sa;

5

    printf("%s:%d connect at %d.\n", inet_ntoa(peer->sin_addr), peer->sin_port, confd);

6

          

7

    setnonblocking(confd);

9           ev.data.fd = confd;

10

    ev.events = EPOLLIN | EPOLLET ;

12          epoll_ctl(myfd, EPOLL_CTL_ADD, confd, &ev);

14

}

  

嗯,确实是这样的,我刚开始学的时候就发生过这种错,我看过man文档,负责监听 accpet的fd是LT模式的,并不是在网上搜到的ET模式。我之前测过的时候就是遇到客户端量一多的时候,就出现监听到的客户端数量不一的问题,当时 查得我那个心烦呀~当时就奇怪别人的代码贴出来之后也没反应有啥问题,但通过在多个版本的LINUX上测试之后,可以肯定就是不用ET模式就没问题。

  呵呵,我今天也找到问题了,才看到你的文章。网上的代码,帮助人也害人哪。

 



阅读(2519) | 评论(0) | 转发(0) |
1

上一篇:vim使用技巧

下一篇:服务器模型

给主人留下些什么吧!~~