Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1792710
  • 博文数量: 335
  • 博客积分: 4690
  • 博客等级: 上校
  • 技术积分: 4341
  • 用 户 组: 普通用户
  • 注册时间: 2010-05-08 21:38
个人简介

无聊之人--除了技术,还是技术,你懂得

文章分类

全部博文(335)

文章存档

2016年(29)

2015年(18)

2014年(7)

2013年(86)

2012年(90)

2011年(105)

分类: Python/Ruby

2012-12-05 21:56:12

天天说C/S结构,偶尔看到publish/subscribe,就想动手测试一下。
 
这个图片同前面两个例子不同的地方就在于多了一个exchange,
官方解释如下

Let's quickly go over what we covered in the previous tutorials:

  • producer is a user application that sends messages.
  • queue is a buffer that stores messages.
  • consumer is a user application that receives messages.

The core idea in the messaging model in RabbitMQ is that the producer never sends any messages directly to a queue. Actually, quite often the producer doesn't even know if a message will be delivered to any queue at all.

Instead, the producer can only send messages to an exchange. An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type.

个人理解就是exchange负责message的路由,它决定了消息的处理方式,即是deliver给某一个queue还是所有的queue,后者丢弃,同时她首先要从producer哪里接受消息。
exchange的type也就决定了消息的趋向,主要后三种,fanout,direct,topic
其中,fanout的意思是所有同exchange做过bind的queue都会接受到消息
direct是指在exchange会直接决定message去那一个queue
而topic则是exchange根据通配符进行匹配,deliver给匹配的queue

说完了理论我们来看一看我们的Publish/Subscribe,我们的任务场景是:
一个logging系统,一个emit ,一个receiver,
即所有的queue都会受到message,因此对应的类型应该是fanout
为了减少代码量,我们使用temporary queue

点击(此处)折叠或打开

  1. #!C:\Python27\Scripts
  2. import pika
  3. import sys

  4. connection = pika.BlockingConnection(pika.ConnectionParameters(
  5.         host='localhost'))
  6. channel = connection.channel()

  7. channel.exchange_declare(exchange='logs',
  8.                          type='fanout')

  9. message = ' '.join(sys.argv[1:]) or "info: Hello World!"
  10. channel.basic_publish(exchange='logs',
  11.                       routing_key='',
  12.                       body=message)
  13. print " [x] Sent %r" % (message,)
  14. connection.close()

点击(此处)折叠或打开

  1. #!C:\Python27\Scripts
  2. import pika

  3. connection = pika.BlockingConnection(pika.ConnectionParameters(
  4.         host='localhost'))
  5. channel = connection.channel()

  6. channel.exchange_declare(exchange='logs',
  7.                          type='fanout')

  8. result = channel.queue_declare(exclusive=True)
  9. queue_name = result.method.queue

  10. channel.queue_bind(exchange='logs',
  11.                    queue=queue_name)

  12. print ' [*] Waiting for logs. To exit press CTRL+C'

  13. def callback(ch, method, properties, body):
  14.     print " [x] %r" % (body,)

  15. channel.basic_consume(callback,
  16.                       queue=queue_name,
  17.                       no_ack=True)

  18. channel.start_consuming()
代码中需要注意的地方:
exchange的type 为fanout,因此rout_key的值为空,这应该很好理解,因为是广播形式,queue的值无需指定。
queue_name = result.method.queue 表明queue的名字有rabbitmq替你生成,不用担心queue的命名冲突。
result = channel.queue_declare(exclusive=True)
表明queue为创建他的进程独占,创建它的进程死亡,该queue也就消亡
再者需要注意的地方是bind ,就是讲queue与exchange进行关联,从而exchange知道向谁deliver message 
然后我们运行程序:

console 1  
:\mq>receiver_log.py  > log.txt
H:\mq>more  log.txt
 [*] Waiting for logs. To exit press CTRL+C
 [x] 'this is error'
 [x] 'this is info'
 [x] 'this is warning'
consol2 
H:\mq>receiver_log.py
 [*] Waiting for logs. To exit press CTRL+C
 [x] 'this is error'
 [x] 'this is info'
 [x] 'this is warning'
cosole  3 
H:\mq>emit.py  this is  error
 [x] Sent 'this is error'

H:\mq>emit.py  this is  info
 [x] Sent 'this is info'

H:\mq>emit.py  this is  warning
 [x] Sent 'this is warning'
这样我们就可以根据自己的需要选择数据以何种方式显示或是存储。




阅读(1473) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~