分类: LINUX
2015-01-05 11:12:08
原文地址:淘宝网络框架tbnet源码分析 作者:zyd_cu
去年写过一篇tbnet的分析文章,主要介绍tbnet()框架结构及基本使用方法,最近又重读了下源码,有一些新的收获。
连接是核心
tbnet框架围绕网络连接(对应Connection类)展开,Connection主要包含如下成员。
class Connection { bool _isServer; // 区分自己是server还是client IPacketHandler *_defaultPacketHandler; // 客户端收到包的回调 IServerAdapter *_serverAdapter; // server收到包时的回调 Socket *_socket; // 底层socke的封装 IPacketStreamer *_streamer; // 对包网络数据进行编解码的接口 PacketQueue _outputQueue; // 输出队列 PacketQueue _inputQueue; // 输入队列 ChannelPool _channelPool; // ChannelPool,后面介绍 };
Connection是客户端与服务器之间建立的网络连接的抽象,tbnet默认使用长连接,如果不主动关闭会一直保持,直到超过一定阈值时间(默认15分钟)连接上没有传输过数据,会将连接关闭掉,这么做主要是为了防止系统资源被大量的无用连接耗尽。
区分server与client
大部分的应用场景,server和client在处理连接的时候行为都不大相同,tbnet通过给Connection设置标记来区分server和client,并允许server和client设置不同的回调接口(接收到网络包后如何处理),分别为_serverAdapter和_defaultPacketHandler,IServerAdapter和IPacketHandler都是抽象接口,基于tbnet进行开发时,开发者需要自己实现这两个回调接口。
class IServerAdapter { virtual ~IServerAdapter(); virtual IPacketHandler::HPRetCode handlePacket(Connection *connection, Packet *packet) = 0; // 这个用于扩展server批量处理网络请求 virtual bool handleBatchPacket(Connection *connection, PacketQueue &packetQueue) { return false; } }; class IPacketHandler { virtual ~IPacketHandler() {} virtual HPRetCode handlePacket(Packet *packet, void *args) = 0; };
网络包
tbnet将网络上传输的数据抽象为网络包,对用Packet类,每个packet都拥有一个头部,头部是定长的,包含标识网络包的id,网络包的长度等信息,每个网络包都有一个encode和decode的接口,分别用于将网络包转化为二进制数据、以及将二进制数据转化为网络包。上层如果需要定义新的网络包,只需要继承Packet并实现编解码接口即可。
class PacketHeader { uint32_t _chid; // channel id int _pcode; // 网络包id int _dataLen; // 网络包长度 }; class Packet { virtual ~Packet(); virtual bool encode(DataBuffer *output) = 0; virtual bool decode(DataBuffer *input, PacketHeader *header) = 0; PacketHeader _packetHeader; // 包头 int64_t _expireTime; // 超时时间 Channel *_channel; // packet对应的Channel }; class Channel { uint32_t _id; // channel id void *_args; // extra args IPacketHandler *_handler; // 回调函数 int64_t _expireTime; // channel超时时间 };
上面已经多次提到Channel,channel在tbnet里的作用适用于“关联”其请求包和反馈包的,比如client发送一个请求A给server,server恢复一个B包给client,client如何得知B包是对A包的回复呢。tbnet的实现方式是在网络包发送时为其分配一个Channel,每个Channel有一个id唯一标识,这个id也将作为网络包头部的一部分被发送到server端,server在回包时,将接受到的请求包里的channel id设置为反馈包的channel id,client在接收到反馈包时,从channel id就知道这个包是对应哪个请求包的。
Chnanel里除了包含id外,还有hander和args成员,前者是发包是为这个包设置的回调函数(接收到针对这个包的反馈包时如何处理),后者是发包时指定的额外参数指针,在接收到反馈包时这个指针会被传递给IPacketHandler::handlePacket。
输入输出队列
每个连接对应一个网络包输入队列和一个输出队列_inputQueue,_outputQueue,tbnet抽象出的网络包处理逻辑如下:
发送网络包由Connection::postPacket接口完成。
bool Connection::postPacket(Packet *packet, IPacketHandler *packetHandler, void *args, bool noblocking) { // 如果是client, 并且有queue长度的限制 _outputCond.lock(); _queueTotalSize = _outputQueue.size() + _channelPool.getUseListCount() + _myQueue.size(); if (!_isServer && _queueLimit > 0 && noblocking && _queueTotalSize >= _queueLimit) { _outputCond.unlock(); return false; } _outputCond.unlock(); Channel *channel = NULL; packet->setExpireTime(_queueTimeout); // 设置超时 if (_streamer->existPacketHeader()) { // 存在包头 uint32_t chid = packet->getChannelId(); // 从packet中取 if (_isServer) { assert(chid != 0); // server回包不能为空 } else { channel = _channelPool.allocChannel(); // 客户端每次分配新的Channel channel->setHandler(packetHandler); // 设置收到反馈包时 channel->setArgs(args); // 将发包时的参数存在Channel里 packet->setChannel(channel); // 设置回去 } } _outputCond.lock(); // 写入到outputqueue中 _outputQueue.push(packet); if (_iocomponent != NULL && _outputQueue.size() == 1U) { _iocomponent->enableWrite(true); } _outputCond.unlock(); return true; }
接受网络包由Connection::handlePacket接口完成。
bool Connection::handlePacket(DataBuffer *input, PacketHeader *header) { Packet *packet; IPacketHandler::HPRetCode rc; void *args = NULL; Channel *channel = NULL; IPacketHandler *packetHandler = NULL; if (_streamer->existPacketHeader() && !_isServer) { // 存在包头 uint32_t chid = header->_chid; // 从header中取 chid = (chid & 0xFFFFFFF); channel = _channelPool.offerChannel(chid); // channel没找到,说明请求已经超时了,Channel已经被移除 if (channel == NULL) { input->drainData(header->_dataLen); TBSYS_LOG(WARN, "没找到channel, id: %u, %s", chid, tbsys::CNetUtil::addrToString(getServerId()).c_str()); return false; } packetHandler = channel->getHandler(); args = channel->getArgs(); } // 将接收到的数据解码为Packet packet = _streamer->decode(input, header); // 实际的编解码工作由DefaultPacketStreamer完成 if (packet == NULL) { packet = &ControlPacket::BadPacket; } else { packet->setPacketHeader(header); // server端批量处理模式, 直接放入queue, 返回 if (_isServer && _serverAdapter->_batchPushPacket) { if (_iocomponent) _iocomponent->addRef(); _inputQueue.push(packet); if (_inputQueue.size() >= 15) { // 大于15个packet就调用一次 _serverAdapter->handleBatchPacket(this, _inputQueue); _inputQueue.clear(); } return true; } } // 回调处理接收到的网络包 if (_isServer) { rc = _serverAdapter->handlePacket(this, packet); // server端的回调 } else { rc = packetHandler->handlePacket(packet, args); // client的回调 } return true; }
两个核心线程
上面介绍了tbnet在网络包层面上的处理逻辑,发送一个网络包,最终要将该网络包通过OS底层接口发送到tcp队列,这里就开始涉及tbnet的事件模型,tbnet默认使用epoll来处理读写事件。
eventLoop线程不断获取ready的读写事件进行处理,timeout线程将超时的请求从请求队列里移除掉(发了请求,但超过指定时间没有收到回复),timeout线程的处理逻辑比较简单,这里主要介绍eventLoop的具体实现。
void Transport::eventLoop(SocketEvent *socketEvent) { IOEvent events[MAX_SOCKET_EVENTS]; while (!_stop) { // 检查是否有事件发生 int cnt = socketEvent->getEvents(1000, events, MAX_SOCKET_EVENTS); // 循环处理每个事件 for (int i = 0; i < cnt; i++) { IOComponent *ioc = events[i]._ioc; ioc->addRef(); // 读写 bool rc = true; if (events[i]._readOccurred) { rc = ioc->handleReadEvent(); // 可读,处理读事件 } if (rc && events[i]._writeOccurred) { rc = ioc->handleWriteEvent(); // 可写,处理写事件 } ioc->subRef(); if (!rc) { removeComponent(ioc); } } } }
上面代码里的IOComponent是对Socket、Event和Connection的封装,eventLoop的工作就是不断的通过SocketEvent(封装epoll实现)来获取可读或者可写的事件,分别调用对应IOComponent的handleReadEvent和handleWriteEvent接口。
处理读写事件时,监听文件描述符和普通连接的文件描述符的处理是不同的,他们的行为分别在TcpAcceptor和TcpComponent两个类里实现。
先来看看针对监听描述符上的读写是如何处理的,实际上监听文件描述符上只会有读事件发生。
bool TCPAcceptor::handleReadEvent() { Socket *socket; while ((socket = ((ServerSocket*)_socket)->accept()) != NULL) { TCPComponent *component = new TCPComponent(\_owner, socket, \_streamer, _serverAdapter); if (!component->init(true)) { delete component; return true; } // 加入到iocomponents中,及注册可读到socketevent中 _owner->addComponent(component, true, false); return true; }
再看TcpComponent是如何处理读写事件的,最终TcpConnection的readData和writeData被调用。
bool TCPComponent::handleWriteEvent() { _lastUseTime = tbsys::CTimeUtil::getTime(); bool rc = true; if (_state == TBNET_CONNECTED) { rc = _connection->writeData(); // 调用TcpConnection的writeData() } return rc; } bool TCPComponent::handleReadEvent() { _lastUseTime = tbsys::CTimeUtil::getTime(); bool rc = false; if (_state == TBNET_CONNECTED) { rc = _connection->readData(); // 调用TcpConnection的readData() } return rc; }
TcpConnection::writeData的处理比较简单,直接将输出队列里的网络包,逐个编码为二进制数据,然后调用write接口写到tcp层。
bool TCPConnection::writeData() { // 将输出队列里的包移到临时队列 _outputCond.lock(); _outputQueue.moveTo(&_myQueue); if (_myQueue.size() == 0 && _output.getDataLen() == 0) { // 返回 _iocomponent->enableWrite(false); _outputCond.unlock(); return true; } _outputCond.unlock(); Packet *packet; int ret; int writeCnt = 0; int myQueueSize = _myQueue.size(); do { while (_output.getDataLen() < READ_WRITE_SIZE) { if (myQueueSize == 0) break; packet = _myQueue.pop(); // 从队列取出网络包 myQueueSize --; _streamer->encode(packet, &_output); // 将网络包编码为二进制数据 _channelPool.setExpireTime(packet->getChannel(), packet->getExpireTime()); packet->free(); TBNET_COUNT_PACKET_WRITE(1); } if (_output.getDataLen() == 0) { break; } // write data ret = _socket->write(_output.getData(), _output.getDataLen()); // 实际发送数据 if (ret > 0) { _output.drainData(ret); } writeCnt ++; } while (ret > 0 && _output.getDataLen() == 0 && myQueueSize>0 && writeCnt < 10); return true; }
TcpConnection::readData的处理稍微复杂一些,主要是要考虑到网络包边界问题,比如上层在实现一个网络包的编解码协议时,编码的时候错误的多编码了几个字节,而在解码时,并没有将多余的字节从接受buffer里移除,如果不做任何处理,这多余的几个字节就被当成下一个网络包的包头部分处理,这样就会导致接下来所有的网络包都会解析错误。tbnet为了解决这个问题,每个数据包先添加一个魔数,代表一个网络包的开始,然后再是包头和实际的数据。在解析数据包时,只有遇到魔数才会认为是一个读到网络包头。(如果很不幸,多编码的数据刚好有跟魔数相等的,网络包的解析还是会乱掉)
bool TCPConnection::readData() { _input.ensureFree(READ_WRITE_SIZE); int ret = _socket->read(_input.getFree(), _input.getFreeLen()); int readCnt = 0; int freeLen = 0; bool broken = false; while (ret > 0) { _input.pourData(ret); freeLen = _input.getFreeLen(); while (1) { if (!_gotHeader) { _gotHeader = _streamer->getPacketInfo(&_input, &_packetHeader, &broken); if (broken) break; } // 如果有足够的数据, decode, 并且调用handlepacket if (_gotHeader && _input.getDataLen() >= _packetHeader._dataLen) { handlePacket(&_input, &_packetHeader); // 调用Connection::handlePacket _gotHeader = false; _packetHeader._dataLen = 0; TBNET_COUNT_PACKET_READ(1); } else { break; } } if (broken || freeLen > 0 || readCnt >= 10) { break; } if (_packetHeader._dataLen - _input.getDataLen() > READ_WRITE_SIZE) { _input.ensureFree(_packetHeader._dataLen - _input.getDataLen()); } else { _input.ensureFree(READ_WRITE_SIZE); } ret = _socket->read(_input.getFree(), _input.getFreeLen()); readCnt++; } return !broken; }
连接管理
客户端要发送数据时先通过Transport::connect建立到server的连接,然后就可以调用postPacekt发包了,为了达到长连接复用的目的,tbnet封装了ConnectionMananger,每次客户端建立都某server的连接,就将serverid与Connection加入到一个map中,下次如果需要像同一个server发包,直接根据serverid从map中取出Connection使用。