Chinaunix首页 | 论坛 | 博客
  • 博客访问: 2716394
  • 博文数量: 416
  • 博客积分: 10220
  • 博客等级: 上将
  • 技术积分: 4193
  • 用 户 组: 普通用户
  • 注册时间: 2006-12-15 09:47
文章分类

全部博文(416)

文章存档

2022年(1)

2021年(1)

2020年(1)

2019年(5)

2018年(7)

2017年(6)

2016年(7)

2015年(11)

2014年(1)

2012年(5)

2011年(7)

2010年(35)

2009年(64)

2008年(48)

2007年(177)

2006年(40)

我的朋友

分类: C/C++

2014-12-18 14:40:27

环境:机器xp系统 CPUi5-3470 + 4G内存, 在此基础上安装的虚拟机vmware10(ubuntu)
epoll: 虚拟机+ubuntu14,网卡显示为1000M,单线程
client: 单线程

iocp:xp,网卡显示为100M,多线程(没有测试其线程池模式)
相同的网络协议dataLen(int)+data(N字节)。
client: 多线程

接收到的数据不处理,保确定每次发送的数据头长度是否被覆盖,程序实现的思路基本差不多,只测试两个模式的吞吐能力。
测试结果:
iocp:
server                                    client
接近400MB/s,  CPU:18%        CPU:80%
epoll:   
1000MB/s,      CPU:2%           CPU:2%(800KB/次)
20MB/s,          CPU:1%           CPU:1%(20KB/次)

多个Client时,epoll接收数据成倍增加。

也许操作系统、网卡速率不同,差异很多,感觉做服务器,Linux有天生的优势.
当每次发的数据包比较小时,Server端的接收速率上不去,大概在20MB/s多点.

iocp server:
// IOCP_TCPIP_Socket_Server.cpp  
 
#include   
#include   
#include   
#include   
 
using namespace std; 
 
#pragma comment(lib, "Ws2_32.lib")      // Socket编程需用的动态链接库  
#pragma comment(lib, "Kernel32.lib")    // IOCP需要用到的动态链接库  
 
/**
 * 结构体名称:PER_IO_DATA
 * 结构体功能:重叠I/O需要用到的结构体,临时记录IO数据
 **/ 
 
#define BUFFSIZE 1024*4
#define SENDSIZE  1024*1

typedef struct{
 double nDataTotal;
 float fTotalLast;
 int  nActCount;
 int  nLastLen;
 int  TotalSize;
 int  ReadSize;
 DWORD tBegin;
 DWORD tEnd;
 DWORD tDiff;
}RecvInfo;

typedef struct 

    OVERLAPPED overlapped; 
    WSABUF databuff; 
    char buffer[ BUFFSIZE ];
 char *pHeader;
    int TotalSize;
 int ReadSize;
    int operationType; 
}PER_IO_OPERATEION_DATA, *LPPER_IO_OPERATION_DATA, *LPPER_IO_DATA, PER_IO_DATA; 
 
/**
 * 结构体名称:PER_HANDLE_DATA
 * 结构体存储:记录单个套接字的数据,包括了套接字的变量及套接字的对应的客户端的地址。
 * 结构体作用:当服务器连接上客户端时,信息存储到该结构体中,知道客户端的地址以便于回访。
 **/ 
typedef struct 

    SOCKET socket; 
    SOCKADDR_STORAGE ClientAddr; 
}PER_HANDLE_DATA, *LPPER_HANDLE_DATA; 

char *g_pRecvData;
// 定义全局变量  
const int DefaultPort = 20000;        
vector < PER_HANDLE_DATA* > clientGroup;      // 记录客户端的向量组  
 
HANDLE hMutex = CreateMutex(NULL, FALSE, NULL); 
DWORD WINAPI ServerWorkThread(LPVOID CompletionPortID); 
DWORD WINAPI ServerSendThread(LPVOID IpParam); 
void  WINAPI IOWorkDummy(DWORD ExeCode, DWORD IOSize, LPOVERLAPPED pOVL);

RecvInfo g_info;
// 开始主函数  
int main() 

// 加载socket动态链接库  
    WORD wVersionRequested = MAKEWORD(2, 2); // 请求2.2版本的WinSock库  
    WSADATA wsaData;    // 接收Windows Socket的结构信息  
    DWORD err = WSAStartup(wVersionRequested, &wsaData); 
 
    if (0 != err){  // 检查套接字库是否申请成功  
        cerr << "Request Windows Socket Library Error!\n"; 
        system("pause"); 
        return -1; 
    } 
    if(LOBYTE(wsaData.wVersion) != 2 || HIBYTE(wsaData.wVersion) != 2){// 检查是否申请了所需版本的套接字库  
        WSACleanup(); 
        cerr << "Request Windows Socket Version 2.2 Error!\n"; 
        system("pause"); 
        return -1; 
    } 
 
// 创建IOCP的内核对象  
    /**
     * 需要用到的函数的原型:
     * HANDLE WINAPI CreateIoCompletionPort(
     *    __in   HANDLE FileHandle,     // 已经打开的文件句柄或者空句柄,一般是客户端的句柄
     *    __in   HANDLE ExistingCompletionPort, // 已经存在的IOCP句柄
     *    __in   ULONG_PTR CompletionKey,   // 完成键,包含了指定I/O完成包的指定文件
     *    __in   DWORD NumberOfConcurrentThreads // 真正并发同时执行最大线程数,一般推介是CPU核心数*2
     * );
     **/ 
    HANDLE completionPort = CreateIoCompletionPort( INVALID_HANDLE_VALUE, NULL, 0, 0); 
    if (NULL == completionPort){    // 创建IO内核对象失败  
        cerr << "CreateIoCompletionPort failed. Error:" << GetLastError() << endl; 
        system("pause"); 
        return -1; 
    } 
 
// 创建IOCP线程--线程里面创建线程池  
 
    // 确定处理器的核心数量  
    SYSTEM_INFO mySysInfo; 
    GetSystemInfo(&mySysInfo); 
 
    // 基于处理器的核心数量创建线程  
    //for(DWORD i = 0; i < (mySysInfo.dwNumberOfProcessors * 2); ++i)
 for(DWORD i = 0; i < 4; ++i)
 { 
        // 创建服务器工作器线程,并将完成端口传递到该线程  
        HANDLE ThreadHandle = CreateThread(NULL, 0, ServerWorkThread, completionPort, 0, NULL); 
        if(NULL == ThreadHandle){ 
            cerr << "Create Thread Handle failed. Error:" << GetLastError() << endl; 
        system("pause"); 
            return -1; 
        } 
        CloseHandle(ThreadHandle); 
   } 
 
// 建立流式套接字  
    SOCKET srvSocket = socket(AF_INET, SOCK_STREAM, 0); 
 
// 绑定SOCKET到本机  
    SOCKADDR_IN srvAddr; 
    srvAddr.sin_addr.S_un.S_addr = htonl(INADDR_ANY); 
    srvAddr.sin_family = AF_INET; 
    srvAddr.sin_port = htons(DefaultPort); 
    int bindResult = bind(srvSocket, (SOCKADDR*)&srvAddr, sizeof(SOCKADDR)); 
    if(SOCKET_ERROR == bindResult){ 
        cerr << "Bind failed. Error:" << GetLastError() << endl; 
        system("pause"); 
        return -1; 
    } 
 
// 将SOCKET设置为监听模式  
    int listenResult = listen(srvSocket, 100); 
    if(SOCKET_ERROR == listenResult){ 
        cerr << "Listen failed. Error: " << GetLastError() << endl; 
        system("pause"); 
        return -1; 
    } 
     
// 开始处理IO数据  
    cout << "本服务器已准备就绪,正在等待客户端的接入...\n"; 
   memset(&g_info, 0, sizeof(RecvInfo));

 int rcv_size = 1024*1024*20;    /* 接收缓冲区大小为8K */
 int optlen = sizeof(rcv_size);
 err = setsockopt(srvSocket,SOL_SOCKET,SO_RCVBUF, (char *)&rcv_size, optlen);
 if(err<0){
  printf("设置接收缓冲区大小错误\n");
 }else{
  printf("设置接收缓冲区大小OK\n");
 }

 g_pRecvData = new char[1024*1024*500];
 if (g_pRecvData==NULL){
  printf("new g_pRecvData false\n");
 }
    // 创建用于发送数据的线程  
    HANDLE sendThread = CreateThread(NULL, 0, ServerSendThread, 0, 0, NULL); 
 /*if (0 == BindIoCompletionCallback((HANDLE)srvSocket, IOWorkDummy, 0))
 {
  printf("BindIoCompletionCallback is failed. Error code = %d", GetLastError());
 }*/

 int nPackIndex = 0;
    while(true){ 
        PER_HANDLE_DATA * PerHandleData = NULL; 
        SOCKADDR_IN saRemote; 
        int RemoteLen; 
        SOCKET acceptSocket; 
 
        // 接收连接,并分配完成端,这儿可以用AcceptEx()  
        RemoteLen = sizeof(saRemote); 
        acceptSocket = accept(srvSocket, (SOCKADDR*)&saRemote, &RemoteLen); 
        if(SOCKET_ERROR == acceptSocket){   // 接收客户端失败  
            cerr << "Accept Socket Error: " << GetLastError() << endl; 
            system("pause"); 
            return -1; 
        } 
         
        // 创建用来和套接字关联的单句柄数据信息结构  
        PerHandleData = (LPPER_HANDLE_DATA)GlobalAlloc(GPTR, sizeof(PER_HANDLE_DATA));  // 在堆中为这个PerHandleData申请指定大小的内存  
        PerHandleData -> socket = acceptSocket; 
        memcpy (&PerHandleData -> ClientAddr, &saRemote, RemoteLen); 
        clientGroup.push_back(PerHandleData);       // 将单个客户端数据指针放到客户端组中  
 
        // 将接受套接字和完成端口关联  
        CreateIoCompletionPort((HANDLE)(PerHandleData -> socket), completionPort, (DWORD)PerHandleData, 0); 
 
         
        // 开始在接受套接字上处理I/O使用重叠I/O机制  
        // 在新建的套接字上投递一个或多个异步  
        // WSARecv或WSASend请求,这些I/O请求完成后,工作者线程会为I/O请求提供服务      
        // 单I/O操作数据(I/O重叠)  
        LPPER_IO_OPERATION_DATA PerIoData = NULL; 
        PerIoData = (LPPER_IO_OPERATION_DATA)GlobalAlloc(GPTR, sizeof(PER_IO_OPERATEION_DATA)); 
        ZeroMemory(&(PerIoData->overlapped), sizeof(OVERLAPPED)); 
        PerIoData->databuff.len = BUFFSIZE; 
        PerIoData->databuff.buf = PerIoData->buffer;
  PerIoData->pHeader = g_pRecvData + nPackIndex*1024*100;
        PerIoData->operationType = 0;    // read  
  PerIoData->TotalSize = 0;
  PerIoData->ReadSize = 0;
  nPackIndex++;
 
        DWORD RecvBytes; 
        DWORD Flags = 0; 
        WSARecv(PerHandleData->socket, &(PerIoData->databuff), 1, &RecvBytes, &Flags, &(PerIoData->overlapped), NULL); 
    } 
 
    system("pause");
 free(g_pRecvData);
    return 0; 

 
// 开始服务工作线程函数  
DWORD WINAPI ServerWorkThread(LPVOID IpParam) 

    HANDLE CompletionPort = (HANDLE)IpParam; 
    DWORD BytesTransferred; 
    LPOVERLAPPED IpOverlapped; 
    LPPER_HANDLE_DATA PerHandleData = NULL; 
    LPPER_IO_DATA PerIoData = NULL; 
    DWORD RecvBytes; 
    DWORD Flags = 0; 
    BOOL bRet = false; 
 
 g_info.tBegin = GetTickCount();
    while(true){ 
        bRet = GetQueuedCompletionStatus(CompletionPort, &BytesTransferred, (PULONG_PTR)&PerHandleData, (LPOVERLAPPED*)&IpOverlapped, INFINITE); 
        if(bRet == 0){ 
            cerr << "GetQueuedCompletionStatus Error: " << GetLastError() << endl; 
            return -1; 
        } 
        PerIoData = (LPPER_IO_DATA)CONTAINING_RECORD(IpOverlapped, PER_IO_DATA, overlapped); 
         
        // 检查在套接字上是否有错误发生  
        if(0 == BytesTransferred){ 
            closesocket(PerHandleData->socket); 
            GlobalFree(PerHandleData); 
            GlobalFree(PerIoData); 
            continue; 
        } 
         
        // 开始数据处理,接收来自客户端的数据  
  //cout << "A Client says: " << PerIoData->databuff.len << endl; 
        if (PerIoData->TotalSize == 0){
   // header1
   PerIoData->TotalSize = *((int *)PerIoData->databuff.buf);
   if (PerIoData->TotalSize>1024*2000 || PerIoData->TotalSize<0){
    //fwrite( g_pBuf, 1, g_lastLen, g_hFile );
    //sprintf(g_pBuf, "Error TotalSize2 = %d\n", PerIoData->TotalSize);
    //fwrite( g_pBuf, 1, strlen(g_pBuf), g_hFile );
    //fflush( g_hFile );
    printf("Error TotalSize2 = %d\n", PerIoData->TotalSize);
    //break;
   }
   
   PerIoData->ReadSize = BytesTransferred - 4;
  }else{
   PerIoData->ReadSize += BytesTransferred;
  }
  g_info.nDataTotal += BytesTransferred;
  
  //PerIoData->buff[len] = '\0';   
  if (PerIoData->TotalSize <= PerIoData->ReadSize && PerIoData->TotalSize>100){
   g_info.TotalSize=PerIoData->TotalSize;
   g_info.ReadSize=PerIoData->ReadSize;
   PerIoData->TotalSize = PerIoData->ReadSize = 0;
   //printf("full .... ");
  }
  
  if (g_info.nActCount++>50000){
   //if (((int)nDataTotal) % (BUFFSIZE*200) == 0){
   g_info.nActCount = 0;
   g_info.tEnd = GetTickCount();
   g_info.tDiff = g_info.tEnd - g_info.tBegin;

   float fTotal = (float)g_info.nDataTotal/(1024*1024);
   float nDiffTime = (float)g_info.tDiff/1000;
   g_info.tBegin = g_info.tEnd;
   printf("tm=%5.3f, v=%5.3f, Total=%5.2f, UT=%d, UR=%d \n",
     nDiffTime, (fTotal-g_info.fTotalLast)/nDiffTime, fTotal, g_info.TotalSize, g_info.ReadSize);
   g_info.fTotalLast = fTotal;
  }
  
        // 为下一个重叠调用建立单I/O操作数据  
        ZeroMemory(&(PerIoData->overlapped), sizeof(OVERLAPPED)); // 清空内存  
  if (PerIoData->TotalSize==0){
   PerIoData->databuff.len = BUFFSIZE; 
   PerIoData->databuff.buf = PerIoData->buffer; 
   PerIoData->operationType = 0;    // read  
  }else{
   memcpy(PerIoData->pHeader, PerIoData->databuff.buf, PerIoData->ReadSize);
   PerIoData->databuff.len = PerIoData->TotalSize-PerIoData->ReadSize; 
   PerIoData->databuff.buf = PerIoData->pHeader+PerIoData->ReadSize;
   PerIoData->operationType = 0;    // read  
  }

       
        WSARecv(PerHandleData->socket, &(PerIoData->databuff), 1, &RecvBytes, &Flags, &(PerIoData->overlapped), NULL); 
    } 
 
    return 0; 

 
void WINAPI IOWorkDummy(DWORD ExeCode, DWORD BytesTransferred, LPOVERLAPPED pOVL)
{
 LPOVERLAPPED IpOverlapped; 
 LPPER_HANDLE_DATA PerHandleData = NULL; 
 LPPER_IO_DATA PerIoData = NULL; 
 DWORD RecvBytes; 
 DWORD Flags = 0; 
 BOOL bRet = false; 

 g_info.tBegin = GetTickCount();
 while(true){ 
  PerIoData = (LPPER_IO_DATA)(pOVL); 

  // 检查在套接字上是否有错误发生  
  if(0 == BytesTransferred){ 
   closesocket(PerHandleData->socket); 
   GlobalFree(PerHandleData); 
   GlobalFree(PerIoData); 
   continue; 
  } 

  // 开始数据处理,接收来自客户端的数据  
  //cout << "A Client says: " << PerIoData->databuff.len << endl; 
  if (PerIoData->TotalSize == 0){
   // header1
   PerIoData->TotalSize = *((int *)PerIoData->databuff.buf);
   if (PerIoData->TotalSize>1024*2000 || PerIoData->TotalSize<0){
    //fwrite( g_pBuf, 1, g_lastLen, g_hFile );
    //sprintf(g_pBuf, "Error TotalSize2 = %d\n", PerIoData->TotalSize);
    //fwrite( g_pBuf, 1, strlen(g_pBuf), g_hFile );
    //fflush( g_hFile );
    printf("Error TotalSize2 = %d\n", PerIoData->TotalSize);
    //break;
   }

   PerIoData->ReadSize = BytesTransferred - 4;
  }else{
   PerIoData->ReadSize += BytesTransferred;
  }
  g_info.nDataTotal += BytesTransferred;

  //PerIoData->buff[len] = '\0';   
  if (PerIoData->TotalSize <= PerIoData->ReadSize && PerIoData->TotalSize>100){
   g_info.TotalSize=PerIoData->TotalSize;
   g_info.ReadSize=PerIoData->ReadSize;
   PerIoData->TotalSize = PerIoData->ReadSize = 0;
   //printf("full .... ");
  }

  if (g_info.nActCount++>50000){
   //if (((int)nDataTotal) % (BUFFSIZE*200) == 0){
   g_info.nActCount = 0;
   g_info.tEnd = GetTickCount();
   g_info.tDiff = g_info.tEnd - g_info.tBegin;

   float fTotal = (float)g_info.nDataTotal/(1024*1024);
   float nDiffTime = (float)g_info.tDiff/1000;
   g_info.tBegin = g_info.tEnd;
   printf("tm=%5.3f, v=%5.3f, Total=%5.2f, UT=%d, UR=%d \n",
    nDiffTime, (fTotal-g_info.fTotalLast)/nDiffTime, fTotal, g_info.TotalSize, g_info.ReadSize);
   g_info.fTotalLast = fTotal;
  }

  // 为下一个重叠调用建立单I/O操作数据  
  ZeroMemory(&(PerIoData->overlapped), sizeof(OVERLAPPED)); // 清空内存  
  if (PerIoData->TotalSize==0){
   PerIoData->databuff.len = BUFFSIZE; 
   PerIoData->databuff.buf = PerIoData->buffer; 
   PerIoData->operationType = 0;    // read  
  }else{
   memcpy(PerIoData->pHeader, PerIoData->databuff.buf, PerIoData->ReadSize);
   PerIoData->databuff.len = PerIoData->TotalSize-PerIoData->ReadSize; 
   PerIoData->databuff.buf = PerIoData->pHeader+PerIoData->ReadSize;
   PerIoData->operationType = 0;    // read  
  }


  WSARecv(PerHandleData->socket, &(PerIoData->databuff), 1, &RecvBytes, &Flags, &(PerIoData->overlapped), NULL); 
 } 
} // IOWorkerDummy()
 
// 发送信息的线程执行函数  
DWORD WINAPI ServerSendThread(LPVOID IpParam) 

    while(1){ 
        char talk[200]; 
        gets(talk); 
        int len; 
        for (len = 0; talk[len] != '\0'; ++len){ 
            // 找出这个字符组的长度  
        } 
        talk[len] = '\n'; 
        talk[++len] = '\0'; 
        printf("I Say:"); 
        cout << talk; 
        WaitForSingleObject(hMutex,INFINITE); 
        for(int i = 0; i < clientGroup.size(); ++i){ 
            send(clientGroup[i]->socket, talk, 200, 0);  // 发送信息  
        } 
        ReleaseMutex(hMutex);  
    } 
    return 0; 

阅读(5344) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~