Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1944049
  • 博文数量: 1000
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 7921
  • 用 户 组: 普通用户
  • 注册时间: 2013-08-20 09:23
个人简介

storage R&D guy.

文章分类

全部博文(1000)

文章存档

2019年(5)

2017年(47)

2016年(38)

2015年(539)

2014年(193)

2013年(178)

分类: LINUX

2015-08-17 18:19:09

event-based方法和epoll

epollevent-based的方法实现异步io/non-blocking io。从Linux kernel 2.5.44之后epoll加入Linux kernel中,代替loop style方法的selectpoll,比后者更加高效更适用于高并发多client的应用。loop style方法的时间复杂度为O(n)(因为需要线性地检测指定的file descriptor),而epollevent-based方法的时间复杂度为O(1)event-based通过为不同的events设置callback函数,在该event发生的时候自动执行相应函数(epoll uses callbacks in the kernel file structure)

epoll类似的其他event-based的方法还有:kqueue(FreeBSD/NetBSD/OpenBSD/Darwin), /dev/poll(Solaris/HPUX), pollset(AIX), Event Completion(Solaris 10), I/O Completion Ports(Windows)等。因此如果程序运行目标平台是Linux(Kernel > 2.5.44)可以使用即可,如果其他Unix平台可以考虑相应的方法(如在FreeBSDUNIX上使用kqueue)。如果希望使用跨平台可移植的event-based可以使用libevent库,它支持多种方法(select/poll, epoll, kqueue, /dev/poll等)。

epoll提供的系统调用

epoll包含了3个系统调用:epoll_create, epoll_ctlepoll_wait(#include )。具体步骤是:首先使用epoll_create创建一个epoll file descriptor;然后使用epoll_ctl添加要监听的IO file descriptorepoll中;最后循环地调用epoll_wait检测各IO fd相关events的变化,然后采取相应的措施。

epoll_create

    int epoll_create(int size); //创建epoll

其中size告知kernel需要为之后添加的IO file descriptor准备的event backing store的大小。Open an epoll file descriptor by requesting the kernel allocate an event backing store dimensioned for size descriptors. The size is not the maximum size of the backing store but just a hint to the kernel about how to dimension internal structures.

Linux 2.6.8开始size值不再被使用,但是其赋值需要> 0。参考:

使用epoll_create创建的epoll file descriptor在程序结束的时候需要使用close()将其关闭,例如:

 

  1. int epfd;
  2. if ( (epfd = epoll_create(1)) == -1 ) {
  3. perror("epoll_create failed");
  4. exit(EXIT_FAILURE);
  5. }
  6. ...
  7. close(epfd);

 

   epoll_ctl

    int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); // add/update/del IO file descriptor to be watched on the epoll instance

 

    //epfd: 即是如上epoll_create创建的epoll file descriptor

    //op:   指定要对指定的fd进行何种操作,支持的操作包括:

                EPOLL_CTL_ADD   将指定的file descriptor添加到epoll

                EPOLL_CTL_MOD   修改指定file descriptorevent,相当于update

                EPOLL_CTL_DEL   将指定的file descriptorepoll中清除,the event is ignored and can be NULL

    //fd:   要操作的IO file descriptor

    //event: 表示the event linked to this file descriptor

其中struct epoll_event的定义如下:

    struct epoll_event {

        __uint32_t      events; //epoll events

        epoll_data_t    data;   //user data variable

    };

    typedef union epoll_data {

        void        *ptr;

        int         fd;

        __uint32_t  u32;

        __uint64_t  u64;

    } epoll_data_t;

其中的struct epoll_eventevents域是一个bit set(可以通过|操作符进行多赋值),支持的event type有:

    EPOLLIN     //ready for read

    EPOLLOUT    //ready for write

    EPOLLPRI    //urgent data available for read

 

    EPOLLERR    //error condition happened, epoll_wait会默认检测该event不需要设置

    EPOLLHUP    //hang up happended on the fd, epoll_wait会默认检测该event不需要设置

 

    EPOLLET     //设置使用Edge Triggered模式,epoll模式使用Level Triggered

更多参看:

epoll_ctl的返回值:成功返回0, 发生错误返回-1

epoll_wait

    int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

 

    //epfd      为如上定义的epoll file descriptor

    //events    返回发生改变的events

    //maxevents 最多返回events的个数,必须 > 0

    //timeout   等待的milliseconds;和poll类似,如果timeout设置为-1epoll_wait将持续等待下去;如果timeout设置为0,则epoll_wait将立即返回

该函数类似selectpoll函数,执行的时候会等待直到epfd定义的指定IO fdevents发生变化或timeout参数指定的milliseconds时间到期才返回。wait and block until events on the watched set happens or timeout expires

返回值:成功返回number of fd ready for requested io; 0表示在timeout以后没有readyfd; -1表示发生错误。

epoll检测events改变的两种模式:edge-triggeredlevel-triggered

调用epoll_wait会返回events发生变化的IO fdepoll支持两种模式:

§  level triggered

只要发生的events没有结束,每次调用epoll_wait都显示该events存在。例如:当一个IO fd的状态变为available for reading的时候,调用epoll_wait会将该event返回;如果下次调用epoll_wait的时候该read过程还没有完成,则epoll_wait仍旧会返回该event

§  edge triggeredEPOLLET

level triggered不同,它只在event产生的时候发出event信息,之后即使event没有结束不再发送此信息。例如:当一个IO fd状态变为available for reading的时候,调用epoll_wait会将该event返回;如果下次调用epoll_wait的时候该read过程还没有完成,epoll_wait不会立即返回而是需要等待新的events或直到timetout的时间。Edge Triggered event distribution delivers events only when events happens on the monitored file.

epoll默认采用Level Triggered模式,如果需要对某个IO fd采用Edge Triggered模式,在调用epoll_ctl的时候指定其struct epoll_eventevents的时候添加EPOLLET

epoll实现non-blocking socket实例:

    #include

    #include

    #include

    #include

    #include

    #include

    #include

    #include

    #include

    #include

 

    #define DEFAULT_PORT    1984    //默认端口

    #define BUFF_SIZE       1024    //buffer大小

 

    #define EPOLL_MAXEVENTS 64      //epoll_wait的最多返回的events个数

    #define EPOLL_TIMEOUT   5000    //epoll_waittimeout milliseconds

 

    //函数:设置socknon-blocking mode

    void setSockNonBlock(int sock) {

        int flags;

        flags = fcntl(sock, F_GETFL, 0);

        if (flags < 0) {

            perror("fcntl(F_GETFL) failed");

            exit(EXIT_FAILURE);

        }

        if (fcntl(sock, F_SETFL, flags | O_NONBLOCK) < 0) {

            perror("fcntl(F_SETFL) failed");

            exit(EXIT_FAILURE);

        }

    }

 

    int main(int argc, char *argv[]) {

 

        //获取自定义端口

        unsigned short int port;

        if (argc == 2) {

            port = atoi(argv[1]);

        } else if (argc < 2) {

            port = DEFAULT_PORT;

        } else {

            fprintf(stderr, "USAGE: %s [port]\n", argv[0]);

            exit(EXIT_FAILURE);

        }

 

        //创建socket

        int sock;

        if ( (sock = socket(PF_INET, SOCK_STREAM, 0)) == -1 ) {

            perror("socket failed");

            exit(EXIT_FAILURE);

        }

        printf("socket done\n");

 

        //in case of 'address already in use' error message

        int yes = 1;

        if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int))) {

            perror("setsockopt failed");

            exit(EXIT_FAILURE);

        }

 

        //设置socknon-blocking

        setSockNonBlock(sock);

 

        //创建要bindsocket address

        struct sockaddr_in bind_addr;

        memset(&bind_addr, 0, sizeof(bind_addr));

        bind_addr.sin_family = AF_INET;

        bind_addr.sin_addr.s_addr = htonl(INADDR_ANY);  //设置接受任意地址

        bind_addr.sin_port = htons(port);               //host byte order转换为network byte order

 

        //bind sock到创建的socket address

        if ( bind(sock, (struct sockaddr *) &bind_addr, sizeof(bind_addr)) == -1 ) {

            perror("bind failed");

            exit(EXIT_FAILURE);

        }

        printf("bind done\n");

 

        //listen

        if ( listen(sock, 5) == -1) {

            perror("listen failed");

            exit(EXIT_FAILURE);

        }

        printf("listen done\n");

 

        //创建epoll (epoll file descriptor)

        int epfd;

        if ( (epfd = epoll_create(1)) == -1 ) {

            perror("epoll_create failed");

            exit(EXIT_FAILURE);

        }

        //sock添加到epoll

        struct epoll_event event;

        event.events = EPOLLIN;

        event.data.fd = sock;

        if ( epoll_ctl(epfd, EPOLL_CTL_ADD, sock, &event) == -1 ) {

            perror("epoll_ctl");

            exit(EXIT_FAILURE);

        }

 

        //初始化epoll_wait的参数

        struct epoll_event events[EPOLL_MAXEVENTS];

        memset(events, 0, sizeof(events));

 

        //循环侦听

        int conn_sock;

        struct sockaddr_in client_addr;

        socklen_t client_addr_len;

        char client_ip_str[INET_ADDRSTRLEN];

        int res;

        int i;

        char buffer[BUFF_SIZE];

        int recv_size;

 

        while (1) {

 

            //每次循环调用依次epoll_wait侦听

            res = epoll_wait(epfd, events, EPOLL_MAXEVENTS, EPOLL_TIMEOUT);

            if (res < 0) {

                perror("epoll_wait failed");

                exit(EXIT_FAILURE);

            } else if (res == 0) {

                fprintf(stderr, "no socket ready for read within %d secs\n", EPOLL_TIMEOUT / 1000);

                continue;

            }

 

            //检测到resIO file descriptoreventsloop各个fd进行响应

            for (i = 0; i < res; i++) {

                //events[i]即为检测到的event,域events[i].events表示具体哪些events,域events[i].data.fd即对应的IO fd

 

                if ( (events[i].events & EPOLLERR) ||

                     (events[i].events & EPOLLHUP) ||

                     (!(events[i].events & EPOLLIN)) ) {

                    //由于events[i].events使用每个bit表示event,因此判断是否包含某个具体事件可以使用`&`操作符

                    //这里判断是否存在EPOLLERR, EPOLLHUPevent

                    fprintf (stderr, "epoll error\n");

                    close (events[i].data.fd);

                    continue;

                }

 

                //对检测到event的各IO fd进行响应

                if (events[i].data.fd == sock) {

 

                    //当前fdserversocket,不进行读而是accept所有client连接请求

                    while (1) {

                        client_addr_len = sizeof(client_addr);

                        conn_sock = accept(sock, (struct sockaddr *) &client_addr, &client_addr_len);

                        if (conn_sock == -1) {

                            if ( (errno == EAGAIN) || (errno == EWOULDBLOCK) ) {

                                //non-blocking模式下无新connection请求,跳出while (1)

                                break;

                            } else {

                                perror("accept failed");

                                exit(EXIT_FAILURE);

                            }

                        }

                        if (!inet_ntop(AF_INET, &(client_addr.sin_addr), client_ip_str, sizeof(client_ip_str))) {

                            perror("inet_ntop failed");

                            exit(EXIT_FAILURE);

                        }

                        printf("accept a client from: %s\n", client_ip_str);

                        //设置conn_socknon-blocking

                        setSockNonBlock(conn_sock);

                        //conn_sock添加到epoll的侦听中

                        event.events = EPOLLIN;

                        event.data.fd = conn_sock;

                        if ( epoll_ctl(epfd, EPOLL_CTL_ADD, conn_sock, &event) == -1 ) {

                            perror("epoll_ctl(EPOLL_CTL_ADD) failed");

                            exit(EXIT_FAILURE);

                        }

                    }

 

                } else {

 

                    //当前fdclient连接的socket,可以读(read from client)

                    conn_sock = events[i].data.fd;

                    memset(buffer, 0, sizeof(buffer));

                    if ( (recv_size = recv(conn_sock, buffer, sizeof(buffer), 0)) == -1  && (errno != EAGAIN) ) {

                        //recvnon-blocking模式下,返回-1errnoEAGAIN表示当前无可读数据,并不表示错误

                        perror("recv failed");

                        exit(EXIT_FAILURE);

                    }

                    printf("recved from conn_sock=%d : %s(%d length string)\n", conn_sock, buffer, recv_size);

 

                    //立即将收到的内容写回去

                    if ( send(conn_sock, buffer, recv_size, 0) == -1 && (errno != EAGAIN) && (errno != EWOULDBLOCK) ) {

                        //sendnon-blocking模式下,返回-1errnoEAGAINEWOULDBLOCK表示当前无可写数据,并不表示错误

                        perror("send failed");

                        exit(EXIT_FAILURE);

                    }

                    printf("send to conn_sock=%d done\n", conn_sock);

 

                    //将当前socketepoll的侦听中移除(有文章说:关闭con_sock之后,其会自动从epoll中删除,因此此段代码可以省略)

                    if ( epoll_ctl(epfd, EPOLL_CTL_DEL, conn_sock, NULL) == -1 ) {

                        perror("epoll_ctl(EPOLL_CTL_DEL) failed");

                        exit(EXIT_FAILURE);

                    }

 

                    //关闭连接

                    if ( close(conn_sock) == -1 ) {

                        perror("close failed");

                        exit(EXIT_FAILURE);

                    }

                    printf("close conn_sock=%d done\n", conn_sock);

                }

            }

 

        }

 

        close(sock);    //关闭serverlistening socket

        close(epfd);    //关闭epoll file descriptor

 

        return 0;

    }

测试:编译并运行程序;然后尝试运行多个telnet localhost 1984server进行通信。

注意epoll在使用epoll_ctlfile descriptor指定events的时候,默认采用Level Triggered,即如果events未完成调用epoll_wait的话每次都会返回该事件;通过如下方式:

    struct epoll_event event;

    event.data.fd   =   sock;

    event.events    =   EPOLLIN | EPOLLET;

    if ( epoll_ctl(epfd, EPOLL_CTL_ADD, sock, &event) == -1 ) {

        perror("epoll_ctl(EPOLL_CTL_ADD) failed");

        exit(EXIT_FAILURE);

    }

可以指定该fdevent采用Edge Triggered模型,如果采用该模型,epoll_wait检测到每次事件变化只通知一次,因此在epoll_wait之后的处理的时候需要注意(例如有可读的event的时候,注意数据读取完整)

小结:

epoll这种event-based的方法比较1select/pollloop style方法;2。多进程(forking)/多线程(threading)方法(每个进程或线程对应一个connection socket),在多client高并发下性能更优越。因此推荐在实际中应用。例如:Nginx, Lighttpd, Memcached等都采用有该event-based的异步IO模型。

另外,event-handling也支持epoll方法(还支持kqueue(FreeBSD/NetBSD/OpenBSD/Darwin), /dev/poll(Solaris/HPUX), select, poll等方法)在实际中也可使用该库编写高性能的Server,方便实现跨平台可移植

 

 

Related Posts
  • 2011/06/23 --
  • 2011/06/20 --
  • 2011/06/16 --
  • 2011/06/16 --
  • 2011/06/29 --
  • 2011/06/22 --
  • 2011/06/21 --

 

阅读(878) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~