天天说C/S结构,偶尔看到publish/subscribe,就想动手测试一下。
这个图片同前面两个例子不同的地方就在于多了一个exchange,
官方解释如下
Let's quickly go over what we covered in the previous tutorials:
- A producer is a user application that sends messages.
- A queue is a buffer that stores messages.
- A consumer is a user application that receives messages.
The core idea in the messaging model in RabbitMQ is that the producer never sends any messages directly to a queue. Actually, quite often the producer doesn't even know if a message will be delivered to any queue at all.
Instead, the producer can only send messages to an exchange. An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type.
个人理解就是exchange负责message的路由,它决定了消息的处理方式,即是deliver给某一个queue还是所有的queue,后者丢弃,同时她首先要从producer哪里接受消息。
exchange的type也就决定了消息的趋向,主要后三种,fanout,direct,topic
其中,fanout的意思是所有同exchange做过bind的queue都会接受到消息
direct是指在exchange会直接决定message去那一个queue
而topic则是exchange根据通配符进行匹配,deliver给匹配的queue
说完了理论我们来看一看我们的Publish/Subscribe,我们的任务场景是:
一个logging系统,一个emit ,一个receiver,
即所有的queue都会受到message,因此对应的类型应该是fanout
为了减少代码量,我们使用temporary queue
- #!C:\Python27\Scripts
- import pika
- import sys
- connection = pika.BlockingConnection(pika.ConnectionParameters(
- host='localhost'))
- channel = connection.channel()
- channel.exchange_declare(exchange='logs',
- type='fanout')
- message = ' '.join(sys.argv[1:]) or "info: Hello World!"
- channel.basic_publish(exchange='logs',
- routing_key='',
- body=message)
- print " [x] Sent %r" % (message,)
- connection.close()
- #!C:\Python27\Scripts
- import pika
- connection = pika.BlockingConnection(pika.ConnectionParameters(
- host='localhost'))
- channel = connection.channel()
- channel.exchange_declare(exchange='logs',
- type='fanout')
- result = channel.queue_declare(exclusive=True)
- queue_name = result.method.queue
- channel.queue_bind(exchange='logs',
- queue=queue_name)
- print ' [*] Waiting for logs. To exit press CTRL+C'
- def callback(ch, method, properties, body):
- print " [x] %r" % (body,)
- channel.basic_consume(callback,
- queue=queue_name,
- no_ack=True)
- channel.start_consuming()
代码中需要注意的地方:
exchange的type 为fanout,因此rout_key的值为空,这应该很好理解,因为是广播形式,queue的值无需指定。
queue_name = result.method.queue 表明queue的名字有rabbitmq替你生成,不用担心queue的命名冲突。
result = channel.queue_declare(exclusive=True)
表明queue为创建他的进程独占,创建它的进程死亡,该queue也就消亡
再者需要注意的地方是bind ,就是讲queue与exchange进行关联,从而exchange知道向谁deliver message
然后我们运行程序:
console 1
:\mq>receiver_log.py > log.txt
H:\mq>more log.txt
[*] Waiting for logs. To exit press CTRL+C
[x] 'this is error'
[x] 'this is info'
[x] 'this is warning'
consol2
H:\mq>receiver_log.py
[*] Waiting for logs. To exit press CTRL+C
[x] 'this is error'
[x] 'this is info'
[x] 'this is warning'
cosole 3
H:\mq>emit.py this is error
[x] Sent 'this is error'
H:\mq>emit.py this is info
[x] Sent 'this is info'
H:\mq>emit.py this is warning
[x] Sent 'this is warning'
这样我们就可以根据自己的需要选择数据以何种方式显示或是存储。
阅读(1473) | 评论(0) | 转发(0) |