Chinaunix首页 | 论坛 | 博客
  • 博客访问: 376397
  • 博文数量: 105
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 826
  • 用 户 组: 普通用户
  • 注册时间: 2013-02-16 13:58
个人简介

记录有意义的东西

文章分类

全部博文(105)

文章存档

2013年(105)

我的朋友

分类: LINUX

2013-04-28 04:11:42

epoll 真正意义在于,解决服务器压力问题,一个进程里面解决:把读写io分开,接收分开,因为io操作耗时是个瓶颈

EPOLLOUT的触发是通过以下触发的,触发完成下一次循环的时候发送:触发的意义在于把写和读分开

ev.events = EPOLLOUT | EPOLLET;
epoll_ctl(epfd, EPOLL_CTL_MOD, cfd, &ev);


要注意:

read满才返回,或关闭套接口 recv 收到数据就直接返回
客户端关闭链接,服务器收到返回字节为0,这时结束客户端epoll


//这十一个关闭客户端套接口的例子,一般服务器关闭

#include
#include
#include
#include
#include
#include
#include
#include
#include
#include

#define PORT 5555
#define MAXFDS 5000
#define EVENTSIZE 100

#define BUFFER "HTTP/1.1 200 OK/r/nContent-Length: 5/r/nConnection: close/r/nContent-Type: text/html/r/n/r/nHello"

int epfd;
void *serv_epoll(void *p);
void setnonblocking(int fd) {
    int opts;
    opts = fcntl(fd, F_GETFL);
    if (opts < 0) {
        fprintf(stderr, "fcntl failed/n");
        return;
    }
    opts = opts | O_NONBLOCK;
    if (fcntl(fd, F_SETFL, opts) < 0) {
        fprintf(stderr, "fcntl failed/n");
        return;
    }
    return;
}

int main(int argc, char *argv[]) {
    int fd, opt = 1;
    struct epoll_event ev;
    struct sockaddr_in sin, cin;
    socklen_t sin_len = sizeof(struct sockaddr_in);

    epfd = epoll_create(MAXFDS);
    if ((fd = socket(AF_INET, SOCK_STREAM, 0)) <= 0) {
        fprintf(stderr, "socket failed/n");
        return -1;
    }
    setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (const void*) &opt, sizeof(opt));

    memset(&sin, 0, sizeof(struct sockaddr_in));
    sin.sin_family = AF_INET;
    sin.sin_port = htons((short) (PORT));
    sin.sin_addr.s_addr = INADDR_ANY;
    if (bind(fd, (struct sockaddr *) &sin, sizeof(sin)) != 0) {
        fprintf(stderr, "bind failed/n");
        return -1;
    }
    if (listen(fd, 32) != 0) {
        fprintf(stderr, "listen failed/n");
        return -1;
    }

    int i, ret, cfd, nfds;

    struct epoll_event events[EVENTSIZE];
    char buffer[512];

    ev.data.fd = fd;
//    ev.events = EPOLLIN |EPOLLOUT | EPOLLET;
    ev.events = EPOLLIN | EPOLLET; //设置要处理的事件类型
    epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);

    while (1)
    {
        nfds = epoll_wait(epfd, events, EVENTSIZE, -1);
        printf("nfds ........... %d/n",nfds);
        for (i = 0; i < nfds; i++) {
            if(events[i].data.fd==fd)
            {
                cfd = accept(fd, (struct sockaddr *) &cin, &sin_len);
                setnonblocking(cfd); //把客户端的socket设置为非阻塞方式
                ev.data.fd = cfd;
                ev.events = EPOLLIN | EPOLLET;
                epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &ev);
            }else
            {
                if (events[i].events & EPOLLIN) {
                    cfd = events[i].data.fd;
                    ret = recv(cfd, buffer, sizeof(buffer), 0);
                    printf("read ret..........= %d/n",ret);

                    ev.data.fd = cfd;
                    ev.events = EPOLLOUT | EPOLLET;
                    epoll_ctl(epfd, EPOLL_CTL_MOD, cfd, &ev);
                 } else if (events[i].events & EPOLLOUT) {
                    cfd = events[i].data.fd;
//                    while(1)
//                    {
                         ret = send(cfd, BUFFER, strlen(BUFFER), 0);
                                            printf("send ret...........= %d/n", ret);
//                        ret = write (cfd,BUFFER,strlen(BUFFER));
//                         printf("send ret...........= %d/n", ret);
//                    }


                    ev.data.fd = cfd;
                    epoll_ctl(epfd, EPOLL_CTL_DEL, cfd, &ev);
                    close(cfd);

                }
            }
        }
    }


    if (fd > 0)
        close(fd);
    return 0;
}

这十一个循环接收的例子,等待客户端关闭
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include

#define MAXLINE 10
#define OPEN_MAX 100
#define LISTENQ 20
#define SERV_PORT 5555
#define INFTIM 1000

void setnonblocking(int sock) {
    int opts;
    opts = fcntl(sock, F_GETFL);

    if (opts < 0) {
        perror("fcntl(sock,GETFL)");
        exit(1);
    }

    opts = opts | O_NONBLOCK;

    if (fcntl(sock, F_SETFL, opts) < 0) {
        perror("fcntl(sock,SETFL,opts)");
        exit(1);
    }
}

int main() {
    int i, maxi, listenfd, connfd, sockfd, epfd, nfds;
    ssize_t n;
    char line[MAXLINE];
    socklen_t clilen;

    struct epoll_event ev, events[20]; //声明epoll_event结构体的变量, ev用于注册事件, events数组用于回传要处理的事件
    epfd = epoll_create(256); //生成用于处理accept的epoll专用的文件描述符, 指定生成描述符的最大范围为256

    struct sockaddr_in clientaddr;
    struct sockaddr_in serveraddr;

    listenfd = socket(AF_INET, SOCK_STREAM, 0);

    setnonblocking(listenfd); //把用于监听的socket设置为非阻塞方式

    ev.data.fd = listenfd; //设置与要处理的事件相关的文件描述符
    ev.events = EPOLLIN | EPOLLET; //设置要处理的事件类型
    epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev); //注册epoll事件

    bzero(&serveraddr, sizeof(serveraddr));
    serveraddr.sin_family = AF_INET;
    char *local_addr = "127.0.0.1";
    inet_aton(local_addr, &(serveraddr.sin_addr));
    serveraddr.sin_port = htons(SERV_PORT); //或者htons(SERV_PORT);

    bind(listenfd, (sockaddr *) &serveraddr, sizeof(serveraddr));

    listen(listenfd, LISTENQ);

    maxi = 0;

    for (;;) {
        nfds = epoll_wait(epfd, events, 20, -1); //等待epoll事件的发生

        for (i = 0; i < nfds; ++i) //处理所发生的所有事件
        {
            if (events[i].data.fd == listenfd) /**监听事件**/
            {
                connfd = accept(listenfd, (sockaddr *) &clientaddr, &clilen);
                if (connfd < 0) {
                    perror("connfd<0");
                    exit(1);
                }

//                setnonblocking(connfd); //把客户端的socket设置为非阻塞方式

                char *str = inet_ntoa(clientaddr.sin_addr);
                std::cout << "connect from " << str << std::endl;

                ev.data.fd = connfd; //设置用于读操作的文件描述符
                ev.events = EPOLLIN | EPOLLET; //设置用于注测的读操作事件
                epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev); //注册ev事件
            } else if (events[i].events & EPOLLIN) /**读事件**/
            {
                if ((sockfd = events[i].data.fd) < 0)
                    continue;
                if ((n = read(sockfd, line, MAXLINE)) < 0) {
                    if (errno == ECONNRESET) {
                        close(sockfd);
                        events[i].data.fd = -1;
                    } else {
                        std::cout << "readline error" << std::endl;
                    }
                } else if (n == 0) {
                    close(sockfd);
                    events[i].data.fd = -1;
                }

                ev.data.fd = sockfd; //设置用于写操作的文件描述符
                ev.events = EPOLLOUT | EPOLLET; //设置用于注测的写操作事件
                epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd, &ev); //修改sockfd上要处理的事件为EPOLLOUT
            } else if (events[i].events & EPOLLOUT) /**写事件**/
            {
                sockfd = events[i].data.fd;
                send(sockfd, line, n,0);
//                int ret = write (sockfd,line,n);

                ev.data.fd = sockfd; //设置用于读操作的文件描述符
                ev.events = EPOLLIN | EPOLLET; //设置用于注册的读操作事件
                epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd, &ev); //修改sockfd上要处理的事件为EPOLIN
            }
        }
    }
}

客户端测试代码:

头文件

/*
 * TestVTL.h
 *
 *  Created on: 2011-5-9
 *      Author: root
 */

#ifndef TESTVTL_H_
#define TESTVTL_H_

/* tcp链接connect封装*/

                            /* "../config.h" is generated by configure */

/* If anything changes in the following list of #includes, must change
   acsite.m4 also, for configure's tests. */

#include        /* basic system data types */
#include        /* basic socket definitions */
#include        /* timeval{} for select() */
#include            /* timespec{} for pselect() */
#include        /* includes unsafely */
#include            /* old system? */
#include        /* sockaddr_in{} and other Internet defns */
#include        /* inet(3) functions */
#include    
#include            /* for nonblocking */
#include    
#include    
#include    
#include    
#include    
#include        /* for S_xxx file mode constants */
#include            /* for iovec{} and readv/writev */
#include    
#include    
#include            /* for Unix domain sockets */
# include        /* for convenience */
# include        /* OpenBSD prereq for sysctl.h */
# include    
# include            /* for convenience */
# include            /* for convenience */
# include    
# include    

#define    LISTENQ        1024    /* 2nd argument to listen() */
/* Miscellaneous constants */
#define    MAXLINE        4096    /* max text line length */
#define    BUFFSIZE    8192    /* buffer size for reads and writes */
////////////////////////////////////////////////////////////////////


void *
Calloc(size_t n, size_t size)
{
    void    *ptr;

    if ( (ptr = calloc(n, size)) == NULL)
        perror("calloc error");
    return(ptr);
}

void
Close(int fd)
{
    if (close(fd) == -1)
        perror("close error");
}

void
Dup2(int fd1, int fd2)
{
    if (dup2(fd1, fd2) == -1)
        perror("dup2 error");
}

int
Fcntl(int fd, int cmd, int arg)
{
    int    n;

    if ( (n = fcntl(fd, cmd, arg)) == -1)
        perror("fcntl error");
    return(n);
}


int
Ioctl(int fd, int request, void *arg)
{
    int        n;

    if ( (n = ioctl(fd, request, arg)) == -1)
        perror("ioctl error");
    return(n);    /* streamio of I_LIST returns value */
}

pid_t
Fork(void)
{
    pid_t    pid;

    if ( (pid = fork()) == -1)
        perror("fork error");
    return(pid);
}

void *
Malloc(size_t size)
{
    void    *ptr;

    if ( (ptr = malloc(size)) == NULL)
        perror("malloc error");
    return(ptr);
}



#include    

void *
Mmap(void *addr, size_t len, int prot, int flags, int fd, off_t offset)
{
    void    *ptr;

    if ( (ptr = mmap(addr, len, prot, flags, fd, offset)) == ((void *) -1))
        perror("mmap error");
    return(ptr);
}

int
Open(const char *pathname, int oflag, mode_t mode)
{
    int        fd;

    if ( (fd = open(pathname, oflag, mode)) == -1)
        perror("open error for ");
    return(fd);
}

void
Pipe(int *fds)
{
    if (pipe(fds) < 0)
        perror("pipe error");
}

ssize_t
Read(int fd, void *ptr, size_t nbytes)
{
    ssize_t        n;

    if ( (n = read(fd, ptr, nbytes)) == -1)
        perror("read error");
    return(n);
}

void
Sigaddset(sigset_t *set, int signo)
{
    if (sigaddset(set, signo) == -1)
        perror("sigaddset error");
}

void
Sigdelset(sigset_t *set, int signo)
{
    if (sigdelset(set, signo) == -1)
        perror("sigdelset error");
}

void
Sigemptyset(sigset_t *set)
{
    if (sigemptyset(set) == -1)
        perror("sigemptyset error");
}

void
Sigfillset(sigset_t *set)
{
    if (sigfillset(set) == -1)
        perror("sigfillset error");
}

int
Sigismember(const sigset_t *set, int signo)
{
    int        n;

    if ( (n = sigismember(set, signo)) == -1)
        perror("sigismember error");
    return(n);
}

void
Sigpending(sigset_t *set)
{
    if (sigpending(set) == -1)
        perror("sigpending error");
}

void
Sigprocmask(int how, const sigset_t *set, sigset_t *oset)
{
    if (sigprocmask(how, set, oset) == -1)
        perror("sigprocmask error");
}

char *
Strdup(const char *str)
{
    char    *ptr;

    if ( (ptr = strdup(str)) == NULL)
        perror("strdup error");
    return(ptr);
}

long
Sysconf(int name)
{
    long    val;

    errno = 0;        /* in case sysconf() does not change this */
    if ( (val = sysconf(name)) == -1)
        perror("sysconf error");
    return(val);
}


pid_t
Wait(int *iptr)
{
    pid_t    pid;

    if ( (pid = wait(iptr)) == -1)
        perror("wait error");
    return(pid);
}

pid_t
Waitpid(pid_t pid, int *iptr, int options)
{
    pid_t    retpid;

    if ( (retpid = waitpid(pid, iptr, options)) == -1)
        perror("waitpid error");
    return(retpid);
}

void
Write(int fd, void *ptr, size_t nbytes)
{
    if (write(fd, ptr, nbytes) != (ssize_t)nbytes)
        perror("write error");
}


////////////////////////////////////////////////////////////////////


int
tcp_connect(const char *host, const char *serv)
{
    int                sockfd, n;
    struct addrinfo    hints, *res, *ressave;/*这里hints不用分配地址空间,不用指针。hints设置获取地址信息,ressave用于保存释放套接字结构*/

    bzero(&hints, sizeof(struct addrinfo));/*封装信息*/
    hints.ai_family = AF_UNSPEC;
//    hints.ai_family = AF_INET;
    hints.ai_socktype = SOCK_STREAM;

    if ( (n = getaddrinfo(host, serv, &hints, &res)) != 0)/*获取本机上或指定主机上所有地址信息ipv4和ipv6*/
        perror("tcp_connect error for");
    ressave = res;/*保存原先地址信息*/

    do {/*循环直到找到一个能够绑定的套接口*/
        sockfd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);/*创建监听套接字*/
        if (sockfd < 0)
            continue;    /* 创建是不处理重新循环 */

        if (connect(sockfd, res->ai_addr, res->ai_addrlen) == 0)
            break;        /* 如果链接成功直接退出 */

        Close(sockfd);    /* 如果链接错误,关闭先前套接口 */
    } while ( (res = res->ai_next) != NULL);

    if (res == NULL)    /* 如果地址结构找到结束部分,就报错结束 */
        perror("tcp_connect error for ");

    freeaddrinfo(ressave);/*释放结构中的所有动态创建的内存地址*/

    return(sockfd);
}

/*
 * tcp链接调用函数
 */

int
Tcp_connect(const char *host, const char *serv)
{
    return(tcp_connect(host, serv));
}
//////////////////////////////////////////////////////////////

ssize_t                        /* Read "n" bytes from a descriptor. */
readn(int fd, void *vptr, size_t n)
{
    size_t    nleft;
    ssize_t    nread;
    char    *ptr;

    ptr = (char    *)vptr;
    nleft = n;
    while (nleft > 0) {
        if ( (nread = read(fd, ptr, nleft)) < 0) {
            int myerr =  errno;
            if (myerr == EINTR)
                nread = 0;        /* and call read() again */
            else
                return(-1);
        } else if (nread == 0)
            break;                /* EOF */

        nleft -= nread;
        ptr   += nread;
    }
    return(n - nleft);        /* return >= 0 */
}
/* end readn */

ssize_t
Readn(int fd, void *ptr, size_t nbytes)
{
    ssize_t        n;

    if ( (n = readn(fd, ptr, nbytes)) < 0)
        perror("readn error");
    return(n);
}

/////////////////////////////////////////////////////

ssize_t                        /* Write "n" bytes to a descriptor. */
writen(int fd, const void *vptr, size_t n)
{
    size_t        nleft;
    ssize_t        nwritten;
    const char    *ptr;

    ptr = (const char    *)vptr;
    nleft = n;
    while (nleft > 0) {
        if ( (nwritten = write(fd, ptr, nleft)) <= 0) {
            if (nwritten < 0 && errno == EINTR)
                nwritten = 0;        /* and call write() again */
            else
                return(-1);            /* error */
        }

        nleft -= nwritten;
        ptr   += nwritten;
    }
    return(n);
}
/* end writen */

void
Writen(int fd, void *ptr, size_t nbytes)
{
    if (writen(fd, ptr, nbytes) != (ssize_t)nbytes)
        perror("writen error");
}

//////////////////////////////////////////////////////////


#endif /* TESTVTL_H_ */

cpp文件

/*并发服务器测试程序-tcp 客户端*/

//
//type=routeplan&x1=418913659&y1=143714599&x2=418919157&
//y2=143699459&fl_mid=0&fl_id=0&tl_mid=0&tl_id=0&dir=-1&flag=33

#include    "TestVTL.h"
/*并发服务器测试程序-tcp 客户端*/

#define    MAXN    16384        /* 定义服务器请求最大字节数 */

int main(int argc, char **argv) {
    int i, j, fd, nchildren, nloops, nbytes;
//    pid_t pid;
    //    ssize_t    n;
    //    char    request[MAXLINE], reply[MAXN];/*数组最大字节MAXLINE*/

    char reply[MAXN];/*数组最大字节MAXLINE*/

//    pid_t pid_ret;
    /*对应参数转换成整形,保存到对应数据*/
    nchildren = 1;
    nloops = 1;
    nbytes = 4096;
//    int nstat;
    while (true) {
        for (i = 0; i < nchildren; i++) {/*循环创建子进程*/
            //            if ( (pid = Fork()) == 0) {        /* 创建子进程 */
            for (j = 0; j < nloops; j++)
            {
                fd = Tcp_connect("127.0.0.1", "5555");/*主机名和端口号请求连接*/
                char buf[MAXLINE];
                sprintf(buf, "i was %d", (int) pthread_self());

                Write(fd, buf, strlen(buf));/*向服务器发送数据*/

//                Readn(fd, reply, nbytes);//字节满或服务器套接口关闭返回
                int ret = recv(fd, reply, nbytes, 0);

                //                    read(fd, reply, nbytes);
                printf("server returned %s bytes\n", reply);

                Close(fd); /* 关闭所有进程 */
                //                }
                //                printf("child %d done\n", i);
                //                exit(0);
            }
            /* parent loops around to fork() again */
        }

        //        while ((pid_ret = waitpid(-1, &nstat, 0)) > 0)
        //        {
        //        }
        //        sleep(3);
    }

    exit(0);
}


阅读(1337) | 评论(0) | 转发(0) |
0

上一篇:X86分页机制

下一篇:telnet源码分析

给主人留下些什么吧!~~