应用:进程内通信
测试代码
class MyPush:
def __init__(self, content, addr):
self._sock = content.socket(zmq.PUSH)
self._sock.bind(addr)
def push(self, datas):
assert isinstance(datas, list)
for data in datas:
self._sock.send(data)
print 'push done'
class MyPull:
def __init__(self, content, addr, num):
assert num > 0 and num < 256
self._socks = {}
self._poller = zmq.Poller()
for i in range(num):
sock = content.socket(zmq.PULL)
sock.connect(addr)
self._poller.register(sock, zmq.POLLIN)
self._socks[sock] = i
def close(self):
for sock in self._socks.keys():
self._poller.unregister(sock)
sock.close()
def pull(self):
flag = False
socks = dict(self._poller.poll(1)) # 如果不设超时,阻塞
for sock in socks.keys():
if socks[sock] == zmq.POLLIN:
print self._socks[sock], ' - ', sock.recv()
flag = True
if flag: self.pull() or None
ctx = zmq.Context()
ps = MyPush(ctx, 'inproc://xxx.xx')
pl = MyPull(ctx, 'inproc://xxx.xx', 5)
datas = []
for idx in range(12):
datas.append('msg' + str(idx))
ps.push(datas)
pl.pull()
pl.close()
Pipeline pattern
摘录自:
The pipeline pattern is used for distributing data to nodes arranged in a pipeline. Data always flows down the pipeline, and each stage of the pipeline is connected to at least one node. When a pipeline stage is connected to multiple nodes data is load-balanced among all connected nodes.
A socket of type ZMQ_PUSH is used by a pipeline node to send messages to downstream pipeline nodes. Messages are load-balanced to all connected downstream nodes. The zmq_recv() function is not implemented for this socket type.
A socket of type ZMQ_PULL is used by a pipeline node to receive messages from upstream pipeline nodes. Messages are fair-queued from among all connected upstream nodes. The zmq_send() function is not implemented for this socket type.