Chinaunix首页 | 论坛 | 博客
  • 博客访问: 638313
  • 博文数量: 113
  • 博客积分: 10
  • 博客等级: 民兵
  • 技术积分: 4176
  • 用 户 组: 普通用户
  • 注册时间: 2012-11-15 20:22
个人简介

最大化我的市场价值

文章分类

全部博文(113)

文章存档

2013年(113)

分类: Windows平台

2013-02-26 18:11:35

ZeroMQ简介

又叫ZMQ,据说是一种快速的简单的,颠覆传统思想的网络编程框架。

下面有一段来自的简介:

?MQ (ZeroMQ, 0MQ, zmq) looks like an embeddable networking library but acts like a concurrency framework. It gives you sockets that carry whole messages across various transports like in-process, inter-process, TCP, and multicast. You can connect sockets N-to-N with patterns like fanout, pub-sub, task distribution, and request-reply. It's fast enough to be the fabric for clustered products. Its asynchronous I/O model gives you scalable multicore applications, built as asynchronous message-processing tasks. It has a score of language APIs and runs on most operating systems. ?MQ is from iMatix and is LGPL open source.

它支持下列语言:

C++ | C# | Clojure | CL | Erlang | F# | Felix | Go | Haskell | Java | Lua | Node.js | Objective-C | Perl | PHP | Python | Ruby | Scala | Tcl | Ada | Basic | Haxe | ooc

其中至少有四种语言以前没听过。。

不过ZMQ的消息经过了一层封装,导致其并非消息发送时并非原生的socket。如果发送方使用ZMQ,而接收方不使用ZMQ(或者反过来),就会出现问题。因此,ZMQ不能用来实现http server.

ZMQ中有以下几种通信模型以覆盖所有的应用程序:

Exclusive-Pair
Publish-Subscribe
Push-Pull
Request-Reply

使用ZMQ之后的话不像我们所认识一样的server必须在client之前启动。在ZMQ下面的话client完全可以在server之前启动。一旦连接顺序并不按照我们所想象的那样工作的话,连接顺序是无关的时候,那么客户端和服务端这两个概念就非常模糊了。我们必须重新考虑什么是server,什么是client.ZMQ给出一个非常实际的答案。我们将socket嵌入到网络拓扑的时候,server应该是网络结构中稳定的部分,而client应该是网络结构中比较易变的部分。

REQ/REPHelloWorld

REQ/REP模型

Request-Reply模型比较像原始的socket,于是官方Guide里就用了Request-Reply模型来做HelloWorld,其C++代码如下:

//
//  Hello World server in C++
//  Binds REP socket to tcp://*:5555
//  Expects "Hello" from client, replies with "World"
//
#include 
#include 
#include 
#include 

int main () {
    //  Prepare our context and socket
    zmq::context_t context (1);
    zmq::socket_t socket (context, ZMQ_REP);
    socket.bind ("tcp://*:5555");

    while (true) {
        zmq::message_t request;

        //  Wait for next request from client
        socket.recv (&request);
        std::cout << "Received Hello" << std::endl;

        //  Do some 'work'
        sleep (1);

        //  Send reply back to client
        zmq::message_t reply (5);
        memcpy ((void *) reply.data (), "World", 5);
        socket.send (reply);
    }
    return 0;
}

再来看看python的HelloWorld

#
#   Hello World client in Python
#   Connects REQ socket to tcp://localhost:5555
#   Sends "Hello" to server, expects "World" back
#
import zmq

context = zmq.Context()

#  Socket to talk to server
print "Connecting to hello world server…"
socket = context.socket(zmq.REQ)
socket.connect ("tcp://localhost:5555")

#  Do 10 requests, waiting each time for a response
for request in range (1,10):
    print "Sending request ", request,"…"
    socket.send ("Hello")

    #  Get the reply.
    message = socket.recv()
    print "Received reply ", request, "[", message, "]"

是不是看起来很Simple?

ZMQ支持多种通信协议,包括:

tcp // tcp
ipc // 进程间通信。猜想底层应该是unix domain socket实现的.因为运行完毕之后我们可以看到socket文件。
inproc // 线程间通信。对于这种通讯协议来说的话底层IO线程没用使用。
pgm // ???
epgm // ???

使用的时候,许多时候只需要简单的在bind和connect的字符串里修改一下地址前缀就可以了~

比如

socket.bind("ipc://ipcname")  //server
socket.connect("ipc://ipcname") //client

当然,底层协议本身在使用时有一些特别需要注意的地方,但是差异不大,所以可以认为ZMQ在这个问题上解决还是比较好的。

pub/sub模式

Pub/Sub模型

这里的Publish-Subscribe模型是一个很典型的PUB-SUB模型,即发布者(Publisher)只能发送数据,它发送时指明发送数据的类型,而订阅者(Subscriber)则只接收它关心的类型的消息。

这里的发布与订阅角色是绝对的,即发布者无法使用recv,订阅者不能使用send,并且订阅者需要设置订阅条件"setsockopt"。

PUB/SUB Hello World

这里只贴出python代码:

Publisher:

import zmq
import time
from random import choice
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://127.0.0.1:5000")

countries = ['netherlands','brazil','germany','portugal']
events = ['yellow card', 'red card', 'goal', 'corner', 'foul']

while True:
    msg = choice( countries ) +" "+ choice( events )
    print "->",msg
    socket.send( msg )
    time.sleep(1)

Subscriber:

import zmq
import time
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://127.0.0.1:5000")
socket.setsockopt(zmq.SUBSCRIBE, "netherlands")
socket.setsockopt(zmq.SUBSCRIBE, "germany")

while True:
    print  socket.recv()
    time.sleep(5)

官方Guide里有下面一段话需要注意:

There is one more important thing to know about PUB-SUB sockets: you do not know precisely when a subscriber starts to get messages. Even if you start a subscriber, wait a while, and then start the publisher, the subscriber will always miss the first messages that the publisher sends. This is because as the subscriber connects to the publisher (something that takes a small but non-zero time), the publisher may already be sending messages out.

也就是说,在PUB-SUB模型中,众所周知,如果发布者(Publisher)先发布消息,而订阅者(Subscriber)后订阅消息,则Subscriber无法收到它订阅之前的消息。但是,在ZMQ中,你即使让订阅者先启动,发布者后启动,如果发布者启动后立刻发送数据,则启动后刚发送的数据,依然经常会有部分丢失。主要原因是ZMQ是异步的去建立连接与发送数据的,在与订阅者建立起连接之前的一段时间里,发送的数据就会被丢弃。

不过,对于大部分的PUB-SUB模型的应用,上面这种丢失是能够接受的。如果应用确实不能接受的话,Guide里给出了两种解法,第一种方法,Guide的作者说它很Simple and Stupid,说自己从来不会在真正的项目里使用这种方法的:那就是connect后sleep一段时间来等待连接完成,然后再发送数据。

ZGuide里指出了以下几个需要注意的地方:

  • A subscriber can in fact connect to more than one publisher, using one 'connect' call each time. Data will then arrive and be interleaved so that no single publisher drowns out the others.

  • If a publisher has no connected subscribers, then it will simply drop all messages.

  • If you're using TCP, and a subscriber is slow, messages will queue up on the publisher. We'll look at how to protect publishers against this, using the "high-water mark" later.

  • In the current versions of ?MQ, filtering happens at the subscriber side, not the publisher side. This means, over TCP, that a publisher will send all messages to all subscribers, which will then drop messages they don't want.

翻译下就是:

  • 一个订阅者可以订阅多个发布者的消息,只需要多connect几次就可以了。数据会交替到达,以保证不会有哪一个发布者的数据到来过快时,使其它发布者的数据没时间处理。

  • 当无订阅者时,发布者会直接丢弃数据

  • 当使用TCP协议时,如果订阅者比较慢,消息会在发布者那里排队。可以使用high-water mark来解决发布者处堆积的数据过多的问题(随后会讲到)。

  • 当前版本的ZMQ,会在订阅者端进行过滤,也就是说,无论是否是订阅者关心的数据,都会全部发送到订阅者端,订阅者端会自动drop掉不需要的数据。

关于第三条,我用pyzmq试验,试验表明,当下游订阅者处理过慢时,发布端的内存并没有增大,反而是下游订阅端内存会一直往上狂涨。这点原因暂不明确,可能是ZGuide的文档没有更新?新版本的zmq实现已经更改了?(另一个可能的解释是我模拟的情况里,是下游处理过慢,但不是下游接收过慢,因为ZMQ是异步接收数据的,可能在程序没有接待recv的时候,数据已经被实际上接收到了订阅端。如果是网络过慢导致下游接收很慢时,这时必然是阻塞在了上游。)

关于第四条,这点我感觉似乎有点不可接受,至少我们的项目里不能接受这种实现,每个结点订阅的上游都很多,但实际每个结点订阅到的数据很少,如果全部发到下游再过滤,就会凭空多发数十倍不必要要的数据。不清楚现在最新版本的ZMQ到底是在哪端过滤的,是不是还是文档过旧了(待查证)。

未完待续


阅读(2024) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~