分类: C/C++
2014-12-18 14:40:27
环境:机器xp系统 CPUi5-3470 + 4G内存, 在此基础上安装的虚拟机vmware10(ubuntu)
epoll: 虚拟机+ubuntu14,网卡显示为1000M,单线程
client: 单线程
iocp:xp,网卡显示为100M,多线程(没有测试其线程池模式)
相同的网络协议dataLen(int)+data(N字节)。
client: 多线程
接收到的数据不处理,保确定每次发送的数据头长度是否被覆盖,程序实现的思路基本差不多,只测试两个模式的吞吐能力。
测试结果:
iocp:
server client
接近400MB/s, CPU:18% CPU:80%
epoll:
1000MB/s, CPU:2% CPU:2%(800KB/次)
20MB/s, CPU:1% CPU:1%(20KB/次)
多个Client时,epoll接收数据成倍增加。
也许操作系统、网卡速率不同,差异很多,感觉做服务器,Linux有天生的优势.
当每次发的数据包比较小时,Server端的接收速率上不去,大概在20MB/s多点.
iocp server:
// IOCP_TCPIP_Socket_Server.cpp
#include
#include
#include
#include
using namespace std;
#pragma comment(lib, "Ws2_32.lib") // Socket编程需用的动态链接库
#pragma comment(lib, "Kernel32.lib") // IOCP需要用到的动态链接库
/**
* 结构体名称:PER_IO_DATA
* 结构体功能:重叠I/O需要用到的结构体,临时记录IO数据
**/
#define BUFFSIZE 1024*4
#define SENDSIZE 1024*1
typedef struct{
double nDataTotal;
float fTotalLast;
int nActCount;
int nLastLen;
int TotalSize;
int ReadSize;
DWORD tBegin;
DWORD tEnd;
DWORD tDiff;
}RecvInfo;
typedef struct
{
OVERLAPPED overlapped;
WSABUF databuff;
char buffer[ BUFFSIZE ];
char *pHeader;
int TotalSize;
int ReadSize;
int operationType;
}PER_IO_OPERATEION_DATA, *LPPER_IO_OPERATION_DATA, *LPPER_IO_DATA, PER_IO_DATA;
/**
* 结构体名称:PER_HANDLE_DATA
* 结构体存储:记录单个套接字的数据,包括了套接字的变量及套接字的对应的客户端的地址。
* 结构体作用:当服务器连接上客户端时,信息存储到该结构体中,知道客户端的地址以便于回访。
**/
typedef struct
{
SOCKET socket;
SOCKADDR_STORAGE ClientAddr;
}PER_HANDLE_DATA, *LPPER_HANDLE_DATA;
char *g_pRecvData;
// 定义全局变量
const int DefaultPort = 20000;
vector < PER_HANDLE_DATA* > clientGroup; // 记录客户端的向量组
HANDLE hMutex = CreateMutex(NULL, FALSE, NULL);
DWORD WINAPI ServerWorkThread(LPVOID CompletionPortID);
DWORD WINAPI ServerSendThread(LPVOID IpParam);
void WINAPI IOWorkDummy(DWORD ExeCode, DWORD IOSize, LPOVERLAPPED pOVL);
RecvInfo g_info;
// 开始主函数
int main()
{
// 加载socket动态链接库
WORD wVersionRequested = MAKEWORD(2, 2); // 请求2.2版本的WinSock库
WSADATA wsaData; // 接收Windows Socket的结构信息
DWORD err = WSAStartup(wVersionRequested, &wsaData);
if (0 != err){ // 检查套接字库是否申请成功
cerr << "Request Windows Socket Library Error!\n";
system("pause");
return -1;
}
if(LOBYTE(wsaData.wVersion) != 2 || HIBYTE(wsaData.wVersion) != 2){// 检查是否申请了所需版本的套接字库
WSACleanup();
cerr << "Request Windows Socket Version 2.2 Error!\n";
system("pause");
return -1;
}
// 创建IOCP的内核对象
/**
* 需要用到的函数的原型:
* HANDLE WINAPI CreateIoCompletionPort(
* __in HANDLE FileHandle, // 已经打开的文件句柄或者空句柄,一般是客户端的句柄
* __in HANDLE ExistingCompletionPort, // 已经存在的IOCP句柄
* __in ULONG_PTR CompletionKey, // 完成键,包含了指定I/O完成包的指定文件
* __in DWORD NumberOfConcurrentThreads // 真正并发同时执行最大线程数,一般推介是CPU核心数*2
* );
**/
HANDLE completionPort = CreateIoCompletionPort( INVALID_HANDLE_VALUE, NULL, 0, 0);
if (NULL == completionPort){ // 创建IO内核对象失败
cerr << "CreateIoCompletionPort failed. Error:" << GetLastError() << endl;
system("pause");
return -1;
}
// 创建IOCP线程--线程里面创建线程池
// 确定处理器的核心数量
SYSTEM_INFO mySysInfo;
GetSystemInfo(&mySysInfo);
// 基于处理器的核心数量创建线程
//for(DWORD i = 0; i < (mySysInfo.dwNumberOfProcessors * 2); ++i)
for(DWORD i = 0; i < 4; ++i)
{
// 创建服务器工作器线程,并将完成端口传递到该线程
HANDLE ThreadHandle = CreateThread(NULL, 0, ServerWorkThread, completionPort, 0, NULL);
if(NULL == ThreadHandle){
cerr << "Create Thread Handle failed. Error:" << GetLastError() << endl;
system("pause");
return -1;
}
CloseHandle(ThreadHandle);
}
// 建立流式套接字
SOCKET srvSocket = socket(AF_INET, SOCK_STREAM, 0);
// 绑定SOCKET到本机
SOCKADDR_IN srvAddr;
srvAddr.sin_addr.S_un.S_addr = htonl(INADDR_ANY);
srvAddr.sin_family = AF_INET;
srvAddr.sin_port = htons(DefaultPort);
int bindResult = bind(srvSocket, (SOCKADDR*)&srvAddr, sizeof(SOCKADDR));
if(SOCKET_ERROR == bindResult){
cerr << "Bind failed. Error:" << GetLastError() << endl;
system("pause");
return -1;
}
// 将SOCKET设置为监听模式
int listenResult = listen(srvSocket, 100);
if(SOCKET_ERROR == listenResult){
cerr << "Listen failed. Error: " << GetLastError() << endl;
system("pause");
return -1;
}
// 开始处理IO数据
cout << "本服务器已准备就绪,正在等待客户端的接入...\n";
memset(&g_info, 0, sizeof(RecvInfo));
int rcv_size = 1024*1024*20; /* 接收缓冲区大小为8K */
int optlen = sizeof(rcv_size);
err = setsockopt(srvSocket,SOL_SOCKET,SO_RCVBUF, (char *)&rcv_size, optlen);
if(err<0){
printf("设置接收缓冲区大小错误\n");
}else{
printf("设置接收缓冲区大小OK\n");
}
g_pRecvData = new char[1024*1024*500];
if (g_pRecvData==NULL){
printf("new g_pRecvData false\n");
}
// 创建用于发送数据的线程
HANDLE sendThread = CreateThread(NULL, 0, ServerSendThread, 0, 0, NULL);
/*if (0 == BindIoCompletionCallback((HANDLE)srvSocket, IOWorkDummy, 0))
{
printf("BindIoCompletionCallback is failed. Error code = %d", GetLastError());
}*/
int nPackIndex = 0;
while(true){
PER_HANDLE_DATA * PerHandleData = NULL;
SOCKADDR_IN saRemote;
int RemoteLen;
SOCKET acceptSocket;
// 接收连接,并分配完成端,这儿可以用AcceptEx()
RemoteLen = sizeof(saRemote);
acceptSocket = accept(srvSocket, (SOCKADDR*)&saRemote, &RemoteLen);
if(SOCKET_ERROR == acceptSocket){ // 接收客户端失败
cerr << "Accept Socket Error: " << GetLastError() << endl;
system("pause");
return -1;
}
// 创建用来和套接字关联的单句柄数据信息结构
PerHandleData = (LPPER_HANDLE_DATA)GlobalAlloc(GPTR, sizeof(PER_HANDLE_DATA)); // 在堆中为这个PerHandleData申请指定大小的内存
PerHandleData -> socket = acceptSocket;
memcpy (&PerHandleData -> ClientAddr, &saRemote, RemoteLen);
clientGroup.push_back(PerHandleData); // 将单个客户端数据指针放到客户端组中
// 将接受套接字和完成端口关联
CreateIoCompletionPort((HANDLE)(PerHandleData -> socket), completionPort, (DWORD)PerHandleData, 0);
// 开始在接受套接字上处理I/O使用重叠I/O机制
// 在新建的套接字上投递一个或多个异步
// WSARecv或WSASend请求,这些I/O请求完成后,工作者线程会为I/O请求提供服务
// 单I/O操作数据(I/O重叠)
LPPER_IO_OPERATION_DATA PerIoData = NULL;
PerIoData = (LPPER_IO_OPERATION_DATA)GlobalAlloc(GPTR, sizeof(PER_IO_OPERATEION_DATA));
ZeroMemory(&(PerIoData->overlapped), sizeof(OVERLAPPED));
PerIoData->databuff.len = BUFFSIZE;
PerIoData->databuff.buf = PerIoData->buffer;
PerIoData->pHeader = g_pRecvData + nPackIndex*1024*100;
PerIoData->operationType = 0; // read
PerIoData->TotalSize = 0;
PerIoData->ReadSize = 0;
nPackIndex++;
DWORD RecvBytes;
DWORD Flags = 0;
WSARecv(PerHandleData->socket, &(PerIoData->databuff), 1, &RecvBytes, &Flags, &(PerIoData->overlapped), NULL);
}
system("pause");
free(g_pRecvData);
return 0;
}
// 开始服务工作线程函数
DWORD WINAPI ServerWorkThread(LPVOID IpParam)
{
HANDLE CompletionPort = (HANDLE)IpParam;
DWORD BytesTransferred;
LPOVERLAPPED IpOverlapped;
LPPER_HANDLE_DATA PerHandleData = NULL;
LPPER_IO_DATA PerIoData = NULL;
DWORD RecvBytes;
DWORD Flags = 0;
BOOL bRet = false;
g_info.tBegin = GetTickCount();
while(true){
bRet = GetQueuedCompletionStatus(CompletionPort, &BytesTransferred, (PULONG_PTR)&PerHandleData, (LPOVERLAPPED*)&IpOverlapped, INFINITE);
if(bRet == 0){
cerr << "GetQueuedCompletionStatus Error: " << GetLastError() << endl;
return -1;
}
PerIoData = (LPPER_IO_DATA)CONTAINING_RECORD(IpOverlapped, PER_IO_DATA, overlapped);
// 检查在套接字上是否有错误发生
if(0 == BytesTransferred){
closesocket(PerHandleData->socket);
GlobalFree(PerHandleData);
GlobalFree(PerIoData);
continue;
}
// 开始数据处理,接收来自客户端的数据
//cout << "A Client says: " << PerIoData->databuff.len << endl;
if (PerIoData->TotalSize == 0){
// header1
PerIoData->TotalSize = *((int *)PerIoData->databuff.buf);
if (PerIoData->TotalSize>1024*2000 || PerIoData->TotalSize<0){
//fwrite( g_pBuf, 1, g_lastLen, g_hFile );
//sprintf(g_pBuf, "Error TotalSize2 = %d\n", PerIoData->TotalSize);
//fwrite( g_pBuf, 1, strlen(g_pBuf), g_hFile );
//fflush( g_hFile );
printf("Error TotalSize2 = %d\n", PerIoData->TotalSize);
//break;
}
PerIoData->ReadSize = BytesTransferred - 4;
}else{
PerIoData->ReadSize += BytesTransferred;
}
g_info.nDataTotal += BytesTransferred;
//PerIoData->buff[len] = '\0';
if (PerIoData->TotalSize <= PerIoData->ReadSize && PerIoData->TotalSize>100){
g_info.TotalSize=PerIoData->TotalSize;
g_info.ReadSize=PerIoData->ReadSize;
PerIoData->TotalSize = PerIoData->ReadSize = 0;
//printf("full .... ");
}
if (g_info.nActCount++>50000){
//if (((int)nDataTotal) % (BUFFSIZE*200) == 0){
g_info.nActCount = 0;
g_info.tEnd = GetTickCount();
g_info.tDiff = g_info.tEnd - g_info.tBegin;
float fTotal = (float)g_info.nDataTotal/(1024*1024);
float nDiffTime = (float)g_info.tDiff/1000;
g_info.tBegin = g_info.tEnd;
printf("tm=%5.3f, v=%5.3f, Total=%5.2f, UT=%d, UR=%d \n",
nDiffTime, (fTotal-g_info.fTotalLast)/nDiffTime, fTotal, g_info.TotalSize, g_info.ReadSize);
g_info.fTotalLast = fTotal;
}
// 为下一个重叠调用建立单I/O操作数据
ZeroMemory(&(PerIoData->overlapped), sizeof(OVERLAPPED)); // 清空内存
if (PerIoData->TotalSize==0){
PerIoData->databuff.len = BUFFSIZE;
PerIoData->databuff.buf = PerIoData->buffer;
PerIoData->operationType = 0; // read
}else{
memcpy(PerIoData->pHeader, PerIoData->databuff.buf, PerIoData->ReadSize);
PerIoData->databuff.len = PerIoData->TotalSize-PerIoData->ReadSize;
PerIoData->databuff.buf = PerIoData->pHeader+PerIoData->ReadSize;
PerIoData->operationType = 0; // read
}
WSARecv(PerHandleData->socket, &(PerIoData->databuff), 1, &RecvBytes, &Flags, &(PerIoData->overlapped), NULL);
}
return 0;
}
void WINAPI IOWorkDummy(DWORD ExeCode, DWORD BytesTransferred, LPOVERLAPPED pOVL)
{
LPOVERLAPPED IpOverlapped;
LPPER_HANDLE_DATA PerHandleData = NULL;
LPPER_IO_DATA PerIoData = NULL;
DWORD RecvBytes;
DWORD Flags = 0;
BOOL bRet = false;
g_info.tBegin = GetTickCount();
while(true){
PerIoData = (LPPER_IO_DATA)(pOVL);
// 检查在套接字上是否有错误发生
if(0 == BytesTransferred){
closesocket(PerHandleData->socket);
GlobalFree(PerHandleData);
GlobalFree(PerIoData);
continue;
}
// 开始数据处理,接收来自客户端的数据
//cout << "A Client says: " << PerIoData->databuff.len << endl;
if (PerIoData->TotalSize == 0){
// header1
PerIoData->TotalSize = *((int *)PerIoData->databuff.buf);
if (PerIoData->TotalSize>1024*2000 || PerIoData->TotalSize<0){
//fwrite( g_pBuf, 1, g_lastLen, g_hFile );
//sprintf(g_pBuf, "Error TotalSize2 = %d\n", PerIoData->TotalSize);
//fwrite( g_pBuf, 1, strlen(g_pBuf), g_hFile );
//fflush( g_hFile );
printf("Error TotalSize2 = %d\n", PerIoData->TotalSize);
//break;
}
PerIoData->ReadSize = BytesTransferred - 4;
}else{
PerIoData->ReadSize += BytesTransferred;
}
g_info.nDataTotal += BytesTransferred;
//PerIoData->buff[len] = '\0';
if (PerIoData->TotalSize <= PerIoData->ReadSize && PerIoData->TotalSize>100){
g_info.TotalSize=PerIoData->TotalSize;
g_info.ReadSize=PerIoData->ReadSize;
PerIoData->TotalSize = PerIoData->ReadSize = 0;
//printf("full .... ");
}
if (g_info.nActCount++>50000){
//if (((int)nDataTotal) % (BUFFSIZE*200) == 0){
g_info.nActCount = 0;
g_info.tEnd = GetTickCount();
g_info.tDiff = g_info.tEnd - g_info.tBegin;
float fTotal = (float)g_info.nDataTotal/(1024*1024);
float nDiffTime = (float)g_info.tDiff/1000;
g_info.tBegin = g_info.tEnd;
printf("tm=%5.3f, v=%5.3f, Total=%5.2f, UT=%d, UR=%d \n",
nDiffTime, (fTotal-g_info.fTotalLast)/nDiffTime, fTotal, g_info.TotalSize, g_info.ReadSize);
g_info.fTotalLast = fTotal;
}
// 为下一个重叠调用建立单I/O操作数据
ZeroMemory(&(PerIoData->overlapped), sizeof(OVERLAPPED)); // 清空内存
if (PerIoData->TotalSize==0){
PerIoData->databuff.len = BUFFSIZE;
PerIoData->databuff.buf = PerIoData->buffer;
PerIoData->operationType = 0; // read
}else{
memcpy(PerIoData->pHeader, PerIoData->databuff.buf, PerIoData->ReadSize);
PerIoData->databuff.len = PerIoData->TotalSize-PerIoData->ReadSize;
PerIoData->databuff.buf = PerIoData->pHeader+PerIoData->ReadSize;
PerIoData->operationType = 0; // read
}
WSARecv(PerHandleData->socket, &(PerIoData->databuff), 1, &RecvBytes, &Flags, &(PerIoData->overlapped), NULL);
}
} // IOWorkerDummy()
// 发送信息的线程执行函数
DWORD WINAPI ServerSendThread(LPVOID IpParam)
{
while(1){
char talk[200];
gets(talk);
int len;
for (len = 0; talk[len] != '\0'; ++len){
// 找出这个字符组的长度
}
talk[len] = '\n';
talk[++len] = '\0';
printf("I Say:");
cout << talk;
WaitForSingleObject(hMutex,INFINITE);
for(int i = 0; i < clientGroup.size(); ++i){
send(clientGroup[i]->socket, talk, 200, 0); // 发送信息
}
ReleaseMutex(hMutex);
}
return 0;
}