参考:
实现zmqpolling.py.py
-
import zmq
-
import time
-
import sys
-
import random
-
from multiprocessing import Process
-
-
def server_push(port="5556"):
-
context = zmq.Context()
-
socket = context.socket(zmq.PUSH)
-
socket.bind("tcp://*:%s" % port)
-
print("Running server on port: ", port)
-
# serves only 5 request and dies
-
for reqnum in range(10):
-
if reqnum < 6:
-
socket.send("Continue".encode("ascii") )
-
else:
-
socket.send("Exit".encode("ascii") )
-
break
-
time.sleep (1)
-
-
def server_pub(port="5558"):
-
context = zmq.Context()
-
socket = context.socket(zmq.PUB)
-
socket.bind("tcp://*:%s" % port)
-
publisher_id = random.randrange(0,9999)
-
print ("Running server on port: ", port)
-
# serves only 5 request and dies
-
for reqnum in range(10):
-
# Wait for next request from client
-
topic = random.randrange(8,10)
-
messagedata = "server#%s" % publisher_id
-
print("%s %s" % (topic, messagedata))
-
socket.send( ("%d %s" % (topic, messagedata)).encode("ascii") )
-
time.sleep(1)
-
-
def client(port_push, port_sub):
-
context = zmq.Context()
-
socket_pull = context.socket(zmq.PULL)
-
socket_pull.connect ("tcp://localhost:%s" % port_push)
-
print ("Connected to server with port %s" % port_push)
-
socket_sub = context.socket(zmq.SUB)
-
socket_sub.connect ("tcp://localhost:%s" % port_sub)
-
socket_sub.setsockopt(zmq.SUBSCRIBE, "9".encode("ascii") )
-
print ("Connected to publisher with port %s" % port_sub)
-
# Initialize poll set
-
poller = zmq.Poller()
-
poller.register(socket_pull, zmq.POLLIN)
-
poller.register(socket_sub, zmq.POLLIN)
-
# Work on requests from both server and publisher
-
should_continue = True
-
while should_continue:
-
socks = dict(poller.poll())
-
if socket_pull in socks and socks[socket_pull] == zmq.POLLIN:
-
message = socket_pull.recv()
-
print ("Recieved control command: %s" % message)
-
if message == "Exit":
-
print ("Recieved exit command, client will stop recieving messages")
-
should_continue = False
-
-
if socket_sub in socks and socks[socket_sub] == zmq.POLLIN:
-
string = socket_sub.recv()
-
topic, messagedata = string.split()
-
print("Processing ... ", topic, messagedata)
-
-
-
-
if __name__ == "__main__":
-
# Now we can run a few servers
-
server_push_port = "5556"
-
server_pub_port = "5558"
-
Process(target=server_push, args=(server_push_port,)).start()
-
Process(target=server_pub, args=(server_pub_port,)).start()
-
Process(target=client, args=(server_push_port,server_pub_port,)).start()
运行:
#python3 zmqpolling.py.py
阅读(2265) | 评论(0) | 转发(0) |