Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1789067
  • 博文数量: 335
  • 博客积分: 4690
  • 博客等级: 上校
  • 技术积分: 4341
  • 用 户 组: 普通用户
  • 注册时间: 2010-05-08 21:38
个人简介

无聊之人--除了技术,还是技术,你懂得

文章分类

全部博文(335)

文章存档

2016年(29)

2015年(18)

2014年(7)

2013年(86)

2012年(90)

2011年(105)

分类: Python/Ruby

2012-12-05 21:27:59

通过消息队列来模拟了一个任务producer,在exchange以后,被均匀的分配到两个各队列当中,来进行作业的时间执行。可以根据实际的需要来进行任务调度或是负载均衡。
将任务分发给两个worker
以及通过参数prefetch来实现负载的均衡

点击(此处)折叠或打开

  1. #!C:\Python27\Scripts
  2. import pika
  3. import sys

  4. connection = pika.BlockingConnection(pika.ConnectionParameters(
  5.         host='localhost'))
  6. channel = connection.channel()

  7. channel.queue_declare(queue='task_queue', durable=True)

  8. message = ' '.join(sys.argv[1:]) or "Hello World!"
  9. channel.basic_publish(exchange='',
  10.                       routing_key='task_queue',
  11.                       body=message,
  12.                       properties=pika.BasicProperties(
  13.                          delivery_mode = 2, # make message persistent
  14.                       ))
  15. print " [x] Sent %r" % (message,)
  16. connection.close()

点击(此处)折叠或打开

  1. #!C:\Python27\Scripts
  2. import pika
  3. import time

  4. connection = pika.BlockingConnection(pika.ConnectionParameters(
  5.         host='localhost'))
  6. channel = connection.channel()

  7. channel.queue_declare(queue='task_queue', durable=True)
  8. print ' [*] Waiting for messages. To exit press CTRL+C'

  9. def callback(ch, method, properties, body):
  10.     print " [x] Received %r" % (body,)
  11.     time.sleep( body.count('.') )
  12.     print " [x] Done"
  13.     ch.basic_ack(delivery_tag = method.delivery_tag)

  14. channel.basic_qos(prefetch_count=1)
  15. channel.basic_consume(callback,
  16.                       queue='task_queue')

  17. channel.start_consuming()
程序的执行顺序,首先执行work.py,在两个console分别执行
然后运行producer程序,new_task 程序
H:\mq>new_task.py  f1......
 [x] Sent 'f1......'

H:\mq>new_task.py  f1.......
 [x] Sent 'f1.......'

H:\mq>new_task.py  f1........
 [x] Sent 'f1........'

H:\mq>new_task.py  f1.........
 [x] Sent 'f1.........'

H:\mq>new_task.py  f1..........
 [x] Sent 'f1..........'

H:\mq>new_task.py  f1...........
 [x] Sent 'f1...........'
work_task py 
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'f1.......'
 [x] Done
 [x] Received 'f1.........'
 [x] Done
 [x] Received 'f1...........'
 [x] Done
work_task py 
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'f1......'
[x] Done
[x] Received 'f1........'
[x] Done
[x] Received 'f1..........'
[x] Done
可以确认的是任务确实实在量给worker之间进行fair  dispatcher
此例程中需要注意的地方时: 
delivery_mode = 2, # make message persistent用来保证队列中消息的持久化prefetch参数告诉exchange,只有exchange在收到task queue的ackonowledge后才能发送新的消息channel.basic_qos(prefetch_count=1)
阅读(1067) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~