Chinaunix首页 | 论坛 | 博客
  • 博客访问: 2716391
  • 博文数量: 416
  • 博客积分: 10220
  • 博客等级: 上将
  • 技术积分: 4193
  • 用 户 组: 普通用户
  • 注册时间: 2006-12-15 09:47
文章分类

全部博文(416)

文章存档

2022年(1)

2021年(1)

2020年(1)

2019年(5)

2018年(7)

2017年(6)

2016年(7)

2015年(11)

2014年(1)

2012年(5)

2011年(7)

2010年(35)

2009年(64)

2008年(48)

2007年(177)

2006年(40)

我的朋友

分类: C/C++

2015-02-26 14:13:20

// IOCP_TCPIP_Socket_Server.cpp   
#include "stdafx.h"
#include "SafeLocker.h"  
#include   
#include
#include
#include
#include   
using namespace std;  
  
#pragma comment(lib, "Ws2_32.lib")      // Socket编程需用的动态链接库   
#pragma comment(lib, "Kernel32.lib")    // IOCP需要用到的动态链接库   
  
/** 
 * 结构体名称:PER_IO_DATA 
 * 结构体功能:重叠I/O需要用到的结构体,临时记录IO数据 
 **/  
 
#define BUFFSIZE (1024*10)
#define RECVSIZE (1024*10)
#define SENDSIZE (1024*5)
#define STATUS_ACCEPT 3
#define IO_TYPE_READ 5
#define IO_TYPE_WRITE 6
#define BUF_SIZE_TO_ACCEPT ((sizeof (sockaddr_in) + 16) * 2)
#define SOCKET_SIZE 20000
typedef struct{
double nDataTotal;
float fTotalLast;
int nActCount;
int nLastLen;
int TotalSize;
int ReadSize;
DWORD tBegin;
DWORD tEnd;
DWORD tDiff;
}RecvInfo;


typedef struct  
{  
    OVERLAPPED overlapped;  
    WSABUF databuff;  
    char buffer[ BUFFSIZE ];
BYTE BufferForAccept[BUF_SIZE_TO_ACCEPT];
char *pHeader;
    int TotalSize;
int ReadSize;
    int operationType;
bool bStart;
int NetSizeInfoTempLen;
BYTE NetSizeInfoTemp[NET_SIZE_INFO_LEN];


SOCKET skClient;
SOCKET skServer;
}PER_IO_OPERATEION_DATA, *LPPER_IO_OPERATION_DATA, *LPPER_IO_DATA, PER_IO_DATA;  
  
/** 
 * 结构体名称:PER_HANDLE_DATA 
 * 结构体存储:记录单个套接字的数据,包括了套接字的变量及套接字的对应的客户端的地址。 
 * 结构体作用:当服务器连接上客户端时,信息存储到该结构体中,知道客户端的地址以便于回访。 
 **/  
typedef struct  
{  
    SOCKET socket;  
    SOCKADDR_STORAGE ClientAddr;  
}PER_HANDLE_DATA, *LPPER_HANDLE_DATA;  


char *g_pRecvData;
CRITICAL_SECTION m_SlotLock;
// 定义全局变量   
const int DefaultPort = 20000;         
vector < PER_HANDLE_DATA* > clientGroup;      // 记录客户端的向量组   
  
HANDLE hMutex = CreateMutex(NULL, FALSE, NULL);  
DWORD WINAPI ServerSendThread(LPVOID IpParam);  
void  WINAPI IOWorkDummy(DWORD ExeCode, DWORD IOSize, LPOVERLAPPED pOVL);


RecvInfo g_info;
LPFN_ACCEPTEX m_lpfnAcceptEx;
// 开始主函数   
int main()  
{  
// 加载socket动态链接库   
    WORD wVersionRequested = MAKEWORD(2, 2); // 请求2.2版本的WinSock库   
    WSADATA wsaData;    // 接收Windows Socket的结构信息   
    DWORD err = WSAStartup(wVersionRequested, &wsaData);  
  
    if (0 != err){  // 检查套接字库是否申请成功   
        cerr << "Request Windows Socket Library Error!\n";  
        system("pause");  
        return -1;  
    }  
    if(LOBYTE(wsaData.wVersion) != 2 || HIBYTE(wsaData.wVersion) != 2){// 检查是否申请了所需版本的套接字库   
        WSACleanup();  
        cerr << "Request Windows Socket Version 2.2 Error!\n";  
        system("pause");  
        return -1;  
    }  
  
// 创建IOCP的内核对象   
    /** 
     * 需要用到的函数的原型: 
     * HANDLE WINAPI CreateIoCompletionPort( 
     *    __in   HANDLE FileHandle,     // 已经打开的文件句柄或者空句柄,一般是客户端的句柄 
     *    __in   HANDLE ExistingCompletionPort, // 已经存在的IOCP句柄 
     *    __in   ULONG_PTR CompletionKey,   // 完成键,包含了指定I/O完成包的指定文件 
     *    __in   DWORD NumberOfConcurrentThreads // 真正并发同时执行最大线程数,一般推介是CPU核心数*2 
     * ); 
     **/  
    HANDLE completionPort = CreateIoCompletionPort( INVALID_HANDLE_VALUE, NULL, 0, 0);  
    if (NULL == completionPort){    // 创建IO内核对象失败   
        cerr << "CreateIoCompletionPort failed. Error:" << GetLastError() << endl;  
        system("pause");  
        return -1;  
    }  
  
// 创建IOCP线程--线程里面创建线程池   
  
    // 确定处理器的核心数量   
    SYSTEM_INFO mySysInfo;  
    GetSystemInfo(&mySysInfo);  
  
    // 基于处理器的核心数量创建线程   
    
// 建立流式套接字   
    SOCKET srvSocket = socket(AF_INET, SOCK_STREAM, 0);  
  
// 绑定SOCKET到本机   
    SOCKADDR_IN srvAddr;  
    srvAddr.sin_addr.S_un.S_addr = htonl(INADDR_ANY);  
    srvAddr.sin_family = AF_INET;  
    srvAddr.sin_port = htons(DefaultPort);  
    int bindResult = bind(srvSocket, (SOCKADDR*)&srvAddr, sizeof(SOCKADDR));  
    if(SOCKET_ERROR == bindResult){  
        cerr << "Bind failed. Error:" << GetLastError() << endl;  
        system("pause");  
        return -1;  
    }  
  
// 将SOCKET设置为监听模式   
    int listenResult = listen(srvSocket, 100);  
    if(SOCKET_ERROR == listenResult){  
        cerr << "Listen failed. Error: " << GetLastError() << endl;  
        system("pause");  
        return -1;  
    }  
      
// 开始处理IO数据   
    cout << "本服务器已准备就绪,正在等待客户端的接入...\n";  
  memset(&g_info, 0, sizeof(RecvInfo));


//int rcv_size = 1024*1024*10;    /* 接收缓冲区大小为8K */ 
//int optlen = sizeof(rcv_size); 
//err = setsockopt(srvSocket,SOL_SOCKET,SO_RCVBUF, (char *)&rcv_size, optlen); 
//if(err<0){ 
// printf("设置接收缓冲区大小错误\n"); 
//}else{
// printf("设置接收缓冲区大小OK\n"); 
//}


g_pRecvData = new char[1024*1024*500];
if (g_pRecvData==NULL){
printf("new g_pRecvData false\n");
}
    // 创建用于发送数据的线程   
    //HANDLE sendThread = CreateThread(NULL, 0, ServerSendThread, 0, 0, NULL);  
if (0 == BindIoCompletionCallback((HANDLE)srvSocket, IOWorkDummy, 0))
{
printf("BindIoCompletionCallback is failed. Error code = %d", GetLastError());
}


GUID GuidAcceptEx = WSAID_ACCEPTEX;
DWORD dwBytes = 0;
if (SOCKET_ERROR == WSAIoctl(srvSocket, SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuidAcceptEx, sizeof (GuidAcceptEx),
&m_lpfnAcceptEx, sizeof (m_lpfnAcceptEx), &dwBytes, 0, 0))
{
printf("WSAIoctl is failed. Error code = %d", WSAGetLastError());
}


InitializeCriticalSection(&m_SlotLock);
int nPackIndex = 0;
    for(int i=0; i {  
        PER_HANDLE_DATA * PerHandleData = NULL;  
        SOCKADDR_IN saRemote;  
        int RemoteLen;  
        SOCKET acceptSocket;  
  
        // 接收连接,并分配完成端,这儿可以用AcceptEx()   
        RemoteLen = sizeof(saRemote);  
        //acceptSocket = accept(srvSocket, (SOCKADDR*)&saRemote, &RemoteLen);  
acceptSocket = ::WSASocket(AF_INET,SOCK_STREAM,IPPROTO_TCP, NULL, 0,WSA_FLAG_OVERLAPPED);


        if(SOCKET_ERROR == acceptSocket){   // 接收客户端失败   
            cerr << "Accept Socket Error: " << GetLastError() << endl;  
            system("pause");  
            return -1;  
        }  
          
        // 创建用来和套接字关联的单句柄数据信息结构   
        PerHandleData = (LPPER_HANDLE_DATA)GlobalAlloc(GPTR, sizeof(PER_HANDLE_DATA));  // 在堆中为这个PerHandleData申请指定大小的内存   
        PerHandleData -> socket = acceptSocket;  
        memcpy (&PerHandleData -> ClientAddr, &saRemote, RemoteLen);  
        clientGroup.push_back(PerHandleData);       // 将单个客户端数据指针放到客户端组中   
  
          
        // 开始在接受套接字上处理I/O使用重叠I/O机制   
        // 在新建的套接字上投递一个或多个异步   
        // WSARecv或WSASend请求,这些I/O请求完成后,工作者线程会为I/O请求提供服务       
        // 单I/O操作数据(I/O重叠)   
        LPPER_IO_OPERATION_DATA PerIoData = NULL;  
        PerIoData = (LPPER_IO_OPERATION_DATA)GlobalAlloc(GPTR, sizeof(PER_IO_OPERATEION_DATA));  
        ZeroMemory(&(PerIoData->overlapped), sizeof(OVERLAPPED));  
        PerIoData->pHeader = g_pRecvData + nPackIndex*RECVSIZE;
PerIoData->databuff.len = RECVSIZE;  
PerIoData->databuff.buf = PerIoData->pHeader;
        PerIoData->operationType = STATUS_ACCEPT;    // read  
PerIoData->skServer = srvSocket;
PerIoData->skClient = acceptSocket;
PerIoData->bStart = true;
PerIoData->TotalSize = 0;
PerIoData->ReadSize = 0;
nPackIndex++;
  
        DWORD RecvBytes;  
        DWORD Flags = 0;  
// 将接受套接字和完成端口关联   
m_lpfnAcceptEx(srvSocket, PerHandleData->socket, reinterpret_cast(const_cast(PerIoData->BufferForAccept)), 
0,//将接收缓冲置为0,令AcceptEx直接返回,防止拒绝服务攻击
sizeof(sockaddr_in) + 16, sizeof(sockaddr_in) + 16, NULL,
(LPOVERLAPPED)PerIoData);


if (0 == BindIoCompletionCallback((HANDLE)PerHandleData->socket, IOWorkDummy, 0))
{
printf("BindIoCompletionCallback is failed. Error code = %d", GetLastError());
}
    }  
g_info.tBegin = GetTickCount();
    system("pause"); 
// free resource


DeleteCriticalSection(&m_SlotLock);
for(int i=0; i
//GlobalFree
}
free(g_pRecvData);
    return 0;  
}


int Put(LPPER_IO_DATA PerIoData, BYTE *pDataPartial, DWORD IOSize)
{
BOOL bError = 0;
BYTE *pDataStart;
DWORD IOSizeRevised; //不带数据长度的数据大小(IOSize-4)
DWORD SizeToRead; //收到实际的数据大小
DWORD SizeRemain; //若包完整后还有数据的长度
BYTE *pDataIndex;
int NetSizeRevised; //数据长度位数
BOOL bDataChain; //数据是否

pDataIndex = pDataPartial;
do
{
// 1. 首次来包
if (PerIoData->bStart) // Don't forget '&'
{
PerIoData->bStart = false;
if (0 < PerIoData->NetSizeInfoTempLen)
{
NetSizeRevised = NET_SIZE_INFO_LEN - PerIoData->NetSizeInfoTempLen;
memcpy(&PerIoData->TotalSize, PerIoData->NetSizeInfoTemp, PerIoData->NetSizeInfoTempLen);
if (PerIoData->TotalSize>1024*512){// Error
printf("sockId1=0x%x, len=%d\n", PerIoData, PerIoData->TotalSize);
bError = 1;
goto ErrHand;
}
memcpy(&PerIoData->TotalSize + PerIoData->NetSizeInfoTempLen, pDataIndex, NetSizeRevised);
PerIoData->NetSizeInfoTempLen = 0; // initialize it.
}
else
{
// 2. 包头长度
NetSizeRevised = NET_SIZE_INFO_LEN;
memcpy(&PerIoData->TotalSize, pDataIndex, NetSizeRevised);
if (PerIoData->TotalSize>1024*512){// Error
printf("sockId2=0x%x, len=%d\n", PerIoData, PerIoData->TotalSize);
bError = 1;
goto ErrHand;
}
}

PerIoData->ReadSize = 0;
// 5. 数据位置(无数据长度)、数据大小
pDataStart = pDataIndex + NetSizeRevised; //
IOSizeRevised = IOSize - NetSizeRevised;
bDataChain = 0;
}
else
{
pDataStart = pDataIndex;
IOSizeRevised = IOSize;
bDataChain = 1;
}


if ((PerIoData->TotalSize - PerIoData->ReadSize) >= IOSizeRevised)
{
SizeToRead = IOSizeRevised;
SizeRemain = 0;
}
else
{
SizeToRead = PerIoData->TotalSize - PerIoData->ReadSize;
SizeRemain = IOSizeRevised - SizeToRead;
}
if (PerIoData->TotalSize>1024*512){ // Error trace
printf("sockId3=0x%x, len=%d\n", PerIoData, PerIoData->TotalSize);
bError = 1;
goto ErrHand;
}


//if (pBuffSock->BufferReadIORealDataHeader == pBuffSock->pFillingSlot->pData){ //first
// memcpy(pBuffSock->pFillingSlot->pData, pDataStart, SizeToRead);
// if (pBuffSock->pFillingSlot->TotalSize > SizeToRead){ //只有接收数据大于BUFFER_UNIT_SIZE
// pBuffSock->pFillingSlot->pData += SizeToRead;
// pBuffSock->BufferReadIO.buf = (char *)pBuffSock->pFillingSlot->pData;
// pBuffSock->BufferReadIO.len = pBuffSock->pFillingSlot->TotalSize - SizeToRead;
// }
//}else{
// if (pBuffSock->BufferReadIO.len == SizeToRead){ //restore
// pBuffSock->pFillingSlot->pData = pBuffSock->BufferReadIORealDataHeader;
// pBuffSock->BufferReadIO.buf = (char *)pBuffSock->BufferReadIOHeader;
// pBuffSock->BufferReadIO.len = sizeof (pBuffSock->BufferReadIORealData);
// }
//}
PerIoData->ReadSize += SizeToRead;


if (PerIoData->TotalSize == PerIoData->ReadSize && PerIoData->ReadSize>100)
{
//printf("xxxxxxxxx=0x%x, clientSock=%d, TotalSize=%d, ReadSize=%d\n", PerIoData, PerIoData->skClient, PerIoData->TotalSize, PerIoData->ReadSize);
g_info.TotalSize=PerIoData->TotalSize;
g_info.ReadSize=PerIoData->ReadSize;
PerIoData->TotalSize = PerIoData->ReadSize = 0;
PerIoData->bStart = true;
}


if (0 == SizeRemain)
{
break;
}
else if (NET_SIZE_INFO_LEN < SizeRemain)
{
IOSize = SizeRemain;
if (0 == bDataChain)
{
pDataIndex += (NET_SIZE_INFO_LEN + SizeToRead);
}
else
{
pDataIndex += SizeToRead;
}
}
else if (0 < SizeRemain && NET_SIZE_INFO_LEN >= SizeRemain)
{
memcpy(PerIoData->NetSizeInfoTemp, pDataStart + SizeToRead, SizeRemain);
PerIoData->NetSizeInfoTempLen = SizeRemain;
break;
}
else
{
printf("Odd case.");
break;
}
} while (0 < SizeRemain);


ErrHand:
return (1 != bError);
} // Put()


// 开始服务工作线程函数   
void WINAPI IOWorkDummy(DWORD ExeCode, DWORD BytesTransferred, LPOVERLAPPED pOVL)
{
LPOVERLAPPED IpOverlapped;  
LPPER_HANDLE_DATA PerHandleData = NULL;  
LPPER_IO_DATA PerIoData = NULL;  
DWORD RecvBytes;  
DWORD Flags = 0;  
BOOL bRet = false;  


PerIoData = (LPPER_IO_DATA)CONTAINING_RECORD(pOVL, PER_IO_OPERATEION_DATA, overlapped);
// 检查在套接字上是否有错误发生   
/*static int oldPId = 0;
if (oldPId != GetCurrentThreadId()) {
printf("pid = %d\n", GetCurrentThreadId());
oldPId = GetCurrentThreadId();
}*/
if (PerIoData->operationType == STATUS_ACCEPT){
int nRet = ::setsockopt(
PerIoData->skClient,SOL_SOCKET,SO_UPDATE_ACCEPT_CONTEXT,
(char *)&PerIoData->skServer,sizeof(SOCKET));
if (nRet != 0){
printf("setsockopt error=%d\n", GetLastError());
}else{
PerIoData->operationType = IO_TYPE_READ;
}
}else if (PerIoData->operationType == IO_TYPE_READ){
if(0 == BytesTransferred){  
//closesocket(PerHandleData->socket);  
//GlobalFree(PerHandleData);  
//GlobalFree(PerIoData);  
//continue;  
// 为下一个重叠调用建立单I/O操作数据   
ZeroMemory(&(PerIoData->overlapped), sizeof(OVERLAPPED)); // 清空内存   
PerIoData->operationType = IO_TYPE_READ;


} else{
// 开始数据处理,接收来自客户端的数据   
//cout << "A Client says: " << PerIoData->databuff.len << endl;  
//OSafeLocker SLock(&m_SlotLock);
//SLock.Lock();
//if (PerIoData->bStart){
// // header1
// PerIoData->bStart = false;
// PerIoData->TotalSize = *((int *)PerIoData->databuff.buf);
// printf("PerIoData=0x%x, clientSock=%d, TotalSize=%d\n", PerIoData, PerIoData->skClient, PerIoData->TotalSize);
// if (PerIoData->TotalSize>1024*2000 || PerIoData->TotalSize<0){
// //fwrite( g_pBuf, 1, g_lastLen, g_hFile );
// //sprintf(g_pBuf, "Error TotalSize2 = %d\n", PerIoData->TotalSize);
// //fwrite( g_pBuf, 1, strlen(g_pBuf), g_hFile );
// //fflush( g_hFile );
// printf("Error TotalSize2 = %d\n", PerIoData->TotalSize);
// //break;
// }


// PerIoData->ReadSize = BytesTransferred - 4;
//}else{
// PerIoData->ReadSize += BytesTransferred;
//}
//g_info.nDataTotal += BytesTransferred;


////PerIoData->buff[len] = '\0';
//if (PerIoData->TotalSize <= PerIoData->ReadSize && PerIoData->TotalSize>100){
// printf("xxxxxxxxx=0x%x, clientSock=%d, TotalSize=%d, ReadSize=%d\n", PerIoData, PerIoData->skClient, PerIoData->TotalSize, PerIoData->ReadSize);
// g_info.TotalSize=PerIoData->TotalSize;
// g_info.ReadSize=PerIoData->ReadSize;
// PerIoData->TotalSize = PerIoData->ReadSize = 0;
// PerIoData->bStart = true;
// //printf("full .... ");
//}
//SLock.Unlock();
//


g_info.nDataTotal += BytesTransferred;
Put(PerIoData, (BYTE *)PerIoData->databuff.buf, BytesTransferred);
if (g_info.nActCount++>45000){
g_info.nActCount = 0;
g_info.tEnd = GetTickCount();
g_info.tDiff = g_info.tEnd - g_info.tBegin;


float fTotal = (float)g_info.nDataTotal/(1024*1024);
float nDiffTime = (float)g_info.tDiff/1000;
g_info.tBegin = g_info.tEnd;
printf("tm=%5.3f, v=%5.3f, Total=%5.2f, UT=%d, UR=%d \n", 
nDiffTime, (fTotal-g_info.fTotalLast)/nDiffTime, fTotal, g_info.TotalSize, g_info.ReadSize); 
g_info.fTotalLast = fTotal;
}


// 为下一个重叠调用建立单I/O操作数据   
ZeroMemory(&(PerIoData->overlapped), sizeof(OVERLAPPED)); // 清空内存   
PerIoData->operationType = IO_TYPE_READ;
}
}
Flags = MSG_PARTIAL;
WSARecv(PerIoData->skClient, &(PerIoData->databuff), 1, &RecvBytes, &Flags, &(PerIoData->overlapped), NULL);  
} // IOWorkDummy()
  
// 发送信息的线程执行函数   
DWORD WINAPI ServerSendThread(LPVOID IpParam)  
{  
    while(1){  
        char talk[200];  
        gets(talk);  
        int len;  
        for (len = 0; talk[len] != '\0'; ++len){  
            // 找出这个字符组的长度   
        }  
        talk[len] = '\n';  
        talk[++len] = '\0';  
        printf("I Say:");  
        cout << talk;  
        WaitForSingleObject(hMutex,INFINITE);  
        for(int i = 0; i < clientGroup.size(); ++i){  
            send(clientGroup[i]->socket, talk, 200, 0);  // 发送信息   
        }  
        ReleaseMutex(hMutex);   
    }  
    return 0;  
}  

以上是Server端测试程序,不带锁的,在xp下BindIoCompletionCallback为单线程,在发送每包大小25KB下,接收速度为330MB/s, 但在win8下为多线程(4线程),最高也只有200MB/s
线程切换能相差这么大? Client端程序不变的情况下测试的结果.

函数:int Put(LPPER_IO_DATA PerIoData, BYTE *pDataPartial, DWORD IOSize)处理粘包时不严谨
接收速度330MB/s计算有问题,有空验证之后纠正




阅读(3111) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~