Chinaunix首页 | 论坛 | 博客
  • 博客访问: 91029
  • 博文数量: 27
  • 博客积分: 1090
  • 博客等级: 少尉
  • 技术积分: 355
  • 用 户 组: 普通用户
  • 注册时间: 2009-09-07 10:30
文章分类

全部博文(27)

文章存档

2009年(27)

我的朋友

分类:

2009-10-29 10:51:36

一、声明
版权声明:
1、通讯模块代码版权归作者所有;
2、未经许可不得全部或部分用于任何项目开发;
3、未经许可不得部分修改后再利用源码。

免责声明:
1、 由于设计缺陷或其它Bug造成的后果,作者不承担责任;
2、未经许可的使用作者不提供任何技术支持服务。

权利和义务:
1、任何获得源码并发现Bug的个人或单位均有义务向作者反映;
2、作者保留追究侵权者法律责任的权利。

二、开发背景
部分代码由前项目分离而来,尚未有应用考验,但对于初学者的学习和进阶会有很大帮助,通过这个例程可以深入了解IOCP Socket。性能上尚未有定论,但应该不会令你失望。

三、功能说明
1、可以关闭Socket的Buffer;
2、可以关闭MTU(不等待MTU满才发送);
3、可以多IP或多端口监听;
4、可以重用socket(主动关闭除外);
5、可以0缓冲接收(Socket的Buffe = 0时,避免过多的锁定内存页);
6、可以0缓冲连接(客户端仅连接,不一定立即发数据);
7、可以条件编译:
a、是否使用内核Singly-linked lists;
b、是否使用处理线程(工作线程和处理线程分开);
c、是否使用内核锁来同步链表。
8、可以实现集群服务器模式的通讯(有客户端socket);
9、可以单独设置每个连接的Data项来实现连接和Usernfo的关联;
10、每个线程有OnBegin和OnEnd,用于设置线程独立的对象(数据库会话对象);
11、可以提供详细的运行情况,便于了解IOCP下的机制,以及进行调试分析;
12、可以发起巨量连接和数据(需要硬件配置来支持)。

四、缺陷
1、不支持UDP;
2、不兼容IPv6;
3、不带通讯协议,无法处理粘包;
4、工作线程和处理线程隔离还不是很明确;
5、设计尚需再完善和优化。

五、通讯速率测试部分截图
A机:单核台式机(服务端)
B机:双核笔记本(客户端)
网络:本地100M路由
由于台式机太老,用尽CPU还是不能用完带宽(其中大部分被“system”进程使用),因此改做服务器,由笔记本做客户端,发起密集数据,以堵塞的情况来满负荷使用网络,收发接近:10MB。
注意:由于测试条件太差,下面截图不能说明任何权威结果。

服务器端设置:


服务器端运行信息:


客户端设置:


客户端运行信息:


客户端网络利用率:


客户端CPU利用率:


下载连接:



The end
 
 
 
 

 

 

 

 

 

 
/********************************************************************************
* *
* GSocket:完成端口通讯模块(IOCP Socket) *
* ——GTcpServer *
* *
* Copyright © 2009  代码客(卢益贵)  版权所有 *
* 未经许可,不得用于任何项目开发 *
*  QQ:48092788  E-Mail:gcode@qq.com  源码博客:http://blog.csdn.net/guestcode *
* *
* GSN:34674B4D-1F63-11D3-B64C-11C04F79498E *
********************************************************************************/

#include "stdafx.h"
#include
#include

#include "GLog.h"
#include "GMemory.h"
#include "GWorkerThread.h"
#include "GPerIoData.h"
#include "GPerHandleData.h"
#include "GSocketInside.h"
#include "GThread.h"
#include "GSocket.h"

#if(_GSOCKET_FUNC_TCP_SERVER)

#define GTcpSvr_AcceptEx(pListener, pClient, pIoData) (pfnGSockAcceptEx(pListener->Socket, pClient->Socket, pIoData->cData, dwGSockAcceptBytes, sizeof(SOCKADDR_IN) + 16, sizeof(SOCKADDR_IN) + 16, NULL, LPWSAOVERLAPPED(pIoData)))
#define GTcpSvr_GetAcceptExSockAddrs(pListener, Buf, Bytes, Addr, Len) (pfnGSockGetAcceptExSockAddrs(Buf, Bytes, sizeof(SOCKADDR_IN) + 16, sizeof(SOCKADDR_IN) + 16, (PSOCKADDR *)&Addr, (int*)&Len, (PSOCKADDR *)&Addr, (int*)&Len))
#define GTcpSvr_DisconnectEx(pClient) (pfnGSockDisconnectEx(pClient->Socket, NULL, TF_REUSE_SOCKET, 0))

/*********************************************************************************
                  变量定义和初始化
*********************************************************************************/
#if(_USE_INTERLOCKED_IN_LIST)
BOOL bGTcpSvrPendingAcceptLock = FALSE;
#else
CRITICAL_SECTION GTcpSvrPendingAcceptCS;
#endif
PGHND_DATA pGTcpSvrPendingAcceptHead = NULL;
DWORD dwGTcpSvrPendingAcceptCount = 0;

#if(_USE_INTERLOCKED_IN_LIST)
BOOL bGTcpSvrClientLock = FALSE;
#else
CRITICAL_SECTION GTcpSvrClientCS;
#endif
PGHND_DATA pGTcpSvrClientHead = NULL;
DWORD dwGTcpSvrClientCount = NULL;

CRITICAL_SECTION GTcpSvrListenerCS;
DWORD dwGTcpSvrListenerCount = 0;
PGHND_DATA pGTcpSvrListeners[WSA_MAXIMUM_WAIT_EVENTS];
HANDLE hGTcpSvrListenerEvents[WSA_MAXIMUM_WAIT_EVENTS];
PGHND_DATA* pGTcpSvrOvertimeClient = NULL;
DWORD pGTcpSvrOvertimeCount = 0;
GTHREAD GTcpSvrServiceThreadData;

BOOL bGTcpSvrIsActive = FALSE;

/*********************************************************************************
                内核锁
*********************************************************************************/
#if(_USE_INTERLOCKED_IN_LIST)
void GTcpSvr_LockClientList(void)
{
for(;;)
{
if(FALSE == GSock_InterlockedSet(bGTcpSvrClientLock, TRUE, FALSE))
return;
}
}

void GTcpSvr_UnlockClientList(void)
{
GSock_InterlockedSet(bGTcpSvrClientLock, FALSE, TRUE);
}

void GTcpSvr_LockPendingAcceptList(void)
{
for(;;)
{
if(FALSE == GSock_InterlockedSet(bGTcpSvrPendingAcceptLock, TRUE, FALSE))
return;
}
}

void GTcpSvr_UnlockPendingAcceptList(void)
{
GSock_InterlockedSet(bGTcpSvrPendingAcceptLock, FALSE, TRUE);
}
#endif

/*********************************************************************************
                变量设置
*********************************************************************************/
DWORD GTcpSvr_GetClientCount(void)
{
return(dwGTcpSvrClientCount);
}

DWORD GTcpSvr_GetListenerCount(void)
{
return(dwGTcpSvrListenerCount);
}

DWORD GTcpSvr_GetPendingAcceptCount(void)
{
return(dwGTcpSvrPendingAcceptCount);
}

/*********************************************************************************
                Listener参数获取
*********************************************************************************/
DWORD GTcpSvr_GetListenerConnectCount(DWORD dwListenerId)
{
return((DWORD)PGHND_DATA(dwListenerId)->pData);
}

void GTcpSvr_TraversalListener(DWORD dwParam, PFN_ON_GSOCK_TRAVERSAL pfnOnProc)
{
if(!bGTcpSvrIsActive)
return;

EnterCriticalSection(>cpSvrListenerCS);

DWORD i;

for(i = 0; i < dwGTcpSvrListenerCount; i++)
{
if(!pfnOnProc(dwParam, i, DWORD(pGTcpSvrListeners[i])))
break;
}

LeaveCriticalSection(>cpSvrListenerCS);
}
/*********************************************************************************
                服务操作
*********************************************************************************/
void GTcpSvr_InsertClientList(PGHND_DATA pClient)
{
#if(_USE_INTERLOCKED_IN_LIST)
GTcpSvr_LockClientList();
#else
EnterCriticalSection(>cpSvrClientCS);
#endif

pClient->pPrior = NULL;
pClient->pNext = pGTcpSvrClientHead;
if(pGTcpSvrClientHead)
pGTcpSvrClientHead->pPrior = pClient;
pGTcpSvrClientHead = pClient;
dwGTcpSvrClientCount++;
#if(_USE_INTERLOCKED_IN_LIST)
GTcpSvr_UnlockClientList();
#else
LeaveCriticalSection(>cpSvrClientCS);
#endif
}

void GTcpSvr_DeleteClientList(PGHND_DATA pClient)
{
#if(_USE_INTERLOCKED_IN_LIST)
GTcpSvr_LockClientList();
#else
EnterCriticalSection(>cpSvrClientCS);
#endif

if(pClient == pGTcpSvrClientHead)
{
pGTcpSvrClientHead = pGTcpSvrClientHead->pNext;
if(pGTcpSvrClientHead)
pGTcpSvrClientHead->pPrior = NULL;
}else
{
pClient->pPrior->pNext = pClient->pNext;
if(pClient->pNext)
pClient->pNext->pPrior = pClient->pPrior;
}
dwGTcpSvrClientCount--;

#if(_USE_INTERLOCKED_IN_LIST)
GTcpSvr_UnlockClientList();
#else
LeaveCriticalSection(>cpSvrClientCS);
#endif
}

void GTcpSvr_InsertPendingAcceptList(PGHND_DATA pClient)
{
#if(_USE_INTERLOCKED_IN_LIST)
GTcpSvr_LockPendingAcceptList();
#else
EnterCriticalSection(>cpSvrPendingAcceptCS);
#endif

pClient->pNext = pGTcpSvrPendingAcceptHead;
if(pGTcpSvrPendingAcceptHead)
pGTcpSvrPendingAcceptHead->pPrior = pClient;
pClient->pPrior = NULL;
pGTcpSvrPendingAcceptHead = pClient;
dwGTcpSvrPendingAcceptCount++;

#if(_USE_INTERLOCKED_IN_LIST)
GTcpSvr_UnlockPendingAcceptList();
#else
LeaveCriticalSection(>cpSvrPendingAcceptCS);
#endif
}
void GTcpSvr_DeletePendingAcceptList(PGHND_DATA pClient)
{
#if(_USE_INTERLOCKED_IN_LIST)
GTcpSvr_LockPendingAcceptList();
#else
EnterCriticalSection(>cpSvrPendingAcceptCS);
#endif

if(pClient == pGTcpSvrPendingAcceptHead)
{
pGTcpSvrPendingAcceptHead = pGTcpSvrPendingAcceptHead->pNext;
if(pGTcpSvrPendingAcceptHead)
pGTcpSvrPendingAcceptHead->pPrior = NULL;
}else
{
pClient->pPrior->pNext = pClient->pNext;
if(pClient->pNext)
pClient->pNext->pPrior = pClient->pPrior;
}
dwGTcpSvrPendingAcceptCount--;

#if(_USE_INTERLOCKED_IN_LIST)
GTcpSvr_UnlockPendingAcceptList();
#else
LeaveCriticalSection(>cpSvrPendingAcceptCS);
#endif
}

void GTcpSvr_TraversalClient(DWORD dwParam, DWORD dwFromIndex, DWORD dwCount, PFN_ON_GSOCK_TRAVERSAL pfnOnProc)
{
if(!bGTcpSvrIsActive)
return;

#if(_USE_INTERLOCKED_IN_LIST)
GTcpSvr_LockClientList();
#else
EnterCriticalSection(>cpSvrClientCS);
#endif

PGHND_DATA pClient;
DWORD dwIndex;

dwIndex = 0;
dwCount = dwFromIndex + dwCount;
pClient = pGTcpSvrClientHead;
while(pClient)
{
if(dwIndex >= dwFromIndex)
{
if((dwIndex >= dwCount) || (!pfnOnProc(dwParam, dwIndex, (DWORD)pClient)))
break;
}
dwIndex++;
pClient = pClient->pNext;
}

#if(_USE_INTERLOCKED_IN_LIST)
GTcpSvr_UnlockClientList();
#else
LeaveCriticalSection(>cpSvrClientCS);
#endif
}

 

 

 

 

 

 

 
void GTcpSvr_OnReadWriteError(PGHND_DATA pClient, PGIO_DATA pIoData)
{
if(GIO_WRITE_COMPLETED == pIoData->OperType)
pfnOnGSockSendErrorSvr((DWORD)pClient, pIoData->cData, pIoData->WSABuf.len);
GIoDat_Free(pIoData);

if(GHND_STATE_CONNECTED != GSock_InterlockedSet(pClient->hsState, GHND_STATE_DISCONNECT, GHND_STATE_CONNECTED))
return;

GTcpSvr_DeleteClientList(pClient);
GSock_InterlockedDec(PGHND_DATA(pClient->pOwner)->pData);
pfnOnGSockDisconnectSvr((DWORD)pClient);

if(GIO_CONNECTION_OVERFLOW == pIoData->OperType)
{
pfnOnGSockConnectionOverflow((DWORD)pClient);
GSock_UninitTcpHndData(pClient);
#if(_REUSED_SOCKET)
GSock_InitTcpHndData(pClient);
#endif
GHndDat_Free(pClient);
}else
if(GIO_IDLE_OVERTIME == pIoData->OperType)
{
pfnOnGSockIdleOvertime((DWORD)pClient);
GSock_UninitTcpHndData(pClient);
#if(_REUSED_SOCKET)
GSock_InitTcpHndData(pClient);
#endif
GHndDat_Free(pClient);
}else
if(GIO_CLOSE == pIoData->OperType)
{
GSock_UninitTcpHndData(pClient);
#if(_REUSED_SOCKET)
GSock_InitTcpHndData(pClient);
#endif
GHndDat_Free(pClient);
}else
{
#if(_REUSED_SOCKET)
GTcpSvr_DisconnectEx(pClient);
#else
GSock_UninitTcpHndData(pClient);
#endif
GHndDat_Free(pClient);
}

}

void GTcpSvr_OnReadWrite(DWORD dwBytes, PGHND_DATA pClient, PGIO_DATA pIoData)
{
if(!dwBytes)
{
if(bGSockIsZeroByteRecv && (GHND_STATE_CONNECTED == pClient->hsState) && (GIO_READ_COMPLETED == pIoData->OperType))
{
GIoDat_ResetIoDataOnRead(pIoData);
pIoData->OperType = GIO_ZERO_READ_COMPLETED;
pIoData->WSABuf.len = dwGBufSize;
dwBytes = 0;
DWORD dwFlag = 0;
if((SOCKET_ERROR != WSARecv(pClient->Socket, &(pIoData->WSABuf), 1, &dwBytes, &dwFlag, LPWSAOVERLAPPED(pIoData), NULL)) ||
(ERROR_IO_PENDING == WSAGetLastError()))
{
return;
}
}
GTcpSvr_OnReadWriteError(pClient, pIoData);
return;
}

if(GIO_WRITE_COMPLETED == pIoData->OperType)
{
pfnOnGSockSendedSvr((DWORD)pClient, pIoData->cData, dwBytes);
GIoDat_Free(pIoData);
}else
{
pClient->dwTickCountAcitve = GetTickCount();
#if(_USE_GPROTOCOL)
if(GCommProt_ProcessReceive(pClient, pIoData->cData, dwBytes, pfnOnGSockReceiveSvr))
{
pIoData = GIoDat_Alloc();
if(!pIoData)
{
GLog_Write("GTcpSvr_OnReadWrite:IoData分配失败,无法再投递接收");
return;
}
}
#else
pfnOnGSockReceiveSvr((DWORD)pClient, pIoData->cData, dwBytes);
#endif

GIoDat_ResetIoDataOnRead(pIoData);
pIoData->OperType = GIO_READ_COMPLETED;
pIoData->WSABuf.len = dwGSockRecvBytes;
dwBytes = 0;
DWORD dwFlag = 0;
if((SOCKET_ERROR == WSARecv(pClient->Socket, &(pIoData->WSABuf), 1, &dwBytes, &dwFlag, LPWSAOVERLAPPED(pIoData), NULL)) &&
(ERROR_IO_PENDING != WSAGetLastError()))
{
GTcpSvr_OnReadWriteError(pClient, pIoData);
}
}
}

BOOL GTcpSvr_PostAccept(PGHND_DATA pListener, DWORD dwCount)
{
int nCount;
PGHND_DATA pClient;
PGIO_DATA pIoData;

nCount = 0;

while(dwCount && (dwGTcpSvrClientCount + dwGTcpSvrPendingAcceptCount < dwGSockMaxNumberConnection))
{
pClient = GHndDat_Alloc();
if(!pClient)
{
GLog_Write("GTcpSvr_PostAccept:分配HndData失败,无法投递接受");
return(nCount);
}
#if(!_REUSED_SOCKET)
GSock_InitTcpHndData(pClient);
#endif
pIoData = GIoDat_Alloc();
if(!pIoData)
{
#if(!_REUSED_SOCKET)
GSock_UninitTcpHndData(pClient);
#endif
GHndDat_Free(pClient);
GLog_Write("GTcpSvr_PostAccept:分配IoData失败,无法投递接受");
return(nCount);
}

pClient->pfnOnIocpOper = >cpSvr_OnReadWrite;
pClient->pfnOnIocpError = >cpSvr_OnReadWriteError;
pClient->htType = GHND_TYPE_TCP_SVR_CLIENT;
pClient->hsState = GHND_STATE_ACCEPTING;
pClient->pOwner = pListener;
pClient->pData = NULL;

pIoData->OperType = GIO_CONNECTED;
pIoData->pOwner = pClient;
pIoData->WSABuf.len = dwGSockRecvBytes;

GTcpSvr_InsertPendingAcceptList(pClient);
if((!GTcpSvr_AcceptEx(pListener, pClient, pIoData)) && (ERROR_IO_PENDING != WSAGetLastError()))
{
GTcpSvr_DeletePendingAcceptList(pClient);
GIoDat_Free(pIoData);
#if(!_REUSED_SOCKET)
GSock_UninitTcpHndData(pClient);
#endif
GHndDat_Free(pClient);
GLog_Write("GTcpSvr_PostAccept:执行pfnGTcpSvrAcceptEx失败,无法投递接受");
return(nCount);
}
dwCount--;
nCount++;
}//for(i = 0; i < dwCount; i++)

return(TRUE);
}

void GTcpSvr_OnAcceptError(PGHND_DATA pListener, PGIO_DATA pIoData)
{
PGHND_DATA pClient = PGHND_DATA(pIoData->pOwner);
GIoDat_Free(pIoData);

if(GHND_STATE_ACCEPTING != GSock_InterlockedSet(pClient->hsState, GHND_STATE_DISCONNECT, GHND_STATE_ACCEPTING))
return;

GTcpSvr_DeletePendingAcceptList(pClient);
if(GIO_CLOSE == pIoData->OperType)
{
GSock_UninitTcpHndData(pClient);
GHndDat_Free(pClient);
}else
{
#if(_REUSED_SOCKET)
GTcpSvr_DisconnectEx(pClient);
#else
GSock_UninitTcpHndData(pClient);
#endif
GHndDat_Free(pClient);
}
}

void GTcpSvr_OnAccept(DWORD dwBytes, PGHND_DATA pListener, PGIO_DATA pIoData)
{
if((!dwBytes) && (dwGSockAcceptBytes))
{
GTcpSvr_OnAcceptError(pListener, pIoData);
return;
}

PGHND_DATA pClient;
PSOCKADDR_IN pAddr;
int nLen;

pClient = PGHND_DATA(pIoData->pOwner);
GTcpSvr_DeletePendingAcceptList(pClient);
setsockopt(pClient->Socket, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (char *)&(pListener->Socket), sizeof(pListener->Socket));
BOOL bDontLinger = FALSE;
setsockopt(pClient->Socket, SOL_SOCKET, SO_DONTLINGER, (const char *) &bDontLinger, sizeof(BOOL));

GTcpSvr_GetAcceptExSockAddrs(pListener, pIoData->cData, dwGSockAcceptBytes, pAddr, nLen);
pClient->dwAddr = pAddr->sin_addr.S_un.S_addr;
pClient->dwPort = htons(pAddr->sin_port);
pClient->hsState = GHND_STATE_CONNECTED;
pClient->dwTickCountAcitve = GetTickCount();

GTcpSvr_InsertClientList(pClient);
GSock_InterlockedAdd(PGHND_DATA(pClient->pOwner)->pData);
#if(_USE_GPROTOCOL)
if(GCommProt_ProcessReceive(pClient, pIoData->cData, dwBytes, pfnOnGSockConnectTcpSvr))
{
pIoData = GIoDat_Alloc();
if(!pIoData)
{
GLog_Write("GTcpSvr_OnAccept:IoData分配失败,连接后无法再投递接收");
return;
}
}
#else
pfnOnGSockConnectTcpSvr((DWORD)pClient, pIoData->cData, dwBytes);
#endif

ZeroMemory(pIoData, sizeof(WSAOVERLAPPED));
DWORD dwCount = dwGSockNumberPostRecv;
for(;;)
{
GIoDat_ResetIoDataOnRead(pIoData);
pIoData->OperType = GIO_READ_COMPLETED;
pIoData->WSABuf.len = dwGSockRecvBytes;
dwBytes = 0;
DWORD dwFlag = 0;
if((SOCKET_ERROR == WSARecv(pClient->Socket, &(pIoData->WSABuf), 1, &dwBytes, &dwFlag, LPWSAOVERLAPPED(pIoData), NULL)) &&
(ERROR_IO_PENDING != WSAGetLastError()))
{
GTcpSvr_OnReadWriteError(pClient, pIoData);
break;
}
dwCount--;
if(!dwCount)
break;
pIoData = GIoDat_Alloc();
if(!pIoData)
{
GLog_Write("GTcpSvr_OnAccept:申请IoData失败,连接后无法投递接收");
break;
}
pIoData->pOwner = pClient;
}
if(dwGTcpSvrClientCount >= dwGSockMaxNumberConnection)
{
pfnOnGSockConnectionOverflow((DWORD)pClient);
void GTcpSvr_DoCloseClient(PGHND_DATA pClient, PGHND_DATA pIoDataOwner, GIO_OPER_TYPE OperType);
GTcpSvr_DoCloseClient(pClient, pClient, GIO_CLOSE);
GLog_Write("GTcpSvr_OnAccept:连接超过最大值");
}
}

阅读(1457) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~