Chinaunix首页 | 论坛 | 博客
  • 博客访问: 158390
  • 博文数量: 22
  • 博客积分: 425
  • 博客等级: 下士
  • 技术积分: 350
  • 用 户 组: 普通用户
  • 注册时间: 2012-05-15 09:43
个人简介

专注服务器设计、开发; https://github.com/justscu;

文章分类

全部博文(22)

文章存档

2014年(10)

2013年(2)

2012年(10)

分类: LINUX

2013-11-07 20:30:42

1. 基于上一篇文章(用epoll实现一个异步处理的模型(1)),来继续实现一个tcp服务器。
    (1)构造一个监听socket(socket-> bind-> listen),将监听socket的fd加入到epoll的监听中,等待客户端的连接。
    (2)客户端连接后,调用OnFDRead(),得到客户端socket的fd,并将fd加入到epoll的监听中。
    (3)处理客户端fd的读写事件。
2. 代码

点击(此处)折叠或打开  tcpeventhandler.h

  1. #pragma once
  2. #include "reactor.h"
  3. #include <sys/types.h> /* See NOTES */
  4. #include <sys/socket.h>
  5. #include <errno.h>
  6. #include <arpa/inet.h>
  7. #include <netinet/tcp.h>
  8. #include <fcntl.h>
  9. #include <string.h>
  10. #include <stdio.h>

  11. class SocketDataDecoderBase
  12. {
  13. public:
  14.     virtual int OnProcess(FDEventHanderBase* pSocket, const char* buf,
  15.      unsigned int buf_len) = 0;
  16. };

  17. /********************************
  18.  * 处理tcpServer事件的类
  19.  ********************************/
  20. class TcpServerEventHandler: public FDEventHanderBase
  21. {
  22. public:
  23.     TcpServerEventHandler(const char* ip, int port, SocketDataDecoderBase* decoder,
  24.      int fd, Reactor* pReactor) :
  25.         FDEventHanderBase(fd, pReactor), m_pDecoder(decoder)
  26.     {
  27.         memset(m_ip, 0x00, sizeof(m_ip));
  28.         strncpy(m_ip, ip, strlen(ip));
  29.         m_port = port;
  30.     }
  31.     virtual ~TcpServerEventHandler() {
  32.     }
  33.     // create listen socket. return 0, success.
  34.     int OnListen();
  35.     int OnAccept(int fd);
  36.     // 当epoll中有新的连接过来的时候,就会调用该函数
  37.     virtual void OnFDRead();
  38.     virtual void OnFDWrite();
  39.     virtual void Close();
  40. protected:
  41.     char m_ip[16]; //ipv4
  42.     int m_port;
  43.     SocketDataDecoderBase* m_pDecoder;
  44. };

  45. /* **************************
  46.  * 处理TCP客户端发过来的数据
  47.  * 这个地方,可以开辟2块缓冲区(读缓冲区和写缓冲区)
  48.  * (1)读,当socket可读时,epoll会被触发,回调OnFDRead()
  49.  * 将读取到的数据进行处理,或者放到读缓冲区
  50.  * (2)写,当有数据需要通过socket发出去时,先将数据放到写缓冲区中,
  51.  * 注册RegisterWriteEvent,之后,OnFDWrite就会被回调,
  52.  * 若LT模式,当数据没有写完时,OnFDWrite一直会被回调,直到写完后,
  53.  * 用户调用UnRegisteWriteEvent.
  54.  * ************************/
  55. class TcpSocketHandler: public FDEventHanderBase
  56. {
  57. public:
  58.     TcpSocketHandler(SocketDataDecoderBase* decoder, int fd, Reactor* pReactor) :
  59.         FDEventHanderBase(fd, pReactor), m_dataDecoder(decoder)
  60.     {

  61.     }
  62.     virtual ~TcpSocketHandler() {
  63.     }
  64.     // 读被触发时,调用该函数,处理客户端发过来的数据
  65.     virtual void OnFDRead();
  66.     // 写被触发时,调用该函数
  67.     virtual void OnFDWrite();
  68.     // 关闭fd
  69.     virtual void Close();
  70.     // 可以通过该函数,向客户端发送数据
  71.     virtual int SendBuf(const char* buf, int buf_len);
  72. private:
  73.     SocketDataDecoderBase* m_dataDecoder;
  74.     //Buffer* pReadBuf;
  75.     //Buffer* pWriteBuf;
  76. };

  77. /* *********************************
  78.  *          TcpServerBase类
  79.  * *********************************/
  80. class TcpServerBase
  81. {
  82. public:
  83.     TcpServerBase(const char* ip, int port, SocketDataDecoderBase* pDecoder)
  84.     {
  85.         memset(m_ip, 0x00, sizeof(m_ip));
  86.         strncpy(m_ip, ip, strlen(ip));
  87.         m_port = port;
  88.         m_pReactor = NULL;
  89.         m_pDecoder = pDecoder;
  90.     }
  91.     virtual int Init(){
  92.         m_pReactor = new LTReactor;
  93.         return m_pReactor->Init();
  94.     }
  95.     virtual int Run(){
  96.         TcpServerEventHandler h(m_ip, m_port, m_pDecoder, 0, m_pReactor);
  97.         if (0 == h.OnListen()) {
  98.             m_pReactor->Run();
  99.         }

  100.         return 0;
  101.     }
  102.     virtual int Exit(){
  103.         delete m_pReactor;
  104.         return 0;
  105.     }
  106. private:
  107.     char m_ip[16];//ipv4
  108.     int m_port;
  109.     Reactor* m_pReactor;
  110.     SocketDataDecoderBase* m_pDecoder;
  111. };

点击(此处)折叠或打开 tcpeventhandler.cpp

  1. #include "tcpeventhandler.h"

  2. /********************************
  3.  * 处理tcpServer事件的类
  4.  ********************************/
  5. // create listen socket. return 0, success.
  6. int TcpServerEventHandler::OnListen() {
  7.     m_fd = ::socket(AF_INET, SOCK_STREAM, 0);
  8.     if (m_fd == -1)
  9.     {
  10.         std::cout << "create socket error." << strerror(errno) << std::endl;
  11.         return -1;
  12.     }
  13.     sockaddr_in addr;
  14.     memset(&addr, 0x00, sizeof(sockaddr_in));
  15.     addr.sin_family = AF_INET;
  16.     addr.sin_port = htons(m_port);
  17.     socklen_t len = sizeof(sockaddr_in);

  18.     if (1 != inet_pton(AF_INET, m_ip, &addr.sin_addr))
  19.     {
  20.         std::cout << "inet_pton error." << strerror(errno) << std::endl;
  21.         close(m_fd);
  22.         return -1;
  23.     }
  24.     // 允许在同一端口上启动同一服务器的多个实例
  25.     int opVal = 1;
  26.     if (0 != setsockopt(m_fd, SOL_SOCKET, SO_REUSEADDR, &opVal, sizeof(opVal)))
  27.     {
  28.         std::cout << "socketopt error." << strerror(errno) << std::endl;
  29.         close(m_fd);
  30.         return -1;
  31.     }
  32.     // 设置非阻塞模式
  33.     SetNonBlock(m_fd, true);
  34.     // bind
  35.     if (-1 == bind(m_fd, (struct sockaddr*) &addr, len))
  36.     {
  37.         std::cout << "bind error." << strerror(errno) << std::endl;
  38.         close(m_fd);
  39.         return -1;
  40.     }
  41.     // listen
  42.     if (-1 == listen(m_fd, 200))
  43.     {
  44.         std::cout << "listen error." << strerror(errno) << std::endl;
  45.         close(m_fd);
  46.         return -1;
  47.     }

  48.     // 将该fd加入到 epoll中,监听客户端的连接
  49.     if (0 != RegisterReadEvent())
  50.     {
  51.         std::cout << "RegisterReadEvent error." << std::endl;
  52.         close(m_fd);
  53.         return -1;
  54.     }

  55.     return 0;
  56. }

  57. int TcpServerEventHandler::OnAccept(int fd) {
  58.     TcpSocketHandler* pHandler = new TcpSocketHandler(m_pDecoder, fd,
  59.      this->m_pReactor);
  60.     // 注册一个读事件,从TCP的客户端读取数据.
  61.     if (0 != pHandler->RegisterReadEvent())
  62.     {
  63.         pHandler->Close();
  64.         delete pHandler;
  65.         pHandler = NULL;
  66.     }
  67.     // 需要delete pHandler
  68.     return 0;
  69. }
  70. // 当epoll中有新的连接过来的时候,就会调用该函数
  71. void TcpServerEventHandler::OnFDRead() {
  72.     sockaddr_in clientAddr;
  73.     socklen_t len = sizeof(clientAddr);
  74.     // 客户端的fd,这个地方,其实可以建立一个(fd -- 客户端)对应关系
  75.     int sockFD = accept(m_fd, (struct sockaddr*) &clientAddr, &len);
  76.     if (sockFD < 0)
  77.     {
  78.         std::cout << "accept error." << std::endl;
  79.         return;
  80.     }
  81.     else
  82.     {
  83.         int port = ntohs(clientAddr.sin_port);
  84.         char addr[256] =
  85.         { 0 };
  86.         inet_ntop(AF_INET, &clientAddr.sin_addr.s_addr, addr,
  87.          (socklen_t) sizeof(addr));
  88.         printf("accept: [%s:%d] \n", addr, port);
  89.     }

  90.     OnAccept(sockFD);
  91. }
  92. void TcpServerEventHandler::OnFDWrite() {

  93. }
  94. void TcpServerEventHandler::Close() {
  95.     UnRegisterReadEvent();
  96.     UnRegisterWriteEvent();
  97.     close(m_fd);
  98. }

  99. /* ***********************
  100.  * 处理TCP客户端发过来的数据
  101.  * ************************/
  102. // 读被触发时,调用该函数,处理客户端发过来的数据
  103. void TcpSocketHandler::OnFDRead() {
  104.     // 在这个函数中,可以读取客户端发过来的数据,并对数据进行分析,TCP的数据,
  105.     // 可能会出现半包的情况,所以一般会有一个缓存来保存没有处理完毕的数据
  106.     char buf[1024] =
  107.     { 0 };
  108.     int iRet = read(m_fd, buf, sizeof(buf));
  109.     if (0 == iRet)
  110.     {
  111.         Close();// 客户端关闭socket.
  112.         return;
  113.     }
  114.     // 回调处理函数,对数据进行处理
  115.     m_dataDecoder->OnProcess(this, buf, iRet);
  116. }
  117. // 写被触发时,调用该函数
  118. void TcpSocketHandler::OnFDWrite() {

  119. }
  120. // 关闭fd
  121. void TcpSocketHandler::Close() {
  122.     UnRegisterReadEvent();
  123.     UnRegisterWriteEvent();
  124.     close(m_fd);
  125.     delete this;
  126. }
  127. // 可以通过该函数,向客户端发送数据
  128. int TcpSocketHandler::SendBuf(const char* buf, int buf_len) {
  129.     RegisterWriteEvent();
  130.     int iRet = write(m_fd, buf, buf_len);
  131.     UnRegisterWriteEvent();
  132.     return iRet;
  133. }
3.  测试代码
    测试代码实现了一个基本的tcp回显服务器。
    简单的测试,可以在命令行,nc localhost 56893,然后输入,就能够看到回显的效果。

测试代码(此处)

  1. #include "tcpeventhandler.h"

  2. /* *********************
  3.  * 处理客户端发过来的数据
  4.  * ********************/
  5. class TcpDataDecoder_echo_test: public SocketDataDecoderBase
  6. {
  7. public:
  8.     // 真正的处理数据的函数
  9.     virtual int OnProcess(FDEventHanderBase* pSocket, const char* buf,
  10.      unsigned int buf_len)
  11.     {
  12.         // 这个地方,将客户端的数据回写
  13.         if (0 == pSocket->RegisterWriteEvent())
  14.         {
  15.             write(pSocket->GetFD(), buf, buf_len);
  16.             pSocket->UnRegisterWriteEvent();
  17.         }

  18.         return 0;
  19.     }
  20. };


  21. void TcpServer_test() {
  22.     SocketDataDecoderBase* pDecoder = new TcpDataDecoder_echo_test;
  23.     TcpServerBase b("127.0.0.1", 56893, pDecoder);
  24.     b.Init();
  25.     b.Run();
  26.     b.Exit();
  27.     delete pDecoder;

  28. }
4. 一些需要注意的地方
    (1)服务器端,最好设置tcp keepalive,在服务器与客户端之间,维持一个心跳连接。防止一些假死的客户端占用服务器端资源。【上面的代码中,并没有体现出来】
    (2)由于服务器端的fd,是连续回收使用的,所以当close(fd) 后,需要将该fd的已经注册的读写handler,从EventHanderSet中删除掉。
    (3)在epoll的LT模式下,在写之前,需要注册RegistReadEvent(),之后,该fd可以一直写。当写完后,需要调用UnRegisterReadEvent()。
    (4)通常的接收和发送,是需要接收/发送缓存的,上面的代码中,并没有体现出来。
5. 同样的,可以使用unix_socket连接。
    unix socket,用于同一主机上不同进程间的交互。通过socket文件进行交互。

点击(此处)折叠或打开 unixsocketeventhandler.h

  1. #pragma once

  2. #include <stdlib.h>
  3. #include <stdio.h>
  4. #include <string.h>
  5. #include <sys/types.h> /* See NOTES */
  6. #include <sys/socket.h>
  7. #include <netinet/in.h>
  8. #include <sys/un.h>
  9. #include <iostream>
  10. #include <errno.h>
  11. #include <sys/stat.h>

  12. #include "reactor.h"
  13. #include "SocketDataDecoderBase.h"

  14. #define MAX_PATH 256

  15. /* ********************************
  16.  *    处理 unix socket server事件
  17.  * ********************************/
  18. class UnixSocketServerEventHandler: public FDEventHanderBase
  19. {
  20. public:
  21.     UnixSocketServerEventHandler(const char* path, SocketDataDecoderBase* decoder, int fd, Reactor* pReactor) :
  22.         FDEventHanderBase(fd, pReactor), m_pDecoder(decoder)
  23.     {
  24.         assert(strlen(path) > 0);
  25.         strncpy(m_path, path, sizeof(m_path));
  26.         unlink(m_path);
  27.     }
  28.     ~UnixSocketServerEventHandler() {

  29.     }
  30.     // create unix socket. return 0, success.
  31.     int OnListen();
  32.     int OnAccept(int fd);
  33.     // 读被触发时,调用该函数
  34.     virtual void OnFDRead();
  35.     // 写被触发时,调用该函数
  36.     virtual void OnFDWrite() {

  37.     }
  38.     // 关闭fd
  39.     virtual void Close() {
  40.         unlink(m_path);
  41.         close(m_fd);
  42.     }
  43. private:
  44.     char m_path[MAX_PATH];
  45.     SocketDataDecoderBase* m_pDecoder;
  46. };

  47. /* ********************************
  48.  *    处理 unix socket 客户端事件
  49.  * ********************************/
  50. class UnixSocketHandler: public FDEventHanderBase
  51. {
  52. public:
  53.     UnixSocketHandler(SocketDataDecoderBase* decoder, int fd, Reactor* pReactor) :
  54.         FDEventHanderBase(fd, pReactor), m_pDecoder(decoder)
  55.     {

  56.     }
  57.     ~UnixSocketHandler(){}
  58.     virtual void OnFDRead();
  59.     virtual void OnFDWrite();
  60.     virtual void Close();
  61.     virtual int SendBuf(const char* buf, int buf_len);
  62. private:
  63.     SocketDataDecoderBase* m_pDecoder;
  64. };

  65. /* *********************************
  66.  *          UnixSocketServerBase类
  67.  * *********************************/
  68. class UnixSocketServerBase
  69. {
  70. public:
  71.     explicit UnixSocketServerBase(const char* path, SocketDataDecoderBase* pDecoder)
  72.     {
  73.         memset(m_path, 0x00, sizeof(m_path));
  74.         strncpy(m_path, path, strlen(path));
  75.         m_pReactor = NULL;
  76.         m_pDecoder = pDecoder;
  77.     }
  78.     virtual int Init()
  79.     {
  80.         m_pReactor = new LTReactor;
  81.         return m_pReactor->Init();
  82.     }
  83.     virtual int Run()
  84.     {
  85.         UnixSocketServerEventHandler h(m_path, m_pDecoder, 0, m_pReactor);
  86.         if (0 == h.OnListen())
  87.         {
  88.             m_pReactor->Run();
  89.         }

  90.         return 0;
  91.     }
  92.     virtual int Exit()
  93.     {
  94.         delete m_pReactor;
  95.         return 0;
  96.     }
  97. private:
  98.     UnixSocketServerBase(const UnixSocketServerBase&);
  99.     const UnixSocketServerBase& operator=(const UnixSocketServerBase&);
  100. private:
  101.     char m_path[MAX_PATH];
  102.     Reactor* m_pReactor;
  103.     SocketDataDecoderBase* m_pDecoder;
  104. };

点击(此处)折叠或打开 unixsocketeventhandler.cpp

  1. #include "unixsocketeventhandler.h"

  2. // create unix socket. return 0, success.
  3. int UnixSocketServerEventHandler::OnListen() {
  4.     m_fd = socket(AF_UNIX, SOCK_STREAM, 0);
  5.     if (m_fd == -1)
  6.     {
  7.         std::cout << "create socket error." << strerror(errno) << std::endl;
  8.         return -1;
  9.     }

  10.     sockaddr_un addr;
  11.     memset(&addr, 0, sizeof(addr));
  12.     addr.sun_family = AF_UNIX;
  13.     strcpy(addr.sun_path, m_path);

  14.     // 设置非阻塞模式
  15.     SetNonBlock(m_fd, true);
  16.     // 在bind的时候,会生成socket文件。
  17.     socklen_t len = offsetof(struct sockaddr_un, sun_path)+strlen(addr.sun_path);
  18.     if (bind(m_fd, (struct sockaddr *) &addr, len) != 0)
  19.     {
  20.         std::cout << "unix socket bind error,path = " << m_path << std::endl;
  21.         return -1;
  22.     }

  23.     if (listen(m_fd, 500) != 0)
  24.     {
  25.         std::cout << "unix listen error" << std::endl;
  26.         return -1;
  27.     }
  28.     // 更改socket文件的访问权限.
  29.     if (0 != chmod(m_path, 0777))
  30.     {
  31.         std::cout << "chmod[777] " << m_path << "error " << strerror(errno)
  32.          << std::endl;
  33.         return -1;
  34.     }

  35.     // 将该fd加入到 epoll中,监听客户端的连接
  36.     if (0 != RegisterReadEvent())
  37.     {
  38.         std::cout << "RegisterReadEvent error." << std::endl;
  39.         close(m_fd);
  40.         return -1;
  41.     }
  42.     return 0;
  43. }

  44. // 读被触发时,调用该函数
  45. void UnixSocketServerEventHandler::OnFDRead() {
  46.     sockaddr_un clientAddr;
  47.     memset(&clientAddr, 0x00, sizeof(sockaddr_un));
  48.     socklen_t len = sizeof(clientAddr);
  49.     // 客户端的fd,这个地方,其实可以建立一个(fd -- 客户端)对应关系
  50.     int sockFD = accept(m_fd, (struct sockaddr*) &clientAddr, &len);
  51.     if (sockFD < 0)
  52.     {
  53.         std::cout << "accept error." << std::endl;
  54.         return;
  55.     }
  56.     else
  57.     {
  58.         len -= offsetof(struct sockaddr_un, sun_path);
  59.         clientAddr.sun_path[len] = 0;
  60.         std::cout << "accpet path = " << clientAddr.sun_path << std::endl;
  61.     }

  62.     OnAccept(sockFD);
  63. }

  64. int UnixSocketServerEventHandler::OnAccept(int fd) {
  65.     SetNonBlock(fd, true);
  66.     UnixSocketHandler* pHandler = new UnixSocketHandler(m_pDecoder, fd,
  67.      this->m_pReactor);
  68.     if (pHandler == NULL)
  69.     {
  70.         return -1;
  71.     }
  72.     if (0 != pHandler->RegisterReadEvent())
  73.     {
  74.         pHandler->Close();
  75.         delete pHandler;
  76.         pHandler = NULL;
  77.         return -1;
  78.     }
  79.     return 0;
  80. }

  81. /* ********************************
  82.  *    处理 unix socket 客户端事件
  83.  * ********************************/
  84. void UnixSocketHandler::OnFDRead() {
  85.     char buf[1024] =
  86.     { 0 };
  87.     int iRet = read(m_fd, buf, sizeof(buf));
  88.     if (0 == iRet)
  89.     {
  90.         Close();// 客户端关闭socket.
  91.         return;
  92.     }
  93.     // 回调处理函数,对数据进行处理
  94.     m_pDecoder->OnProcess(this, buf, iRet);
  95. }

  96. void UnixSocketHandler::Close() {
  97.     UnRegisterReadEvent();
  98.     UnRegisterWriteEvent();
  99.     close(m_fd);
  100.     delete this;
  101. }

  102. void UnixSocketHandler::OnFDWrite() {

  103. }

  104. // 可以通过该函数,向客户端发送数据
  105. int UnixSocketHandler::SendBuf(const char* buf, int buf_len) {
  106.     RegisterWriteEvent();
  107.     int iRet = write(m_fd, buf, buf_len);
  108.     UnRegisterWriteEvent();
  109.     return iRet;
  110. }

点击(此处)折叠或打开 测试代码: unixsocketeventhandler_test.cpp

  1. #include "unixsocketeventhandler.h"


  2. class UnixSocketDataDecoder_echo_test: public SocketDataDecoderBase
  3. {
  4. public:
  5.     virtual int OnProcess(FDEventHanderBase* pSocket, const char* buf,
  6.          unsigned int buf_len)
  7.     {
  8.         std::cout << "RecvString: " << buf << std::endl;
  9.         return ((UnixSocketHandler*)pSocket)->SendBuf(buf, buf_len);
  10.     }
  11. };

  12. void UnixSocketEventHandler_test()
  13. {
  14.     const char* path = "/home/ll/work/min_heap/unix_path.sock";
  15.     UnixSocketDataDecoder_echo_test* pDecoder = new UnixSocketDataDecoder_echo_test;
  16.     UnixSocketServerBase server(path, pDecoder);
  17.     server.Init();
  18.     server.Run();
  19.     server.Exit();

  20.     delete pDecoder;
  21.     pDecoder = NULL;
  22. }

  23. // 客户端测试程序
  24. int f_unix_socket_client()
  25. {
  26.     const char* path = "/home/ll/work/min_heap/unix_path.sock";
  27.     int sockFd = socket(AF_UNIX, SOCK_STREAM, 0);
  28.     if ( sockFd < 0 )
  29.     {
  30.         std::cout << "socket Error:" << strerror(errno) << std::endl;
  31.         return -1;
  32.     }

  33.     sockaddr_un server;
  34.     server.sun_family = AF_UNIX;
  35.     strcpy(server.sun_path, path);
  36.     // 直接去connect就好了,不需要再bind,否则会再生成一个socket文件。
  37.     int iRet = connect(sockFd, (struct sockaddr*) &server, sizeof(server));
  38.     if ( 0 != iRet )
  39.     {
  40.         std::cout << "connect Error:" << strerror(errno) << std::endl;
  41.         close(sockFd);
  42.         return 0;
  43.     }
  44.     while(true)
  45.     {
  46.         char inBuf[1024] = {0};
  47.         std::cin >> inBuf;
  48.         write(sockFd, inBuf, strlen(inBuf));

  49.         char outBuf[1024] = {0};
  50.         read(sockFd, outBuf, sizeof(outBuf));
  51.         std::cout << "Echo: " << outBuf << std::endl;
  52.     }

  53.     return 0;
  54. }





阅读(4235) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~