storage R&D guy.
全部博文(1000)
分类: LINUX
2015-08-17 18:19:09
event-based方法和epoll
epoll是event-based的方法实现异步io/non-blocking io。从Linux kernel 2.5.44之后epoll加入Linux kernel中,代替loop style方法的select和poll,比后者更加高效更适用于高并发多client的应用。loop style方法的时间复杂度为O(n)(因为需要线性地检测指定的file descriptor),而epoll等event-based方法的时间复杂度为O(1)。event-based通过为不同的events设置callback函数,在该event发生的时候自动执行相应函数(epoll uses callbacks in the kernel file structure)。
和epoll类似的其他event-based的方法还有:kqueue(FreeBSD/NetBSD/OpenBSD/Darwin), /dev/poll(Solaris/HPUX), pollset(AIX), Event Completion(Solaris 10), I/O Completion Ports(Windows)等。因此如果程序运行目标平台是Linux(Kernel > 2.5.44)可以使用即可,如果其他Unix平台可以考虑相应的方法(如在FreeBSD等UNIX上使用kqueue)。如果希望使用跨平台可移植的event-based可以使用libevent库,它支持多种方法(select/poll, epoll, kqueue, /dev/poll等)。
epoll提供的系统调用
epoll包含了3个系统调用:epoll_create, epoll_ctl和epoll_wait(需#include
epoll_create
int epoll_create(int size); //创建epoll
其中size告知kernel需要为之后添加的IO file descriptor准备的event backing store的大小。Open an epoll file descriptor by requesting the kernel allocate an event backing store dimensioned for size descriptors. The size is not the maximum size of the backing store but just a hint to the kernel about how to dimension internal structures.
从Linux 2.6.8开始size值不再被使用,但是其赋值需要> 0。参考:
使用epoll_create创建的epoll file descriptor在程序结束的时候需要使用close()将其关闭,例如:
epoll_ctl
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); // add/update/del IO file descriptor to be watched on the epoll instance
//epfd: 即是如上epoll_create创建的epoll file descriptor
//op: 指定要对指定的fd进行何种操作,支持的操作包括:
EPOLL_CTL_ADD 将指定的file descriptor添加到epoll中
EPOLL_CTL_MOD 修改指定file descriptor的event,相当于update
EPOLL_CTL_DEL 将指定的file descriptor从epoll中清除,the event is ignored and can be NULL
//fd: 要操作的IO file descriptor
//event: 表示the event linked to this file descriptor
其中struct epoll_event的定义如下:
struct epoll_event {
__uint32_t events; //epoll events
epoll_data_t data; //user data variable
};
typedef union epoll_data {
void *ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;
其中的struct epoll_event的events域是一个bit set(可以通过|操作符进行多赋值),支持的event type有:
EPOLLIN //ready for read
EPOLLOUT //ready for write
EPOLLPRI //urgent data available for read
EPOLLERR //error condition happened, epoll_wait会默认检测该event不需要设置
EPOLLHUP //hang up happended on the fd, epoll_wait会默认检测该event不需要设置
EPOLLET //设置使用Edge Triggered模式,epoll模式使用Level Triggered
更多参看:
epoll_ctl的返回值:成功返回0, 发生错误返回-1
epoll_wait
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
//epfd 为如上定义的epoll file descriptor
//events 返回发生改变的events
//maxevents 最多返回events的个数,必须 > 0
//timeout 等待的milliseconds;和poll类似,如果timeout设置为-1则epoll_wait将持续等待下去;如果timeout设置为0,则epoll_wait将立即返回
该函数类似select和poll函数,执行的时候会等待直到epfd定义的指定IO fd的events发生变化或timeout参数指定的milliseconds时间到期才返回。wait and block until events on the watched set happens or timeout expires。
返回值:成功返回number of fd ready for requested io; 0表示在timeout以后没有ready的fd; -1表示发生错误。
epoll检测events改变的两种模式:edge-triggered和level-triggered
调用epoll_wait会返回events发生变化的IO fd,epoll支持两种模式:
§ level triggered:
只要发生的events没有结束,每次调用epoll_wait都显示该events存在。例如:当一个IO fd的状态变为available for reading的时候,调用epoll_wait会将该event返回;如果下次调用epoll_wait的时候该read过程还没有完成,则epoll_wait仍旧会返回该event。
§ edge triggered(EPOLLET )
和level triggered不同,它只在event产生的时候发出event信息,之后即使event没有结束不再发送此信息。例如:当一个IO fd状态变为available for reading的时候,调用epoll_wait会将该event返回;如果下次调用epoll_wait的时候该read过程还没有完成,epoll_wait不会立即返回而是需要等待新的events或直到timetout的时间。Edge Triggered event distribution delivers events only when events happens on the monitored file.
epoll默认采用Level Triggered模式,如果需要对某个IO fd采用Edge Triggered模式,在调用epoll_ctl的时候指定其struct epoll_event的events的时候添加EPOLLET。
epoll实现non-blocking socket实例:
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#define DEFAULT_PORT 1984 //默认端口
#define BUFF_SIZE 1024 //buffer大小
#define EPOLL_MAXEVENTS 64 //epoll_wait的最多返回的events个数
#define EPOLL_TIMEOUT 5000 //epoll_wait的timeout milliseconds
//函数:设置sock为non-blocking mode
void setSockNonBlock(int sock) {
int flags;
flags = fcntl(sock, F_GETFL, 0);
if (flags < 0) {
perror("fcntl(F_GETFL) failed");
exit(EXIT_FAILURE);
}
if (fcntl(sock, F_SETFL, flags | O_NONBLOCK) < 0) {
perror("fcntl(F_SETFL) failed");
exit(EXIT_FAILURE);
}
}
int main(int argc, char *argv[]) {
//获取自定义端口
unsigned short int port;
if (argc == 2) {
port = atoi(argv[1]);
} else if (argc < 2) {
port = DEFAULT_PORT;
} else {
fprintf(stderr, "USAGE: %s [port]\n", argv[0]);
exit(EXIT_FAILURE);
}
//创建socket
int sock;
if ( (sock = socket(PF_INET, SOCK_STREAM, 0)) == -1 ) {
perror("socket failed");
exit(EXIT_FAILURE);
}
printf("socket done\n");
//in case of 'address already in use' error message
int yes = 1;
if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int))) {
perror("setsockopt failed");
exit(EXIT_FAILURE);
}
//设置sock为non-blocking
setSockNonBlock(sock);
//创建要bind的socket address
struct sockaddr_in bind_addr;
memset(&bind_addr, 0, sizeof(bind_addr));
bind_addr.sin_family = AF_INET;
bind_addr.sin_addr.s_addr = htonl(INADDR_ANY); //设置接受任意地址
bind_addr.sin_port = htons(port); //将host byte order转换为network byte order
//bind sock到创建的socket address上
if ( bind(sock, (struct sockaddr *) &bind_addr, sizeof(bind_addr)) == -1 ) {
perror("bind failed");
exit(EXIT_FAILURE);
}
printf("bind done\n");
//listen
if ( listen(sock, 5) == -1) {
perror("listen failed");
exit(EXIT_FAILURE);
}
printf("listen done\n");
//创建epoll (epoll file descriptor)
int epfd;
if ( (epfd = epoll_create(1)) == -1 ) {
perror("epoll_create failed");
exit(EXIT_FAILURE);
}
//将sock添加到epoll中
struct epoll_event event;
event.events = EPOLLIN;
event.data.fd = sock;
if ( epoll_ctl(epfd, EPOLL_CTL_ADD, sock, &event) == -1 ) {
perror("epoll_ctl");
exit(EXIT_FAILURE);
}
//初始化epoll_wait的参数
struct epoll_event events[EPOLL_MAXEVENTS];
memset(events, 0, sizeof(events));
//循环侦听
int conn_sock;
struct sockaddr_in client_addr;
socklen_t client_addr_len;
char client_ip_str[INET_ADDRSTRLEN];
int res;
int i;
char buffer[BUFF_SIZE];
int recv_size;
while (1) {
//每次循环调用依次epoll_wait侦听
res = epoll_wait(epfd, events, EPOLL_MAXEVENTS, EPOLL_TIMEOUT);
if (res < 0) {
perror("epoll_wait failed");
exit(EXIT_FAILURE);
} else if (res == 0) {
fprintf(stderr, "no socket ready for read within %d secs\n", EPOLL_TIMEOUT / 1000);
continue;
}
//检测到res个IO file descriptor的events,loop各个fd进行响应
for (i = 0; i < res; i++) {
//events[i]即为检测到的event,域events[i].events表示具体哪些events,域events[i].data.fd即对应的IO fd
if ( (events[i].events & EPOLLERR) ||
(events[i].events & EPOLLHUP) ||
(!(events[i].events & EPOLLIN)) ) {
//由于events[i].events使用每个bit表示event,因此判断是否包含某个具体事件可以使用`&`操作符
//这里判断是否存在EPOLLERR, EPOLLHUP等event
fprintf (stderr, "epoll error\n");
close (events[i].data.fd);
continue;
}
//对检测到event的各IO fd进行响应
if (events[i].data.fd == sock) {
//当前fd是server的socket,不进行读而是accept所有client连接请求
while (1) {
client_addr_len = sizeof(client_addr);
conn_sock = accept(sock, (struct sockaddr *) &client_addr, &client_addr_len);
if (conn_sock == -1) {
if ( (errno == EAGAIN) || (errno == EWOULDBLOCK) ) {
//non-blocking模式下无新connection请求,跳出while (1)
break;
} else {
perror("accept failed");
exit(EXIT_FAILURE);
}
}
if (!inet_ntop(AF_INET, &(client_addr.sin_addr), client_ip_str, sizeof(client_ip_str))) {
perror("inet_ntop failed");
exit(EXIT_FAILURE);
}
printf("accept a client from: %s\n", client_ip_str);
//设置conn_sock为non-blocking
setSockNonBlock(conn_sock);
//把conn_sock添加到epoll的侦听中
event.events = EPOLLIN;
event.data.fd = conn_sock;
if ( epoll_ctl(epfd, EPOLL_CTL_ADD, conn_sock, &event) == -1 ) {
perror("epoll_ctl(EPOLL_CTL_ADD) failed");
exit(EXIT_FAILURE);
}
}
} else {
//当前fd是client连接的socket,可以读(read from client)
conn_sock = events[i].data.fd;
memset(buffer, 0, sizeof(buffer));
if ( (recv_size = recv(conn_sock, buffer, sizeof(buffer), 0)) == -1 && (errno != EAGAIN) ) {
//recv在non-blocking模式下,返回-1且errno为EAGAIN表示当前无可读数据,并不表示错误
perror("recv failed");
exit(EXIT_FAILURE);
}
printf("recved from conn_sock=%d : %s(%d length string)\n", conn_sock, buffer, recv_size);
//立即将收到的内容写回去
if ( send(conn_sock, buffer, recv_size, 0) == -1 && (errno != EAGAIN) && (errno != EWOULDBLOCK) ) {
//send在non-blocking模式下,返回-1且errno为EAGAIN或EWOULDBLOCK表示当前无可写数据,并不表示错误
perror("send failed");
exit(EXIT_FAILURE);
}
printf("send to conn_sock=%d done\n", conn_sock);
//将当前socket从epoll的侦听中移除(有文章说:关闭con_sock之后,其会自动从epoll中删除,因此此段代码可以省略)
if ( epoll_ctl(epfd, EPOLL_CTL_DEL, conn_sock, NULL) == -1 ) {
perror("epoll_ctl(EPOLL_CTL_DEL) failed");
exit(EXIT_FAILURE);
}
//关闭连接
if ( close(conn_sock) == -1 ) {
perror("close failed");
exit(EXIT_FAILURE);
}
printf("close conn_sock=%d done\n", conn_sock);
}
}
}
close(sock); //关闭server的listening socket
close(epfd); //关闭epoll file descriptor
return 0;
}
测试:编译并运行程序;然后尝试运行多个telnet localhost 1984和server进行通信。
注意:epoll在使用epoll_ctl为file descriptor指定events的时候,默认采用Level Triggered,即如果events未完成调用epoll_wait的话每次都会返回该事件;通过如下方式:
struct epoll_event event;
event.data.fd = sock;
event.events = EPOLLIN | EPOLLET;
if ( epoll_ctl(epfd, EPOLL_CTL_ADD, sock, &event) == -1 ) {
perror("epoll_ctl(EPOLL_CTL_ADD) failed");
exit(EXIT_FAILURE);
}
可以指定该fd的event采用Edge Triggered模型,如果采用该模型,epoll_wait检测到每次事件变化只通知一次,因此在epoll_wait之后的处理的时候需要注意(例如有可读的event的时候,注意数据读取完整)。。
小结:
epoll这种event-based的方法比较1。select/poll等loop style方法;2。多进程(forking)/多线程(threading)方法(每个进程或线程对应一个connection socket),在多client高并发下性能更优越。因此推荐在实际中应用。例如:Nginx, Lighttpd, Memcached等都采用有该event-based的异步IO模型。
另外,event-handling库也支持epoll方法(还支持kqueue(FreeBSD/NetBSD/OpenBSD/Darwin), /dev/poll(Solaris/HPUX), select, poll等方法)在实际中也可使用该库编写高性能的Server,方便实现跨平台可移植