相比iocp, epoll的cpu占用率比较高,接收数据的速度要差一点,都能达到108MB/s左右,iocp能达到125MB/s,但iocp cpu只有15%左右,而epoll达到100%。
在Server端都用单线程或多线程测试结果都差不多.
这里epoll的server程序.
测试环境:
server: 虚拟机下
ubuntu + epoll
client: xp
网卡: 1000Kb/s
server.cpp
#include#<#sys/socket.h>
#include#<#sys/epoll.h>
#include#<#netinet/in.h>
#include#<#arpa/inet.h>
#include#<#strings.h>
#include#<#string.h>
#include#<#fcntl.h>
#include#<#unistd.h>
#include#<#stdio.h>
#include#<#errno.h>
#include#<#iostream>
#include#<#stdlib.h>
#include#<#sys/time.h>
using namespace std;
#define MAX_EVENTS 50
#define BUF_SIZE 1024*25
#define STR_WELCOME "Hello World Epoll! You ID is: Server #%d"
//#define EPOLLMODEL EPOLLET
#define EPOLLMODEL 0
bool SubTimeval(timeval &result, timeval &begin, timeval &end)
{
if ( begin.tv_sec>end.tv_sec ) return false;
if ( (begin.tv_sec == end.tv_sec) && (begin.tv_usec > end.tv_usec) )
return false;
result.tv_sec = ( end.tv_sec - begin.tv_sec );
result.tv_usec = ( end.tv_usec - begin.tv_usec );
if (result.tv_usec<0) {
result.tv_sec--;
result.tv_usec+=1000000;}
return true;
}
struct myevent_s
{
int fd;
void (*call_back)(int fd, int events, void *arg);
int events;
void *arg;
int status; // 1: in epoll wait list, 0 not in
char buff[BUF_SIZE]; // recv data buffer
int len;
long last_active; // last active time
int TotalSize;
int ReadSize;
};
typedef struct{
double nDataTotal;
float fTotalLast;
int nActCount;
int nLastLen;
int TotalSize;
int ReadSize;
timeval tBegin;
timeval tEnd;
timeval tDiff;
}RecvInfo;
RecvInfo g_info;
// set event
void EventSet(myevent_s *ev, int fd, void (*call_back)(int, int, void*), void *arg)
{
ev->fd = fd;
ev->call_back = call_back;
ev->events = 0;
ev->arg = arg;
ev->status = 0;
bzero(ev->buff, sizeof(ev->buff));
ev->len==0;
ev->TotalSize==0;
ev->ReadSize==0;
ev->last_active = time(NULL);
}
// add/mod an event to epoll
void EventAdd(int epollFd, int events, myevent_s *ev)
{
struct epoll_event epv = {0, {0}};
int op;
epv.data.ptr = ev;
epv.events = ev->events = events;
if(ev->status == 1){
op = EPOLL_CTL_MOD;
}
else{
op = EPOLL_CTL_ADD;
ev->status = 1;
}
if(epoll_ctl(epollFd, op, ev->fd, &epv) < 0)
printf("Event Add failed[fd=%d]\n", ev->fd);
else
printf("Event Add OK[fd=%d], op=%d\n", ev->fd, op);
}
// delete an event from epoll
void EventDel(int epollFd, myevent_s *ev)
{
struct epoll_event epv = {0, {0}};
if(ev->status != 1) return;
epv.data.ptr = ev;
ev->status = 0;
epoll_ctl(epollFd, EPOLL_CTL_DEL, ev->fd, &epv);
}
int g_epollFd;
myevent_s g_Events[MAX_EVENTS+1]; // g_Events[MAX_EVENTS] is used by listen fd
void RecvData(int fd, int events, void *arg);
void SendData(int fd, int events, void *arg);
// accept new connections from clients
void AcceptConn(int fd, int events, void *arg)
{
struct sockaddr_in sin;
socklen_t len = sizeof(struct sockaddr_in);
int nfd, i;
// accept
if((nfd = accept(fd, (struct sockaddr*)&sin, &len)) == -1)
{
if(errno != EAGAIN && errno != EINTR)
{
printf("%s: bad accept", __func__);
}
return;
}
do
{
for(i = 0; i < MAX_EVENTS; i++)
{
if(g_Events[i].status == 0)
{
break;
}
}
if(i == MAX_EVENTS)
{
printf("%s:max connection limit[%d]\n", __func__, MAX_EVENTS);
break;
}
// set nonblocking
if(fcntl(nfd, F_SETFL, O_NONBLOCK) < 0) break;
// add a read event for receive data
EventSet(&g_Events[i], nfd, RecvData, &g_Events[i]);
EventAdd(g_epollFd, EPOLLIN|EPOLLMODEL, &g_Events[i]);
//printf("new conn[%s:%d], clientId=%d, [time:%d]\n", inet_ntoa(sin.sin_addr), ntohs(sin.sin_port), nfd, g_Events[i].last_active);
}while(0);
}
// receive data
#define BUFFSIZE 1024*20
FILE *g_hFile;
char *g_pBuf;
void RecvData(int fd, int events, void *arg)
{
int len;
struct myevent_s *ev = (struct myevent_s*)arg;
//printf("recv start ");
//EventDel(g_epollFd, ev);
// receive data
while((len = recv(fd, g_pBuf, BUFFSIZE * 5, MSG_DONTWAIT))>0){
g_info.nDataTotal += len;
g_info.nLastLen = len;
if (len< BUFFSIZE * 5) {
break;
}
}
if (len == 0){
EventDel(g_epollFd, ev);
close(fd);
}else if (len == -1 && errno == EAGAIN) {
reset_oneshot( g_epollFd, fd );
printf("aaa\n");
}
if (g_info.nActCount++>5000){
g_info.nActCount = 0;
gettimeofday(&g_info.tEnd, 0);
SubTimeval(g_info.tDiff, g_info.tBegin, g_info.tEnd);
float fTotal = (float)g_info.nDataTotal/(1024*1024);
float nDiffTime = g_info.tDiff.tv_sec + ((float)g_info.tDiff.tv_usec)/1000000;
g_info.tBegin = g_info.tEnd;
printf("recv end C[%d], tm=%5.3f, speed=%5.3f, nRecvTotal =%5.2f, TotalSize=%d, ReadSize=%d \n",
fd, nDiffTime, (fTotal-g_info.fTotalLast)/nDiffTime, fTotal, g_info.TotalSize, g_info.ReadSize);
g_info.fTotalLast = fTotal;
}
if (ev->TotalSize == ev->ReadSize && ev->TotalSize>100){
//printf("fulled reset ok\n");
g_info.TotalSize=ev->TotalSize;
g_info.ReadSize=ev->ReadSize;
ev->TotalSize = ev->ReadSize = 0;
}
ev->last_active = time(NULL);
// change to send event
//EventSet(ev, fd, SendData, ev);
//EventAdd(g_epollFd, EPOLLOUT|EPOLLMODEL, ev);
}
/*
void RecvData(int fd, int events, void *arg)
{
int len;
struct myevent_s *ev = (struct myevent_s*)arg;
//printf("recv start ");
//EventDel(g_epollFd, ev);
// receive data
while((len = recv(fd, ev->buff, sizeof(ev->buff), MSG_DONTWAIT))>0){
//fwrite( ev->buff, 1, len, g_hFile );
//fflush( g_hFile );
if (ev->TotalSize==0){
ev->TotalSize = *((int *)ev->buff);
ev->ReadSize = len - 4;
if (ev->TotalSize>1024*200){
//fwrite( g_pBuf, 1, g_lastLen, g_hFile );
sprintf(g_pBuf, "Error TotalSize2 = %d\n", ev->TotalSize);
//fwrite( g_pBuf, 1, strlen(g_pBuf), g_hFile );
//fflush( g_hFile );
printf("Error TotalSize2 = %d\n", ev->TotalSize);
//break;
}
bzero(&ev->buff, BUF_SIZE);
}else{
ev->ReadSize += len;
}
g_info.nDataTotal += len;
g_info.nLastLen = len;
//memcpy(g_pBuf, ev->buff, len);
ev->len = len;
//ev->buff[len] = '\0';
if (ev->TotalSize == ev->ReadSize && ev->TotalSize>100){
g_info.TotalSize=ev->TotalSize;
g_info.ReadSize=ev->ReadSize;
ev->TotalSize = ev->ReadSize = 0;
//printf("full .... ");
}
if (len< sizeof(ev->buff)) {
break;
}
}
if (len == -1 && errno != EAGAIN) {
perror("recv read error");
}
if (g_info.nActCount++>5000){
g_info.nActCount = 0;
gettimeofday(&g_info.tEnd, 0);
SubTimeval(g_info.tDiff, g_info.tBegin, g_info.tEnd);
float fTotal = (float)g_info.nDataTotal/(1024*1024);
float nDiffTime = g_info.tDiff.tv_sec + ((float)g_info.tDiff.tv_usec)/1000000;
g_info.tBegin = g_info.tEnd;
printf("recv end C[%d], tm=%5.3f, speed=%5.3f, nRecvTotal =%5.2f, TotalSize=%d, ReadSize=%d \n",
fd, nDiffTime, (fTotal-g_info.fTotalLast)/nDiffTime, fTotal, g_info.TotalSize, g_info.ReadSize);
g_info.fTotalLast = fTotal;
}
if (ev->TotalSize == ev->ReadSize && ev->TotalSize>100){
//printf("fulled reset ok\n");
g_info.TotalSize=ev->TotalSize;
g_info.ReadSize=ev->ReadSize;
ev->TotalSize = ev->ReadSize = 0;
}
ev->last_active = time(NULL);
// change to send event
//EventSet(ev, fd, SendData, ev);
//EventAdd(g_epollFd, EPOLLOUT|EPOLLMODEL, ev);
}
*/
// send data
void SendData(int fd, int events, void *arg)
{
struct myevent_s *ev = (struct myevent_s*)arg;
int len;
// send data
len = send(fd, ev->buff, ev->len, 0);
ev->len = 0;
//EventDel(g_epollFd, ev);
if(len > 0)
{
// change to receive event
//EventSet(ev, fd, RecvData, ev);
//EventAdd(g_epollFd, EPOLLIN|EPOLLMODEL, ev);
ev->last_active = time(NULL);
}
else
{
close(ev->fd);
printf("Err send[fd=%d] error[%d]\n", fd, errno);
}
}
void InitListenSocket(int epollFd, short port)
{
int snd_size = 0; /* 发送缓冲区大小 */
int rcv_size = 0; /* 接收缓冲区大小 */
socklen_t optlen; /* 选项值长度 */
int listenFd = socket(AF_INET, SOCK_STREAM, 0);
fcntl(listenFd, F_SETFL, O_NONBLOCK); // set non-blocking
// bind & listen
sockaddr_in sin;
bzero(&sin, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = INADDR_ANY;
sin.sin_port = htons(port);
bind(listenFd, (const sockaddr*)&sin, sizeof(sin));
listen(listenFd, 5);
printf("server listen fd=%d\n", listenFd);
EventSet(&g_Events[MAX_EVENTS], listenFd, AcceptConn, &g_Events[MAX_EVENTS]);
// add listen socket
EventAdd(epollFd, EPOLLIN|EPOLLMODEL, &g_Events[MAX_EVENTS]);
rcv_size = 1024*8; /* 接收缓冲区大小为8K */
optlen = sizeof(rcv_size);
int err = setsockopt(listenFd,SOL_SOCKET,SO_RCVBUF, (char *)&rcv_size, optlen);
if(err<0){
printf("设置接收缓冲区大小错误\n");
}else{
printf("设置接收缓冲区大小OK\n");
}
}
/*************************************
EPOLLIN : 表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
EPOLLOUT: 表示对应的文件描述符可以写;
EPOLLPRI: 表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
EPOLLERR: 表示对应的文件描述符发生错误;
EPOLLHUP: 表示对应的文件描述符被挂断;
EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里
***************************************/
int main(int argc, char **argv)
{
short port = 20000; // default port
if(argc == 2){
port = atoi(argv[1]);
}
memset(&g_info, 0, sizeof(RecvInfo));
g_pBuf = new char[BUFFSIZE * 5];
// create epoll
g_epollFd = epoll_create(MAX_EVENTS);
if(g_epollFd <= 0) printf("create epoll failed.%d\n", g_epollFd);
// create & bind listen socket, and add to epoll, set non-blocking
InitListenSocket(g_epollFd, port);
g_hFile = fopen("/home/rooter/cpp/sv.txt", "a+b");
if (g_hFile == NULL){
printf("open false\n");
}
// event loop
struct epoll_event events[MAX_EVENTS];
printf("server running:port[%d], listenId=%d\n", port, g_Events[MAX_EVENTS].fd);
int checkPos = 0;
int res = 0;
int nCount = 0;
gettimeofday(&g_info.tBegin, 0);
while(1){
// a simple timeout check here, every time 100, better to use a mini-heap, and add timer event
//long now = time(NULL);
/*for(int i = 0; i < 100; i++, checkPos++) // doesn't check listen fd
{
if(checkPos == MAX_EVENTS) checkPos = 0; // recycle
if(g_Events[checkPos].status != 1) continue;
long duration = now - g_Events[checkPos].last_active;
if(duration >= 60) // 60s timeout
{
close(g_Events[checkPos].fd);
printf("[fd=%d] timeout[%d--%d].\n", g_Events[checkPos].fd, g_Events[checkPos].last_active, now);
EventDel(g_epollFd, &g_Events[checkPos]);
}
}*/
// wait for events to happen
int fds = epoll_wait(g_epollFd, events, MAX_EVENTS+1, -1);
if(fds < 0){
printf("epoll_wait error, exit\n");
break;
}
for(int i = 0; i < fds; i++){
if (( events[i].events & EPOLLERR) ||
( events[i].events & EPOLLHUP) ||
(!(events[i].events & EPOLLIN)))
{
/* An error has occured on this fd, or the socket is not
ready for reading (why were we notified then?) */
//fprintf (stderr, "xxxxxxxxxxxxxx epoll event:%d error : %d\n", events[i].events, errno);
ev = (struct myevent_s*)events[i].data.ptr;
EventDel(g_epollFd, ev);
close (events[i].data.fd);
continue;
}
myevent_s *ev = (struct myevent_s*)events[i].data.ptr;
if(ev->fd == g_Events[MAX_EVENTS].fd)
{
ev->call_back(ev->fd, events[i].events, ev->arg);
//printf("\nxxxxxxxxxxxxxxxx accept ListenId=%d\n", ev->fd);
}
else if((events[i].events&EPOLLIN)&&(ev->events&EPOLLIN)) // read event
{
//printf("nCount = %d\n", nCount++);
ev->call_back(ev->fd, events[i].events, ev->arg);
//bzero(&ev->buff, BUF_SIZE);
//sprintf(ev->buff, STR_WELCOME, ev->fd);
//printf("recvId=%d, %s\n", ev->fd, ev->buff);
//res = send(ev->fd, ev->buff, strlen(ev->buff), 0);
//if (res>0){
// printf("send dataLen=%d\n", res);
//}
}
else if((events[i].events&EPOLLOUT)&&(ev->events&EPOLLOUT)) // write event
{
printf("send \n");
ev->call_back(ev->fd, events[i].events, ev->arg);
}
}
}
// free resource
free(g_pBuf);
fclose(g_hFile);
return 0;
}
说一下接收的体会:
epoll: 来数据了(消息),告诉你要收数据,如何收,收多少是应用如何处理的事,需要copy接收的数据,一次接收不完,还要接收多次.
iocp: 有数据了,告诉你数据已经存放在预先指定的缓存中,你要处理。若以收数据为主,可指定缓冲指针,移动缓冲指针接收数据,不需要copy数据; 对于数据结构为:数据长度+数据非常方便,但组包(粘包)处理比较麻烦.
补充:上面自己说法不够严谨,参考下面:
不同之处:
-
Epoll 用于 Linux 系统;而 IOCP 则是用于 Windows;
-
Epoll 是当事件资源满足时发出可处理通知消息;而 IOCP 则是当事件完成时发出完成通知消息。
-
从应用程序的角度来看, Epoll 本质上来讲是同步非阻塞的,而 IOCP 本质上来讲则是异步操作;这是才二者最大的不同。
阅读(1863) | 评论(0) | 转发(0) |