Chinaunix首页 | 论坛 | 博客
  • 博客访问: 47365
  • 博文数量: 9
  • 博客积分: 10
  • 博客等级: 民兵
  • 技术积分: 110
  • 用 户 组: 普通用户
  • 注册时间: 2012-07-24 12:19
文章分类

全部博文(9)

文章存档

2017年(4)

2016年(5)

我的朋友

分类: LINUX

2016-11-01 09:10:11

一、IO多路复用

所谓IO多路复用,就是通过一种机制,一个进程可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。

支持IO多路复用的系统调用有select、poll、epoll,这些调用都是内核级别的。但select、poll、epoll本质上都是同步I/O,先是block住等待就绪的socket,再是block住将数据从内核拷贝到用户内存。

当然select、poll、epoll之间也是有区别的,如下表:

\ select poll epoll
操作方式 遍历 遍历 回调
底层实现 数组 链表 哈希表
IO效率 每次调用都进行线性遍历,时间复杂度为O(n) 每次调用都进行线性遍历,时间复杂度为O(n) 事件通知方式,每当fd就绪,系统注册的回调函数就会被调用,将就绪fd放到rdllist里面。时间复杂度O(1)
最大连接数 1024(x86)或 2048(x64) 无上限 无上限
fd拷贝 每次调用select,都需要把fd集合从用户态拷贝到内核态 每次调用poll,都需要把fd集合从用户态拷贝到内核态 调用epoll_ctl时拷贝进内核并保存,之后每次epoll_wait不拷贝

二、select示例

2.1 流程图

注:摘自。

2.2 相关函数

点击(此处)折叠或打开

  1. #include <sys/select.h>
  2. #include <sys/time.h>

  3. int select(int max_fd, fd_set *readset, fd_set *writeset, fd_set *exceptset, struct timeval *timeout)
  1. 该select()函数返回就绪描述符的数目,超时返回0,出错返回-1

  2. 第一个参数max_fd指待测试的fd个数,它的值是待测试的最大文件描述符加1,文件描述符从0开始到max_fd-1都将被测试。

  3. 中间三个参数readset、writeset和exceptset指定要让内核测试读、写和异常条件的fd集合,如果不需要测试可以设置为NULL。操作fd_set有四个宏:

    • void FD_ZERO(fd_set *fdset):清空集合
    • void FD_SET(int fd, fd_set *fdset):将一个给定的文件描述符加入集合之中
    • void FD_CLR(int fd, fd_set *fdset):将一个给定的文件描述符从集合中删除
    • int FD_ISSET(int fd, fd_set *fdset):判断指定描述符是否在集合中
  4. timeout是指 select 的等待时长,如果这段时间内所监听的 socket 没有事件就绪,超时返回。

2.3 示例程序

这里写一个程序,Client向Server发送消息,Server接收消息并原样发送给Client,Client再把消息输出到终端。


点击(此处)折叠或打开

  1. /*************************************************************************
  2.     > File Name: server.cpp
  3.     > Author: SongLee
  4.     > E-mail: lisong.shine@qq.com
  5.     > Created Time: 2016年04月28日 星期四 22时02分43秒
  6.     > Personal Blog: http://songlee24.github.io/
  7.  ************************************************************************/
  8. #include<netinet/in.h> // sockaddr_in
  9. #include<sys/types.h> // socket
  10. #include<sys/socket.h> // socket
  11. #include<arpa/inet.h>
  12. #include<unistd.h>
  13. #include<sys/select.h> // select
  14. #include<sys/ioctl.h>
  15. #include<sys/time.h>
  16. #include<iostream>
  17. #include<vector>
  18. #include<string>
  19. #include<cstdlib>
  20. #include<cstdio>
  21. #include<cstring>
  22. using namespace std;
  23. #define BUFFER_SIZE 1024

  24. struct PACKET_HEAD
  25. {
  26.     int length;
  27. };

  28. class Server
  29. {
  30. private:
  31.     struct sockaddr_in server_addr;
  32.     socklen_t server_addr_len;
  33.     int listen_fd; // 监听的fd
  34.     int max_fd; // 最大的fd
  35.     fd_set master_set; // 所有fd集合,包括监听fd和客户端fd
  36.     fd_set working_set; // 工作集合
  37.     struct timeval timeout;
  38. public:
  39.     Server(int port);
  40.     ~Server();
  41.     void Bind();
  42.     void Listen(int queue_len = 20);
  43.     void Accept();
  44.     void Run();
  45.     void Recv(int nums);
  46. };

  47. Server::Server(int port)
  48. {
  49.     bzero(&server_addr, sizeof(server_addr));
  50.     server_addr.sin_family = AF_INET;
  51.     server_addr.sin_addr.s_addr = htons(INADDR_ANY);
  52.     server_addr.sin_port = htons(port);
  53.     // create socket to listen
  54.     listen_fd = socket(PF_INET, SOCK_STREAM, 0);
  55.     if(listen_fd < 0)
  56.     {
  57.         cout << "Create Socket Failed!";
  58.         exit(1);
  59.     }
  60.     int opt = 1;
  61.     setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
  62. }

  63. Server::~Server()
  64. {
  65.     for(int fd=0; fd<=max_fd; ++fd)
  66.     {
  67.         if(FD_ISSET(fd, &master_set))
  68.         {
  69.             close(fd);
  70.         }
  71.     }
  72. }

  73. void Server::Bind()
  74. {
  75.     if(-1 == (bind(listen_fd, (struct sockaddr*)&server_addr, sizeof(server_addr))))
  76.     {
  77.         cout << "Server Bind Failed!";
  78.         exit(1);
  79.     }
  80.     cout << "Bind Successfully.\n";
  81. }

  82. void Server::Listen(int queue_len)
  83. {
  84.     if(-1 == listen(listen_fd, queue_len))
  85.     {
  86.         cout << "Server Listen Failed!";
  87.         exit(1);
  88.     }
  89.     cout << "Listen Successfully.\n";
  90. }

  91. void Server::Accept()
  92. {
  93.     struct sockaddr_in client_addr;
  94.     socklen_t client_addr_len = sizeof(client_addr);

  95.     int new_fd = accept(listen_fd, (struct sockaddr*)&client_addr, &client_addr_len);
  96.     if(new_fd < 0)
  97.     {
  98.         cout << "Server Accept Failed!";
  99.         exit(1);
  100.     }

  101.     cout << "new connection was accepted.\n";
  102.     // 将新建立的连接的fd加入master_set
  103.     FD_SET(new_fd, &master_set);
  104.     if(new_fd > max_fd)
  105.     {
  106.         max_fd = new_fd;
  107.     }
  108. }

  109. void Server::Run()
  110. {
  111.     max_fd = listen_fd; // 初始化max_fd
  112.     FD_ZERO(&master_set);
  113.     FD_SET(listen_fd, &master_set); // 添加监听fd

  114.     while(1)
  115.     {
  116.         FD_ZERO(&working_set);
  117.         memcpy(&working_set, &master_set, sizeof(master_set));

  118.         timeout.tv_sec = 30;
  119.         timeout.tv_usec = 0;

  120.         int nums = select(max_fd+1, &working_set, NULL, NULL, &timeout);
  121.         if(nums < 0)
  122.         {
  123.             cout << "select() error!";
  124.             exit(1);
  125.         }

  126.         if(nums == 0)
  127.         {
  128.             //cout << "select() is timeout!";
  129.             continue;
  130.         }

  131.         if(FD_ISSET(listen_fd, &working_set))
  132.             Accept(); // 有新的客户端请求
  133.         else
  134.             Recv(nums); // 接收客户端的消息
  135.     }
  136. }

  137. void Server::Recv(int nums)
  138. {
  139.     for(int fd=0; fd<=max_fd; ++fd)
  140.     {
  141.         if(FD_ISSET(fd, &working_set))
  142.         {
  143.             bool close_conn = false; // 标记当前连接是否断开了

  144.             PACKET_HEAD head;
  145.             recv(fd, &head, sizeof(head), 0); // 先接受包头,即数据总长度

  146.             char* buffer = new char[head.length];
  147.             bzero(buffer, head.length);
  148.             int total = 0;
  149.             while(total < head.length)
  150.             {
  151.                 int len = recv(fd, buffer + total, head.length - total, 0);
  152.                 if(len < 0)
  153.                 {
  154.                     cout << "recv() error!";
  155.                     close_conn = true;
  156.                     break;
  157.                 }
  158.                 total = total + len;
  159.             }

  160.             if(total == head.length) // 将收到的消息原样发回给客户端
  161.             {
  162.                 int ret1 = send(fd, &head, sizeof(head), 0);
  163.                 int ret2 = send(fd, buffer, head.length, 0);
  164.                 if(ret1 < 0 || ret2 < 0)
  165.                 {
  166.                     cout << "send() error!";
  167.                     close_conn = true;
  168.                 }
  169.             }

  170.             delete buffer;

  171.             if(close_conn) // 当前这个连接有问题,关闭它
  172.             {
  173.                 close(fd);
  174.                 FD_CLR(fd, &master_set);
  175.                 if(fd == max_fd) // 需要更新max_fd;
  176.                 {
  177.                     while(FD_ISSET(max_fd, &master_set) == false)
  178.                         --max_fd;
  179.                 }
  180.             }
  181.         }
  182.     }
  183. }

  184. int main()
  185. {
  186.     Server server(15000);
  187.     server.Bind();
  188.     server.Listen();
  189.     server.Run();
  190.     return 0;
  191. }

点击(此处)折叠或打开

  1. /*************************************************************************
  2.     > File Name: client.cpp
  3.     > Author: SongLee
  4.     > E-mail: lisong.shine@qq.com
  5.     > Created Time: 2016年04月28日 星期四 23时10分15秒
  6.     > Personal Blog: http://songlee24.github.io/
  7.  ************************************************************************/
  8. #include<netinet/in.h> // sockaddr_in
  9. #include<sys/types.h> // socket
  10. #include<sys/socket.h> // socket
  11. #include<arpa/inet.h>
  12. #include<sys/ioctl.h>
  13. #include<unistd.h>
  14. #include<iostream>
  15. #include<string>
  16. #include<cstdlib>
  17. #include<cstdio>
  18. #include<cstring>
  19. using namespace std;
  20. #define BUFFER_SIZE 1024

  21. struct PACKET_HEAD
  22. {
  23.     int length;
  24. };

  25. class Client
  26. {
  27. private:
  28.     struct sockaddr_in server_addr;
  29.     socklen_t server_addr_len;
  30.     int fd;
  31. public:
  32.     Client(string ip, int port);
  33.     ~Client();
  34.     void Connect();
  35.     void Send(string str);
  36.     string Recv();
  37. };

  38. Client::Client(string ip, int port)
  39. {
  40.     bzero(&server_addr, sizeof(server_addr));
  41.     server_addr.sin_family = AF_INET;
  42.     if(inet_pton(AF_INET, ip.c_str(), &server_addr.sin_addr) == 0)
  43.     {
  44.         cout << "Server IP Address Error!";
  45.         exit(1);
  46.     }
  47.     server_addr.sin_port = htons(port);
  48.     server_addr_len = sizeof(server_addr);
  49.     // create socket
  50.     fd = socket(AF_INET, SOCK_STREAM, 0);
  51.     if(fd < 0)
  52.     {
  53.         cout << "Create Socket Failed!";
  54.         exit(1);
  55.     }
  56. }

  57. Client::~Client()
  58. {
  59.     close(fd);
  60. }

  61. void Client::Connect()
  62. {
  63.     cout << "Connecting......" << endl;
  64.     if(connect(fd, (struct sockaddr*)&server_addr, server_addr_len) < 0)
  65.     {
  66.         cout << "Can not Connect to Server IP!";
  67.         exit(1);
  68.     }
  69.     cout << "Connect to Server successfully." << endl;
  70. }

  71. void Client::Send(string str)
  72. {
  73.     PACKET_HEAD head;
  74.     head.length = str.size()+1; // 注意这里需要+1
  75.     int ret1 = send(fd, &head, sizeof(head), 0);
  76.     int ret2 = send(fd, str.c_str(), head.length, 0);
  77.     if(ret1 < 0 || ret2 < 0)
  78.     {
  79.         cout << "Send Message Failed!";
  80.         exit(1);
  81.     }
  82. }

  83. string Client::Recv()
  84. {
  85.     PACKET_HEAD head;
  86.     recv(fd, &head, sizeof(head), 0);

  87.     char* buffer = new char[head.length];
  88.     bzero(buffer, head.length);
  89.     int total = 0;
  90.     while(total < head.length)
  91.     {
  92.         int len = recv(fd, buffer + total, head.length - total, 0);
  93.         if(len < 0)
  94.         {
  95.             cout << "recv() error!";
  96.             break;
  97.         }
  98.         total = total + len;
  99.     }
  100.     string result(buffer);
  101.     delete buffer;
  102.     return result;
  103. }

  104. int main()
  105. {
  106.     Client client("127.0.0.1", 15000);
  107.     client.Connect();
  108.     while(1)
  109.     {
  110.         string msg;
  111.         getline(cin, msg);
  112.         if(msg == "exit")
  113.             break;
  114.         client.Send(msg);
  115.         cout << client.Recv() << endl;
  116.     }
  117.     return 0;
  118. }

对上述程序的一些说明:

  • 监听socket也由select来轮询,不需要单独的线程;
  • working_set每次都要重新设置,因为select调用后它所检测的集合working_set会被修改;
  • 接收很长一段数据时,需要循环多次recv。但是recv函数会阻塞,可以通过自定义包头(保存数据长度)。



阅读(1771) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~