最近开始看Tair的源码实现,Tair的通信使用的是淘宝的开源的网络库tbnet实现。具体来说是依靠tbnet::Transport类型实现,其源代码路径如下:
下面是Transport的简单类图:
下面介绍其通信流程:
1. 启动
Transport::start()完成其启动,主要工作是启动了两个线程:_readWriteThread和_timeoutThread. 这两个线程的实际入口函数式Tranport::run(), 下面是Transport::run的实现:
-
void Transport::run(tbsys::CThread *thread, void *arg) {
-
if (thread == &_timeoutThread) {
-
timeoutLoop();
-
} else {
-
eventLoop((SocketEvent*)arg);
-
}
-
}
A. 读写线程:
1. 读写线程使用epoll实现,在Transport中存在一个EPollSocketEvent _socketEvent 成员变量, arg传入的是这个成员变量的指针,EPollSocketEvent是epoll个操作的封装,在EPollSocketEvent的构造函数中会调用epoll_create初始化epoll。
2. eventLoop的实现步骤如下:
a. 调用socketEvent->getEvents() 取得发生的事件并放入到ioevents数组中
i. 调用epoll_wait等待读写事件;
ii. 每个事件的events[i].data.ptr存放有事件对应的IOComonet(socket 封装),可以用来处理读写事件;
b. 对于读写事件,分别调用ioc->handleReadEvent()和ioc->handleWriteEvent()处理;
B. timeout线程
1. 主要用于检查通信的socket是否超过一定时常没有使用,对于TCPComponent(封装通信socket),如果15分钟没有使用,则断开连接;
2. 监听
通过调用Transport::listen()完成监听, 主要完成以下步骤:
a. 创建TCPAcceptor,继承于IOComonet(socket 封装),并启动异步监听;
b. 调用addComponent(acceptor, true, false);
i. 创建epoll_event ev,设置ev.data.ptr = socket->getIOComponent(),用来处理读写事件的IOComponentsocket 封装)
ii. 调用epoll_ctl(_iepfd,EPOLL_CTL_ADD,socket->getSocketHandle(),&ev)注册监听socket上的读取事件;
c. 当客户端有connect请求过来的时候会触发监听socket上的读取事件,TCPAcceptor::handleReadEvent()会被调用。
3. 接受客户端的连接请求
系统在TCPAcceptor::handleReadEvent()中接受客户端的连接请求,主要步骤如下:
a. socket = ((ServerSocket*)_socket)->accept() 接受连接请求并得到通信socket;
b. 创建TCPComponent封装通信socket;
c. 调用addComponent(component, true, false) 注册当前socket的读取事件。
4. 数据通信
A. 数据读取
数据通信由通信socket完成,该socket在epoll中注册,当有数据需要读取的时候会触发读取事件并调用TCPComponent::handleReadEvent()处理。
B. 数据发送
当有数据需要发送时Connection::postPacket()函数, 这个函数中会调用_iocomponent->enableWrite(true),注册写入事件,并调用TCPComponent::handleWriteEvent()处理。
阅读(9603) | 评论(0) | 转发(1) |