Chinaunix首页 | 论坛 | 博客
  • 博客访问: 356702
  • 博文数量: 158
  • 博客积分: 52
  • 博客等级: 民兵
  • 技术积分: 613
  • 用 户 组: 普通用户
  • 注册时间: 2011-10-27 11:58
文章分类

全部博文(158)

文章存档

2017年(1)

2016年(5)

2015年(19)

2014年(8)

2013年(13)

2012年(80)

2011年(32)

分类: LINUX

2015-01-05 11:12:08

去年写过一篇tbnet的分析文章,主要介绍tbnet()框架结构及基本使用方法,最近又重读了下源码,有一些新的收获。

连接是核心

tbnet框架围绕网络连接(对应Connection类)展开,Connection主要包含如下成员。

class Connection {                           
    bool _isServer;                          // 区分自己是server还是client
    IPacketHandler *_defaultPacketHandler;   // 客户端收到包的回调
    IServerAdapter *_serverAdapter;          // server收到包时的回调    
    Socket *_socket;                         // 底层socke的封装
    IPacketStreamer *_streamer;              // 对包网络数据进行编解码的接口

    PacketQueue _outputQueue;                // 输出队列        
    PacketQueue _inputQueue;                 // 输入队列   
    ChannelPool _channelPool;                // ChannelPool,后面介绍             
}; 

Connection是客户端与服务器之间建立的网络连接的抽象,tbnet默认使用长连接,如果不主动关闭会一直保持,直到超过一定阈值时间(默认15分钟)连接上没有传输过数据,会将连接关闭掉,这么做主要是为了防止系统资源被大量的无用连接耗尽。

区分server与client

大部分的应用场景,server和client在处理连接的时候行为都不大相同,tbnet通过给Connection设置标记来区分server和client,并允许server和client设置不同的回调接口(接收到网络包后如何处理),分别为_serverAdapter和_defaultPacketHandler,IServerAdapter和IPacketHandler都是抽象接口,基于tbnet进行开发时,开发者需要自己实现这两个回调接口。

class IServerAdapter {
    virtual ~IServerAdapter();
    virtual IPacketHandler::HPRetCode handlePacket(Connection *connection, Packet *packet) = 0;
    // 这个用于扩展server批量处理网络请求
    virtual bool handleBatchPacket(Connection *connection, PacketQueue &packetQueue) {
        return false;
    }
};

class IPacketHandler {
    virtual ~IPacketHandler() {}
    virtual HPRetCode handlePacket(Packet *packet, void *args) = 0;
}; 

网络包

tbnet将网络上传输的数据抽象为网络包,对用Packet类,每个packet都拥有一个头部,头部是定长的,包含标识网络包的id,网络包的长度等信息,每个网络包都有一个encode和decode的接口,分别用于将网络包转化为二进制数据、以及将二进制数据转化为网络包。上层如果需要定义新的网络包,只需要继承Packet并实现编解码接口即可。

class PacketHeader {
    uint32_t _chid;    // channel id   
    int _pcode;        // 网络包id      
    int _dataLen;      // 网络包长度       
};

class Packet {
    virtual ~Packet();
    virtual bool encode(DataBuffer *output) = 0;
    virtual bool decode(DataBuffer *input, PacketHeader *header) = 0;

    PacketHeader _packetHeader;  // 包头
    int64_t _expireTime;         // 超时时间 
    Channel *_channel;           // packet对应的Channel
};


class Channel {  
uint32_t _id; // channel id  
void *_args; // extra args  
IPacketHandler *_handler; // 回调函数  
int64_t _expireTime; // channel超时时间  
}; 

上面已经多次提到Channel,channel在tbnet里的作用适用于“关联”其请求包和反馈包的,比如client发送一个请求A给server,server恢复一个B包给client,client如何得知B包是对A包的回复呢。tbnet的实现方式是在网络包发送时为其分配一个Channel,每个Channel有一个id唯一标识,这个id也将作为网络包头部的一部分被发送到server端,server在回包时,将接受到的请求包里的channel id设置为反馈包的channel id,client在接收到反馈包时,从channel id就知道这个包是对应哪个请求包的。

Chnanel里除了包含id外,还有hander和args成员,前者是发包是为这个包设置的回调函数(接收到针对这个包的反馈包时如何处理),后者是发包时指定的额外参数指针,在接收到反馈包时这个指针会被传递给IPacketHandler::handlePacket。

输入输出队列

每个连接对应一个网络包输入队列和一个输出队列_inputQueue,_outputQueue,tbnet抽象出的网络包处理逻辑如下:

  • 在连接上发包,直接将包添加到连接的输出队列,后续工作交给eventLoop处理(后面介绍)
  • 在连接上收到包,如果是server,调用IServerAdapter::handlePacket;如果是client,调用IPacketHandler::handlePacket处理网络包。如果server设置了batch模式,则收到的网络包会直接加入到连接的输入队列,当输入队列里包数超过某阈值时,调用IServerAdapter::handleBatchPacket批量处理输入队列里的网络包。

发送网络包由Connection::postPacket接口完成。

bool Connection::postPacket(Packet *packet, IPacketHandler *packetHandler, void *args, bool noblocking) {

    // 如果是client, 并且有queue长度的限制
    _outputCond.lock();
    _queueTotalSize = _outputQueue.size() + _channelPool.getUseListCount() + _myQueue.size();
    if (!_isServer && _queueLimit > 0 && noblocking && _queueTotalSize >= _queueLimit) {
        _outputCond.unlock();
        return false;
    }
    _outputCond.unlock();

    Channel *channel = NULL;
    packet->setExpireTime(_queueTimeout);           // 设置超时
    if (_streamer->existPacketHeader()) {           // 存在包头
        uint32_t chid = packet->getChannelId();     // 从packet中取
        if (_isServer) {
            assert(chid != 0);                      // server回包不能为空
        } else {
            channel = _channelPool.allocChannel();  // 客户端每次分配新的Channel
            channel->setHandler(packetHandler);     // 设置收到反馈包时
            channel->setArgs(args);                 // 将发包时的参数存在Channel里
            packet->setChannel(channel);            // 设置回去
        }
    }
    _outputCond.lock();
    // 写入到outputqueue中
    _outputQueue.push(packet);
    if (_iocomponent != NULL && _outputQueue.size() == 1U) {
        _iocomponent->enableWrite(true);
    }
    _outputCond.unlock();

    return true;
} 

接受网络包由Connection::handlePacket接口完成。

bool Connection::handlePacket(DataBuffer *input, PacketHeader *header) {
    Packet *packet;
    IPacketHandler::HPRetCode rc;
    void *args = NULL;
    Channel *channel = NULL;
    IPacketHandler *packetHandler = NULL;

    if (_streamer->existPacketHeader() && !_isServer) { // 存在包头
        uint32_t chid = header->_chid;    // 从header中取
        chid = (chid & 0xFFFFFFF);
        channel = _channelPool.offerChannel(chid);

        // channel没找到,说明请求已经超时了,Channel已经被移除
        if (channel == NULL) {
            input->drainData(header->_dataLen);
            TBSYS_LOG(WARN, "没找到channel, id: %u, %s", chid, tbsys::CNetUtil::addrToString(getServerId()).c_str());
            return false;
        }

        packetHandler = channel->getHandler();
        args = channel->getArgs();
    }

    // 将接收到的数据解码为Packet
    packet = _streamer->decode(input, header);  // 实际的编解码工作由DefaultPacketStreamer完成
    if (packet == NULL) {
        packet = &ControlPacket::BadPacket;
    } else {
        packet->setPacketHeader(header);
        // server端批量处理模式, 直接放入queue, 返回
        if (_isServer && _serverAdapter->_batchPushPacket) {
            if (_iocomponent) _iocomponent->addRef();
            _inputQueue.push(packet);
            if (_inputQueue.size() >= 15) { // 大于15个packet就调用一次
                _serverAdapter->handleBatchPacket(this, _inputQueue);
                _inputQueue.clear();
            }
            return true;
        }
    }

    // 回调处理接收到的网络包
    if (_isServer) {
        rc = _serverAdapter->handlePacket(this, packet);  // server端的回调
    } else {
        rc = packetHandler->handlePacket(packet, args);  // client的回调
    }

    return true;
} 

两个核心线程

上面介绍了tbnet在网络包层面上的处理逻辑,发送一个网络包,最终要将该网络包通过OS底层接口发送到tcp队列,这里就开始涉及tbnet的事件模型,tbnet默认使用epoll来处理读写事件。

eventLoop线程不断获取ready的读写事件进行处理,timeout线程将超时的请求从请求队列里移除掉(发了请求,但超过指定时间没有收到回复),timeout线程的处理逻辑比较简单,这里主要介绍eventLoop的具体实现。

void Transport::eventLoop(SocketEvent *socketEvent) {
    IOEvent events[MAX_SOCKET_EVENTS];

    while (!_stop) {
        // 检查是否有事件发生
        int cnt = socketEvent->getEvents(1000, events, MAX_SOCKET_EVENTS);

        // 循环处理每个事件
        for (int i = 0; i < cnt; i++) {
            IOComponent *ioc = events[i]._ioc;

            ioc->addRef();
            // 读写
            bool rc = true;
            if (events[i]._readOccurred) {
                rc = ioc->handleReadEvent();  // 可读,处理读事件
            }
            if (rc && events[i]._writeOccurred) {
                rc = ioc->handleWriteEvent(); // 可写,处理写事件
            }
            ioc->subRef();

            if (!rc) {
                removeComponent(ioc);
            }
        }
    }
} 

上面代码里的IOComponent是对Socket、Event和Connection的封装,eventLoop的工作就是不断的通过SocketEvent(封装epoll实现)来获取可读或者可写的事件,分别调用对应IOComponent的handleReadEvent和handleWriteEvent接口。

处理读写事件时,监听文件描述符和普通连接的文件描述符的处理是不同的,他们的行为分别在TcpAcceptor和TcpComponent两个类里实现。

先来看看针对监听描述符上的读写是如何处理的,实际上监听文件描述符上只会有读事件发生。

bool TCPAcceptor::handleReadEvent() {  
Socket *socket;  
while ((socket = ((ServerSocket*)_socket)->accept()) != NULL) {  
TCPComponent *component = new TCPComponent(\_owner, socket, \_streamer, _serverAdapter);

     if (!component->init(true)) {
          delete component;
          return true;
     }

     // 加入到iocomponents中,及注册可读到socketevent中
     _owner->addComponent(component, true, false);

     return true;
 } 

再看TcpComponent是如何处理读写事件的,最终TcpConnection的readData和writeData被调用。

bool TCPComponent::handleWriteEvent() {
    _lastUseTime = tbsys::CTimeUtil::getTime();
    bool rc = true;
    if (_state == TBNET_CONNECTED) {
        rc = _connection->writeData();   // 调用TcpConnection的writeData()
    } 
    return rc;
}

bool TCPComponent::handleReadEvent() {
    _lastUseTime = tbsys::CTimeUtil::getTime();
    bool rc = false;
    if (_state == TBNET_CONNECTED) {
        rc = _connection->readData();  // 调用TcpConnection的readData()
    }
    return rc;
} 

TcpConnection::writeData的处理比较简单,直接将输出队列里的网络包,逐个编码为二进制数据,然后调用write接口写到tcp层。

bool TCPConnection::writeData() {
    // 将输出队列里的包移到临时队列
    _outputCond.lock();
    _outputQueue.moveTo(&_myQueue);
    if (_myQueue.size() == 0 && _output.getDataLen() == 0) { // 返回
        _iocomponent->enableWrite(false);
        _outputCond.unlock();
        return true;
    }
    _outputCond.unlock();

    Packet *packet;
    int ret;
    int writeCnt = 0;
    int myQueueSize = _myQueue.size();

    do {
        while (_output.getDataLen() < READ_WRITE_SIZE) {

            if (myQueueSize == 0)
                break;

            packet = _myQueue.pop();             // 从队列取出网络包
            myQueueSize --;
            _streamer->encode(packet, &_output);  // 将网络包编码为二进制数据
            _channelPool.setExpireTime(packet->getChannel(), packet->getExpireTime());
            packet->free();
            TBNET_COUNT_PACKET_WRITE(1);
        }

        if (_output.getDataLen() == 0) {
            break;
        }

        // write data
        ret = _socket->write(_output.getData(), _output.getDataLen()); // 实际发送数据
        if (ret > 0) {
            _output.drainData(ret);
        }

        writeCnt ++;
    } while (ret > 0 && _output.getDataLen() == 0 && myQueueSize>0 && writeCnt < 10);

    return true;
} 

TcpConnection::readData的处理稍微复杂一些,主要是要考虑到网络包边界问题,比如上层在实现一个网络包的编解码协议时,编码的时候错误的多编码了几个字节,而在解码时,并没有将多余的字节从接受buffer里移除,如果不做任何处理,这多余的几个字节就被当成下一个网络包的包头部分处理,这样就会导致接下来所有的网络包都会解析错误。tbnet为了解决这个问题,每个数据包先添加一个魔数,代表一个网络包的开始,然后再是包头和实际的数据。在解析数据包时,只有遇到魔数才会认为是一个读到网络包头。(如果很不幸,多编码的数据刚好有跟魔数相等的,网络包的解析还是会乱掉)

bool TCPConnection::readData() {
    _input.ensureFree(READ_WRITE_SIZE);
    int ret = _socket->read(_input.getFree(), _input.getFreeLen());
    int readCnt = 0;
    int freeLen = 0;
    bool broken = false;

    while (ret > 0) {
        _input.pourData(ret);
        freeLen = _input.getFreeLen();

        while (1) {
            if (!_gotHeader) {
                _gotHeader = _streamer->getPacketInfo(&_input, &_packetHeader, &broken);
                if (broken) break;
            }
            // 如果有足够的数据, decode, 并且调用handlepacket
            if (_gotHeader && _input.getDataLen() >= _packetHeader._dataLen) {
                handlePacket(&_input, &_packetHeader);  // 调用Connection::handlePacket
                _gotHeader = false;
                _packetHeader._dataLen = 0;

                TBNET_COUNT_PACKET_READ(1);  
            } else {
                break;
            }
        }

        if (broken || freeLen > 0 || readCnt >= 10) {
            break;
        }

        if (_packetHeader._dataLen - _input.getDataLen() > READ_WRITE_SIZE) {
            _input.ensureFree(_packetHeader._dataLen - _input.getDataLen());
        } else {
            _input.ensureFree(READ_WRITE_SIZE);
        }
        ret = _socket->read(_input.getFree(), _input.getFreeLen());
        readCnt++;
    }

    return !broken;
} 

连接管理

客户端要发送数据时先通过Transport::connect建立到server的连接,然后就可以调用postPacekt发包了,为了达到长连接复用的目的,tbnet封装了ConnectionMananger,每次客户端建立都某server的连接,就将serverid与Connection加入到一个map中,下次如果需要像同一个server发包,直接根据serverid从map中取出Connection使用。

阅读(892) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~