LT模式:epoll就是一个快速版poll,可读可写就绪条件和传统poll一致
ET模式:为了避免Starvation,建议
1)文件描述符设置为非阻塞
2)只在read或write返回EAGAIN后,才能调用下一次epoll_wait
3)应用层维护一个就绪链表,进行轮询,可以防止大量IO时在一个描述符上长期read或write(因为只有等到read或 write返回EAGAIN后才 表示该描述符处理完毕)而令其它描述符starve
理解ET的含义后,上面那些操作其实都是显然的。以wirte为例说明,LT时只要有一定范围的空闲写缓存区,每次epoll_wait都是可写条件就 绪,但是ET时从第一次可写就绪后,epoll_wait不再得到该描述符可写就绪通知直到程序使描述符变为非可写就绪(比如write收到 EAGAIN)后,epoll_wait才可能继续收到可写就绪通知(比如有空闲可写缓存)
其实ET相对于LT来说,把文件描述符状态跟踪的部分责任由内核空间推到用户空间,内核只关心状态切换即从未就绪到就绪切换时才通知用户,至于保持就绪 状态的,内核不再通知用户,这样在实现非阻塞模型时更方便,不需要每次操作都先查看文件描述符状态。
使用linux epoll模型,水平触发模式(Level-Triggered), 当socket可写时,会不停的触发socket可写的事件,如何处理?
1. 当需要向socket写数据时,将该socket加入到epoll模型(epoll_ctl), 等待可写事件。接收到socket可写事件后,调用write()或send()发送数据。当数据全部写完后, 将socket描述符移出epoll模型。
这种方式的缺点是: 即使发送很少的数据,也要将socket加入、移出epoll模型。有一定的操作代价。
2. 向socket写数据时,不将socket加入到epoll模型;而是直接调用send()发送; 只有当或send()返回错误码EAGAIN(系统缓存满),才将socket加入到epoll模型,等待可写事件后,再发送数据。
全部数据发送完毕,再移出epoll模型。这种方案的优点: 当用户数据比较少时,不需要epool的事件处理。
3. 使用Edge-Triggered(边沿触发),这样socket有可写事件,只会触发一次。 可以在应用层做好标记。以避免频繁的调用 epoll_ctl( EPOLL_CTL_ADD, EPOLL_CTL_MOD)。这种方式是epoll的man手册里推荐的方式,性能最高。但如果处理不当容易出错,事件驱动停止。
4. 在epoll_ctl()使用EPOLLONESHOT标志,当事件触发以后,socket会被禁止再次触发。需要再次调用epoll_ctl(EPOLL_CTL_MOD), 才会接收下一次事件。这种方式可以禁止socket可写事件,应该也会同时禁止可读事件。会带来不便,同时并没有性能优势,因为epoll_ctl有一定的操作代价。
5. 在有些情况下,为了避免针对一个fd读取的字节数量过大而导致影响其他fd的处理,ET模式server并不会读完所以的数据,但是未读完的数据并不会引起fd handler被再此调用,除非有新的数据到来。这时可以先设置fd为可writable,这会使得该fd在下一次epoll_wait时被在此唤醒,此时可以将未读完数据读完。
Server代码
-
#include
-
#include
-
-
#include
-
#include
-
#include
-
-
#include
-
#include
-
#include
-
#include
-
#include
-
-
#include
-
#include"stdio.h"
-
#include"stdlib.h"
-
#include
-
#include
-
#include
-
#include
-
#include
-
#include
-
#include
-
#include
-
#include
-
#include
-
#include
-
#include
-
#include
-
#include
-
#include
-
-
using namespace std;
-
-
intep;
-
struct epoll_event*event;
-
intnevent;
-
-
int
-
event_init(intsize)
-
{
-
intstatus;
-
-
ep=epoll_create(size);
-
if(ep<0){
-
return-1;
-
}
-
nevent=size;
-
-
event=(struct epoll_event*)malloc(sizeof(struct epoll_event)*size);
-
if(event==NULL){
-
status=close(ep);
-
return-1;
-
}
-
-
memset(event,0,sizeof(struct epoll_event)*size);
-
return 0;
-
}
-
-
-
int
-
event_add_conn(intsd)
-
{
-
intstatus;
-
struct epoll_event event;
-
-
//event.events=(uint32_t)(EPOLLIN|EPOLLOUT|EPOLLET);
-
//event.events=(uint32_t)(EPOLLIN|EPOLLOUT);
-
event.events=(uint32_t)(EPOLLIN);
-
event.data.fd=sd;
-
-
status=epoll_ctl(ep,EPOLL_CTL_ADD,sd,&event);
-
if(status<0)
-
printf("error: %s\n",strerror(errno));
-
return status;
-
}
-
-
-
int
-
event_del_conn(intsd)
-
{
-
intstatus;
-
-
status=epoll_ctl(ep,EPOLL_CTL_DEL,sd,NULL);
-
if(status<0){
-
cout<<"del error"<
-
}
-
-
return status;
-
}
-
-
int
-
event_add_out(intsd)
-
{
-
intstatus;
-
struct epoll_event event;
-
-
event.events=(uint32_t)(EPOLLIN|EPOLLOUT);
-
//event.events=(uint32_t)(EPOLLIN|EPOLLOUT|EPOLLET);
-
event.data.fd=sd;
-
-
status=epoll_ctl(ep,EPOLL_CTL_MOD,sd,&event);
-
if(status<0){
-
return-1;
-
}
-
-
return status;
-
}
-
-
int
-
event_del_out(intsd)
-
{
-
intstatus;
-
struct epoll_event event;
-
-
event.events=(uint32_t)(EPOLLIN);
-
//event.events=(uint32_t)(EPOLLIN|EPOLLET);
-
event.data.fd=sd;
-
-
status=epoll_ctl(ep,EPOLL_CTL_MOD,sd,&event);
-
if(status<0){
-
return-1;
-
}
-
-
return status;
-
}
-
-
int
-
event_wait(inttimeout)
-
{
-
intnsd;
-
-
for(;;){
-
nsd=epoll_wait(ep,event,nevent,timeout);
-
if(nsd>0){
-
return nsd;
-
}
-
-
if(nsd==0){
-
if(timeout==-1){
-
printf("timeout\n");
-
return-1;
-
}
-
-
return 0;
-
}
-
-
if(errno==EINTR){
-
continue;
-
}
-
return-1;
-
}
-
-
return 0;
-
}
-
-
void setnonblocking(intsock)
-
{
-
intopts;
-
opts=fcntl(sock,F_GETFL);
-
if(opts<0)
-
{
-
perror("fcntl(sock,GETFL)");
-
exit(1);
-
}
-
opts=opts|O_NONBLOCK;
-
if(fcntl(sock,F_SETFL,opts)<0)
-
{
-
perror("fcntl(sock,SETFL,opts)");
-
exit(1);
-
}
-
}
-
-
intlistenfd;
-
#define SERV_PORT 10000
-
-
-
int
-
set_tcpnodelay(intsd)
-
{
-
intnodelay;
-
socklen_tlen;
-
-
nodelay=1;
-
len=sizeof(nodelay);
-
-
return setsockopt(sd,IPPROTO_TCP,TCP_NODELAY,&nodelay,len);
-
}
-
-
-
intinit_server()
-
{
-
struct sockaddr_in serveraddr;
-
-
listenfd=socket(AF_INET,SOCK_STREAM,0);
-
setnonblocking(listenfd);
-
bzero(&serveraddr,sizeof(serveraddr));
-
serveraddr.sin_family=AF_INET;
-
char*local_addr=(char*)"127.0.0.1";
-
inet_aton(local_addr,&(serveraddr.sin_addr));
-
serveraddr.sin_port=htons(SERV_PORT);
-
bind(listenfd,(sockaddr*)&serveraddr,sizeof(serveraddr));
-
listen(listenfd,1024);
-
intret=event_add_conn(listenfd);
-
if(ret<0)
-
cout<<"error"<
-
return 0;
-
}
-
-
intprocess_handle()
-
{
-
for(;;){
-
intfd=accept(listenfd,NULL,NULL);
-
if(fd<0){
-
if(errno==EINTR){
-
cout<<"accept on not ready - eintr"<
-
continue;
-
}
-
-
if(errno==EAGAIN||errno==EWOULDBLOCK){
-
cout<<"accept on not ready - eagain"<
-
return 0;
-
}
-
-
return-1;
-
}
-
elseif(fd>0)
-
{
-
setnonblocking(fd);
-
set_tcpnodelay(fd);
-
intret=event_add_conn(fd);
-
cout<<"accept: "<
-
if(ret<0)
-
cout<<"error"<
-
}
-
}
-
}
-
-
intread_data(intfd)
-
{
-
-
for(;;)
-
{
-
char c;
-
intn=read(fd,&c,1);
-
if(n>0)
-
cout<
-
elseif(n==0)
-
{
-
cout<<"eof"<
-
event_del_conn(fd);
-
close(fd);
-
return 0;
-
}
-
elseif(n<0)
-
{
-
if(errno==EINTR)
-
continue;
-
elseif(errno==EAGAIN||errno==EWOULDBLOCK)
-
{
-
cout<<"again"<
-
return 0;
-
}
-
}
-
}
-
-
return 0;
-
}
-
-
intmain()
-
{
-
intret=event_init(100);
-
if(ret<0)
-
cout<<"error"<
-
-
init_server();
-
-
while(1)
-
{
-
inttimeout=-1;
-
intnsd=event_wait(timeout);
-
if(nsd<=0)
-
continue;
-
-
inti;
-
-
intready=0;
-
for(i=0;i
-
{
-
struct epoll_event*ev=&event[i];
-
intfd=(int)(ev->data.fd);
-
if(fd==listenfd){
-
cout<<"listen"<
-
process_handle();
-
}
-
else
-
{
-
uint32_t events=ev->events;
-
/*errortakes precedence over read|write*/
-
if(events&EPOLLERR){
-
printf("event error\n");
-
break;
-
}
-
-
/*read takes precedence over write*/
-
if(events&(EPOLLIN|EPOLLHUP)){
-
printf("event in\n");
-
read_data(fd);
-
if(ready==1)
-
{
-
event_add_out(fd);
-
}
-
}
-
-
if(events&(EPOLLOUT|EPOLLHUP))
-
{
-
printf("event out");
-
if(ready==0)
-
{
-
event_del_out(fd);
-
}
-
else
-
{
-
//send_data(fd);
-
}
-
/*the output will be addedinEPOLLIN
-
*handler*/
-
break;
-
}
-
}
-
}
-
}
-
-
return 0;
-
}
Client代码
-
#include
-
#include
-
#include
-
-
#include
-
#include
-
#include
-
#include
-
#include
-
-
#include
-
#include"stdio.h"
-
#include"stdlib.h"
-
#include
-
#include
-
#include
-
#include
-
#include
-
#include
-
#include
-
#include
-
#include
-
#include
-
#include
-
#include
-
#include
-
#include
-
#include
-
-
using namespace std;
-
-
#define SERV_PORT 10000
-
-
void setblocking(intsock)
-
{
-
intopts;
-
opts=fcntl(sock,F_GETFL);
-
if(opts<0)
-
{
-
perror("fcntl(sock,GETFL)");
-
exit(1);
-
}
-
-
opts=opts&~O_NONBLOCK;
-
if(fcntl(sock,F_SETFL,opts)<0)
-
{
-
perror("fcntl(sock,SETFL,opts)");
-
exit(1);
-
}
-
}
-
-
intmain()
-
{
-
struct sockaddr_in serveraddr;
-
-
intconnfd=socket(AF_INET,SOCK_STREAM,0);
-
bzero(&serveraddr,sizeof(serveraddr));
-
serveraddr.sin_family=AF_INET;
-
char*local_addr=(char*)"127.0.0.1";
-
inet_aton(local_addr,&(serveraddr.sin_addr));
-
serveraddr.sin_port=htons(SERV_PORT);
-
intstatus=connect(connfd,(sockaddr*)&serveraddr,sizeof(serveraddr));
-
if(status!=0){
-
if(errno==EINPROGRESS){
-
return 0;
-
}
-
return-1;
-
}
-
-
setblocking(connfd);
-
-
char buffer[1024]={"songweisongweisongwei"};
-
intsize=write(connfd,buffer,strlen(buffer));
-
cout<<"size: "<
-
-
sleep(5);
-
-
size=write(connfd,buffer,strlen(buffer));
-
cout<<"size: "<
-
-
return 0;
-
阅读(357) | 评论(0) | 转发(0) |