Chinaunix首页 | 论坛 | 博客
  • 博客访问: 2649909
  • 博文数量: 416
  • 博客积分: 10220
  • 博客等级: 上将
  • 技术积分: 4193
  • 用 户 组: 普通用户
  • 注册时间: 2006-12-15 09:47
文章分类

全部博文(416)

文章存档

2022年(1)

2021年(1)

2020年(1)

2019年(5)

2018年(7)

2017年(6)

2016年(7)

2015年(11)

2014年(1)

2012年(5)

2011年(7)

2010年(35)

2009年(64)

2008年(48)

2007年(177)

2006年(40)

我的朋友

分类: C/C++

2016-11-01 13:39:40



相比iocp, epoll的cpu占用率比较高,接收数据的速度要差一点,都能达到108MB/s左右,iocp能达到125MB/s,但iocp cpu只有15%左右,而epoll达到100%。
在Server端都用单线程或多线程测试结果都差不多.

这里epoll的server程序.
测试环境:
server: 虚拟机下ubuntu + epoll
client: xp
网卡: 1000Kb/s

server.cpp
#include#<#sys/socket.h> 
#include#<#sys/epoll.h> 
#include#<#netinet/in.h> 
#include#<#arpa/inet.h> 
#include#<#strings.h>
#include#<#string.h>
#include#<#fcntl.h> 
#include#<#unistd.h> 
#include#<#stdio.h> 
#include#<#errno.h> 
#include#<#iostream> 
#include#<#stdlib.h>
#include#<#sys/time.h>


using namespace std; 
#define MAX_EVENTS 50
#define BUF_SIZE 1024*25
#define STR_WELCOME "Hello World Epoll! You ID is: Server #%d"
//#define EPOLLMODEL EPOLLET
#define EPOLLMODEL 0

bool SubTimeval(timeval &result, timeval &begin, timeval &end)
{
          if ( begin.tv_sec>end.tv_sec ) return false;
    
          if ( (begin.tv_sec == end.tv_sec) && (begin.tv_usec > end.tv_usec) )   
          return   false;
    
          result.tv_sec = ( end.tv_sec - begin.tv_sec );   
          result.tv_usec = ( end.tv_usec - begin.tv_usec );   
    
          if (result.tv_usec<0) {
                   result.tv_sec--;
                   result.tv_usec+=1000000;}  


          return true;
}



struct myevent_s 

int fd; 
void (*call_back)(int fd, int events, void *arg); 
int events; 
void *arg; 
int status; // 1: in epoll wait list, 0 not in 
char buff[BUF_SIZE]; // recv data buffer 
int len; 
long last_active; // last active time 
int TotalSize;
int ReadSize;
}; 


typedef struct{
double nDataTotal;
float fTotalLast;
int nActCount;
int nLastLen;
int TotalSize;
int ReadSize;
timeval tBegin;
timeval tEnd;
timeval tDiff;
}RecvInfo;


RecvInfo g_info;


// set event 
void EventSet(myevent_s *ev, int fd, void (*call_back)(int, int, void*), void *arg) 

ev->fd = fd; 
ev->call_back = call_back; 
ev->events = 0; 
ev->arg = arg; 
ev->status = 0;
bzero(ev->buff, sizeof(ev->buff));
ev->len==0;
ev->TotalSize==0;
ev->ReadSize==0;
ev->last_active = time(NULL); 



// add/mod an event to epoll 
void EventAdd(int epollFd, int events, myevent_s *ev) 

struct epoll_event epv = {0, {0}}; 
int op; 
epv.data.ptr = ev; 
epv.events = ev->events = events; 
if(ev->status == 1){ 
op = EPOLL_CTL_MOD; 

else{ 
op = EPOLL_CTL_ADD; 
ev->status = 1; 

if(epoll_ctl(epollFd, op, ev->fd, &epv) < 0) 
printf("Event Add failed[fd=%d]\n", ev->fd); 
else 
printf("Event Add OK[fd=%d], op=%d\n", ev->fd, op); 

// delete an event from epoll 
void EventDel(int epollFd, myevent_s *ev) 

struct epoll_event epv = {0, {0}}; 
if(ev->status != 1) return; 
epv.data.ptr = ev; 
ev->status = 0; 
epoll_ctl(epollFd, EPOLL_CTL_DEL, ev->fd, &epv); 



int g_epollFd; 
myevent_s g_Events[MAX_EVENTS+1]; // g_Events[MAX_EVENTS] is used by listen fd 
void RecvData(int fd, int events, void *arg); 
void SendData(int fd, int events, void *arg); 
// accept new connections from clients 
void AcceptConn(int fd, int events, void *arg) 

struct sockaddr_in sin; 
socklen_t len = sizeof(struct sockaddr_in); 
int nfd, i; 
// accept 
if((nfd = accept(fd, (struct sockaddr*)&sin, &len)) == -1) 

if(errno != EAGAIN && errno != EINTR) 

printf("%s: bad accept", __func__); 

return; 

do 

for(i = 0; i < MAX_EVENTS; i++) 

if(g_Events[i].status == 0) 

break; 


if(i == MAX_EVENTS) 

printf("%s:max connection limit[%d]\n", __func__, MAX_EVENTS); 
break; 

// set nonblocking 
if(fcntl(nfd, F_SETFL, O_NONBLOCK) < 0) break; 
// add a read event for receive data 
EventSet(&g_Events[i], nfd, RecvData, &g_Events[i]); 
EventAdd(g_epollFd, EPOLLIN|EPOLLMODEL, &g_Events[i]); 
//printf("new conn[%s:%d], clientId=%d, [time:%d]\n", inet_ntoa(sin.sin_addr), ntohs(sin.sin_port), nfd, g_Events[i].last_active); 
}while(0); 

// receive data 
#define BUFFSIZE 1024*20 
FILE *g_hFile;
char *g_pBuf;


void RecvData(int fd, int events, void *arg) 

int len; 
struct myevent_s *ev = (struct myevent_s*)arg; 
//printf("recv start ");
//EventDel(g_epollFd, ev); 
// receive data
while((len = recv(fd, g_pBuf, BUFFSIZE * 5, MSG_DONTWAIT))>0){

g_info.nDataTotal += len;
g_info.nLastLen = len;

                if (len< BUFFSIZE * 5) {
                    break;
                }
        }

if (len == 0){
            EventDel(g_epollFd, ev); 
            close(fd);
        }else if (len == -1 && errno == EAGAIN) {   
                reset_oneshot( g_epollFd, fd );  
                printf("aaa\n");
        }


if (g_info.nActCount++>5000){
g_info.nActCount = 0;
gettimeofday(&g_info.tEnd, 0);   
          SubTimeval(g_info.tDiff, g_info.tBegin, g_info.tEnd);   
          float fTotal = (float)g_info.nDataTotal/(1024*1024);
          float nDiffTime = g_info.tDiff.tv_sec + ((float)g_info.tDiff.tv_usec)/1000000;
          g_info.tBegin = g_info.tEnd;
printf("recv end C[%d], tm=%5.3f, speed=%5.3f, nRecvTotal =%5.2f, TotalSize=%d, ReadSize=%d \n", 
fd, nDiffTime, (fTotal-g_info.fTotalLast)/nDiffTime, fTotal, g_info.TotalSize, g_info.ReadSize); 
g_info.fTotalLast = fTotal;
}
if (ev->TotalSize == ev->ReadSize && ev->TotalSize>100){
//printf("fulled reset ok\n");
g_info.TotalSize=ev->TotalSize;
g_info.ReadSize=ev->ReadSize;
ev->TotalSize = ev->ReadSize = 0;
}

ev->last_active = time(NULL); 
// change to send event 
//EventSet(ev, fd, SendData, ev); 
//EventAdd(g_epollFd, EPOLLOUT|EPOLLMODEL, ev); 



/*
void RecvData(int fd, int events, void *arg) 

int len; 
struct myevent_s *ev = (struct myevent_s*)arg; 
//printf("recv start ");
//EventDel(g_epollFd, ev); 
// receive data
while((len = recv(fd, ev->buff, sizeof(ev->buff), MSG_DONTWAIT))>0){
//fwrite( ev->buff, 1, len, g_hFile );
//fflush( g_hFile );
if (ev->TotalSize==0){
ev->TotalSize = *((int *)ev->buff);
ev->ReadSize = len - 4;
if (ev->TotalSize>1024*200){
//fwrite( g_pBuf, 1, g_lastLen, g_hFile );
sprintf(g_pBuf, "Error TotalSize2 = %d\n", ev->TotalSize);
//fwrite( g_pBuf, 1, strlen(g_pBuf), g_hFile );
//fflush( g_hFile );
printf("Error TotalSize2 = %d\n", ev->TotalSize);
//break;
}
bzero(&ev->buff, BUF_SIZE);
}else{
ev->ReadSize += len;
}
g_info.nDataTotal += len;
g_info.nLastLen = len;
//memcpy(g_pBuf, ev->buff, len);

ev->len = len; 
//ev->buff[len] = '\0';
if (ev->TotalSize == ev->ReadSize && ev->TotalSize>100){
g_info.TotalSize=ev->TotalSize;
g_info.ReadSize=ev->ReadSize;
ev->TotalSize = ev->ReadSize = 0;
//printf("full .... ");
}


                if (len< sizeof(ev->buff)) {
                    break;
                }
        }

if (len == -1 && errno != EAGAIN) {   
   perror("recv read error");   
}

if (g_info.nActCount++>5000){
g_info.nActCount = 0;
gettimeofday(&g_info.tEnd, 0);   
          SubTimeval(g_info.tDiff, g_info.tBegin, g_info.tEnd);   
          float fTotal = (float)g_info.nDataTotal/(1024*1024);
          float nDiffTime = g_info.tDiff.tv_sec + ((float)g_info.tDiff.tv_usec)/1000000;
          g_info.tBegin = g_info.tEnd;
printf("recv end C[%d], tm=%5.3f, speed=%5.3f, nRecvTotal =%5.2f, TotalSize=%d, ReadSize=%d \n", 
fd, nDiffTime, (fTotal-g_info.fTotalLast)/nDiffTime, fTotal, g_info.TotalSize, g_info.ReadSize); 
g_info.fTotalLast = fTotal;
}
if (ev->TotalSize == ev->ReadSize && ev->TotalSize>100){
//printf("fulled reset ok\n");
g_info.TotalSize=ev->TotalSize;
g_info.ReadSize=ev->ReadSize;
ev->TotalSize = ev->ReadSize = 0;
}

ev->last_active = time(NULL); 
// change to send event 
//EventSet(ev, fd, SendData, ev); 
//EventAdd(g_epollFd, EPOLLOUT|EPOLLMODEL, ev); 

*/
// send data 
void SendData(int fd, int events, void *arg) 

struct myevent_s *ev = (struct myevent_s*)arg; 
int len; 
// send data 
len = send(fd, ev->buff, ev->len, 0); 
ev->len = 0; 
//EventDel(g_epollFd, ev); 
if(len > 0) 

// change to receive event 
//EventSet(ev, fd, RecvData, ev); 
//EventAdd(g_epollFd, EPOLLIN|EPOLLMODEL, ev); 
ev->last_active = time(NULL); 

else 

close(ev->fd); 
printf("Err send[fd=%d] error[%d]\n", fd, errno); 


void InitListenSocket(int epollFd, short port) 

int snd_size = 0;   /* 发送缓冲区大小 */ 
int rcv_size = 0;    /* 接收缓冲区大小 */ 
socklen_t optlen;    /* 选项值长度 */ 

int listenFd = socket(AF_INET, SOCK_STREAM, 0); 
fcntl(listenFd, F_SETFL, O_NONBLOCK); // set non-blocking 
// bind & listen 
sockaddr_in sin; 
bzero(&sin, sizeof(sin));
sin.sin_family = AF_INET; 
sin.sin_addr.s_addr = INADDR_ANY; 
sin.sin_port = htons(port); 
bind(listenFd, (const sockaddr*)&sin, sizeof(sin)); 
listen(listenFd, 5); 
printf("server listen fd=%d\n", listenFd); 
EventSet(&g_Events[MAX_EVENTS], listenFd, AcceptConn, &g_Events[MAX_EVENTS]); 
// add listen socket 
EventAdd(epollFd, EPOLLIN|EPOLLMODEL, &g_Events[MAX_EVENTS]); 

rcv_size = 1024*8;   /* 接收缓冲区大小为8K */ 
optlen = sizeof(rcv_size); 
int err = setsockopt(listenFd,SOL_SOCKET,SO_RCVBUF, (char *)&rcv_size, optlen); 
if(err<0){ 
printf("设置接收缓冲区大小错误\n"); 
}else{
printf("设置接收缓冲区大小OK\n"); 
}


/************************************* 
 EPOLLIN : 表示对应的文件描述符可以读(包括对端SOCKET正常关闭); 
EPOLLOUT: 表示对应的文件描述符可以写; 
EPOLLPRI: 表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来); 
EPOLLERR: 表示对应的文件描述符发生错误; 
EPOLLHUP: 表示对应的文件描述符被挂断; 
EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。 
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里
***************************************/
int main(int argc, char **argv) 

short port = 20000; // default port 
if(argc == 2){ 
port = atoi(argv[1]);


memset(&g_info, 0, sizeof(RecvInfo));
g_pBuf = new char[BUFFSIZE * 5];
// create epoll 
g_epollFd = epoll_create(MAX_EVENTS); 
if(g_epollFd <= 0) printf("create epoll failed.%d\n", g_epollFd); 
// create & bind listen socket, and add to epoll, set non-blocking 
InitListenSocket(g_epollFd, port); 
g_hFile = fopen("/home/rooter/cpp/sv.txt", "a+b");
if (g_hFile == NULL){
printf("open false\n");
}
// event loop 
struct epoll_event events[MAX_EVENTS]; 
printf("server running:port[%d], listenId=%d\n", port, g_Events[MAX_EVENTS].fd); 
int checkPos = 0; 
int res = 0;
int nCount = 0;
gettimeofday(&g_info.tBegin, 0);  
while(1){ 
// a simple timeout check here, every time 100, better to use a mini-heap, and add timer event 
//long now = time(NULL); 
/*for(int i = 0; i < 100; i++, checkPos++) // doesn't check listen fd 

if(checkPos == MAX_EVENTS) checkPos = 0; // recycle 
if(g_Events[checkPos].status != 1) continue; 
long duration = now - g_Events[checkPos].last_active; 
if(duration >= 60) // 60s timeout 

close(g_Events[checkPos].fd); 
printf("[fd=%d] timeout[%d--%d].\n", g_Events[checkPos].fd, g_Events[checkPos].last_active, now); 
EventDel(g_epollFd, &g_Events[checkPos]); 

}*/ 
// wait for events to happen 
int fds = epoll_wait(g_epollFd, events, MAX_EVENTS+1, -1); 
if(fds < 0){ 
printf("epoll_wait error, exit\n"); 
break; 


for(int i = 0; i < fds; i++){ 
if (( events[i].events & EPOLLERR) ||   
          (  events[i].events & EPOLLHUP) ||   
          (!(events[i].events & EPOLLIN)))   
       {   
          /* An error has occured on this fd, or the socket is not  
             ready for reading (why were we notified then?) */   
          //fprintf (stderr, "xxxxxxxxxxxxxx epoll event:%d error : %d\n", events[i].events, errno);  
          ev = (struct myevent_s*)events[i].data.ptr; 
          EventDel(g_epollFd, ev); 
 
          close (events[i].data.fd);   
          continue;   
       }   
myevent_s *ev = (struct myevent_s*)events[i].data.ptr; 
if(ev->fd == g_Events[MAX_EVENTS].fd)
{
ev->call_back(ev->fd, events[i].events, ev->arg);
//printf("\nxxxxxxxxxxxxxxxx accept ListenId=%d\n", ev->fd);
}
else if((events[i].events&EPOLLIN)&&(ev->events&EPOLLIN)) // read event 

//printf("nCount = %d\n", nCount++);
ev->call_back(ev->fd, events[i].events, ev->arg); 
//bzero(&ev->buff, BUF_SIZE);
//sprintf(ev->buff, STR_WELCOME, ev->fd);
//printf("recvId=%d, %s\n", ev->fd, ev->buff);
//res = send(ev->fd, ev->buff, strlen(ev->buff), 0);
//if (res>0){
// printf("send dataLen=%d\n", res);
//}

else if((events[i].events&EPOLLOUT)&&(ev->events&EPOLLOUT)) // write event 

printf("send \n");
ev->call_back(ev->fd, events[i].events, ev->arg); 
}


// free resource 
free(g_pBuf);
fclose(g_hFile);
return 0; 


说一下接收的体会:
epoll: 来数据了(消息),告诉你要收数据,如何收,收多少是应用如何处理的事,需要copy接收的数据,一次接收不完,还要接收多次.
iocp:  有数据了,告诉你数据已经存放在预先指定的缓存中,你要处理。若以收数据为主,可指定缓冲指针,移动缓冲指针接收数据,不需要copy数据; 对于数据结构为:数据长度+数据非常方便,但组包(粘包)处理比较麻烦.

补充:上面自己说法不够严谨,参考下面:

不同之处:

  1.  Epoll 用于 Linux 系统;而 IOCP 则是用于 Windows;
  2.  Epoll 是当事件资源满足时发出可处理通知消息;而 IOCP 则是当事件完成时发出完成通知消息。
  3.  从应用程序的角度来看, Epoll 本质上来讲是同步非阻塞的,而 IOCP 本质上来讲则是异步操作;这是才二者最大的不同。


阅读(1758) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~