Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1812811
  • 博文数量: 290
  • 博客积分: 10653
  • 博客等级: 上将
  • 技术积分: 3178
  • 用 户 组: 普通用户
  • 注册时间: 2007-10-24 23:08
文章存档

2013年(6)

2012年(15)

2011年(25)

2010年(86)

2009年(52)

2008年(66)

2007年(40)

分类: 系统运维

2011-07-27 15:11:02

   [转]感觉zeromq的特点是:将所有io处理由多个io线程单独完成,io线程与应用线程独立。每一个io操作的对象都与一个io线程绑定,它的所有 io操作都有该io线程完成。线程间通过一种command_t格式,通过传送命令类型和对象指针,将本该由应用线程调用的函数,交给io线程完成。 app线程发送和接收时通过队列和io线程交换数据。
 
处理流程
1. zmq初始化(zmq_init())。
    主要实现在ctx_t::ctx_t()中,初始化了signalers数组,创建了多个io线程(io_thread_t),每个io线程和app线程 都有一个signaler,他处理线程间的command_t,在unix中的实现是本地套接。io_thread_t在构造时,根据编译宏决定了使用 select,poll, kqueue,epoll等那种方式监听io,以后所有的io监听也都采用该方式,将io_thread_t线程的signaler也添加到监听事件。之 后调用io_thread_t->start(),开启多线程。假设采用kqueue, 则最终会调用kqueue_t::loop(),kqueue_t::loop()是个while循环,当有监听事件发生时,则交给对应的hook进行处 理。对于io_thread_t线程的signaler,如果有读事件发生,则交由io_thread_t::in_event()处理。
 
2. 创建socket(zmq_socket())。
    主要实现在ctx_t::create_socket()中。创建app线程(app_thread_t),与一个signaler进行绑定。调用 app_thread_t::create_socket, 确定app线程该应用采用哪种连接模式:ZMQ_PAIR、ZMQ_PUB、ZMQ_SUB、ZMQ_REQ、ZMQ_REP、ZMQ_XREQ、 ZMQ_XREP、ZMQ_PULL、ZMQ_PUSH几类。分别对应pair_t、pub_t、sub_t、req_t、rep_t、xreq_t、 xrep_t、pull_t、push_t。它们继承自socket_base_t,其中每一个类实现了以下接口:xattach_pipes()、 xdetach_inpipe()、xdetach_outpipe()、xkill()、xrevive()、xsetsockopt()、 xsend()、xrecv()、xhas_in()、xhas_out()。程序中调用的发送和接收数据会调用到具体某个模式的 xsend(),xrecv()。
 
3 监听连接(zmq_bind())
   解析地址类型inproc,tcp,ipc。这里以tcp为例,创建一个zmq_listener_t对象,调用 zmq_listener_t::set_address()完成socket(),bind(),listen()系统调用。调用 send_plug(),向io线程发送command_t,类型为plug,对象指针为刚创建的zmq_listener_t。io线程接到该命令后, 最终会有zmq_listener_t::process_plug()来处理,这里完成在kqueue上添加监听tcp的fd。该fd上收到连接请求 时,交由zmq_listener_t::in_event()处理。
 
4 服务端建立连接
   当有新的连接时,zmq_listener_t::in_event()调用tcp_listener::accept()完成accept()操作,获 取新连接的fd。选择一个负载最低的io线程,创建一个 zmq_init_t对象,调用send_plug(),对象指针为zmq_init_t,负载最低io线程接到该命令后,完成以下调用 zmq_init_t::process_plug()->zmq_engine::plug(),zmq_engine::plug()中在 kqueue上添加监听新的fd。
 
5 客户端建立连接(zmq_connect())
   以tcp为例,socket_base_t::connect()中,创建session_t(用于app线程与io线程发送与接收时数据的传送),构造 zmq_connecter_t,调用zmq_connecter_t::set_address()设置连接地址。调用send_plug(),对象指 针为zmq_connecter_t。io线程接到该命令后,完成以下调用 zmq_connecter_t::process_plug()->zmq_connecter_t::start_connecting()->tcp_connecter_t::open () ,tcp_connecter_t::open () 中会完成socket (),connect()操作,在kqueue上添加监听该连接。
 
6 数据的接收(zmq_recv())
    连接建立起来后,收到读事件则由zmq_engine::in_event()处理。调用tcp_socket_t::read()完成 recv()系统调用。然后调用zmq_init::flush->zmq_init_t::finalise () ,创建一个session_t,调用send_attach()发送attach命令。io线程收到命令后调用 session_t::process_attach(), process_attach()中创建pipe_t(pipe_t的实现是一个队列),调用send_bind()发送bind命令。io线程收到命令 后调用session_t::process_attach(),将创建的pipe_t与对应的app线程对应的连接模式进行关联,之后app线程的接收 数据都从pipe_t中获取。

阅读(3696) | 评论(0) | 转发(0) |
0

上一篇:ZeroMQ 的模式

下一篇:python zeromq 介绍

给主人留下些什么吧!~~