全部博文(290)
分类: 系统运维
2011-07-27 15:11:02
[转]感觉zeromq的特点是:将所有io处理由多个io线程单独完成,io线程与应用线程独立。每一个io操作的对象都与一个io线程绑定,它的所有
io操作都有该io线程完成。线程间通过一种command_t格式,通过传送命令类型和对象指针,将本该由应用线程调用的函数,交给io线程完成。
app线程发送和接收时通过队列和io线程交换数据。
处理流程
1. zmq初始化(zmq_init())。
主要实现在ctx_t::ctx_t()中,初始化了signalers数组,创建了多个io线程(io_thread_t),每个io线程和app线程
都有一个signaler,他处理线程间的command_t,在unix中的实现是本地套接。io_thread_t在构造时,根据编译宏决定了使用
select,poll,
kqueue,epoll等那种方式监听io,以后所有的io监听也都采用该方式,将io_thread_t线程的signaler也添加到监听事件。之
后调用io_thread_t->start(),开启多线程。假设采用kqueue,
则最终会调用kqueue_t::loop(),kqueue_t::loop()是个while循环,当有监听事件发生时,则交给对应的hook进行处
理。对于io_thread_t线程的signaler,如果有读事件发生,则交由io_thread_t::in_event()处理。
2. 创建socket(zmq_socket())。
主要实现在ctx_t::create_socket()中。创建app线程(app_thread_t),与一个signaler进行绑定。调用
app_thread_t::create_socket,
确定app线程该应用采用哪种连接模式:ZMQ_PAIR、ZMQ_PUB、ZMQ_SUB、ZMQ_REQ、ZMQ_REP、ZMQ_XREQ、
ZMQ_XREP、ZMQ_PULL、ZMQ_PUSH几类。分别对应pair_t、pub_t、sub_t、req_t、rep_t、xreq_t、
xrep_t、pull_t、push_t。它们继承自socket_base_t,其中每一个类实现了以下接口:xattach_pipes()、
xdetach_inpipe()、xdetach_outpipe()、xkill()、xrevive()、xsetsockopt()、
xsend()、xrecv()、xhas_in()、xhas_out()。程序中调用的发送和接收数据会调用到具体某个模式的
xsend(),xrecv()。
3 监听连接(zmq_bind())
解析地址类型inproc,tcp,ipc。这里以tcp为例,创建一个zmq_listener_t对象,调用
zmq_listener_t::set_address()完成socket(),bind(),listen()系统调用。调用
send_plug(),向io线程发送command_t,类型为plug,对象指针为刚创建的zmq_listener_t。io线程接到该命令后,
最终会有zmq_listener_t::process_plug()来处理,这里完成在kqueue上添加监听tcp的fd。该fd上收到连接请求
时,交由zmq_listener_t::in_event()处理。
4 服务端建立连接
当有新的连接时,zmq_listener_t::in_event()调用tcp_listener::accept()完成accept()操作,获
取新连接的fd。选择一个负载最低的io线程,创建一个
zmq_init_t对象,调用send_plug(),对象指针为zmq_init_t,负载最低io线程接到该命令后,完成以下调用
zmq_init_t::process_plug()->zmq_engine::plug(),zmq_engine::plug()中在
kqueue上添加监听新的fd。
5 客户端建立连接(zmq_connect())
以tcp为例,socket_base_t::connect()中,创建session_t(用于app线程与io线程发送与接收时数据的传送),构造
zmq_connecter_t,调用zmq_connecter_t::set_address()设置连接地址。调用send_plug(),对象指
针为zmq_connecter_t。io线程接到该命令后,完成以下调用
zmq_connecter_t::process_plug()->zmq_connecter_t::start_connecting()->tcp_connecter_t::open
() ,tcp_connecter_t::open () 中会完成socket (),connect()操作,在kqueue上添加监听该连接。
6 数据的接收(zmq_recv())
连接建立起来后,收到读事件则由zmq_engine::in_event()处理。调用tcp_socket_t::read()完成
recv()系统调用。然后调用zmq_init::flush->zmq_init_t::finalise ()
,创建一个session_t,调用send_attach()发送attach命令。io线程收到命令后调用
session_t::process_attach(),
process_attach()中创建pipe_t(pipe_t的实现是一个队列),调用send_bind()发送bind命令。io线程收到命令
后调用session_t::process_attach(),将创建的pipe_t与对应的app线程对应的连接模式进行关联,之后app线程的接收
数据都从pipe_t中获取。