Chinaunix首页 | 论坛 | 博客
  • 博客访问: 3356073
  • 博文数量: 1450
  • 博客积分: 11163
  • 博客等级: 上将
  • 技术积分: 11101
  • 用 户 组: 普通用户
  • 注册时间: 2005-07-25 14:40
文章分类

全部博文(1450)

文章存档

2017年(5)

2014年(2)

2013年(3)

2012年(35)

2011年(39)

2010年(88)

2009年(395)

2008年(382)

2007年(241)

2006年(246)

2005年(14)

分类: LINUX

2010-08-12 14:39:32

从头编写高性能服务程序12-区分读写事件

前一个版本很重要的BUG
就是sendfile在发送大文件的时候会发送不完整
这个bug指出了另一需求
就是需要用到EPOLLOUT事件

前面版本我们事件处理都是在EPOLLIN中进行
当有accept_fd数据进来之后
我们判断指令的内容
再直接进行数据的处理

我们更换一种方式
当accept_fd获取到数据之后
解析数据.
当需要输出的时候,将accept_fd的事件变为EPOLLOUT

这样一种fd会有两种状态
接受指令状态以及输出数据状态
慢慢的.我们会发现这个程序越来越像Nginx或者Lighttpd了
因为一个连接会有不同的状态
事件+状态机就是Nginx以及Lighttpd高效的原因
将一个链接分成不同的生命周期然后处理

经过进化
我们的event_handle结构拥有了读写两种hook钩子

typedef struct event_handle{
    ...
    int ( * read_handle  )( struct event_handle * ev );
    int ( * write_handle )( struct event_handle * ev );
    ...
}

而我们在针对的处理过程中
初始化时会针对不同的事件挂上不同的钩子函数

int init_evhandle(...){
    ...
    ev->read_handle = r_handle;
    ev->write_handle = w_handle;
    ...
}

接着在事件发生时调用不同的钩子函数

else if( events[i].events&EPOLLIN ){
    EVENT_HANDLE current_handle = ( ( EH )( events[i].data.ptr ) )->read_handle;
    ...
}
else if( events[i].events&EPOLLOUT ){
    EVENT_HANDLE current_handle = ( ( EH )( events[i].data.ptr ) )->write_handle;
    ...
}

当分析完指令的时候
将fd变为EPOLLOUT

int parse_request(
    ...
    ev_temp.data.ptr = ev;
    ev_temp.events = EPOLLOUT|EPOLLET;
    epoll_ctl( ev->epoll_fd, EPOLL_CTL_MOD, ev->socket_fd, &ev_temp );
    ...
}

加入这些处理之后
代码越来越长了
我们还需要加很多东西
比如进程管理,等等
这之后会发现一个mini的nginx骨架或者lighttpd骨架出现了

下载:
  1. #include <sys/socket.h>
  2. #include <sys/wait.h>
  3. #include <netinet/in.h>
  4. #include <netinet/tcp.h>
  5. #include <sys/epoll.h>
  6. #include <sys/sendfile.h> 
  7. #include <sys/stat.h>
  8. #include <unistd.h>
  9. #include <stdio.h>
  10. #include <stdlib.h>
  11. #include <string.h>
  12. #include <strings.h>
  13. #include <fcntl.h>
  14. #include <errno.h>
  15.  
  16. #define HANDLE_INFO   1
  17. #define HANDLE_SEND   2
  18. #define HANDLE_DEL    3
  19. #define HANDLE_CLOSE  4
  20.  
  21. #define MAX_REQLEN          1024
  22. #define MAX_PROCESS_CONN    3
  23. #define FIN_CHAR            0x00
  24. #define SUCCESS  0
  25. #define ERROR   -1
  26.  
  27. typedef struct event_handle{
  28.     int socket_fd;
  29.     int file_fd;
  30.     int file_pos;
  31.     int epoll_fd;
  32.     char request[MAX_REQLEN];
  33.     int request_len;
  34.     int ( * read_handle )( struct event_handle * ev );
  35.     int ( * write_handle )( struct event_handle * ev );
  36.     int handle_method;
  37. } EV,* EH;
  38. typedef int ( * EVENT_HANDLE )( struct event_handle * ev );
  39.  
  40. int create_listen_fd( int port ){
  41.     int listen_fd;
  42.     struct sockaddr_in my_addr;
  43.     if( ( listen_fd = socket( AF_INET, SOCK_STREAM, 0 ) ) == -1 ){
  44.         perror( "create socket error" );
  45.         exit( 1 );
  46.     }
  47.     int flag;
  48.     int olen = sizeof(int);
  49.     if( setsockopt( listen_fd, SOL_SOCKET, SO_REUSEADDR
  50.                         , (const void *)&flag, olen ) == -1 ){
  51.         perror( "setsockopt error" );
  52.     }
  53.     flag = 5;
  54.     if( setsockopt( listen_fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &flag, olen ) == -1 ){
  55.         perror( "setsockopt error" );
  56.     }
  57.     flag = 1;
  58.     if( setsockopt( listen_fd, IPPROTO_TCP, TCP_CORK, &flag, olen ) == -1 ){
  59.         perror( "setsockopt error" );
  60.     }
  61.     int flags = fcntl( listen_fd, F_GETFL, 0 );
  62.     fcntl( listen_fd, F_SETFL, flags|O_NONBLOCK );
  63.     my_addr.sin_family = AF_INET;
  64.     my_addr.sin_port = htons( port );
  65.     my_addr.sin_addr.s_addr = INADDR_ANY;
  66.     bzero( &( my_addr.sin_zero ), 8 );
  67.     if( bind( listen_fd, ( struct sockaddr * )&my_addr,
  68.     sizeof( struct sockaddr_in ) ) == -1 ) {
  69.         perror( "bind error" );
  70.         exit( 1 );
  71.     }
  72.     if( listen( listen_fd, 1 ) == -1 ){
  73.         perror( "listen error" );
  74.         exit( 1 );
  75.     }
  76.     return listen_fd;
  77. }
  78.  
  79. int create_accept_fd( int listen_fd ){
  80.     int addr_len = sizeof( struct sockaddr_in );
  81.     struct sockaddr_in remote_addr;
  82.     int accept_fd = accept( listen_fd,
  83.         ( struct sockaddr * )&remote_addr, &addr_len );
  84.     int flags = fcntl( accept_fd, F_GETFL, 0 );
  85.     fcntl( accept_fd, F_SETFL, flags|O_NONBLOCK );
  86.     return accept_fd;
  87. }
  88.  
  89. int fork_process( int process_num ){
  90.     int i;
  91.     int pid=-1;
  92.     for( i = 0; i < process_num; i++ ){
  93.         if( pid != 0 ){
  94.             pid = fork();
  95.         }
  96.     }
  97.     return pid;
  98. }
  99.  
  100. int init_evhandle(EH ev,int socket_fd,int epoll_fd,EVENT_HANDLE r_handle,EVENT_HANDLE w_handle){
  101.     ev->epoll_fd = epoll_fd;
  102.     ev->socket_fd = socket_fd;
  103.     ev->read_handle = r_handle;
  104.     ev->write_handle = w_handle;
  105.     ev->file_pos = 0;
  106.     ev->request_len = 0;
  107.     ev->handle_method = 0;
  108.     memset( ev->request, 0, 1024 );
  109. }
  110. //accept->accept_queue->request->request_queue->output->output_queue
  111. //multi process sendfile
  112. int parse_request(EH ev){
  113.     ev->request_len--;
  114.     *( ev->request + ev->request_len - 1 ) = 0x00;
  115.     int i;
  116.     for( i=0; i<ev->request_len; i++ ){
  117.         if( ev->request[i] == ':' ){
  118.             ev->request_len = ev->request_len-i-1;
  119.             char temp[MAX_REQLEN];
  120.             memcpy( temp, ev->request, i );
  121.             ev->handle_method = atoi( temp );
  122.             memcpy( temp, ev->request+i+1, ev->request_len );
  123.             memcpy( ev->request, temp, ev->request_len );
  124.             break;
  125.         }
  126.     }
  127.     //handle_request( ev );
  128.     //register to epoll EPOLLOUT
  129.    
  130.     struct epoll_event ev_temp;
  131.     ev_temp.data.ptr = ev;
  132.     ev_temp.events = EPOLLOUT|EPOLLET;
  133.     epoll_ctl( ev->epoll_fd, EPOLL_CTL_MOD, ev->socket_fd, &ev_temp );
  134.     return SUCCESS;
  135. }
  136.  
  137. int handle_request(EH ev){
  138.     struct stat file_info;
  139.     switch( ev->handle_method ){
  140.         case HANDLE_INFO:
  141.             ev->file_fd = open( ev->request, O_RDONLY );
  142.             if( ev->file_fd == -1 ){
  143.                 send( ev->socket_fd, "open file failed\n", strlen("open file failed\n"), 0 );
  144.                 return -1;
  145.             }
  146.             fstat(ev->file_fd, &file_info);
  147.             char info[MAX_REQLEN];
  148.             sprintf(info,"file len:%d\n",file_info.st_size);
  149.             send( ev->socket_fd, info, strlen( info ), 0 );
  150.             break;
  151.         case HANDLE_SEND:
  152.             ev->file_fd = open( ev->request, O_RDONLY );
  153.             if( ev->file_fd == -1 ){
  154.                 send( ev->socket_fd, "open file failed\n", strlen("open file failed\n"), 0 );
  155.                 return -1;
  156.             }
  157.             fstat(ev->file_fd, &file_info);
  158.             sendfile( ev->socket_fd, ev->file_fd, 0, file_info.st_size );
  159.             break;
  160.         case HANDLE_DEL:
  161.             break;
  162.         case HANDLE_CLOSE:
  163.             break;
  164.     }
  165.     finish_request( ev );
  166.     return SUCCESS;
  167. }
  168.  
  169. int finish_request(EH ev){
  170.     close(ev->socket_fd);
  171.     close(ev->file_fd);
  172.     ev->handle_method = -1;
  173.     clean_request( ev );
  174.     return SUCCESS;
  175. }
  176.  
  177. int clean_request(EH ev){
  178.     memset( ev->request, 0, MAX_REQLEN );
  179.     ev->request_len = 0;
  180. }
  181.  
  182. int read_hook_v2( EH ev ){
  183.     char in_buf[MAX_REQLEN];
  184.     memset( in_buf, 0, MAX_REQLEN );
  185.     int recv_num = recv( ev->socket_fd, &in_buf, MAX_REQLEN, 0 );
  186.     if( recv_num ==0 ){
  187.         close( ev->socket_fd );
  188.         return ERROR;
  189.     }
  190.     else{
  191.         //check ifoverflow
  192.         if( ev->request_len > MAX_REQLEN-recv_num ){
  193.             close( ev->socket_fd );
  194.             clean_request( ev );
  195.         }
  196.         memcpy( ev->request + ev->request_len, in_buf, recv_num );
  197.         ev->request_len += recv_num;
  198.         if( recv_num == 2 && ( !memcmp( &in_buf[recv_num-2], "\r\n", 2 ) ) ){
  199.             parse_request(ev);
  200.         }
  201.     }
  202.     return recv_num;
  203. }
  204.  
  205. int write_hook_v1( EH ev ){
  206.     struct stat file_info;
  207.     ev->file_fd = open( ev->request, O_RDONLY );
  208.     if( ev->file_fd == ERROR ){
  209.         send( ev->socket_fd, "open file failed\n", strlen("open file failed\n"), 0 );
  210.         return ERROR;
  211.     }
  212.     fstat(ev->file_fd, &file_info);
  213.     int write_num;
  214.     while(1){
  215.         write_num = sendfile( ev->socket_fd, ev->file_fd, (off_t *)&ev->file_pos, 10240 );
  216.         ev->file_pos += write_num;
  217.         if( write_num == ERROR ){
  218.             if( errno == EAGAIN ){
  219.                 break;
  220.             }
  221.         }
  222.         else if( write_num == 0 ){
  223.             printf( "writed:%d\n", ev->file_pos );
  224.             //finish_request( ev );
  225.             break;
  226.         }
  227.     }
  228.     return SUCCESS;
  229. }
  230.  
  231. int main(){
  232.     int listen_fd = create_listen_fd( 3389 );
  233.     int pid = fork_process( 3 );
  234.     if( pid == 0 ){
  235.         int accept_handles = 0;
  236.         struct epoll_event ev, events[20];
  237.         int epfd = epoll_create( 256 );
  238.         int ev_s = 0;
  239.        
  240.         ev.data.fd = listen_fd;
  241.         ev.events = EPOLLIN|EPOLLET;
  242.         epoll_ctl( epfd, EPOLL_CTL_ADD, listen_fd, &ev );
  243.         struct event_handle ev_handles[256];
  244.         for( ;; ){
  245.             ev_s = epoll_wait( epfd, events, 20, 500 );
  246.             int i = 0;
  247.             for( i = 0; i<ev_s; i++ ){
  248.                 if( events[i].data.fd == listen_fd ){
  249.                     if( accept_handles < MAX_PROCESS_CONN ){
  250.                         accept_handles++;
  251.                         int accept_fd = create_accept_fd( listen_fd );
  252.                         init_evhandle(&ev_handles[accept_handles],accept_fd,epfd,read_hook_v2,write_hook_v1);
  253.                         ev.data.ptr = &ev_handles[accept_handles];
  254.                         ev.events = EPOLLIN|EPOLLET;
  255.                         epoll_ctl( epfd, EPOLL_CTL_ADD, accept_fd, &ev );
  256.                     }
  257.                 }
  258.                 else if( events[i].events&EPOLLIN ){
  259.                     EVENT_HANDLE current_handle = ( ( EH )( events[i].data.ptr ) )->read_handle;
  260.                     EH current_event = ( EH )( events[i].data.ptr );
  261.                     ( *current_handle )( current_event );
  262.                 }
  263.                 else if( events[i].events&EPOLLOUT ){
  264.                     EVENT_HANDLE current_handle = ( ( EH )( events[i].data.ptr ) )->write_handle;
  265.                     EH current_event = ( EH )( events[i].data.ptr );
  266.                     if( ( *current_handle )( current_event )  == 0 ){ 
  267.                         accept_handles--;
  268.                     }
  269.                 }
  270.             }
  271.         }
  272.     }
  273.     else{
  274.         //manager the process
  275.         int child_process_status;
  276.         wait( &child_process_status );
  277.     }
  278.    
  279.     return SUCCESS;
  280. }
阅读(1471) | 评论(0) | 转发(1) |
给主人留下些什么吧!~~