Python提供了Queue模块来专门实现消息队列
Queue对象
Queue对象实现一个fifo队列(其他的还有lifo、priority队列,这里不再介绍)。queue只有maxsize一个构造参数,用来指定队列容量,指定为0的时候代表容量无限。主要有以下成员函数:
Queue.qsize():返回消息队列的当前空间。返回的值不一定可靠。
Queue.empty():判断消息队列是否为空,返回True或False。同样不可靠。
Queue.full():类似上边,判断消息队列是否满
Queue.put(item, block=True, timeout=None):往消息队列中存放消息。block可以控制是否阻塞,timeout指定阻塞时候的等待时间。如果不阻塞或者超时,会引起一个full
exception。
Queue.put_nowait(item):相当于put(item, False).
Queue.get(block=True, timeout=None):获取一个消息,其他同put。
以下两个函数用来判断消息对应的任务是否完成。
Queue.task_done():接受消息的线程通过调用这个函数来说明消息对应的任务已完成。
Queue.join(): 实际上意味着等到队列为空,再执行别的操作
程序示例代码如下:
-
#!/usr/bin/env python
-
# -*- coding:utf-8 -*-
-
# Author :Alvin.xie
-
# @Time :2017-12-18 16:48
-
# @file :4.py
-
-
import random, threading, time
-
from Queue import Queue
-
-
-
# Producer thread
-
class Producer(threading.Thread):
-
def __init__(self, t_name, queue):
-
threading.Thread.__init__(self, name=t_name)
-
self.data = queue
-
-
def run(self):
-
for i in range(10): # 随机产生10个数字 ,可以修改为任意大小
-
# randomnum=random.randint(1,99)
-
print "%s: %s is producing %d to the queue!" % (time.ctime(), self.getName(), i)
-
self.data.put(i) # 将数据依次存入队列
-
# time.sleep(1)
-
print "%s: %s finished!" % (time.ctime(), self.getName())
-
-
-
# Consumer thread
-
class Consumer_even(threading.Thread):
-
def __init__(self, t_name, queue):
-
threading.Thread.__init__(self, name=t_name)
-
self.data = queue
-
-
def run(self):
-
while 1:
-
try:
-
val_even = self.data.get(1, 5) # get(self, block=True, timeout=None) ,1就是阻塞等待,5是超时5秒
-
if val_even % 2 == 0:
-
print "%s: %s is consuming. %d in the queue is consumed!" % (time.ctime(), self.getName(), val_even)
-
time.sleep(2)
-
else:
-
self.data.put(val_even)
-
time.sleep(2)
-
except: # 等待输入,超过5秒 就报异常
-
print "%s: %s finished!" % (time.ctime(), self.getName())
-
break
-
-
-
class Consumer_odd(threading.Thread):
-
def __init__(self, t_name, queue):
-
threading.Thread.__init__(self, name=t_name)
-
self.data = queue
-
-
def run(self):
-
while 1:
-
try:
-
val_odd = self.data.get(1, 5)
-
if val_odd % 2 != 0:
-
print "%s: %s is consuming. %d in the queue is consumed!" % (time.ctime(), self.getName(), val_odd)
-
time.sleep(2)
-
else:
-
self.data.put(val_odd)
-
time.sleep(2)
-
except:
-
print "%s: %s finished!" % (time.ctime(), self.getName())
-
break
-
-
-
# Main thread
-
def main():
-
queue = Queue()
-
producer = Producer('Pro.', queue)
-
consumer_even = Consumer_even('Con_even.', queue)
-
consumer_odd = Consumer_odd('Con_odd.', queue)
-
producer.start()
-
consumer_even.start()
-
consumer_odd.start()
-
producer.join()
-
consumer_even.join()
-
consumer_odd.join()
-
print 'All threads terminate!'
-
-
-
if __name__ == '__main__':
-
main()
执行结果如下:
Mon Dec 18 16:49:04 2017: Pro. is producing 0 to the queue!
Mon Dec 18 16:49:04 2017: Pro. is producing 1 to the queue!
Mon Dec 18 16:49:04 2017: Pro. is producing 2 to the queue!
Mon Dec 18 16:49:04 2017: Pro. is producing 3 to the queue!
Mon Dec 18 16:49:04 2017: Con_even. is consuming. 0 in the queue is consumed!
Mon Dec 18 16:49:04 2017: Pro. is producing 4 to the queue!
Mon Dec 18 16:49:04 2017: Con_odd. is consuming. 1 in the queue is consumed!
Mon Dec 18 16:49:04 2017: Pro. is producing 5 to the queue!
Mon Dec 18 16:49:04 2017: Pro. is producing 6 to the queue!
Mon Dec 18 16:49:04 2017: Pro. is producing 7 to the queue!
Mon Dec 18 16:49:04 2017: Pro. is producing 8 to the queue!
Mon Dec 18 16:49:04 2017: Pro. is producing 9 to the queue!
Mon Dec 18 16:49:04 2017: Pro. finished!
Mon Dec 18 16:49:06 2017: Con_even. is consuming. 2 in the queue is consumed!
Mon Dec 18 16:49:06 2017: Con_odd. is consuming. 3 in the queue is consumed!
Mon Dec 18 16:49:08 2017: Con_even. is consuming. 4 in the queue is consumed!
Mon Dec 18 16:49:08 2017: Con_odd. is consuming. 5 in the queue is consumed!
Mon Dec 18 16:49:10 2017: Con_even. is consuming. 6 in the queue is consumed!
Mon Dec 18 16:49:10 2017: Con_odd. is consuming. 7 in the queue is consumed!
Mon Dec 18 16:49:12 2017: Con_even. is consuming. 8 in the queue is consumed!
Mon Dec 18 16:49:12 2017: Con_odd. is consuming. 9 in the queue is consumed!
Mon Dec 18 16:49:19 2017: Con_even. finished!
Mon Dec 18 16:49:19 2017: Con_odd. finished!
All threads terminate!
Process finished with exit code 0
阅读(2003) | 评论(0) | 转发(0) |