全部博文(695)
分类: 网络与安全
2013-12-30 22:16:04
这篇文章主要介绍linux下的epoll(7)方法,其有着良好的就绪事件通知机制。我们将会使用C来展现一个完整的TCP服务器实现代码。Epoll 是被linux2.6开始引进的,但是不被其他的类UNIX系统支持,它提供了一种类似select或poll函数的机制:
1.Select(2)只能够同时管理FD_SETSIZE数目的文件描述符
2. poll(2)没有固定的描述符上限这一限制,但是每次必须遍历所有的描述符来检查就绪的描述符,这个过程的时间复杂度为O(N)。
epoll没有select这样对文件描述符上限的限制,也不会像poll那样进行线性的遍历。因此epoll处理大并发连接有着更高的性能。
Epoll相关操作函数介绍:
1. epoll_create(2) or epoll_create1(2)(有着不同的参数值)用来创建epoll实例。
/usr/include/sys/epoll.h extern int epoll_create (int __size) ; RETURN:>0, 成功;-1, 出错
函数描述:
(1) epoll_create返回的是一个文件描述符,也就是说epoll是以特殊文件的方式体现给用户
(2) __size提示操作系统,用户可能要使用多少个文件描述符,该参数已经废弃,填写一个大于0的正整数
2. epoll_ctl(2)用来增加或移除被epoll所监听的文件描述符。
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); RETURN:0,成功;-1,出错
函数描述:
(1) epfd为epoll_create创建的epoll描述符
(2) epoll_ctl函数对epoll进行op类型的操作,op选项为
EPOLL_CTL_ADD,对fd描述符注册event事件
EPOLL_CTL_MOD,对fd描述符的event事件进行修改
EPOLL_CTL_DEL,删除已注册的event事件
3. epoll_wait(2)用来等待发生在监听描述符上的事件。它会一直阻塞直到事件发生。
#includeint epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout); RETURN:>0,发生事件个数;=0,时间到;-1,出错
函数描述:
epoll_wait与select函数类似,同步地等待事件发生
(1) epfd,标识epoll的文件描述符
(2) events,指向传入操作系统的一个epoll_event数组
(3) maxevents,表示传入数组的大小,必须大于0
当有事件发生,Linux会填写events结构,返回给应用程序。由于epoll_wait同步等待,有可能被信号中断,返回EINTR错误
更多的函数介绍请参照man。
Epoll的两种模式:
1. 水平触发(LT):使用此种模式,当数据可读的时候,epoll_wait()将会一直返回就绪事件。如果你没有处理完全部数据,并且再次在该 epoll实例上调用epoll_wait()才监听描述符的时候,它将会再次返回就绪事件,因为有数据可读。ET只支持非阻塞socket。
2. 边缘触发(ET):使用此种模式,只能获取一次就绪通知,如果没有处理完全部数据,并且再次调用epoll_wait()的时候,它将会阻塞,因为就绪事件已经释放出来了。
ET的效能更高,但是对程序员的要求也更高。在ET模式下,我们必须一次干净而彻底地处理完所有事件。LT两种模式的socket都支持。
传递给epoll_ctl(2)的Epoll事件结构体如下所示:
typedefunionepoll_data { void*ptr; intfd; __uint32_t u32; __uint64_t u64; }epoll_data_t; structepoll_event { __uint32_t events;/* Epoll events */ epoll_data_t data;/* User data variable */ };
对于每一个监听的描述符,能够关联一个整形数据或指向用户数据的指针。
epoll的事件类型:
enum EPOLL_EVENTS { EPOLLIN = 0x001, #define EPOLLIN EPOLLIN EPOLLPRI = 0x002, #define EPOLLPRI EPOLLPRI EPOLLOUT = 0x004, #define EPOLLOUT EPOLLOUT EPOLLRDNORM = 0x040, #define EPOLLRDNORM EPOLLRDNORM EPOLLRDBAND = 0x080, #define EPOLLRDBAND EPOLLRDBAND EPOLLWRNORM = 0x100, #define EPOLLWRNORM EPOLLWRNORM EPOLLWRBAND = 0x200, #define EPOLLWRBAND EPOLLWRBAND EPOLLMSG = 0x400, #define EPOLLMSG EPOLLMSG EPOLLERR = 0x008, #define EPOLLERR EPOLLERR EPOLLHUP = 0x010, #define EPOLLHUP EPOLLHUP EPOLLRDHUP = 0x2000, #define EPOLLRDHUP EPOLLRDHUP EPOLLONESHOT = (1 << 30), #define EPOLLONESHOT EPOLLONESHOT EPOLLET = (1 << 31) #define EPOLLET EPOLLET };
– EPOLLIN,读事件
– EPOLLOUT,写事件
– EPOLLPRI,带外数据,与select的异常事件集合对应
– EPOLLRDHUP,TCP连接对端至少写写半关闭
– EPOLLERR,错误事件
– EPOLLET,设置事件为边沿触发
– EPOLLONESHOT,只触发一次,事件自动被删除
epoll在一个文件描述符上只能有一个事件,在一个描述符上添加多个事件,会产生EEXIST的错误。同样,删除epoll的事件,只需描述符就够了
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
这里有一个比较重要的问题:从epoll_wait返回的events中,该如何知道是哪个描述符上的事件:在注册epoll事件的时候,一定要填写epoll_data,否则我们将分不清触发的是哪个描述符上的事件。
下面我们将实现一个轻型TCP服务器,功能是在标准输出中打印发送给套接字的一切数据。
/* * ===================================================================================== * * Filename: EpollServer.c * * Description: this is a epoll server example * * Version: 1.0 * Created: 2012年03月15日 20时24分26秒 * Revision: none * Compiler: gcc * * Author: LGP (), lgp8819@gmail.com * Company: * * ===================================================================================== */ #include#include #include #include #include #include #include #include #include #include /*struct addrinfo { int ai_flags; int ai_family; int ai_socktype; int ai_protocol; size_t ai_addrlen; struct sockaddr *ai_addr; char *ai_canonname; struct addrinfo *ai_next; }; */ static int create_and_bind(char* port) { struct addrinfo hints; struct addrinfo*result,*rp; int s,sfd; memset(&hints,0,sizeof(struct addrinfo)); hints.ai_family= AF_UNSPEC;/* Return IPv4 and IPv6 */ hints.ai_socktype= SOCK_STREAM;/* TCP socket */ hints.ai_flags= AI_PASSIVE;/* All interfaces */ s = getaddrinfo(NULL, port,&hints,&result); //more info about getaddrinfo() please see:man getaddrinfo! if(s != 0) { fprintf(stderr,"getaddrinfo: %s\n",gai_strerror(s)); return -1; } for(rp= result;rp!= NULL;rp=rp->ai_next) { sfd = socket(rp->ai_family,rp->ai_socktype,rp->ai_protocol); if(sfd==-1) continue; s =bind(sfd,rp->ai_addr,rp->ai_addrlen); if(s ==0) { /* We managed to bind successfully! */ break; } close(sfd); } if(rp== NULL) { fprintf(stderr,"Could not bind\n"); return-1; } freeaddrinfo(result); return sfd; } static int make_socket_non_blocking(int sfd) { int flags, s; flags = fcntl(sfd, F_GETFL,0); if(flags == -1) { perror("fcntl"); return-1; } flags|= O_NONBLOCK; s =fcntl(sfd, F_SETFL, flags); if(s ==-1) { perror("fcntl"); return-1; } return 0; } #define MAXEVENTS 64 int main(int argc,char*argv[]) { int sfd, s; int efd; struct epoll_event event; struct epoll_event* events; if(argc!=2) { fprintf(stderr,"Usage: %s [port]\n",argv[0]); exit(EXIT_FAILURE); } sfd = create_and_bind(argv[1]); if( sfd == -1 ) abort(); s = make_socket_non_blocking(sfd); if(s ==-1) abort(); s = listen(sfd, SOMAXCONN); if(s ==-1) { perror("listen"); abort(); } efd = epoll_create1(0); if(efd==-1) { perror("epoll_create"); abort(); } event.data.fd=sfd; event.events= EPOLLIN | EPOLLET; s =epoll_ctl(efd, EPOLL_CTL_ADD,sfd,&event); if(s ==-1) { perror("epoll_ctl"); abort(); } /* Buffer where events are returned */ events=calloc(MAXEVENTS,sizeof event); /* The event loop */ while(1) { int n,i; n =epoll_wait(efd, events, MAXEVENTS,-1); for(i=0;i< n;i++) { if((events[i].events & EPOLLERR)|| (events[i].events & EPOLLHUP)|| (!(events[i].events & EPOLLIN))) { /* An error has occured on this fd, or the socket is not ready for reading (why were we notified then?) */ fprintf(stderr,"epoll error\n"); close(events[i].data.fd); continue; } else if(sfd == events[i].data.fd) { /* We have a notification on the listening socket, which means one or more incoming connections. */ while(1) { struct sockaddr in_addr; socklen_t in_len; int infd; char hbuf[NI_MAXHOST],sbuf[NI_MAXSERV]; in_len = sizeof in_addr; infd = accept(sfd,&in_addr,&in_len); if(infd==-1) { if((errno== EAGAIN)|| (errno== EWOULDBLOCK)) { /* We have processed all incoming connections. */ break; } else { perror("accept"); break; } } s =getnameinfo(&in_addr,in_len, hbuf,sizeof hbuf, sbuf,sizeof sbuf, NI_NUMERICHOST | NI_NUMERICSERV); if(s ==0) { printf("Accepted connection on descriptor %d " "(host=%s, port=%s)\n",infd,hbuf,sbuf); } /* Make the incoming socket non-blocking and add it to the list of fds to monitor. */ s = make_socket_non_blocking(infd); if(s ==-1) abort(); event.data.fd=infd; event.events= EPOLLIN | EPOLLET; s = epoll_ctl(efd, EPOLL_CTL_ADD,infd,&event); if(s ==-1) { perror("epoll_ctl"); abort(); } } continue; } else { /* We have data on the fd waiting to be read. Read and display it. We must read whatever data is available completely, as we are running in edge-triggered mode and won't get a notification again for the same data. */ int done =0; while(1) { ssize_t count; char buf[512]; count = read(events[i].data.fd,buf,sizeof buf); if(count == -1) { /* If errno == EAGAIN, that means we have read all data. So go back to the main loop. */ if(errno!= EAGAIN) { perror("read"); done=1; } break; } else if(count ==0) { /* End of file. The remote has closed the connection. */ done=1; break; } /* Write the buffer to standard output */ s = write(1,buf, count); if(s ==-1) { perror("write"); abort(); } } if(done) { printf("Closed connection on descriptor %d\n",events[i].data.fd); /* Closing the descriptor will make epoll remove it from the set of descriptors which are monitored. */ close(events[i].data.fd); } } } } free(events); close(sfd); return EXIT_SUCCESS; }
以下是使用c++对epoll简单的封装类:
/** * @file file.h * @comment * wrap of file descriptor * * @author niexw */ #ifndef _XCOM_FILE_H_ #define _XCOM_FILE_H_ #include#include #include #include #include "exception.h" #include "buffer.h" namespace unp { /** * @class File * @comment * wrap of file descriptor */ class File { protected: int fd_; public: // // construtor and destructor // File() : fd_(-1) {} explicit File(FILE *stream) : fd_(fileno(stream)) {} ~File() { close(); } int getFd() { return fd_; } int getFd() const { return fd_; } size_t read(char *buf, size_t count) const { int ret; RETRY: if ((ret = ::read(fd_, buf, count)) == -1) { if (errno == EAGAIN) goto RETRY; else throw EXCEPTION(); } return ret; } size_t write(char *buf, size_t count) const { int ret; RETRY: if ((ret = ::write(fd_, buf, count)) == -1) { if (errno == EAGAIN) goto RETRY; else throw EXCEPTION(); } return ret; } void close() { if (fd_ != -1) { ::close(fd_); fd_ = -1; } } void setNonblock() { int flags = fcntl(fd_, F_GETFL); if (flags == -1) throw EXCEPTION(); flags |= O_NONBLOCK; flags = fcntl(fd_, F_SETFL, flags); if (flags == -1) throw EXCEPTION(); } void clrNonblock() { int flags = fcntl(fd_, F_GETFL); if (flags == -1) throw EXCEPTION(); flags &= ~O_NONBLOCK; flags = fcntl(fd_, F_SETFL, flags); if (flags == -1) throw EXCEPTION(); } size_t readv(CircleBuffer &buf) { int ret; RETRY: if ((ret = ::readv(fd_, buf.idle_, buf.idlenum_)) == -1) { if (errno == EAGAIN) goto RETRY; else throw EXCEPTION(); } buf.afterRead(ret); return ret; } size_t writev(CircleBuffer &buf) { int ret; RETRY: if ((ret = ::writev(fd_, buf.data_, buf.datanum_)) == -1) { if (errno == EAGAIN) goto RETRY; else throw EXCEPTION(); } buf.afterWrite(ret); return ret; } void setFlag(int option) { int flags; RETRY: flags = fcntl(fd_, F_GETFL); if (flags == -1) { if (errno == EINTR) goto RETRY; else throw EXCEPTION(); } flags |= option; RETRY1: int ret = fcntl(fd_, F_SETFL, flags); if (ret == -1) { if (errno == EINTR) goto RETRY1; else throw EXCEPTION(); } } void clrFlag(int option) { int flags; RETRY: flags = fcntl(fd_, F_GETFL); if (flags == -1) { if (errno == EINTR) goto RETRY; else throw EXCEPTION(); } flags &= ~option; RETRY1: int ret = fcntl(fd_, F_SETFL, flags); if (ret == -1) { if (errno == EINTR) goto RETRY1; else throw EXCEPTION(); } } }; /** * @class File2 * @comment * wrap of file descriptor */ class File2 { protected: int descriptor_; public: File2() : descriptor_(-1) { } explicit File2(FILE *stream) : descriptor_(fileno(stream)) { } explicit File2(File2 &f) : descriptor_(f.descriptor_) { f.descriptor_ = -1; } ~File2() { close(); } int descriptor() { return descriptor_; } size_t read(char *buf, size_t count) { int ret; RETRY: if ((ret = ::read(descriptor_, buf, count)) == -1) { if (errno == EAGAIN) goto RETRY; else throw EXCEPTION(); } return ret; } size_t write(char *buf, size_t count) const { int ret; RETRY: if ((ret = ::write(descriptor_, buf, count)) == -1) { if (errno == EAGAIN) goto RETRY; else throw EXCEPTION(); } return ret; } void close() { if (descriptor_ != -1) { ::close(descriptor_); descriptor_ = -1; } } size_t readv(const struct iovec *iov, int cnt) { int ret; RETRY: if ((ret = ::readv(descriptor_, iov, cnt)) == -1) { if (errno == EAGAIN) goto RETRY; else throw EXCEPTION(); } return ret; } size_t writev(const struct iovec *iov, int cnt) { int ret; RETRY: if ((ret = ::writev(descriptor_, iov, cnt)) == -1) { if (errno == EAGAIN) goto RETRY; else throw EXCEPTION(); } return ret; } void setControlOption(int option) { int flags; RETRY: flags = fcntl(descriptor_, F_GETFL); if (flags == -1) { if (errno == EINTR) goto RETRY; else throw EXCEPTION(); } flags |= option; RETRY1: int ret = fcntl(descriptor_, F_SETFL, flags); if (ret == -1) { if (errno == EINTR) goto RETRY1; else throw EXCEPTION(); } } void clearControlOption(int option) { int flags; RETRY: flags = fcntl(descriptor_, F_GETFL); if (flags == -1) { if (errno == EINTR) goto RETRY; else throw EXCEPTION(); } flags &= ~option; RETRY1: int ret = fcntl(descriptor_, F_SETFL, flags); if (ret == -1) { if (errno == EINTR) goto RETRY1; else throw EXCEPTION(); } } void setNonblock() { int flags = fcntl(descriptor_, F_GETFL); if (flags == -1) throw EXCEPTION(); flags |= O_NONBLOCK; flags = fcntl(descriptor_, F_SETFL, flags); if (flags == -1) throw EXCEPTION(); } void clrNonblock() { int flags = fcntl(descriptor_, F_GETFL); if (flags == -1) throw EXCEPTION(); flags &= ~O_NONBLOCK; flags = fcntl(descriptor_, F_SETFL, flags); if (flags == -1) throw EXCEPTION(); } }; }; // namespace unp #endif // _XCOM_FILE_H_
/** * @file epoll.h * @comment * wrap of epoll * * @author niexw */ #ifndef _UNP_EPOLL_H_ #define _UNP_EPOLL_H_ #include#include #include