Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1291987
  • 博文数量: 196
  • 博客积分: 4141
  • 博客等级: 中将
  • 技术积分: 2253
  • 用 户 组: 普通用户
  • 注册时间: 2009-03-21 20:04
文章存档

2019年(31)

2016年(1)

2014年(16)

2011年(8)

2010年(25)

2009年(115)

分类: 系统运维

2011-01-21 02:33:02

  1. #include <winsock2.h>
  2. #include <ws2tcpip.h>

  3. #include "public.h"
  4. #include "resolve.h"

  5. typedef SINGLE_LIST_HEADER BuffHeader;
  6. typedef SINGLE_LIST BuffObj;
  7. typedef SINGLE_LIST_HEADER TheadObjHeader;
  8. typedef SINGLE_LIST ThreadObj;
  9. typedef DOUBLE_LIST_HEADER SockObjHeader;
  10. typedef DOUBLE_LIST SockObj;

  11. typedef struct _SOCKET_OBJ
  12. {
  13.     SOCKET s; // Socket handle
  14.     HANDLE event; // Event handle
  15.     int listening; // Socket is a listening socket (TCP)
  16.     int closing; // Indicates whether the connection is closing

  17.     SOCKADDR_STORAGE addr; // Used for client's remote address
  18.     int addrlen; // Length of the address

  19.     BuffHeader buff;

  20.     DOUBLE_LIST entry;
  21. } SOCKET_OBJ;

  22. typedef struct _THREAD_OBJ
  23. {
  24.     SockObjHeader sockHeader;

  25.     HANDLE Event; // Used to signal new clients assigned
  26.                                        // to this thread
  27.     HANDLE Thread;

  28.     HANDLE Handles[MAXIMUM_WAIT_OBJECTS]; // Array of socket's event handles

  29.     CRITICAL_SECTION ThreadCritSec; // Protect access to SOCKET_OBJ lists

  30.     ThreadObj entry; // Next thread object in list
  31. } THREAD_OBJ;

  32. TheadObjHeader theadObjHeader;

  33. SOCKET_OBJ* GetSocketObj(SOCKET s, int listening) {
  34.     SOCKET_OBJ *sockobj = NULL;

  35.     sockobj = (SOCKET_OBJ*)HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(SOCKET_OBJ));
  36.     if (sockobj == NULL) {
  37.         fprintf(stderr, "HeapAlloc failed.\n");
  38.         ExitProcess(-1);
  39.     }

  40.     sockobj->s = s;
  41.     sockobj->listening = listening;
  42.     sockobj->addrlen = sizeof(sockobj->addr);

  43.     sockobj->event = WSACreateEvent();
  44.     if (sockobj->event == NULL)
  45.     {
  46.         fprintf(stderr, "GetSocketObj: WSACreateEvent failed: %d\n", WSAGetLastError());
  47.         ExitProcess(-1);
  48.     }

  49.     InitializeCriticalSection(&sockobj->buff.SendRecvQueueCritSec);

  50.     return sockobj;
  51. }

  52. THREAD_OBJ *GetThreadObj() {
  53.     THREAD_OBJ *thread = NULL;

  54.     thread = (THREAD_OBJ*)HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(THREAD_OBJ));
  55.     if (thread == NULL) {
  56.         fprintf(stderr, "HeapAllco failed.\n");
  57.         ExitProcess(-1);
  58.     }

  59.     thread->Event = WSACreateEvent();
  60.     if (thread->Event == NULL) {
  61.         fprintf(stderr, "WSACreateEvent failed.\n");
  62.         ExitProcess(-1);
  63.     }

  64.     thread->Handles[0] = thread->Event;

  65.     InitializeCriticalSection(&thread->ThreadCritSec);
  66.     InitializeDoubleHead(&thread->sockHeader);

  67.     return thread;
  68. }

  69. int InsertSocketObj(THREAD_OBJ *thread, SOCKET_OBJ *sockobj) {
  70.     int ret;

  71.     EnterCriticalSection(&thread->ThreadCritSec);

  72.     if (thread->sockHeader.count < MAXIMUM_WAIT_OBJECTS - 1) {
  73.         EnqueueDoubleListHead(&(thread->sockHeader), &(sockobj->entry));

  74.         thread->Handles[thread->sockHeader.count] = sockobj->event;

  75.         ret = NO_ERROR;
  76.     } else {
  77.         ret = SOCKET_ERROR;
  78.     }

  79.     LeaveCriticalSection(&thread->ThreadCritSec);

  80.     return ret;
  81. }

  82. SOCKET_OBJ *FindSocketObj(THREAD_OBJ *thread, int index) {
  83.     SOCKET_OBJ *sockobj = NULL;
  84.     int i;

  85.     EnterCriticalSection(&thread->ThreadCritSec);

  86.     SockObj *sptr = (SockObj *)GotoNextDoubleList(&thread->sockHeader, &(thread->sockHeader.head));
  87.     for (i = 0; i < index; ++i) {
  88.         if (sptr == NULL)
  89.         {
  90.             fprintf(stderr, "FindSocketobj failed.\n");
  91.             ExitProcess(-1);
  92.         }
  93.         sptr = (SockObj *)GotoNextDoubleList(&thread->sockHeader, sptr);
  94.     }

  95.     sockobj = (SOCKET_OBJ *)container_of(SOCKET_OBJ, entry, sptr);

  96.     LeaveCriticalSection(&thread->ThreadCritSec);

  97.     return sockobj;
  98. }

  99. void RemoveSocketObj(THREAD_OBJ *thread, SOCKET_OBJ *sock) {
  100.     EnterCriticalSection(&thread->ThreadCritSec);

  101.     RemoveDoubleList(&thread->sockHeader, &sock->entry);
  102.     WSASetEvent(thread->Event);

  103.     LeaveCriticalSection(&thread->ThreadCritSec);
  104. }

  105. void FreeSocketObj(SOCKET_OBJ *obj) {
  106.     BuffObj *ptr = NULL;
  107.     BUFFER_OBJ *blk = NULL;

  108.     while (true) {
  109.         ptr = DequeueSingleList(&obj->buff);
  110.         if (ptr == NULL)
  111.             break;

  112.         blk = (BUFFER_OBJ *)container_of(BUFFER_OBJ, next, ptr);
  113.         FreeBufferObj(blk);
  114.     }

  115.     WSACloseEvent(obj->event);

  116.     if (obj->s != INVALID_SOCKET) {
  117.         closesocket(obj->s);
  118.     }

  119.     HeapFree(GetProcessHeap(), 0, obj);
  120. }

  121. void RenumberThreadArray(THREAD_OBJ *thread) {
  122.     EnterCriticalSection(&thread->ThreadCritSec);

  123.     SOCKET_OBJ *obj = NULL;
  124.     int i = 0;

  125.     SockObj *sptr = NULL;
  126.     sptr = (SockObj *)GotoNextDoubleList(&thread->sockHeader, &(thread->sockHeader.head));
  127.     while (sptr) {
  128.         obj = (SOCKET_OBJ *)container_of(SOCKET_OBJ, entry, sptr);

  129.         thread->Handles[++i] = obj->event;

  130.         sptr = (SockObj *)GotoNextDoubleList(&thread->sockHeader, sptr);
  131.     }

  132.     LeaveCriticalSection(&thread->ThreadCritSec);
  133. }

  134. int ReceivePendingData(SOCKET_OBJ *sockobj) {
  135.     BUFFER_OBJ *buffobj=NULL;
  136.     int rc,
  137.                 ret;

  138.     // Get a buffer to receive the data
  139.     buffobj = GetBufferObj(gBufferSize);

  140.     ret = 0;

  141.     if (gProtocol == IPPROTO_TCP)
  142.     {
  143.         rc = recv(sockobj->s, buffobj->buf, buffobj->buflen, 0);
  144.     } else {
  145.         fprintf(stderr, "Tcp failed.\n");
  146.         ExitProcess(-1);
  147.     }

  148.     if (rc == SOCKET_ERROR) {
  149.         fprintf(stderr, "recv failed.\n");
  150.         ExitProcess(-1);
  151.     } else if (rc == 0) {
  152.         FreeBufferObj(buffobj);

  153.         sockobj->closing = TRUE;

  154.         if (sockobj->buff.head == NULL)
  155.         {
  156.             // If no sends are pending, close the socket for good
  157.             closesocket(sockobj->s);
  158.             sockobj->s = INVALID_SOCKET;
  159.             ret = -1;
  160.         }
  161.         else
  162.         {
  163.             ret = 0;
  164.         }
  165.     } else {
  166.         buffobj->buflen = rc;
  167.         EnqueueSingleList(&sockobj->buff, &buffobj->next);

  168.         ret = 1;
  169.     }

  170.     return ret;
  171. }

  172. int SendPendingData(SOCKET_OBJ *sock) {
  173.     BUFFER_OBJ *bufobj = NULL;
  174.     BuffObj *entry = NULL;
  175.     int nleft = 0,
  176.                 idx = 0,
  177.                 ret = 0,
  178.                 rc = 0;

  179.     while (entry = DequeueSingleList(&sock->buff)) {
  180.         bufobj = (BUFFER_OBJ *)container_of(BUFFER_OBJ, next, entry);

  181.         if (gProtocol == IPPROTO_TCP) {
  182.             nleft = bufobj->buflen;
  183.             idx = 0;

  184.             while (nleft > 0) {
  185.                 rc = send(sock->s, &(bufobj->buf[idx]), nleft, 0);
  186.                 if (rc == SOCKET_ERROR) {
  187.                     ExitProcess(-1);
  188.                 } else {
  189.                     idx += rc;
  190.                     nleft -= rc;
  191.                 }
  192.             }

  193.             printf("send %d.\n", bufobj->buflen);

  194.             FreeBufferObj(bufobj);
  195.         } else {
  196.             ExitProcess(-1);
  197.         }
  198.     }

  199.     if ((sock->buff.head == NULL) && (sock->closing == TRUE)) {
  200.         closesocket(sock->s);
  201.         sock->s = INVALID_SOCKET;
  202.         ret = -1;

  203.         printf("Closing Connection.\n");
  204.     }

  205.     return ret;
  206. }

  207. int HandleIo(THREAD_OBJ *thread, SOCKET_OBJ *sock) {
  208.     WSANETWORKEVENTS nevents;
  209.     int rc;

  210.     // Enumerate the events
  211.     rc = WSAEnumNetworkEvents(sock->s, sock->event, &nevents);
  212.     if (rc == SOCKET_ERROR)
  213.     {
  214.         fprintf(stderr, "HandleIo: WSAEnumNetworkEvents failed: %d\n", WSAGetLastError());
  215.         return SOCKET_ERROR;
  216.     }

  217.     if (nevents.lNetworkEvents & FD_READ) {
  218.         if (nevents.iErrorCode[FD_READ_BIT] == 0) {
  219.             rc = ReceivePendingData(sock);
  220.             if (rc == -1)
  221.             {
  222.                 RemoveSocketObj(thread, sock);
  223.                 FreeSocketObj(sock);
  224.                 return SOCKET_ERROR;
  225.             }
  226.             rc = SendPendingData(sock);
  227.             if (rc == -1)
  228.             {
  229.                 RemoveSocketObj(thread, sock);
  230.                 FreeSocketObj(sock);
  231.                 return SOCKET_ERROR;
  232.             }
  233.         } else {
  234.             fprintf(stderr, "HandleIo: FD_READ error %d\n", nevents.iErrorCode[FD_READ_BIT]);
  235.             RemoveSocketObj(thread, sock);
  236.             FreeSocketObj(sock);
  237.             return SOCKET_ERROR;
  238.         }
  239.     }
  240.     if (nevents.lNetworkEvents & FD_WRITE) {
  241.         if (nevents.iErrorCode[FD_WRITE_BIT] == 0)
  242.         {
  243.             rc = SendPendingData(sock);
  244.             if (rc == -1)
  245.             {
  246.                 RemoveSocketObj(thread, sock);
  247.                 FreeSocketObj(sock);
  248.                 return SOCKET_ERROR;
  249.             }
  250.         }
  251.         else
  252.         {
  253.             fprintf(stderr, "HandleIo: FD_WRITE error %d\n", nevents.iErrorCode[FD_WRITE_BIT]);
  254.             return SOCKET_ERROR;
  255.         }
  256.     }
  257.     if (nevents.lNetworkEvents & FD_CLOSE) {
  258.         if (nevents.iErrorCode[FD_CLOSE_BIT] == 0)
  259.         {
  260.             // Socket has been indicated as closing so make sure all the data
  261.             // has been read
  262.             printf("close.\n");
  263.             while (1)
  264.             {
  265.                 rc = ReceivePendingData(sock);
  266.                 if (rc == -1)
  267.                 {
  268.                     RemoveSocketObj(thread, sock);
  269.                     FreeSocketObj(sock);
  270.                     return SOCKET_ERROR;
  271.                 }
  272.                 else if (rc != 0)
  273.                 {
  274.                     continue;
  275.                 }
  276.                 else
  277.                 {
  278.                     break;
  279.                 }
  280.             }
  281.             // See if there is any data pending, if so try to send it
  282.             rc = SendPendingData(sock);
  283.             if (rc == -1)
  284.             {
  285.                 RemoveSocketObj(thread, sock);
  286.                 FreeSocketObj(sock);
  287.                 return SOCKET_ERROR;
  288.             }
  289.         }
  290.         else
  291.         {
  292.             fprintf(stderr, "HandleIo: FD_CLOSE error %d\n", nevents.iErrorCode[FD_CLOSE_BIT]);
  293.             RemoveSocketObj(thread, sock);
  294.             FreeSocketObj(sock);
  295.             return SOCKET_ERROR;
  296.         }
  297.     }

  298.     return NO_ERROR;
  299. }

  300. DWORD WINAPI ChildThread(LPVOID lpParam) {
  301.     THREAD_OBJ *thread=NULL;
  302.     SOCKET_OBJ *sptr=NULL,
  303.                *sockobj=NULL;
  304.     int index,
  305.                 rc,
  306.                 i;

  307.     thread = (THREAD_OBJ *)lpParam;

  308.     while (true) {
  309.         rc = WaitForMultipleObjects(thread->sockHeader.count + 1, thread->Handles, FALSE, INFINITE);
  310.         if (rc == WAIT_FAILED || rc == WAIT_TIMEOUT)
  311.         {
  312.             fprintf(stderr, "ChildThread: WaitForMultipleObjects failed: %d\n", GetLastError());
  313.             break;
  314.         } else {
  315.             for(i = 0; i < thread->sockHeader.count + 1; i++) {
  316.                 rc = WaitForSingleObject(thread->Handles[i], 0);

  317.                 if (rc == WAIT_FAILED)
  318.                 {
  319.                     fprintf(stderr, "ChildThread: WaitForSingleObject failed: %d\n", GetLastError());
  320.                     ExitThread(-1);
  321.                 }
  322.                 else if (rc == WAIT_TIMEOUT)
  323.                 {
  324.                     // This event isn't signaled, continue to the next one
  325.                     continue;
  326.                 }
  327.                 index = i;

  328.                 if (index == 0)
  329.                 {
  330.                     // If index 0 is signaled then rebuild the array of event
  331.                     // handles to wait on
  332.                     WSAResetEvent(thread->Handles[index]);

  333.                     RenumberThreadArray(thread);

  334.                     i = 1;
  335.                 } else {
  336.                     sockobj = FindSocketObj(thread, index-1);
  337.                     if (sockobj != NULL)
  338.                     {
  339.                         if (HandleIo(thread, sockobj) == SOCKET_ERROR)
  340.                         {
  341.                             RenumberThreadArray(thread);
  342.                         }
  343.                     }
  344.                     else
  345.                     {
  346.                         printf("Unable to find socket object!\n");
  347.                     }
  348.                 }
  349.             }
  350.         }
  351.     }
  352. }

  353. void AssignToFreeThread(SOCKET_OBJ *sock) {
  354.     ThreadObj *threadobj = NULL;
  355.     THREAD_OBJ *thread = NULL;

  356.     threadobj = (ThreadObj *)GotoNextSingleList(&theadObjHeader, theadObjHeader.head);
  357.     while (threadobj) {
  358.         thread = (THREAD_OBJ *)container_of(THREAD_OBJ, entry, threadobj);

  359.         if (InsertSocketObj(thread, sock) != SOCKET_ERROR) {
  360.             break;
  361.         }
  362.         threadobj = (ThreadObj *)GotoNextSingleList(&theadObjHeader, threadobj);
  363.     }

  364.     if (threadobj == NULL) {
  365.         thread = GetThreadObj();

  366.         thread->Thread = CreateThread(NULL, 0, ChildThread, (LPVOID)thread, 0, NULL);
  367.         if (thread->Thread == NULL)
  368.         {
  369.             fprintf(stderr, "AssignToFreeThread: CreateThread failed: %d\n", GetLastError());
  370.             ExitProcess(-1);
  371.         }

  372.         InsertSocketObj(thread, sock);

  373.         EnqueueSingleList(&theadObjHeader, &thread->entry);
  374.     }

  375.     WSASetEvent(thread->Event);
  376. }

  377. int _tmain(int argc, _TCHAR* argv[])
  378. {
  379.     WSADATA wsd;

  380.     struct addrinfo *res=NULL,
  381.                     *ptr=NULL;

  382.     THREAD_OBJ *thread=NULL;
  383.     SOCKET_OBJ *sockobj=NULL,
  384.                     *newsock=NULL;

  385.     int index,
  386.                      rc;

  387.     if (WSAStartup(MAKEWORD(2,2), &wsd) != 0)
  388.     {
  389.         fprintf(stderr, "unable to load Winsock!\n");
  390.         return -1;
  391.     }

  392.     res = ResolveAddress(gSrvAddr, gPort, gAddressFamily, gSocketType, gProtocol);
  393.     if (res == NULL)
  394.     {
  395.         fprintf(stderr, "ResolveAddress failed to return any addresses!\n");
  396.         return -1;
  397.     }

  398.     thread = GetThreadObj();

  399.     InitializeCriticalSection(&theadObjHeader.SendRecvQueueCritSec);
  400.     theadObjHeader.head = theadObjHeader.tail = NULL;

  401.     ptr = res;
  402.     while (ptr) {
  403.         sockobj = GetSocketObj(INVALID_SOCKET, (gProtocol == IPPROTO_TCP) ? TRUE : FALSE);

  404.         sockobj->s = socket(ptr->ai_family, ptr->ai_socktype, ptr->ai_protocol);
  405.         if (sockobj->s == INVALID_SOCKET) {
  406.             fprintf(stderr, "create socket failed.\n");
  407.             ExitProcess(-1);
  408.         }

  409.         InsertSocketObj(thread, sockobj);

  410.         rc = bind(sockobj->s, ptr->ai_addr, ptr->ai_addrlen);
  411.         if (rc == SOCKET_ERROR)
  412.         {
  413.             fprintf(stderr, "bind failed: %d\n", WSAGetLastError());
  414.             return -1;
  415.         }

  416.         if (gProtocol == IPPROTO_TCP) {
  417.             rc = listen(sockobj->s, 200);
  418.             if (rc == SOCKET_ERROR) {
  419.                 fprintf(stderr, "listen failed.\n");
  420.                 ExitProcess(-1);
  421.             }

  422.             rc = WSAEventSelect(sockobj->s, sockobj->event, FD_ACCEPT | FD_CLOSE);
  423.             if (rc == SOCKET_ERROR) {
  424.                 fprintf(stderr, "WSAEventSelect failed: %d\n", WSAGetLastError());
  425.                 ExitProcess(-1);
  426.             }
  427.         }

  428.         ptr = ptr->ai_next;
  429.     }

  430.     freeaddrinfo(res);

  431.     while (true) {
  432.         rc = WaitForMultipleObjects(thread->sockHeader.count + 1, thread->Handles, FALSE, 5000);
  433.         if (rc == WAIT_FAILED) {
  434.             fprintf(stderr, "WaitForMultipleObjects failed:%d\n", WSAGetLastError());
  435.             break;
  436.         } else if (rc == WAIT_TIMEOUT) {
  437.             continue;
  438.         } else {
  439.             index = rc - WAIT_OBJECT_0;

  440.             sockobj = FindSocketObj(thread, index - 1);

  441.             if (gProtocol == IPPROTO_TCP) {
  442.                 SOCKADDR_STORAGE sa;
  443.                 WSANETWORKEVENTS ne;
  444.                 SOCKET sc;
  445.                 int salen;

  446.                 rc = WSAEnumNetworkEvents(sockobj->s, thread->Handles[index], &ne);
  447.                 if (rc == SOCKET_ERROR) {
  448.                     fprintf(stderr, "WSAEnumNetworkEvents failed.\n");
  449.                     break;
  450.                 }

  451.                 while (true) {
  452.                     sc = INVALID_SOCKET;
  453.                     salen = sizeof(sa);

  454.                     sc = accept(sockobj->s, (SOCKADDR *)&sa, &salen);
  455.                     if ((sc == INVALID_SOCKET) && (WSAGetLastError() != WSAEWOULDBLOCK)) {
  456.                         fprintf(stderr, "accept failed.\n");
  457.                         break;
  458.                     } else if (sc == INVALID_SOCKET){
  459.                         continue;
  460.                     } else {
  461.                         newsock = GetSocketObj(INVALID_SOCKET, FALSE);

  462.                         memcpy(&newsock->addr, &sa, salen);
  463.                         newsock->addrlen = salen;
  464.                         newsock->s = sc;

  465.                         rc = WSAEventSelect(newsock->s, newsock->event, FD_READ | FD_WRITE | FD_CLOSE);
  466.                         if (rc == SOCKET_ERROR)
  467.                         {
  468.                             fprintf(stderr, "WSAEventSelect failed: %d\n", WSAGetLastError());
  469.                             break;
  470.                         }

  471.                         AssignToFreeThread(newsock);
  472.                     }
  473.                 }
  474.             }
  475.         }
  476.     }

  477.     WSACleanup();

  478.     return 0;
  479. }
阅读(3252) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~