一、IO多路复用
所谓IO多路复用,就是通过一种机制,一个进程可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。
支持IO多路复用的系统调用有select、poll、epoll,这些调用都是内核级别的。但select、poll、epoll本质上都是同步I/O,先是block住等待就绪的socket,再是block住将数据从内核拷贝到用户内存。
当然select、poll、epoll之间也是有区别的,如下表:
\
|
select
|
poll
|
epoll
|
操作方式
|
遍历
|
遍历
|
回调
|
底层实现
|
数组
|
链表
|
哈希表
|
IO效率
|
每次调用都进行线性遍历,时间复杂度为O(n)
|
每次调用都进行线性遍历,时间复杂度为O(n)
|
事件通知方式,每当fd就绪,系统注册的回调函数就会被调用,将就绪fd放到rdllist里面。时间复杂度O(1)
|
最大连接数
|
1024(x86)或 2048(x64)
|
无上限
|
无上限
|
fd拷贝
|
每次调用select,都需要把fd集合从用户态拷贝到内核态
|
每次调用poll,都需要把fd集合从用户态拷贝到内核态
|
调用epoll_ctl时拷贝进内核并保存,之后每次epoll_wait不拷贝
|
二、select示例
2.1 流程图
注:摘自。
2.2 相关函数
-
#include <sys/select.h>
-
#include <sys/time.h>
-
-
int select(int max_fd, fd_set *readset, fd_set *writeset, fd_set *exceptset, struct timeval *timeout)
-
该select()函数返回就绪描述符的数目,超时返回0,出错返回-1
-
第一个参数max_fd指待测试的fd个数,它的值是待测试的最大文件描述符加1,文件描述符从0开始到max_fd-1都将被测试。
-
中间三个参数readset、writeset和exceptset指定要让内核测试读、写和异常条件的fd集合,如果不需要测试可以设置为NULL。操作fd_set有四个宏:
-
void FD_ZERO(fd_set *fdset):清空集合
-
void FD_SET(int fd, fd_set *fdset):将一个给定的文件描述符加入集合之中
-
void FD_CLR(int fd, fd_set *fdset):将一个给定的文件描述符从集合中删除
-
int FD_ISSET(int fd, fd_set *fdset):判断指定描述符是否在集合中
-
timeout是指 select 的等待时长,如果这段时间内所监听的 socket 没有事件就绪,超时返回。
2.3 示例程序
这里写一个程序,Client向Server发送消息,Server接收消息并原样发送给Client,Client再把消息输出到终端。
-
/*************************************************************************
-
> File Name: server.cpp
-
> Author: SongLee
-
> E-mail: lisong.shine@qq.com
-
> Created Time: 2016年04月28日 星期四 22时02分43秒
-
> Personal Blog: http://songlee24.github.io/
-
************************************************************************/
-
#include<netinet/in.h> // sockaddr_in
-
#include<sys/types.h> // socket
-
#include<sys/socket.h> // socket
-
#include<arpa/inet.h>
-
#include<unistd.h>
-
#include<sys/select.h> // select
-
#include<sys/ioctl.h>
-
#include<sys/time.h>
-
#include<iostream>
-
#include<vector>
-
#include<string>
-
#include<cstdlib>
-
#include<cstdio>
-
#include<cstring>
-
using namespace std;
-
#define BUFFER_SIZE 1024
-
-
struct PACKET_HEAD
-
{
-
int length;
-
};
-
-
class Server
-
{
-
private:
-
struct sockaddr_in server_addr;
-
socklen_t server_addr_len;
-
int listen_fd; // 监听的fd
-
int max_fd; // 最大的fd
-
fd_set master_set; // 所有fd集合,包括监听fd和客户端fd
-
fd_set working_set; // 工作集合
-
struct timeval timeout;
-
public:
-
Server(int port);
-
~Server();
-
void Bind();
-
void Listen(int queue_len = 20);
-
void Accept();
-
void Run();
-
void Recv(int nums);
-
};
-
-
Server::Server(int port)
-
{
-
bzero(&server_addr, sizeof(server_addr));
-
server_addr.sin_family = AF_INET;
-
server_addr.sin_addr.s_addr = htons(INADDR_ANY);
-
server_addr.sin_port = htons(port);
-
// create socket to listen
-
listen_fd = socket(PF_INET, SOCK_STREAM, 0);
-
if(listen_fd < 0)
-
{
-
cout << "Create Socket Failed!";
-
exit(1);
-
}
-
int opt = 1;
-
setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
-
}
-
-
Server::~Server()
-
{
-
for(int fd=0; fd<=max_fd; ++fd)
-
{
-
if(FD_ISSET(fd, &master_set))
-
{
-
close(fd);
-
}
-
}
-
}
-
-
void Server::Bind()
-
{
-
if(-1 == (bind(listen_fd, (struct sockaddr*)&server_addr, sizeof(server_addr))))
-
{
-
cout << "Server Bind Failed!";
-
exit(1);
-
}
-
cout << "Bind Successfully.\n";
-
}
-
-
void Server::Listen(int queue_len)
-
{
-
if(-1 == listen(listen_fd, queue_len))
-
{
-
cout << "Server Listen Failed!";
-
exit(1);
-
}
-
cout << "Listen Successfully.\n";
-
}
-
-
void Server::Accept()
-
{
-
struct sockaddr_in client_addr;
-
socklen_t client_addr_len = sizeof(client_addr);
-
-
int new_fd = accept(listen_fd, (struct sockaddr*)&client_addr, &client_addr_len);
-
if(new_fd < 0)
-
{
-
cout << "Server Accept Failed!";
-
exit(1);
-
}
-
-
cout << "new connection was accepted.\n";
-
// 将新建立的连接的fd加入master_set
-
FD_SET(new_fd, &master_set);
-
if(new_fd > max_fd)
-
{
-
max_fd = new_fd;
-
}
-
}
-
-
void Server::Run()
-
{
-
max_fd = listen_fd; // 初始化max_fd
-
FD_ZERO(&master_set);
-
FD_SET(listen_fd, &master_set); // 添加监听fd
-
-
while(1)
-
{
-
FD_ZERO(&working_set);
-
memcpy(&working_set, &master_set, sizeof(master_set));
-
-
timeout.tv_sec = 30;
-
timeout.tv_usec = 0;
-
-
int nums = select(max_fd+1, &working_set, NULL, NULL, &timeout);
-
if(nums < 0)
-
{
-
cout << "select() error!";
-
exit(1);
-
}
-
-
if(nums == 0)
-
{
-
//cout << "select() is timeout!";
-
continue;
-
}
-
-
if(FD_ISSET(listen_fd, &working_set))
-
Accept(); // 有新的客户端请求
-
else
-
Recv(nums); // 接收客户端的消息
-
}
-
}
-
-
void Server::Recv(int nums)
-
{
-
for(int fd=0; fd<=max_fd; ++fd)
-
{
-
if(FD_ISSET(fd, &working_set))
-
{
-
bool close_conn = false; // 标记当前连接是否断开了
-
-
PACKET_HEAD head;
-
recv(fd, &head, sizeof(head), 0); // 先接受包头,即数据总长度
-
-
char* buffer = new char[head.length];
-
bzero(buffer, head.length);
-
int total = 0;
-
while(total < head.length)
-
{
-
int len = recv(fd, buffer + total, head.length - total, 0);
-
if(len < 0)
-
{
-
cout << "recv() error!";
-
close_conn = true;
-
break;
-
}
-
total = total + len;
-
}
-
-
if(total == head.length) // 将收到的消息原样发回给客户端
-
{
-
int ret1 = send(fd, &head, sizeof(head), 0);
-
int ret2 = send(fd, buffer, head.length, 0);
-
if(ret1 < 0 || ret2 < 0)
-
{
-
cout << "send() error!";
-
close_conn = true;
-
}
-
}
-
-
delete buffer;
-
-
if(close_conn) // 当前这个连接有问题,关闭它
-
{
-
close(fd);
-
FD_CLR(fd, &master_set);
-
if(fd == max_fd) // 需要更新max_fd;
-
{
-
while(FD_ISSET(max_fd, &master_set) == false)
-
--max_fd;
-
}
-
}
-
}
-
}
-
}
-
-
int main()
-
{
-
Server server(15000);
-
server.Bind();
-
server.Listen();
-
server.Run();
-
return 0;
-
}
-
/*************************************************************************
-
> File Name: client.cpp
-
> Author: SongLee
-
> E-mail: lisong.shine@qq.com
-
> Created Time: 2016年04月28日 星期四 23时10分15秒
-
> Personal Blog: http://songlee24.github.io/
-
************************************************************************/
-
#include<netinet/in.h> // sockaddr_in
-
#include<sys/types.h> // socket
-
#include<sys/socket.h> // socket
-
#include<arpa/inet.h>
-
#include<sys/ioctl.h>
-
#include<unistd.h>
-
#include<iostream>
-
#include<string>
-
#include<cstdlib>
-
#include<cstdio>
-
#include<cstring>
-
using namespace std;
-
#define BUFFER_SIZE 1024
-
-
struct PACKET_HEAD
-
{
-
int length;
-
};
-
-
class Client
-
{
-
private:
-
struct sockaddr_in server_addr;
-
socklen_t server_addr_len;
-
int fd;
-
public:
-
Client(string ip, int port);
-
~Client();
-
void Connect();
-
void Send(string str);
-
string Recv();
-
};
-
-
Client::Client(string ip, int port)
-
{
-
bzero(&server_addr, sizeof(server_addr));
-
server_addr.sin_family = AF_INET;
-
if(inet_pton(AF_INET, ip.c_str(), &server_addr.sin_addr) == 0)
-
{
-
cout << "Server IP Address Error!";
-
exit(1);
-
}
-
server_addr.sin_port = htons(port);
-
server_addr_len = sizeof(server_addr);
-
// create socket
-
fd = socket(AF_INET, SOCK_STREAM, 0);
-
if(fd < 0)
-
{
-
cout << "Create Socket Failed!";
-
exit(1);
-
}
-
}
-
-
Client::~Client()
-
{
-
close(fd);
-
}
-
-
void Client::Connect()
-
{
-
cout << "Connecting......" << endl;
-
if(connect(fd, (struct sockaddr*)&server_addr, server_addr_len) < 0)
-
{
-
cout << "Can not Connect to Server IP!";
-
exit(1);
-
}
-
cout << "Connect to Server successfully." << endl;
-
}
-
-
void Client::Send(string str)
-
{
-
PACKET_HEAD head;
-
head.length = str.size()+1; // 注意这里需要+1
-
int ret1 = send(fd, &head, sizeof(head), 0);
-
int ret2 = send(fd, str.c_str(), head.length, 0);
-
if(ret1 < 0 || ret2 < 0)
-
{
-
cout << "Send Message Failed!";
-
exit(1);
-
}
-
}
-
-
string Client::Recv()
-
{
-
PACKET_HEAD head;
-
recv(fd, &head, sizeof(head), 0);
-
-
char* buffer = new char[head.length];
-
bzero(buffer, head.length);
-
int total = 0;
-
while(total < head.length)
-
{
-
int len = recv(fd, buffer + total, head.length - total, 0);
-
if(len < 0)
-
{
-
cout << "recv() error!";
-
break;
-
}
-
total = total + len;
-
}
-
string result(buffer);
-
delete buffer;
-
return result;
-
}
-
-
int main()
-
{
-
Client client("127.0.0.1", 15000);
-
client.Connect();
-
while(1)
-
{
-
string msg;
-
getline(cin, msg);
-
if(msg == "exit")
-
break;
-
client.Send(msg);
-
cout << client.Recv() << endl;
-
}
-
return 0;
-
}
对上述程序的一些说明:
-
监听socket也由select来轮询,不需要单独的线程;
-
working_set每次都要重新设置,因为select调用后它所检测的集合working_set会被修改;
-
接收很长一段数据时,需要循环多次recv。但是recv函数会阻塞,可以通过自定义包头(保存数据长度)。
阅读(1771) | 评论(0) | 转发(0) |