amy3701229所理解的完成端口原理是:创建一个完成端口与一个SOCKET绑定,所有通过该SOCKET通讯的数据都由完成端口机制来处理,完成端口是异步I/O,接收和发送都是提交后不管结果,所有结果都由完成端口的消息机制来做相应的通知,并且完成端口内部有线程池的概念
//www.cnblogs.com/jiayongzhi/archive/2011/05/17/2048286.html
首先来说为什么要使用完成端口:
原因还是因为为了解决recv方法为阻塞式的问题,WinSocket封装的WSARecv方法为非堵塞的方法。
int WSARecv(
SOCKET lpCompletionRoutine
);
WSARecv为非阻塞的方法,其中第二个参数是I/O请求成功时,数据保存的地址。
Socket的触发是属于网卡硬件的中断信号,只是此信号CPU不能直接获取状态,此时我们可以使之绑定Event事件,Event内核对象的状态时可以监听到的。
这也就是WSAEventSelect模型的原理,当然重叠模型的最终原理也是如此。但Event的方法有着其弊病:当模型处理多线程事件时要调用WSAWaitForMultipleEvents函数,
WSAWaitForMultipleEvents函数一次最多只能等待64个事件对象。所以当海量客户端连接服务器时,服务器将没有能力应对,于是我们使用完成端口。
完成端口:
HANDLE CreateIoCompletionPort(
HANDLE FileHandle, //要链接的Socket
HANDLE ExistingCompletionPort, //全局完成端口
//同完成端口关联到一起的句柄,此处可为链接的socket,或是id等等(目地使接收到的socket知道是哪个socket)
DWORD CompletionKey,
DWORD NumberOfConcurrentThreads
);
此函数创建创建Socket与完成端口的链接,CreateIoCompletionPort函数被用于完成两个工作:
- 用于创建—个完成端口对象。
- 将一个句柄同完成端口关联到一起。
用函数GetQueuedCompletionStatus等待全局完成端口的完成队列。
BOOL GetQueuedCompletionStatus(
HANDLE CompletionPort,
LPDWORD lpNumberOfBytes,
PULONG_PTR lpCompletionKey, //此参数为CreateIoCompletionPort第三个参数传过来的句柄,通过此参数获得socket
LPOVERLAPPED* lpOverlapped,
DWORD dwMilliseconds
);
完成端口的工作原理是,把Socket和完成端口绑定,通过关联句柄传递传递参数,使得获取到的Socket能得知是那个socket,参数可以自定义可以是socket本身也可以是id等等。
- #include "WinSock2.h"
- #pragma comment(lib, "ws2_32.lib")
- #define MESSAGESIZE 1024
- SOCKET serverSocket;
- DWORD WINAPI SocketProcAccept(LPVOID pParam);
- DWORD WINAPI SocketProcMain(LPVOID pParam);
- enum SOCKETOPERATE
- {
- soREVC
- };
- struct SOCKETDATA
- {
- WSAOVERLAPPED overlapped;
- WSABUF buf;
- char sMessage[MESSAGESIZE];
- DWORD dwBytes;
- DWORD flag;
- SOCKETOPERATE socketType;
- void Clear(SOCKETOPERATE type)
- {
- ZeroMemory(this, sizeof(SOCKETDATA));
- buf.buf = sMessage;
- buf.len = MESSAGESIZE;
- socketType = type;
- }
- };
- SOCKET CreateServiceSocket(int Port)
- {
- int iError;
- WSAData data;
- iError = WSAStartup(0x0202, &data);
- SOCKET tmp = socket(AF_INET,SOCK_STREAM,0);
- if(tmp == INVALID_SOCKET)
- {
- return INVALID_SOCKET;
- }
- SOCKADDR_IN addr;
- addr.sin_addr.s_addr = inet_addr("127.0.0.1");
- addr.sin_family = AF_INET;
- addr.sin_port = htons(Port);
- if((bind(tmp, (sockaddr*)&addr, sizeof(addr))) != 0)
- {
- closesocket(tmp);
- return INVALID_SOCKET;
- }
- if((listen(tmp, INFINITE)) != 0)
- {
- closesocket(tmp);
- return INVALID_SOCKET;
- }
- return tmp;
- }
- int _tmain(int argc, _TCHAR* argv[])
- {
- HANDLE CP = INVALID_HANDLE_VALUE;
- CP = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
- SYSTEM_INFO systemInfo;
- GetSystemInfo(&systemInfo);
- for (int i = 0; i<systemInfo.dwNumberOfProcessors; i++)
- {
- CreateThread(NULL, NULL, &SocketProcMain, CP, NULL, NULL);
- }
- serverSocket = CreateServiceSocket(6565);
- if (serverSocket == INVALID_SOCKET)
- {
- return 0;
- }
- CreateThread(NULL, NULL, &SocketProcAccept, CP, NULL, NULL);
- while(1)
- {
- Sleep(10000);
- }
- CloseHandle(CP);
- closesocket(serverSocket);
- WSACleanup();
- return 0;
- }
- DWORD WINAPI SocketProcAccept(LPVOID pParam)
- {
- HANDLE CP = (HANDLE)pParam;
- SOCKADDR_IN addr;
- int len = sizeof(SOCKADDR_IN);
- SOCKET tmp;
- SOCKETDATA *lpSocketData;
- while(1)
- {
- tmp = accept(serverSocket, (sockaddr*)&addr, &len);
- printf("Client Accept:%s\t:%d\n", inet_ntoa(addr.sin_addr), htons(addr.sin_port));
- CreateIoCompletionPort((HANDLE)tmp, CP, (DWORD)tmp, INFINITE);
- lpSocketData = (SOCKETDATA *)HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(SOCKETDATA));
- lpSocketData->Clear(soREVC);
- WSARecv(tmp, &lpSocketData->buf, 1,&lpSocketData->dwBytes, &lpSocketData->flag, &lpSocketData->overlapped, NULL);
- }
- }
- DWORD WINAPI SocketProcMain(LPVOID pParam)
- {
- HANDLE CP = (HANDLE)pParam;
- SOCKADDR_IN addr;
- DWORD dwBytes;
- SOCKETDATA *lpSocketData;
- SOCKET clientSocket;
-
- while(1)
- {
- GetQueuedCompletionStatus(CP, &dwBytes, (PULONG_PTR)&clientSocket, (LPOVERLAPPED*)&lpSocketData, INFINITE);
- if(dwBytes == 0xFFFFFFFF)
- {
- return 0;
- }
- if(lpSocketData->socketType == soREVC)
- {
- if(dwBytes == 0)
- {
- closesocket(clientSocket);
- HeapFree(GetProcessHeap(), 0, lpSocketData);
- }else
- {
- lpSocketData->sMessage[dwBytes] = '\0';
- printf("%x\t:%s\n", (DWORD)clientSocket, lpSocketData->sMessage);
- lpSocketData->Clear(soREVC);
- WSARecv(clientSocket, &lpSocketData->buf, 1, &lpSocketData->dwBytes, &lpSocketData->flag, &lpSocketData->overlapped, NULL);
- }
- }
- }
- }
Socket 重叠I/O 完成端口模型详解
//blog.sina.com.cn/s/blog_3fb7f7270100fesw.html
- 2009年02月23日 星期一 下午 10:07
- #include "stdafx.h"
- #include <WINSOCK2.h>
- #include <stdio.h>
- #define PORT 5150
- #define MSGSIZE 1024
- #pragma comment(lib, "ws2_32.lib")
- typedef enum
- {
- RECV_POSTED
- }OPERATION_TYPE; //枚举,表示状态
- typedef struct
- {
- WSAOVERLAPPED overlap;
- WSABUF Buffer;
- char szMessage[MSGSIZE];
- DWORD NumberOfBytesRecvd;
- DWORD Flags;
- OPERATION_TYPE OperationType;
- }PER_IO_OPERATION_DATA, *LPPER_IO_OPERATION_DATA; //定义一个结构体保存IO数据
- DWORD WINAPI WorkerThread(LPVOID);
- int main()
- {
- WSADATA wsaData;
- SOCKET sListen, sClient;
- SOCKADDR_IN local, client;
- DWORD i, dwThreadId;
- int iaddrSize = sizeof(SOCKADDR_IN);
- HANDLE CompletionPort = INVALID_HANDLE_VALUE;
- SYSTEM_INFO systeminfo;
- LPPER_IO_OPERATION_DATA lpPerIOData = NULL;
- // Initialize Windows Socket library
- WSAStartup(0x0202, &wsaData);
- // 初始化完成端口
- CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
- // 有几个CPU就创建几个工作者线程
- GetSystemInfo(&systeminfo);
- for (i = 0; i < systeminfo.dwNumberOfProcessors; i++)
- {
- CreateThread(NULL, 0, WorkerThread, CompletionPort, 0, &dwThreadId);
- }
- // 创建套接字
- sListen = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
- // 绑定套接字
- local.sin_addr.S_un.S_addr = htonl(INADDR_ANY);
- local.sin_family = AF_INET;
- local.sin_port = htons(PORT);
- bind(sListen, (struct sockaddr *)&local, sizeof(SOCKADDR_IN));
- // 开始监听!
- listen(sListen, 3);
- while (TRUE)//主进程的这个循环中循环等待客户端连接,若有连接,则将该客户套接字于完成端口绑定到一起
- //然后开始异步等待接收客户传来的数据。
- {
- // 如果接到客户请求连接,则继续,否则等待。
- sClient = accept(sListen, (struct sockaddr *)&client, &iaddrSize);
- //client中保存用户信息。
- printf("Accepted client:%s:%d\n", inet_ntoa(client.sin_addr), ntohs(client.sin_port));
- //将这个最新到来的客户套接字和完成端口绑定到一起。
- CreateIoCompletionPort((HANDLE)sClient, CompletionPort, ( ULONG_PTR)sClient, 0);
- //第三个参数表示传递的参数,这里就传递的客户套接字地址。最后一个参数为0 表示有和CPU一样的进程数。即1个CPU一个线程
-
- // 初始化结构体
- lpPerIOData = (LPPER_IO_OPERATION_DATA)HeapAlloc(
- GetProcessHeap(),
- HEAP_ZERO_MEMORY,
- sizeof(PER_IO_OPERATION_DATA));
- lpPerIOData->Buffer.len = MSGSIZE; // len=1024
- lpPerIOData->Buffer.buf = lpPerIOData->szMessage;
- lpPerIOData->OperationType = RECV_POSTED; //操作类型
- WSARecv(sClient, //异步接收消息,立刻返回。
- &lpPerIOData->Buffer, //获得接收的数据
- 1, //The number of WSABUF structures in the lpBuffers array.
- &lpPerIOData->NumberOfBytesRecvd, //接收到的字节数,如果错误返回0
- &lpPerIOData->Flags, //参数,先不管
- &lpPerIOData->overlap, //输入这个结构体咯。
- NULL);
- }
- //posts an I/O completion packet to an I/O completion port.
- PostQueuedCompletionStatus(CompletionPort, 0xFFFFFFFF, 0, NULL);
- CloseHandle(CompletionPort);
- closesocket(sListen);
- WSACleanup();
- return 0;
- }
- //工作者线程有一个参数,是指向完成端口的句柄
- DWORD WINAPI WorkerThread(LPVOID CompletionPortID)
- {
- HANDLE CompletionPort=(HANDLE)CompletionPortID;
- DWORD dwBytesTransferred;
- SOCKET sClient;
- LPPER_IO_OPERATION_DATA lpPerIOData = NULL;
- while (TRUE)
- {
- GetQueuedCompletionStatus( //遇到可以接收数据则返回,否则等待
- CompletionPort,
- &dwBytesTransferred, //返回的字数
- (PULONG_PTR&)sClient, //是响应的哪个客户套接字?
- (LPOVERLAPPED *)&lpPerIOData, //得到该套接字保存的IO信息
- INFINITE); //无限等待咯。不超时的那种。
- if (dwBytesTransferred == 0xFFFFFFFF)
- {
- return 0;
- }
-
- if (lpPerIOData->OperationType == RECV_POSTED) //如果受到数据
- {
- if (dwBytesTransferred == 0)
- {
- // Connection was closed by client
- closesocket(sClient);
- HeapFree(GetProcessHeap(), 0, lpPerIOData); //释放结构体
- }
- else
- {
- lpPerIOData->szMessage[dwBytesTransferred] = '\0';
- send(sClient, lpPerIOData->szMessage, dwBytesTransferred, 0); //将接收到的消息返回
-
- // Launch another asynchronous operation for sClient
- memset(lpPerIOData, 0, sizeof(PER_IO_OPERATION_DATA));
- lpPerIOData->Buffer.len = MSGSIZE;
- lpPerIOData->Buffer.buf = lpPerIOData->szMessage;
- lpPerIOData->OperationType = RECV_POSTED;
- WSARecv(sClient, //循环接收
- &lpPerIOData->Buffer,
- 1,
- &lpPerIOData->NumberOfBytesRecvd,
- &lpPerIOData->Flags,
- &lpPerIOData->overlap,
- NULL);
- }
- }
- }
- return 0;
- }
首先,说说主线程:
1.创建完成端口对象
2.创建工作者线程(这里工作者线程的数量是按照CPU的个数来决定的,这样可以达到最佳性能)
3.创建监听套接字,绑定,监听,然后程序进入循环
4.在循环中,我做了以下几件事情:
(1).接受一个客户端连接
(2).将该客户端套接字与完成端口绑定到一起(还是调用CreateIoCompletionPort,但这次的作用不同),
注意,按道理来讲,此时传递给CreateIoCompletionPort的第三个参数应该是一个完成键,
一般来讲,程序都是传递一个单句柄数据结构的地址,该单句柄数据包含了和该客户端连接有关的信息,
由于我们只关心套接字句柄,所以直接将套接字句柄作为完成键传递;
(3).触发一个WSARecv异步调用,用到了“尾随数据”,使接收数据所用的缓冲区紧跟在WSAOVERLAPPED对象之后,
此外,还有操作类型等重要信息。
在工作者线程的循环中,我们
1.调用GetQueuedCompletionStatus取得本次I/O的相关信息(例如套接字句柄、传送的字节数、单I/O数据结构的地址等等)
2.通过单I/O数据结构找到接收数据缓冲区,然后将数据原封不动的发送到客户端
3.再次触发一个WSARecv异步操作
自己的理解,有任何问题都可以与我联系
阅读(3008) | 评论(0) | 转发(0) |