通过消息队列来模拟了一个任务producer,在exchange以后,被均匀的分配到两个各队列当中,来进行作业的时间执行。可以根据实际的需要来进行任务调度或是负载均衡。
将任务分发给两个worker
以及通过参数prefetch来实现负载的均衡
- #!C:\Python27\Scripts
- import pika
- import sys
- connection = pika.BlockingConnection(pika.ConnectionParameters(
- host='localhost'))
- channel = connection.channel()
- channel.queue_declare(queue='task_queue', durable=True)
- message = ' '.join(sys.argv[1:]) or "Hello World!"
- channel.basic_publish(exchange='',
- routing_key='task_queue',
- body=message,
- properties=pika.BasicProperties(
- delivery_mode = 2, # make message persistent
- ))
- print " [x] Sent %r" % (message,)
- connection.close()
- #!C:\Python27\Scripts
- import pika
- import time
- connection = pika.BlockingConnection(pika.ConnectionParameters(
- host='localhost'))
- channel = connection.channel()
- channel.queue_declare(queue='task_queue', durable=True)
- print ' [*] Waiting for messages. To exit press CTRL+C'
- def callback(ch, method, properties, body):
- print " [x] Received %r" % (body,)
- time.sleep( body.count('.') )
- print " [x] Done"
- ch.basic_ack(delivery_tag = method.delivery_tag)
- channel.basic_qos(prefetch_count=1)
- channel.basic_consume(callback,
- queue='task_queue')
- channel.start_consuming()
程序的执行顺序,首先执行work.py,在两个console分别执行
然后运行producer程序,new_task 程序
H:\mq>new_task.py f1......
[x] Sent 'f1......'
H:\mq>new_task.py f1.......
[x] Sent 'f1.......'
H:\mq>new_task.py f1........
[x] Sent 'f1........'
H:\mq>new_task.py f1.........
[x] Sent 'f1.........'
H:\mq>new_task.py f1..........
[x] Sent 'f1..........'
H:\mq>new_task.py f1...........
[x] Sent 'f1...........'
work_task py
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'f1.......'
[x] Done
[x] Received 'f1.........'
[x] Done
[x] Received 'f1...........'
[x] Done
work_task py
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'f1......'
[x] Done
[x] Received 'f1........'
[x] Done
[x] Received 'f1..........'
[x] Done
可以确认的是任务确实实在量给worker之间进行fair dispatcher
此例程中需要注意的地方时:
delivery_mode = 2, # make message persistent用来保证队列中消息的持久化prefetch参数告诉exchange,只有exchange在收到task queue的ackonowledge后才能发送新的消息channel.basic_qos(prefetch_count=1)
阅读(1074) | 评论(0) | 转发(0) |