1. 基于上一篇文章(
用epoll实现一个异步处理的模型(1)),来继续实现一个tcp服务器。
(1)构造一个监听socket(socket-> bind-> listen),将监听socket的fd加入到epoll的监听中,等待客户端的连接。
(2)客户端连接后,调用OnFDRead(),得到客户端socket的fd,并将fd加入到epoll的监听中。
(3)处理客户端fd的读写事件。
2. 代码
点击(此处)折叠或打开 tcpeventhandler.h
-
#pragma once
-
#include "reactor.h"
-
#include <sys/types.h> /* See NOTES */
-
#include <sys/socket.h>
-
#include <errno.h>
-
#include <arpa/inet.h>
-
#include <netinet/tcp.h>
-
#include <fcntl.h>
-
#include <string.h>
-
#include <stdio.h>
-
-
class SocketDataDecoderBase
-
{
-
public:
-
virtual int OnProcess(FDEventHanderBase* pSocket, const char* buf,
-
unsigned int buf_len) = 0;
-
};
-
-
/********************************
-
* 处理tcpServer事件的类
-
********************************/
-
class TcpServerEventHandler: public FDEventHanderBase
-
{
-
public:
-
TcpServerEventHandler(const char* ip, int port, SocketDataDecoderBase* decoder,
-
int fd, Reactor* pReactor) :
-
FDEventHanderBase(fd, pReactor), m_pDecoder(decoder)
-
{
-
memset(m_ip, 0x00, sizeof(m_ip));
-
strncpy(m_ip, ip, strlen(ip));
-
m_port = port;
-
}
-
virtual ~TcpServerEventHandler() {
-
}
-
// create listen socket. return 0, success.
-
int OnListen();
-
int OnAccept(int fd);
-
// 当epoll中有新的连接过来的时候,就会调用该函数
-
virtual void OnFDRead();
-
virtual void OnFDWrite();
-
virtual void Close();
-
protected:
-
char m_ip[16]; //ipv4
-
int m_port;
-
SocketDataDecoderBase* m_pDecoder;
-
};
-
-
/* **************************
-
* 处理TCP客户端发过来的数据
-
* 这个地方,可以开辟2块缓冲区(读缓冲区和写缓冲区)
-
* (1)读,当socket可读时,epoll会被触发,回调OnFDRead(),
-
* 将读取到的数据进行处理,或者放到读缓冲区
-
* (2)写,当有数据需要通过socket发出去时,先将数据放到写缓冲区中,
-
* 注册RegisterWriteEvent,之后,OnFDWrite就会被回调,
-
* 若LT模式,当数据没有写完时,OnFDWrite一直会被回调,直到写完后,
-
* 用户调用UnRegisteWriteEvent.
-
* ************************/
-
class TcpSocketHandler: public FDEventHanderBase
-
{
-
public:
-
TcpSocketHandler(SocketDataDecoderBase* decoder, int fd, Reactor* pReactor) :
-
FDEventHanderBase(fd, pReactor), m_dataDecoder(decoder)
-
{
-
-
}
-
virtual ~TcpSocketHandler() {
-
}
-
// 读被触发时,调用该函数,处理客户端发过来的数据
-
virtual void OnFDRead();
-
// 写被触发时,调用该函数
-
virtual void OnFDWrite();
-
// 关闭fd
-
virtual void Close();
-
// 可以通过该函数,向客户端发送数据
-
virtual int SendBuf(const char* buf, int buf_len);
-
private:
-
SocketDataDecoderBase* m_dataDecoder;
-
//Buffer* pReadBuf;
-
//Buffer* pWriteBuf;
-
};
-
-
/* *********************************
-
* TcpServerBase类
-
* *********************************/
-
class TcpServerBase
-
{
-
public:
-
TcpServerBase(const char* ip, int port, SocketDataDecoderBase* pDecoder)
-
{
-
memset(m_ip, 0x00, sizeof(m_ip));
-
strncpy(m_ip, ip, strlen(ip));
-
m_port = port;
-
m_pReactor = NULL;
-
m_pDecoder = pDecoder;
-
}
-
virtual int Init(){
-
m_pReactor = new LTReactor;
-
return m_pReactor->Init();
-
}
-
virtual int Run(){
-
TcpServerEventHandler h(m_ip, m_port, m_pDecoder, 0, m_pReactor);
-
if (0 == h.OnListen()) {
-
m_pReactor->Run();
-
}
-
-
return 0;
-
}
-
virtual int Exit(){
-
delete m_pReactor;
-
return 0;
-
}
-
private:
-
char m_ip[16];//ipv4
-
int m_port;
-
Reactor* m_pReactor;
-
SocketDataDecoderBase* m_pDecoder;
-
};
点击(此处)折叠或打开 tcpeventhandler.cpp
-
#include "tcpeventhandler.h"
-
-
/********************************
-
* 处理tcpServer事件的类
-
********************************/
-
// create listen socket. return 0, success.
-
int TcpServerEventHandler::OnListen() {
-
m_fd = ::socket(AF_INET, SOCK_STREAM, 0);
-
if (m_fd == -1)
-
{
-
std::cout << "create socket error." << strerror(errno) << std::endl;
-
return -1;
-
}
-
sockaddr_in addr;
-
memset(&addr, 0x00, sizeof(sockaddr_in));
-
addr.sin_family = AF_INET;
-
addr.sin_port = htons(m_port);
-
socklen_t len = sizeof(sockaddr_in);
-
-
if (1 != inet_pton(AF_INET, m_ip, &addr.sin_addr))
-
{
-
std::cout << "inet_pton error." << strerror(errno) << std::endl;
-
close(m_fd);
-
return -1;
-
}
-
// 允许在同一端口上启动同一服务器的多个实例
-
int opVal = 1;
-
if (0 != setsockopt(m_fd, SOL_SOCKET, SO_REUSEADDR, &opVal, sizeof(opVal)))
-
{
-
std::cout << "socketopt error." << strerror(errno) << std::endl;
-
close(m_fd);
-
return -1;
-
}
-
// 设置非阻塞模式
-
SetNonBlock(m_fd, true);
-
// bind
-
if (-1 == bind(m_fd, (struct sockaddr*) &addr, len))
-
{
-
std::cout << "bind error." << strerror(errno) << std::endl;
-
close(m_fd);
-
return -1;
-
}
-
// listen
-
if (-1 == listen(m_fd, 200))
-
{
-
std::cout << "listen error." << strerror(errno) << std::endl;
-
close(m_fd);
-
return -1;
-
}
-
-
// 将该fd加入到 epoll中,监听客户端的连接
-
if (0 != RegisterReadEvent())
-
{
-
std::cout << "RegisterReadEvent error." << std::endl;
-
close(m_fd);
-
return -1;
-
}
-
-
return 0;
-
}
-
-
int TcpServerEventHandler::OnAccept(int fd) {
-
TcpSocketHandler* pHandler = new TcpSocketHandler(m_pDecoder, fd,
-
this->m_pReactor);
-
// 注册一个读事件,从TCP的客户端读取数据.
-
if (0 != pHandler->RegisterReadEvent())
-
{
-
pHandler->Close();
-
delete pHandler;
-
pHandler = NULL;
-
}
-
// 需要delete pHandler
-
return 0;
-
}
-
// 当epoll中有新的连接过来的时候,就会调用该函数
-
void TcpServerEventHandler::OnFDRead() {
-
sockaddr_in clientAddr;
-
socklen_t len = sizeof(clientAddr);
-
// 客户端的fd,这个地方,其实可以建立一个(fd -- 客户端)对应关系
-
int sockFD = accept(m_fd, (struct sockaddr*) &clientAddr, &len);
-
if (sockFD < 0)
-
{
-
std::cout << "accept error." << std::endl;
-
return;
-
}
-
else
-
{
-
int port = ntohs(clientAddr.sin_port);
-
char addr[256] =
-
{ 0 };
-
inet_ntop(AF_INET, &clientAddr.sin_addr.s_addr, addr,
-
(socklen_t) sizeof(addr));
-
printf("accept: [%s:%d] \n", addr, port);
-
}
-
-
OnAccept(sockFD);
-
}
-
void TcpServerEventHandler::OnFDWrite() {
-
-
}
-
void TcpServerEventHandler::Close() {
-
UnRegisterReadEvent();
-
UnRegisterWriteEvent();
-
close(m_fd);
-
}
-
-
/* ***********************
-
* 处理TCP客户端发过来的数据
-
* ************************/
-
// 读被触发时,调用该函数,处理客户端发过来的数据
-
void TcpSocketHandler::OnFDRead() {
-
// 在这个函数中,可以读取客户端发过来的数据,并对数据进行分析,TCP的数据,
-
// 可能会出现半包的情况,所以一般会有一个缓存来保存没有处理完毕的数据
-
char buf[1024] =
-
{ 0 };
-
int iRet = read(m_fd, buf, sizeof(buf));
-
if (0 == iRet)
-
{
-
Close();// 客户端关闭socket.
-
return;
-
}
-
// 回调处理函数,对数据进行处理
-
m_dataDecoder->OnProcess(this, buf, iRet);
-
}
-
// 写被触发时,调用该函数
-
void TcpSocketHandler::OnFDWrite() {
-
-
}
-
// 关闭fd
-
void TcpSocketHandler::Close() {
-
UnRegisterReadEvent();
-
UnRegisterWriteEvent();
-
close(m_fd);
-
delete this;
-
}
-
// 可以通过该函数,向客户端发送数据
-
int TcpSocketHandler::SendBuf(const char* buf, int buf_len) {
-
RegisterWriteEvent();
-
int iRet = write(m_fd, buf, buf_len);
-
UnRegisterWriteEvent();
-
return iRet;
-
}
3. 测试代码
测试代码实现了一个基本的tcp回显服务器。
简单的测试,可以在命令行,nc localhost 56893
,然后输入,就能够看到回显的效果。
-
#include "tcpeventhandler.h"
-
-
/* *********************
-
* 处理客户端发过来的数据
-
* ********************/
-
class TcpDataDecoder_echo_test: public SocketDataDecoderBase
-
{
-
public:
-
// 真正的处理数据的函数
-
virtual int OnProcess(FDEventHanderBase* pSocket, const char* buf,
-
unsigned int buf_len)
-
{
-
// 这个地方,将客户端的数据回写
-
if (0 == pSocket->RegisterWriteEvent())
-
{
-
write(pSocket->GetFD(), buf, buf_len);
-
pSocket->UnRegisterWriteEvent();
-
}
-
-
return 0;
-
}
-
};
-
-
-
void TcpServer_test() {
-
SocketDataDecoderBase* pDecoder = new TcpDataDecoder_echo_test;
-
TcpServerBase b("127.0.0.1", 56893, pDecoder);
-
b.Init();
-
b.Run();
-
b.Exit();
-
delete pDecoder;
-
-
}
4. 一些需要注意的地方
(1)服务器端,最好设置tcp keepalive,在服务器与客户端之间,维持一个心跳连接。防止一些假死的客户端占用服务器端资源。【上面的代码中,并没有体现出来】
(2)由于服务器端的fd,是连续回收使用的,所以当close(fd) 后,需要将该fd的已经注册的读写handler,从EventHanderSet中删除掉。
(3)在epoll的LT模式下,在写之前,需要注册RegistReadEvent(),之后,该fd可以一直写。当写完后,需要调用UnRegisterReadEvent()。
(4)通常的接收和发送,是需要接收/发送缓存的,上面的代码中,并没有体现出来。
5. 同样的,可以使用unix_socket连接。
unix socket,用于同一主机上不同进程间的交互。通过socket文件进行交互。
点击(此处)折叠或打开 unixsocketeventhandler.h
-
#pragma once
-
-
#include <stdlib.h>
-
#include <stdio.h>
-
#include <string.h>
-
#include <sys/types.h> /* See NOTES */
-
#include <sys/socket.h>
-
#include <netinet/in.h>
-
#include <sys/un.h>
-
#include <iostream>
-
#include <errno.h>
-
#include <sys/stat.h>
-
-
#include "reactor.h"
-
#include "SocketDataDecoderBase.h"
-
-
#define MAX_PATH 256
-
-
/* ********************************
-
* 处理 unix socket server事件
-
* ********************************/
-
class UnixSocketServerEventHandler: public FDEventHanderBase
-
{
-
public:
-
UnixSocketServerEventHandler(const char* path, SocketDataDecoderBase* decoder, int fd, Reactor* pReactor) :
-
FDEventHanderBase(fd, pReactor), m_pDecoder(decoder)
-
{
-
assert(strlen(path) > 0);
-
strncpy(m_path, path, sizeof(m_path));
-
unlink(m_path);
-
}
-
~UnixSocketServerEventHandler() {
-
-
}
-
// create unix socket. return 0, success.
-
int OnListen();
-
int OnAccept(int fd);
-
// 读被触发时,调用该函数
-
virtual void OnFDRead();
-
// 写被触发时,调用该函数
-
virtual void OnFDWrite() {
-
-
}
-
// 关闭fd
-
virtual void Close() {
-
unlink(m_path);
-
close(m_fd);
-
}
-
private:
-
char m_path[MAX_PATH];
-
SocketDataDecoderBase* m_pDecoder;
-
};
-
-
/* ********************************
-
* 处理 unix socket 客户端事件
-
* ********************************/
-
class UnixSocketHandler: public FDEventHanderBase
-
{
-
public:
-
UnixSocketHandler(SocketDataDecoderBase* decoder, int fd, Reactor* pReactor) :
-
FDEventHanderBase(fd, pReactor), m_pDecoder(decoder)
-
{
-
-
}
-
~UnixSocketHandler(){}
-
virtual void OnFDRead();
-
virtual void OnFDWrite();
-
virtual void Close();
-
virtual int SendBuf(const char* buf, int buf_len);
-
private:
-
SocketDataDecoderBase* m_pDecoder;
-
};
-
-
/* *********************************
-
* UnixSocketServerBase类
-
* *********************************/
-
class UnixSocketServerBase
-
{
-
public:
-
explicit UnixSocketServerBase(const char* path, SocketDataDecoderBase* pDecoder)
-
{
-
memset(m_path, 0x00, sizeof(m_path));
-
strncpy(m_path, path, strlen(path));
-
m_pReactor = NULL;
-
m_pDecoder = pDecoder;
-
}
-
virtual int Init()
-
{
-
m_pReactor = new LTReactor;
-
return m_pReactor->Init();
-
}
-
virtual int Run()
-
{
-
UnixSocketServerEventHandler h(m_path, m_pDecoder, 0, m_pReactor);
-
if (0 == h.OnListen())
-
{
-
m_pReactor->Run();
-
}
-
-
return 0;
-
}
-
virtual int Exit()
-
{
-
delete m_pReactor;
-
return 0;
-
}
-
private:
-
UnixSocketServerBase(const UnixSocketServerBase&);
-
const UnixSocketServerBase& operator=(const UnixSocketServerBase&);
-
private:
-
char m_path[MAX_PATH];
-
Reactor* m_pReactor;
-
SocketDataDecoderBase* m_pDecoder;
-
};
点击(此处)折叠或打开 unixsocketeventhandler.cpp
-
#include "unixsocketeventhandler.h"
-
-
// create unix socket. return 0, success.
-
int UnixSocketServerEventHandler::OnListen() {
-
m_fd = socket(AF_UNIX, SOCK_STREAM, 0);
-
if (m_fd == -1)
-
{
-
std::cout << "create socket error." << strerror(errno) << std::endl;
-
return -1;
-
}
-
-
sockaddr_un addr;
-
memset(&addr, 0, sizeof(addr));
-
addr.sun_family = AF_UNIX;
-
strcpy(addr.sun_path, m_path);
-
-
// 设置非阻塞模式
-
SetNonBlock(m_fd, true);
-
// 在bind的时候,会生成socket文件。
-
socklen_t len = offsetof(struct sockaddr_un, sun_path)+strlen(addr.sun_path);
-
if (bind(m_fd, (struct sockaddr *) &addr, len) != 0)
-
{
-
std::cout << "unix socket bind error,path = " << m_path << std::endl;
-
return -1;
-
}
-
-
if (listen(m_fd, 500) != 0)
-
{
-
std::cout << "unix listen error" << std::endl;
-
return -1;
-
}
-
// 更改socket文件的访问权限.
-
if (0 != chmod(m_path, 0777))
-
{
-
std::cout << "chmod[777] " << m_path << "error " << strerror(errno)
-
<< std::endl;
-
return -1;
-
}
-
-
// 将该fd加入到 epoll中,监听客户端的连接
-
if (0 != RegisterReadEvent())
-
{
-
std::cout << "RegisterReadEvent error." << std::endl;
-
close(m_fd);
-
return -1;
-
}
-
return 0;
-
}
-
-
// 读被触发时,调用该函数
-
void UnixSocketServerEventHandler::OnFDRead() {
-
sockaddr_un clientAddr;
-
memset(&clientAddr, 0x00, sizeof(sockaddr_un));
-
socklen_t len = sizeof(clientAddr);
-
// 客户端的fd,这个地方,其实可以建立一个(fd -- 客户端)对应关系
-
int sockFD = accept(m_fd, (struct sockaddr*) &clientAddr, &len);
-
if (sockFD < 0)
-
{
-
std::cout << "accept error." << std::endl;
-
return;
-
}
-
else
-
{
-
len -= offsetof(struct sockaddr_un, sun_path);
-
clientAddr.sun_path[len] = 0;
-
std::cout << "accpet path = " << clientAddr.sun_path << std::endl;
-
}
-
-
OnAccept(sockFD);
-
}
-
-
int UnixSocketServerEventHandler::OnAccept(int fd) {
-
SetNonBlock(fd, true);
-
UnixSocketHandler* pHandler = new UnixSocketHandler(m_pDecoder, fd,
-
this->m_pReactor);
-
if (pHandler == NULL)
-
{
-
return -1;
-
}
-
if (0 != pHandler->RegisterReadEvent())
-
{
-
pHandler->Close();
-
delete pHandler;
-
pHandler = NULL;
-
return -1;
-
}
-
return 0;
-
}
-
-
/* ********************************
-
* 处理 unix socket 客户端事件
-
* ********************************/
-
void UnixSocketHandler::OnFDRead() {
-
char buf[1024] =
-
{ 0 };
-
int iRet = read(m_fd, buf, sizeof(buf));
-
if (0 == iRet)
-
{
-
Close();// 客户端关闭socket.
-
return;
-
}
-
// 回调处理函数,对数据进行处理
-
m_pDecoder->OnProcess(this, buf, iRet);
-
}
-
-
void UnixSocketHandler::Close() {
-
UnRegisterReadEvent();
-
UnRegisterWriteEvent();
-
close(m_fd);
-
delete this;
-
}
-
-
void UnixSocketHandler::OnFDWrite() {
-
-
}
-
-
// 可以通过该函数,向客户端发送数据
-
int UnixSocketHandler::SendBuf(const char* buf, int buf_len) {
-
RegisterWriteEvent();
-
int iRet = write(m_fd, buf, buf_len);
-
UnRegisterWriteEvent();
-
return iRet;
-
}
点击(此处)折叠或打开 测试代码: unixsocketeventhandler_test.cpp
-
#include "unixsocketeventhandler.h"
-
-
-
class UnixSocketDataDecoder_echo_test: public SocketDataDecoderBase
-
{
-
public:
-
virtual int OnProcess(FDEventHanderBase* pSocket, const char* buf,
-
unsigned int buf_len)
-
{
-
std::cout << "RecvString: " << buf << std::endl;
-
return ((UnixSocketHandler*)pSocket)->SendBuf(buf, buf_len);
-
}
-
};
-
-
void UnixSocketEventHandler_test()
-
{
-
const char* path = "/home/ll/work/min_heap/unix_path.sock";
-
UnixSocketDataDecoder_echo_test* pDecoder = new UnixSocketDataDecoder_echo_test;
-
UnixSocketServerBase server(path, pDecoder);
-
server.Init();
-
server.Run();
-
server.Exit();
-
-
delete pDecoder;
-
pDecoder = NULL;
-
}
-
-
// 客户端测试程序
-
int f_unix_socket_client()
-
{
-
const char* path = "/home/ll/work/min_heap/unix_path.sock";
-
int sockFd = socket(AF_UNIX, SOCK_STREAM, 0);
-
if ( sockFd < 0 )
-
{
-
std::cout << "socket Error:" << strerror(errno) << std::endl;
-
return -1;
-
}
-
-
sockaddr_un server;
-
server.sun_family = AF_UNIX;
-
strcpy(server.sun_path, path);
-
// 直接去connect就好了,不需要再bind,否则会再生成一个socket文件。
-
int iRet = connect(sockFd, (struct sockaddr*) &server, sizeof(server));
-
if ( 0 != iRet )
-
{
-
std::cout << "connect Error:" << strerror(errno) << std::endl;
-
close(sockFd);
-
return 0;
-
}
-
while(true)
-
{
-
char inBuf[1024] = {0};
-
std::cin >> inBuf;
-
write(sockFd, inBuf, strlen(inBuf));
-
-
char outBuf[1024] = {0};
-
read(sockFd, outBuf, sizeof(outBuf));
-
std::cout << "Echo: " << outBuf << std::endl;
-
}
-
-
return 0;
-
}
阅读(4235) | 评论(0) | 转发(0) |