分类: C/C++
2008-05-28 16:23:58
说明
锁定模式下为了避免socket在进行IO操作时一直处于“凝固”状态,简单地用两个线程来实现:一个线程用于数据的接收;另一个线程则用于对接收到的数据进行相关的处理。因为由两个线程来实现,其特点:
① 对数据的处理可以是耗时操作,它不会对数据的接收带来影响。
② 对数据的接收与处理分别在不同的线程内,故应考虑数据的完整性,应避免在数据接收完全之前就对数据进行操作,可以以多种方式实现:如Mutex,Semphore,CriticalSection等同步技术。
③ 当数据接收完全之后,还需要及时的通知另一个线程,以便及时处理。其实现方法是:当接收完数据后,发送已完成信号。
详细说明参见<Windows网络编程>一书。
本代码中应用CriticalSection和event来实现。
需要注意的是:event为自动重置。
完整代码及注释
// zBlogSocket.cpp : 定义控制台应用程序的入口点。 // //多线程socket锁定 #include "stdafx.h" #include #include #include using namespace std; CRITICAL_SECTION cs; HANDLE hComplete; TCHAR buf[BUFSIZ]; int nBytes; //number of per receive const int nNumPerRecv = BUFSIZ; //从socket接收数据线程 DWORD WINAPI ReadThread(LPVOID lpParam); //对接受到的数据进行计算的线程 DWORD WINAPI ComptThread(LPVOID lpParam); int _tmain(int argc, _TCHAR* argv[]) { WSAData wsd; int nRet = WSAStartup(0x0202,&wsd); if (nRet != 0) { cout << "WSAStartup Error = " << WSAGetLastError() << endl; return 1; } SOCKET soRecv = socket(AF_INET,SOCK_STREAM,0); SOCKADDR_IN siRecv; siRecv.sin_addr.s_addr = inet_addr("127.0.0.1"); siRecv.sin_family = AF_INET; siRecv.sin_port = htons(5150); nRet = bind(soRecv,(struct sockaddr*)&siRecv,sizeof(siRecv)); if (nRet == SOCKET_ERROR) { cout << "bind Error = " << WSAGetLastError() << endl; return 1; } listen(soRecv,8); HANDLE hThreads[2]; DWORD dwThread[2]; //初始化cs InitializeCriticalSection(&cs); hThreads[0] = CreateThread(NULL,0,ReadThread,(LPVOID)soRecv,0,&dwThread[0]); hThreads[1] = CreateThread(NULL,0,ComptThread,NULL,0,&dwThread[1]); //创建自动重置的event对象,当ReadThread接收数据完毕 //后将信号置为signaled hComplete = CreateEvent(NULL,false,FALSE,"evt"); //等待创建的两个线程结束 WaitForMultipleObjects(2,hThreads,true,INFINITE); //清除cs DeleteCriticalSection(&cs); return 0; } //从socket接收数据线程 DWORD WINAPI ReadThread(LPVOID lpParam){ int nTotal = 0; int nRead = 0; int nLeft = 0; int nReadBytes = 0; int nBytes = 0; SOCKET pSoRecv = (SOCKET)lpParam; int dwSend; SOCKADDR_IN siSend; SOCKET soAccept; while (1) { nTotal = 0; nLeft = nNumPerRecv; /* 10014错误原因 * int dwSend = sizeof(dwSend); Error = 10014 */ int dwSend = sizeof(siSend); soAccept = accept(pSoRecv,(struct sockaddr*)&siSend,&dwSend); if (soAccept == SOCKET_ERROR) { cout << "accept Error = " << WSAGetLastError() << endl; system("pause"); } //接收到的数据为空 nBytes = 0; memset(buf,0,BUFSIZ); while (nTotal != nNumPerRecv) { //同步操作进入cs EnterCriticalSection(&cs); /* * Recv data from socket and place data in buf[nBytes] */ //nRead = recv(soAccept,&buf[BUFSIZ - nBytes],nLeft,0); nRead = recv(soAccept,&buf[nBytes],nLeft,0); if (nRead == -1) { cout << "recv error = " << WSAGetLastError() << endl; ExitThread(1); } nTotal += nRead; nLeft -= nRead; nBytes += nRead; //离开cs LeaveCriticalSection(&cs); } //激发event SetEvent(hComplete); } } //对接受到的数据进行计算的线程 DWORD WINAPI ComptThread(LPVOID lpParam){ //等待ReadThread将数据接受完成后再进行计算 while (1) { WaitForSingleObject(hComplete,INFINITE); //对全局变量操作进入cs EnterCriticalSection(&cs); cout << buf << endl; nBytes -= nNumPerRecv; //离开cs LeaveCriticalSection(&cs); } return 0; } |