参考
1)resultcollector.py
-
import time
-
import zmq
-
import pprint
-
-
def result_collector():
-
context = zmq.Context()
-
results_receiver = context.socket(zmq.PULL)
-
results_receiver.bind("tcp://127.0.0.1:5558")
-
collecter_data = {}
-
for x in range(1,20000):
-
result = results_receiver.recv_json()
-
if result['consumer'] in collecter_data:
-
collecter_data[result['consumer']] = collecter_data[result['consumer']] + 1
-
else:
-
collecter_data[result['consumer']] = 1
-
if x == 999:
-
pprint.pprint(collecter_data)
-
-
result_collector()
2)producer.py
-
import time
-
import zmq
-
-
def producer():
-
context = zmq.Context()
-
zmq_socket = context.socket(zmq.PUSH)
-
zmq_socket.bind("tcp://127.0.0.1:5557")
-
# Start your result manager and workers before you start your producers
-
for num in range(1,20000):
-
work_message = { 'num' : num }
-
zmq_socket.send_json(work_message)
-
-
producer()
3)consumer.py
-
import time
-
import zmq
-
import random
-
-
def consumer():
-
consumer_id = random.randrange(1,10005)
-
print ("I am consumer #%s" % (consumer_id))
-
context = zmq.Context()
-
# recieve work
-
consumer_receiver = context.socket(zmq.PULL)
-
consumer_receiver.connect("tcp://127.0.0.1:5557")
-
# send work
-
consumer_sender = context.socket(zmq.PUSH)
-
consumer_sender.connect("tcp://127.0.0.1:5558")
-
-
while True:
-
work = consumer_receiver.recv_json()
-
data = work['num']
-
result = { 'consumer' : consumer_id, 'num' : data}
-
if data%2 == 0:
-
consumer_sender.send_json(result)
-
-
consumer()
#python3 resultcollector.py
#python3 producer.py
#python3 consumer.py
阅读(1621) | 评论(0) | 转发(0) |