Chinaunix首页 | 论坛 | 博客
  • 博客访问: 885112
  • 博文数量: 254
  • 博客积分: 5350
  • 博客等级: 大校
  • 技术积分: 2045
  • 用 户 组: 普通用户
  • 注册时间: 2008-06-27 13:27
文章分类

全部博文(254)

文章存档

2015年(1)

2014年(9)

2013年(17)

2012年(30)

2011年(150)

2010年(17)

2009年(28)

2008年(2)

分类: C/C++

2011-12-14 16:54:38

Linux I/O多路复用技术在比较多的TCP网络服务器中有使用,即比较多的用到select函数。Linux 2.6内核中有提高网络I/O性能的新方法,即epoll 。

1、为什么select落后
    首先,在Linux内核中,select所用到的FD_SET是有限的,即内核中有个参数__FD_SETSIZE定义了每个FD_SET的句柄个数,在我用的2.6.15-25-386内核中,该值是1024,搜索内核源代码得到:
include/linux/posix_types.h:
#define __FD_SETSIZE         1024
    也就是说,如果想要同时检测1025个句柄的可读状态是不可能用select实现的。或者同时检测1025个句柄的可写状态也是不可能的。其次,内核中实现select是使用轮询方法,即每次检测都会遍历所有FD_SET中的句柄,显然,select函数的执行时间与FD_SET中句柄的个数有一个比例关系,即select要检测的句柄数越多就会越费时。当然,在前文中我并没有提及poll方法,事实上用select的朋友一定也试过poll,我个人觉得select和poll大同小异,个人偏好于用select而已。

2、内核中提高I/O性能的新方法 epoll
    epoll是什么?按照man手册的说法:是为处理大批量句柄而作了改进的poll。要使用epoll只需要以下的三个系统函数调用:epoll_create(2), epoll_ctl(2), epoll_wait(2)。

Linux2.6内核epoll介绍
    先介绍2本书《The Linux Networking Architecture--Design and Implementation of Network Protocols in the Linux Kernel》, 以2.4内核讲解Linux TCP/IP实现,相当不错。作为一个现实世界中的实现,很多时候你必须作很多权衡,这时候参考一个久经考验的系统更有实际意义。举个例子,linux内 核中sk_buff结构为了追求速度和安全,牺牲了部分内存,所以在发送TCP包的时候,无论应用层数据多大,sk_buff最小也有272的字节。其实 对于socket应用层程序来说,另外一本书《UNIX Network Programming Volume 1》意义更大一点。2003年的时候,这本书出了最新的第3版本,不过主要还是修订第2版本。其中第6章《I/O Multiplexing》是最重要的,Stevens给出了网络IO的基本模型。在 这里最重要的莫过于select模型和Asynchronous I/O模型。从理论上说,AIO似乎是最高效的,你的IO操作可以立即返回,然后等待os告诉你IO操作完成。但是一直以来,如何实现就没有一个完美的方 案。最著名的windows完成端口实现的AIO,实际上也只是内部用线程池实现的罢了,最后的结果是IO有个线程池,你的应用程序也需要一个线程 池...... 很多文档其实已经指出了这引发的线程context-switch所带来的代价。在linux 平台上,关于网络AIO一直是改动最多的地方,2.4的年代就有很多AIO内核patch,最著名的应该算是SGI。但是一直到2.6内核发布,网络模块 的AIO一直没有进入稳定内核版本(大部分都是使用用户线程模拟方法,在使用了NPTL的linux上面其实和windows的完成端口基本上差不多 了)。2.6内核所支持的AIO特指磁盘的AIO---支持io_submit(),io_getevents()以及对Direct IO的支持(即:就是绕过VFS系统buffer直接写硬盘,对于流服务器在内存平稳性上有相当的帮助)。
    所以,剩下的select模型基本上就成为我们在linux上面的唯一选择,其实,如果加上no-block socket的配置,可以完成一个"伪"AIO的实现,只不过推动力在于你而不是os而已。不过传统的select/poll函数有着一些无法忍受的缺 点,所以改进一直是2.4-2.5开发版本内核的任务,包括/dev/poll,realtime signal等等。最终,Davide Libenzi开发的epoll进入2.6内核成为正式的解决方案。

3、epoll的优点
<1> 支持一个进程打开大数目的socket描述符(FD)
    select 最不能忍受的是一个进程所打开的FD是有一定限制的,由FD_SETSIZE设置。 对于那些需要支持上万连接数目的IM服务器来说显然太少了。这时候你一是可以选择修改这个宏然后重新编译内核,不过资料也同时指出这样会带来网络效率的下 降;二是可以选择多进程的解决方案(传统的Apache方案),不过虽然linux上面创建进程的代价比较小,但仍旧是不可忽视的,加上进程间数据同步远 比不上线程间同步高效,所以这也不是一种完美的方案。不过epoll 没有这个限制,它所支持的FD上限是最大可以打开文件的数目,这个数字一般远大于select 所支持的2048。举个例子,在1GB内存的机器上大约是10万左右,具体数目可以cat /proc/sys/fs/file-max察看,一般来说这个数目和系统内存关系很大。

<2> IO效率不随FD数目增加而线性下降
    传统select/poll的另一个致命弱点就是当你拥有一个很大的socket集合,由于网络得延时,使得任一时间只有部分的socket是"活跃" 的,而select/poll每次调用都会线性扫描全部的集合,导致效率呈现线性下降。但是epoll不存在这个问题,它只会对"活跃"的socket进 行操作---这是因为在内核实现中epoll是根据每个fd上面的callback函数实现的。于是,只有"活跃"的socket才会主动去调用 callback函数,其他idle状态的socket则不会,在这点上,epoll实现了一个"伪"AIO,因为这时候推动力在os内核。在一些 benchmark中,如果所有的socket基本上都是活跃的---比如一个高速LAN环境,epoll也不比select/poll低多少效率,但若 过多使用的调用epoll_ctl,效率稍微有些下降。然而一旦使用idle connections模拟WAN环境,那么epoll的效率就远在select/poll之上了。

<3> 使用mmap加速内核与用户空间的消息传递
    这点实际上涉及到epoll的具体实现。无论是select,poll还是epoll都需要内核把FD消息通知给用户空间,如何避免不必要的内存拷贝就显 得很重要,在这点上,epoll是通过内核于用户空间mmap同一块内存实现的。而如果你像我一样从2.5内核就开始关注epoll的话,一定不会忘记手 工mmap这一步的。

<4> 内核微调
    这一点其实不算epoll的优点,而是整个linux平台的优点。也许你可以怀疑linux平台,但是你无法回避linux平台赋予你微调内核的能力。比 如,内核TCP/IP协议栈使用内存池管理sk_buff结构,可以在运行期间动态地调整这个内存pool(skb_head_pool)的大小---通 过echo XXXX>/proc/sys/net/core/hot_list_length来完成。再比如listen函数的第2个参数(TCP完成3次握 手的数据包队列长度),也可以根据你平台内存大小来动态调整。甚至可以在一个数据包面数目巨大但同时每个数据包本身大小却很小的特殊系统上尝试最新的 NAPI网卡驱动架构。

4、epoll的工作模式
    令人高兴的是,linux2.6内核的epoll比其2.5开发版本的/dev/epoll简洁了许多,所以,大部分情况下,强大的东西往往是简单的。唯一有点麻烦的是epoll有2种工作方式:LT和ET
    LT(level triggered)是缺省的工作方式,并且同时支持block和no-block socket。在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的,所以,这种模式编程出错误可能性要小一点。传统的select/poll都是这种模型的代表
    ET (edge-triggered) 是高速工作方式,只支持no-block socket。 在这种模式下,当描述符从未就绪变为就绪时,内核就通过epoll告诉你,然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的 就绪通知,直到你做了某些操作而导致那个文件描述符不再是就绪状态(比如 你在发送,接收或是接受请求,或者发送接收的数据少于一定量时导致了一个EWOULDBLOCK 错误)。但是请注意,如果一直不对这个fd作IO操作(从而导致它再次变成未就绪),内核就不会发送更多的通知(on

ly once)。不过在TCP协议中,ET模式的加速效用仍需要更多的benchmark确认。
    epoll只有epoll_create,epoll_ctl,epoll_wait 3个系统调用,具体用法请参考,在也有一个完整的例子,大家一看就知道如何使用了。

5、 epoll的使用方法

epoll用到的所有函数都是在头文件sys/epoll.h中声明的,下面简要说明所用到的数据结构和函数:
所用到的数据结构:
typedef union epoll_data {
void *ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;

struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
结构体epoll_event 被用于注册所感兴趣的事件和回传所发生待处理的事件,而epoll_data 联合体用来保存触发事件的某个文件描述符相关的数据。例如一个client连接到服务器,服务器通过调用accept函数可以得到于这个client对应的socket文件描述符,可以把这文件描述符赋给epoll_data的fd字段,以便后面的读写操作在这个文件描述符上进行。epoll_event 结构体的events字段是表示感兴趣的事件和被触发的事件,可能的取值为:
EPOLLIN:表示对应的文件描述符可以读;
EPOLLOUT:表示对应的文件描述符可以写;
EPOLLPRI:表示对应的文件描述符有紧急的数据可读;
EPOLLERR:表示对应的文件描述符发生错误;
EPOLLHUP:表示对应的文件描述符被挂断;
EPOLLET:表示对应的文件描述符有事件发生;
EPOLL
ONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里
所用到的函数:
1)、epoll_create函数
函数声明:int epoll_create(int size)
该 函数生成一个epoll专用的文件描述符,其中的参数是指定生成描述符的最大范围。这个参数不同于select()中的第一个参数,给出最大监听的 fd+1的值。需要注意的是,当创建好epoll句柄后,它就是会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个 fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。
2)、epoll_ctl函数
函数声明:int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
该函数用于控制某个文件描述符上的事件,可以注册事件,修改事件,删除事件。
参数:
epfd:由 epoll_create 生成的epoll专用的文件描述符;
op:对fd进行的操作,可能的取值EPOLL_CTL_ADD 注册、EPOLL_CTL_MOD 修改、EPOLL_CTL_DEL 删除;
fd:关联的文件描述符;
event:指向epoll_event的指针;
如果调用成功则返回0,不成功则返回-1。
3)、epoll_wait函数
函数声明:int epoll_wait(int epfd,struct epoll_event * events,int maxevents,int timeout)
该函数用于轮询I/O事件的发生。类似于select()调用。
参数:
epfd:由epoll_create 生成的epoll专用的文件描述符;
events:用来从内核得到事件的集合,用于回传代处理事件的数组;
maxevents:每次能处理的事件数,告之内核这个events有多大;这个maxevents的值不能大于创建epoll_create()时的size;
timeout:等待I/O事件发生的超时值;(毫秒,0会立即返回,-1将不确定,也有说法说是永久阻塞)。
nfds:返回发生事件数,如返回0表示已超时。

/* epoll_wait函数:等待epoll事件的发生,并将发生的sokct fd和事件类型放入到events数组中,nfds 为发生的事件的个数。
注:事件发生后,注册在epfd上的socket fd的事件类型会被清空(或手动EPOLL_CTL_DEL删除事件?),所以如果下一个循环你还要关注这个socket fd的话,
则需要用epoll_ctl(epfd,EPOLL_CTL_MOD,listenfd,&ev)来重新设置socket fd的事件类型。
*/
    首先通过create_epoll(int maxfds)来创建一个epoll的句柄,其中maxfds为你的epoll所支持的最大句柄数。这个函数会返回一个新的epoll句柄,之后的所有操作都将通过这个句柄来进行操作。在用完之后,记得用close()来关闭这个创建出来的epoll句柄

    之后在你的网络主循环里面调用epoll_wait(int epfd, epoll_event events, int max_events, int timeout)来查询所有的网络接口,看哪一个可以读,哪一个可以写。基本的语法为:
nfds = epoll_wait(kdpfd, events, maxevents, -1);
其中kdpfd为用epoll_create创建之后的句柄,events是一个epoll_event*的指针,当epoll_wait函数操作成功之后,events里面将储存所有的读写事件。max_events是当前需要监听的所有socket句柄数。最后一个timeout参 数指示 epoll_wait的超时条件,为0时表示马上返回;为-1时表示函数会一直等下去直到有事件返回;为任意正整数时表示等这么长的时间,如果一直没有事 件,则会返回。一般情况下如果网络主循环是单线程的话,可以用-1来等待,这样可以保证一些效率,如果是和主循环在同一个线程的话,则可以用0来保证主循 环的效率。epoll_wait返回之后,应该进入一个循环,以便遍历所有的事件。

    对epoll 的操作就这么简单,总共不过4个API:epoll_create, epoll_ctl, epoll_wait和close。以下是man中的一个例子。

struct epoll_event ev, *events;
for(;;) {
 nfds = epoll_wait(kdpfd, events, maxevents, -1); //等待I/O事件
 for(n = 0; n < nfds; ++n) {
  if(events[n].data.fd == listener) { //如果是主socket的事件,则表示有新连接进入,需要进行新连接的处理。
    client = accept(listener, (struct sockaddr *) &local,  &addrlen);
    if(client < 0){
      perror("accept error");
      continue;
    }
    setnonblocking(client); // 将新连接置于非阻塞模式
    ev.events = EPOLLIN | EPOLLET; 
                                   //注意这里的参数EPOLLIN | EPOLLET并没有设置对写socket的监听,
                                   //如果有写操作的话,这个时候epoll是不会返回事件的,
                                   //如果要对写操作也监听的话,应该是EPOLLIN | EPOLLOUT | EPOLLET。
    ev.data.fd = client;    // 并且将新连接也加入EPOLL的监听队列
    if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, client, &ev) < 0) { 

// 设置好event之后,将这个新的event通过epoll_ctl 加入到epoll的监听队列里,这里用EPOLL_CTL_ADD来加一个新的 epoll事件。可以通过EPOLL_CTL_DEL来减少一个epoll事件,通过EPOLL_CTL_MOD来改变一个事件的监听方式
      fprintf(stderr, "epoll set insertion error: fd=%d", client);
      return -1;
    }
  }  else {// 如果不是主socket的事件的话,则代表这是一个用户的socket的事件(如读事件、写事件之类的),
             // 则用来处理这个用户的socket的事情是,比如说read(fd,xxx)或write(fd,xxx)之类,或者一些其他的处理。
    do_use_fd(events[n].data.fd);
 }
}

6、Linux下epoll编程实例
 epoll_wait范围之后应该是一个循环,遍历所有的事件。
几乎所有的epoll程序都使用下面的框架 转载自:http://blog.csdn.net/ljx0305/article/details/4065058
for( ; ; )
{
nfds = epoll_wait(epfd,events,20,500);
for(i=0;i{
if(events[i].data.fd==listenfd) //有新的连接
{
connfd = accept(listenfd,(sockaddr *)&clientaddr, &clilen); //accept这个连接
ev.data.fd=connfd;
ev.events=EPOLLIN|EPOLLET;
epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&ev); //将新的fd添加到epoll的监听队列中
}
else if( events[i].events&EPOLLIN ) //接收到数据,读socket
{
n = read(sockfd, line, MAXLINE,0); //读出数据
ev.data.ptr = md; //md为自定义类型,添加数据
ev.events=EPOLLOUT|EPOLLET;
epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);//修改标识符,等待下一个循环时发送数据,异步处理的精髓
}
else if(events[i].events&EPOLLOUT) //有数据待发送,写socket
{
struct myepoll_data* md = (myepoll_data*)events[i].data.ptr; //取数据
sockfd = md->fd;
send( sockfd, md->ptr, strlen((char*)md->ptr), 0 ); //发送数据
ev.data.fd=sockfd;
ev.events=EPOLLIN|EPOLLET;
epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev); //修改标识符,等待下一个循环时接收数据
}
else
{
//其他的处理
}
}
}

 *****************************************************************************************************************************************




    Epoll模型主要负责对大量并发用户的请求进行及时处理,完成服务器与客户端的数据交互。其具体的实现步骤如下:
(a) 使用epoll_create()函数创建文件描述,设定可管理的最大socket描述符数目。
(b) 创建与epoll关联的接收线程,应用程序可以创建多个接收线程来处理epoll上的读通知事件,线程的数量依赖于程序的具体需要。
(c) 创建一个侦听socket的描述符ListenSock,并将该描述符设定为非阻塞模式,调用Listen()函数在该套接字上侦听有无新的连接请求,在epoll_event结构中设置要处理的事件类型EPOLLIN,工作方式为 epoll_ET,以提高工作效率,同时使用epoll_ctl()来注册事件,最后启动网络监视线程。
(d) 网络监视线程启动循环,epoll_wait()等待epoll事件发生。
(e) 如果epoll事件表明有新的连接请求,则调用accept()函数,将用户socket描述符添加到epoll_data联合体,同时设定该描述符为非阻塞,并在epoll_event结构中设置要处理的事件类型为读和写,工作方式为epoll_ET。
(f) 如果epoll事件表明socket描述符上有数据可读,则将该socket描述符加入可读队列,通知接收线程读入数据,并将接收到的数据放入到接收数据的链表中,经逻辑处理后,将反馈的数据包放入到发送数据链表中,等待由发送线程发送。

例子代码:

#include
#include
#include
#include
#include
#include
#include
#include

#define MAXLINE 10
#define OPEN_MAX 100
#define LISTENQ 20
#define SERV_PORT 5555
#define INFTIM 1000

void setnonblocking(int sock)
{
  int opts;
  opts=fcntl(sock,F_GETFL);

  if(opts<0)
  {
    perror("fcntl(sock,GETFL)");
    exit(1);
  }

  opts = opts | O_NONBLOCK;

  if(fcntl(sock,F_SETFL,opts)<0)
  {
    perror("fcntl(sock,SETFL,opts)");
    exit(1);
  }
}


int main()
{
  int i, maxi, listenfd, connfd, sockfd, epfd, nfds;
  ssize_t n;
  char line[MAXLINE];
  socklen_t clilen;

  struct epoll_event ev,events[20]; //声明epoll_event结构体的变量, ev用于注册事件, events数组用于回传要处理的事件
  epfd=epoll_create(256); //生成用于处理accept的epoll专用的文件描述符, 指定生成描述符的最大范围为256 

  struct sockaddr_in clientaddr;
  struct sockaddr_in serveraddr;

  listenfd = socket(AF_INET, SOCK_STREAM, 0);

  setnonblocking(listenfd); //把用于监听的socket设置为非阻塞方式

  ev.data.fd=listenfd; //设置与要处理的事件相关的文件描述符
  ev.events=EPOLLIN | EPOLLET; //设置要处理的事件类型
  epoll_ctl(epfd,EPOLL_CTL_ADD,listenfd,&ev); //注册epoll事件

  bzero(&serveraddr, sizeof(serveraddr));
  serveraddr.sin_family = AF_INET;
  char *local_addr="200.200.200.204";
  inet_aton(local_addr,&(serveraddr.sin_addr));
  serveraddr.sin_port=htons(SERV_PORT);  //或者htons(SERV_PORT);

  bind(listenfd,(sockaddr *)&serveraddr, sizeof(serveraddr));

  listen(listenfd, LISTENQ);  //同一时刻能监听的最大连接请求数为LISTENQ,始终受约束?

  maxi = 0;

  for( ; ; ) {
    nfds=epoll_wait(epfd,events,20,500); //等待epoll事件的发生

    for(i=0;i //处理所发生的所有事件
      { 
       if(events[i].data.fd==listenfd)    /**是一个监听事件**/
        { 
           connfd = accept(listenfd,(sockaddr *)&clientaddr, &clilen); 
           if(connfd<0){ 
             perror("connfd<0"); 
             continue; //处理下一个事件
           }

         setnonblocking(connfd); //把客户端的socket设置为非阻塞方式

         char *str = inet_ntoa(clientaddr.sin_addr);
         std::cout<<"connect from "<_u115 ? tr<

         ev.data.fd=connfd; //设置用于读操作的文件描述符
         ev.events=EPOLLIN | EPOLLET; //设置用于注测的读操作事件
         epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&ev); //注册ev事件 
       } 
      else if(events[i].events&EPOLLIN)     /**是一个读事件**/
        {
           if ( (sockfd = events[i].data.fd) < 0) continue;
           if ( (n = read(sockfd, line, MAXLINE)) < 0) { 
              if (errno == ECONNRESET) {

                epoll_ctl(epfd,EPOLL_CL_DEL,sockfd,&ev); //删除sockfd上的事件类型
                close(sockfd);  //并关闭本套接字
                events[i].data.fd = -1; //?

                continue; //处理下一个事件
                } else
                 {  //难道不需要重新修改本sockfd上的事件类型吗?
                    std::cout<<"readline error"<

                    continue; //处理下一个事件
                  }
             } else if (n == 0) { //网络无法连接本sockfd,可能是对方关闭掉了

                epoll_ctl(epfd,EPOLL_CL_DEL,sockfd,&ev); //删除sockfd上的事件类型
                close(sockfd);  //并关闭本套接字
                events[i].data.fd = -1; //?

                continue;  //处理下一个事件
              }

          ev.data.fd=sockfd; //设置用于写操作的文件描述符
          ev.events=EPOLLOUT | EPOLLET; //设置用于注测的写操作事件
          epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev); //修改sockfd上要处理的事件为EPOLLOUT 
       } 
      else if(events[i].events&EPOLLOUT)    /**是一个写事件**/
        {

          //处理情况可参照“写事件”,以下是简要描述。
          sockfd = events[i].data.fd;
          write(sockfd, line, n);

          ev.data.fd=sockfd; //设置用于读操作的文件描述符
          ev.events=EPOLLIN | EPOLLET; //设置用于注册的读操作事件
          epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev); //修改sockfd上要处理的事件为EPOLIN
        } 
      else  /**其他区情况**/
        { 
           do_use_fd(events[n].data.fd);
        } 
     }
  }
}

*****************************************************************************************************************************************

转载自:http://www.cnblogs.com/OnlyXP/archive/2007/08/10/851222.html

epoll精髓

在linux的网络编程中,很长的时间都在使用select来做事件触发。在linux新的内核中,有了一种替换它的机制,就是epoll。
相比于select,epoll最大的好处在于它不会随着监听fd数目的增长而降低效率。因为在内核中的select实现中,它是采用轮询来处理的,轮询的fd数目越多,自然耗时越多。并且,在linux/posix_types.h头文件有这样的声明:
#define __FD_SETSIZE 1024
表示select最多同时监听1024个fd,当然,可以通过修改头文件再重编译内核来扩大这个数目,但这似乎并不治本。

epoll的接口非常简单,一共就三个函数:
1. int epoll_create(int size);
创 建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大。这个参数不同于select()中的第一个参数,给出最大监听的fd+1的值。 需要注意的是,当创建好epoll句柄后,它就是会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在 使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。

2. int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
epoll的事件注册函数,它不同与select()是在监听事件时告诉内核要监听什么类型的事件,而是在这里先注册要监听的事件类型。第一个参数是epoll_create()的返回值,第二个参数表示动作,用三个宏来表示:
EPOLL_CTL_ADD:注册新的fd到epfd中;
EPOLL_CTL_MOD:修改已经注册的fd的监听事件;
EPOLL_CTL_DEL:从epfd中删除一个fd;
第三个参数是需要监听的fd,第四个参数是告诉内核需要监听什么事,struct epoll_event结构如下:
struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};

events可以是以下几个宏的集合:
EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
EPOLLOUT:表示对应的文件描述符可以写;
EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
EPOLLERR:表示对应的文件描述符发生错误;
EPOLLHUP:表示对应的文件描述符被挂断;
EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的;
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里。

3. int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
等 待事件的产生,类似于select()调用。参数events用来从内核得到事件的集合,maxevents告之内核这个events有多大,这个 maxevents的值不能大于创建epoll_create()时的size,参数timeout是超时时间(毫秒,0会立即返回,-1将不确定,也有 说法说是永久阻塞)。该函数返回需要处理的事件数目,若返回0则表示已超时。
--------------------------------------------
从man手册中,得到ET和LT的具体描述如下

EPOLL事件有两种模型:
Edge Triggered (ET)
Level Triggered (LT)

假如有这样一个例子:
1. 我们已经把一个用来从管道中读取数据的文件句柄(RFD)添加到epoll描述符
2. 这个时候从管道的另一端被写入了2KB的数据
3. 调用epoll_wait(2),并且它会返回RFD,说明它已经准备好读取操作
4. 然后我们读取了1KB的数据
5. 调用epoll_wait(2)......

Edge Triggered 工作模式:
如果我们在第1步将RFD添加到epoll描述符的时候使用了EPOLLET标志,那么在第5步调用epoll_wait(2)之后将有可能会挂起: 因为剩余的数据还存在于文件的输入缓冲区内,而且数据发出端还在等待一个针对已经发出数据的反馈信息。只有在监视的文件句柄上发生了某个事件的时候 ET 工作模式才会汇报事件。因此在第5步的时候,调用者可能会放弃等待仍在存在于文件输入缓冲区内的剩余数据。在上面的例子中,会有一个事件产生在RFD句柄 上,因为在第2步执行了一个写操作,然后,事件将会在第3步被销毁。因为第4步的读取操作没有读空文件输入缓冲区内的数据,因此我们在第5步调用 epoll_wait(2)完成后,是否挂起是不确定的。epoll工作在ET模式的时候,必须使用非阻塞套接口,以避免由于一个文件句柄的阻塞读/阻塞写操作把处理多个文件描述符的任务饿死最好以下面的方式调用ET模式的epoll接口,在后面会介绍避免可能的缺陷
i 基于非阻塞文件句柄
ii 只有当read(2)或者write(2)返回EAGAIN时才需要挂起、等待。但这并不是说每次read()时都需要循环读,直到读到产生一个EAGAIN才认为此次事件处理完成,当read()返回的读到的数据长度小于请求的数据长度时,就可以确定此时缓冲中已没有数据了,也就可以认为此时读事件已处理完成。

Level Triggered 工作模式:
相 反的,以LT方式调用epoll接口的时候,它就相当于一个速度比较快的poll(2),并且无论后面的数据是否被使用,因此他们具有同样的职能。因为即 使使用ET模式的epoll,在收到多个chunk的数据的时候仍然会产生多个事件。调用者可以设定EPOLLONESHOT标志,在 epoll_wait(2)收到事件后epoll会与事件关联的文件句柄从epoll描述符中禁止掉。因此当EPOLLONESHOT设定后,使用带有 EPOLL_CTL_MOD标志的epoll_ctl(2)处理文件句柄就成为调用者必须作的事情。

然后详细解释ET, LT:
LT(level triggered)是缺省的工作方式,并且同时支持block和no-block socket.在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你 的,所以,这种模式编程出错误可能性要小一点。传统的select/poll都是这种模型的代表。

ET(edge-triggered) 是高速工作方式,只支持no-block socket。在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述 符发送更多的就绪通知,直到你做了某些操作导致那个文件描述符不再为就绪状态了(比如,你在发送,接收或者接收请求,或者发送接收的数据少于一定量时导致 了一个EWOULDBLOCK 错误)。但是请注意,如果一直不对这个fd作IO操作(从而导致它再次变成未就绪),内核不会发送更多的通知(only once),不过在TCP协议中,ET模式的加速效用仍需要更多的benchmark确认(这句话不理解)。

可以得出这样的结论:
ET 模式仅当状态发生变化的时候才获得通知,这里所谓的状态的变化并不包括缓冲区中还有未处理的数据,也就是说,如果要采用ET模式,需要一直 read/write直到出错为止,很多人反映为什么采用ET模式只接收了一部分数据就再也得不到通知了,大多因为这样;而LT模式是只要有数据没有处理 就会一直通知下去的.

在许多测试中我们会看到 如果没有大量的idle -connection或者dead-connection,epoll的效率并不会比select/poll高很多,但是当我们遇到大量的idle- connection(例如WAN环境中存在大量的慢速连接),就会发现epoll的效率大大高于select/poll。(未测试)
---------------------------------------------------------------
上 面没有说明对于listen socket fd该如何处理,有的时候会使用两个线程,一个用来监听accept,另一个用来监听epoll_wait,如果是这样使用的话,则listen socket fd使用默认的阻塞方式就行了;而如果epoll_wait和accept处于一个线程中,即,全部由epoll_wait进行监听,则,需将 listen socket fd也设置成非阻塞的,这样,对accept也应该使用while包起来(类似于recv的操作),因为epoll_wait返回时只是说有连接到来了, 并没有说有几个连接,而且在ET模式下epoll_wait不会再因为上一次的连接还没读完而返回,epoll_wait只返回了一次这里需要说明的是,每调用一次accept将从内核中的已连接队列中的队头读取一个连接,这是因为在并发访问的环境下,有可能有多个连接“同时”到达,这种情况确实存在。

另外,当使用epoll的ET模型来工作时,当产生了一个EPOLLIN事件后,
读数据的时候需要考虑的是:当recv()返回的大小如果等于请求的大小,那么很有可能是缓冲区还有数据未读完,也意味着该次事件还没有处理完,所以还需要再次读取
while(rs)
{
buflen = recv(activeevents[i].data.fd, buf, sizeof(buf), 0);
if(buflen < 0)
{
// 由于是非阻塞的模式,所以当errno为EAGAIN时,表示当前缓冲区已无数据可读
// 在这里就当作是该次事件已处理处完成.
if(errno == EAGAIN)
break;
else
return; //其他情况当特殊处理
}
else if(buflen == 0)
{
// 这里表示对端的socket已正常关闭.
}
if(buflen == sizeof(buf)
rs = 1; // 需要再次读取
else
rs = 0;
}

还 有,假如发送端流量大于接收端的流量(意思是epoll所在的程序读比转发的socket要快),由于是非阻塞的socket,那么send()函数虽然 返回,但实际缓冲区的数据并未真正发给接收端,这样不断的读和发,当缓冲区满后会产生EAGAIN错误(参考man send),同时不理会这次请求发送的数据.所以,需要封装socket_send()的函数用来处理这种情况,该函数会尽量将数据写完再返回,返回-1 表示出错。在socket_send()内部,当写缓冲已满(send()返回-1,且errno为EAGAIN),那么会等待后再重试.这种方式并不很 完美,在理论上可能会长时间的阻塞在socket_send()内部,但暂没有更好的办法.

ssize_t socket_send(int sockfd, const char* buffer, size_t buflen)
{
ssize_t tmp;
size_t total = buflen;
const char *p = buffer;

while(1)
{
tmp = send(sockfd, p, total, 0);
if(tmp < 0)
{
// 当send收到信号时,可以继续写,但这里给它返回-1.
if(errno == EINTR)
return -1;

// 当socket是非阻塞时,如返回此错误,表示写缓冲队列已满,
// 在这里做延时后再重试.(或将数据重新放回输出队列?)
if(errno == EAGAIN)
{
usleep(1000);
continue;
}

return -1;
}

if((size_t)tmp == total)
return buflen;

total -= tmp;
p += tmp;
}

return tmp;
}
-------后解---------
jhorne: --引用--------------------------------------------------
123we: 可以使用buffer来解决socket_send的问题,不用阻塞,只是send的buffer写满后,再检查EPOLLOUT,然后继续写剩余的buffer
--------------------------------------------------------
这话是什么意思呢? 非阻塞socket 调用一次send 然后等待EPOLLOUT事件
等到了即是发送完毕了 是这样吗
--------------------------------------------------------
应该是这样的buffer写满后,当再有EPOLLOUT事件发生,就要等待(阻塞)了,等buffer有空余空间时再处理EPOLLOUT事件
--------------------------------------------------------
EPOLLOUT事件的意思就是 当前这个socket的发送状态是空闲的,此时处理能力很强,告知用户可以发送数据。
所以在正常情况下,基本上socket在epoll_wait后,都会得到一个socket的EPOLLOUT事件。【如果你不是一直在写数据或者你不是在传送一个几百M的数据文件,send一般都处于空闲状态】
而这个特性刚好可以处理楼主所谓的 阻塞问题。
当数据发送不出去的时候,说明网络阻塞或者延迟太厉害了。
那么可将 要发送的数据放在一个buffer中,当下次你发现了EPOLLOUT事件时,说明现在网络处于空闲状态,OK,此时你可以用另外一个线程来发送上次堆积在buffer中的数据了。这样就不会阻塞了。
------so--------------------------------------------------
对于EPOLLOUT事件的处理策略就是 先尝试直接发送. 如果发送不完整,就buffer住等下一轮再发.

下面给出一个完整的服务器端例子:
==============================================

  1. #include <iostream>
  2. #include <sys/socket.h>
  3. #include <sys/epoll.h>
  4. #include <netinet/in.h>
  5. #include <arpa/inet.h>
  6. #include <fcntl.h>
  7. #include <unistd.h>
  8. #include <stdio.h>
  9. #include <errno.h>

  10. using namespace std;

  11. #define    MAXLINE 50
  12. //#define OPEN_MAX 100
  13. #define    LISTENQ 20
  14. //#define    SERV_PORT 5000
  15. //#define    INFTIM 1000

  16. void setnonblocking(int sock)
  17. {
  18.     int opts;
  19.     opts = fcntl(sock, F_GETFL);
  20.     if(opts < 0) {
  21.         perror("fcntl(sock, GETFL)");
  22.         exit(1);
  23.         }
  24.     opts |= O_NONBLOCK;
  25.     if(fcntl(sock, F_SETFL, opts) < 0)
  26.         {
  27.             perror("fcntl set nonblock error");
  28.             exit(1);
  29.         }
  30. }

  31. int SetReuseAddr(unsigned int fd)
  32. {
  33.     unsigned int itemp = 1;
  34.     if (0 > setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (void *)&itemp, socklen_t(sizeof(itemp))))
  35.     {
  36.         perror("SetReuseAddr error");
  37.         return -1;
  38.     }
  39.     
  40.     return 1;
  41. }


  42. int main(int argc, char *argv[])
  43. {
  44.     int i, maxi, listenfd, connfd, sockfd, epfd, nfds, portnumber;
  45.     ssize_t n;
  46.     char line[MAXLINE];
  47.     socklen_t clilen;
  48.     //portnumber = SERV_PORT;

  49.     if(2 != argc){
  50.         fprintf(stderr, "Using:%s portnumber\n", argv[0]);
  51.         return 1;
  52.         } else {
  53.             portnumber = atoi(argv[1]);
  54.                 }

  55.     struct epoll_event ev, events[20];

  56.     epfd = epoll_create(256);
  57.     struct sockaddr_in clientaddr, serveraddr;
  58.     listenfd = socket(AF_INET, SOCK_STREAM, 0);

  59.     //setnonblocking(listenfd);
  60.     
  61.     ev.data.fd = listenfd;
  62.     ev.events = EPOLLIN|EPOLLET;
  63.     epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev);

  64.     bzero(&serveraddr, sizeof(serveraddr));
  65.     serveraddr.sin_family = AF_INET;
  66.     char * local_addr = "127.0.0.1";
  67.     inet_aton(local_addr, &(serveraddr.sin_addr));
  68.     serveraddr.sin_port = htons(portnumber);

  69.     
  70.     if ( 0 > bind(listenfd, (sockaddr *)&serveraddr, sizeof(serveraddr)) ){
  71.             perror("bind error");
  72.             exit(-1);
  73.         }
  74.     
  75.     if( 0 > listen(listenfd, LISTENQ) ) {
  76.             perror("listen error");
  77.             exit(-1);
  78.         }
  79.     
  80.     SetReuseAddr(listenfd);
  81.     
  82.     maxi = 0;
  83.     for(;;) {
  84.         nfds = epoll_wait(epfd, events, LISTENQ, 500);
  85.         for(i = 0; i < nfds; ++i) {
  86.     //如果新监测到一个SOCKET用户连接到了绑定的SOCKET端口,
  87.     //建立新的连接。
  88.             if(events[i].data.fd == listenfd)
  89.                 {
  90.                     connfd = accept(listenfd, (sockaddr *)&clientaddr, &clilen);
  91.                     if(connfd < 0) {
  92.                         perror("connfd<0");
  93.                         exit(1);
  94.                         }
  95.                     ///把socket设置为非阻塞方式
  96.                     setnonblocking(listenfd);

  97.                     char *str = inet_ntoa(clientaddr.sin_addr);
  98.                     cout << "accept a connection from" << str << endl;

  99.                     ev.data.fd = connfd;
  100.                     ev.events = EPOLLIN|EPOLLET;

  101.                     epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev);
  102.                 }

  103.         /*如果是已经连接的用户
  104.              并且收到数据,那么进行连续的读
  105.              */
  106.             else if(events[i].events&EPOLLIN) {
  107.                         cout << "EPOLLIN" << endl;
  108.                         if((sockfd = events[i].data.fd) < 0)
  109.                             continue;
  110.                         if((n = read(sockfd, line, MAXLINE)) < 0 ) {
  111.                             if(errno == EINTR) {
  112.                                 close(sockfd);
  113.                                 events[i].data.fd = -1;
  114.                                 continue;
  115.                                 } else {
  116.                                     std::cout << "readline error 111" << std::endl;
  117.                                     break;
  118.                                     }
  119.                             }else if(n == 0) {
  120.                                 close(sockfd);
  121.                                 events[i].data.fd = -1;
  122.                                 std::cout << "readline error 222" << std::endl;
  123.                                 break;
  124.                                 }
  125.                             //line[n] = '/0';
  126.                             cout << "read" << line << endl;

  127. //            可以修改数据等待下个循环写数据
  128. //                            ev.data.fd = sockfd;
  129. //                            ev.events = EPOLLOUT|EPOLLET;
  130.                     }

  131.             // 如果有数据发送
  132.             else if(events[i].events&EPOLLOUT)    {            
  133.                     sockfd = events[i].data.fd;
  134.                     write(sockfd, line, n);
  135.             //修改数据等待下个循环接收数据        
  136.                     ev.data.fd = sockfd;
  137.                     ev.events = EPOLLIN|EPOLLET;
  138.                     epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd, &ev);
  139.                 }
  140.             }
  141.         }
  142.     return 0;
  143. }
阅读(1333) | 评论(0) | 转发(0) |
0

上一篇:epoll学习1

下一篇:pragma pack 预处理指令

给主人留下些什么吧!~~