今天研究了一个以前的写的生产者-消费者的线程代码,这个代码会一直hang下去. 代码如下
-
# two entities here that try to share a common resource, a queue
-
import threading
-
from threading import Thread,Event
-
from queue import Queue
-
import time,random
-
import logging
-
import sys
-
-
logging.basicConfig(level=logging.DEBUG,format='%(asctime)s (%(threadName)-10s) %(message)s',)
-
_sentinel=object()
-
#_sentinel = None
-
-
class Producer(threading.Thread):
-
def __init__(self,queue):
-
Thread.__init__(self)
-
self.queue=queue
-
self.daemon=True
-
-
def run(self):
-
for i in range(5):
-
item=random.randint(0,256)
-
self.queue.put(item)
-
logging.debug('Producer notify: item %d appended to queue by %s\n' %(item,self.name))
-
self.queue.put(_sentinel)
-
-
-
class Consumer(threading.Thread):
-
def __init__(self,queue):
-
Thread.__init__(self)
-
self.queue=queue
-
self.daemon=True
-
-
def run(self):
-
while True:
-
item=self.queue.get()
-
if item is _sentinel:
-
self.queue.put(_sentinel)
-
break
-
else:
-
logging.debug('Consumer notify: %d poped from queue by %s' %(item,self.name))
-
self.queue.task_done()
-
-
def main():
-
shared_queue=Queue()
-
consumers = []
-
#start the producer
-
p1=Producer(shared_queue)
-
p1.start()
-
-
#start 3 consumer
-
for i in range(0, 3):
-
t=Consumer(shared_queue)
-
t.start()
-
consumers.append(t)
-
-
-
#join the queue
-
shared_queue.join()
-
-
#stop producer
-
p1.join()
-
-
#stop consumers
-
for t in consumers:
-
t.join()
-
-
if __name__=='__main__':
-
main()
-
root@kali:/usr/local/src/py/network# ./thread_share_queue_hang.py
-
2018-09-05 20:20:01,481 (Thread-1 ) Producer notify: item 93 appended to queue by Thread-1
-
-
2018-09-05 20:20:01,482 (Thread-1 ) Producer notify: item 144 appended to queue by Thread-1
-
-
2018-09-05 20:20:01,482 (Thread-2 ) Consumer notify: 93 poped from queue by Thread-2
-
2018-09-05 20:20:01,482 (Thread-1 ) Producer notify: item 45 appended to queue by Thread-1
-
-
2018-09-05 20:20:01,482 (Thread-2 ) Consumer notify: 144 poped from queue by Thread-2
-
2018-09-05 20:20:01,482 (Thread-3 ) Consumer notify: 45 poped from queue by Thread-3
-
2018-09-05 20:20:01,482 (Thread-1 ) Producer notify: item 106 appended to queue by Thread-1
-
-
2018-09-05 20:20:01,483 (Thread-4 ) Consumer notify: 106 poped from queue by Thread-4
-
2018-09-05 20:20:01,483 (Thread-1 ) Producer notify: item 219 appended to queue by Thread-1
-
-
2018-09-05 20:20:01,483 (Thread-2 ) Consumer notify: 219 poped from queue by Thread-2
-
^CTraceback (most recent call last):
-
File "./thread_share_queue_hang.py", line 73, in <module>
-
main()
-
File "./thread_share_queue_hang.py", line 63, in main
-
shared_queue.join()
这段代码会hang在63行,shared_queue.join(). 为什么呢?看官方文档的解释,join方法会阻塞直到queue里面所有的item都被处理完。producer 和3个consumer 都往queue里面放了一个None, 共有4个None, 而其中三个None被用来做sentinel 来使得3个consumer线程能够结束。最后留下一个None在queue里面,而没有人来处理最后的这个None,所以会导致queue.join()会一直hang.
-
Queue.join()
-
-
Blocks until all items in the queue have been gotten and processed.
那么问题找到了,我就重新写了这个main()部分,但是我用了不同的思路:
第一种:
main thread来检查是否只剩下最后一个线程了,如果只剩下最后一个线程(即它自己)
就从queue里面取出最后一个元素处理它.用task_done()函数。
-
def main():
-
shared_queue=Queue()
-
consumers = []
-
#start the producer
-
p1=Producer(shared_queue)
-
p1.start()
-
-
#start 3 consumer
-
for i in range(0, 3):
-
t=Consumer(shared_queue)
-
t.start()
-
consumers.append(t)
-
-
-
while True:
-
#if I - the main thread also am the last thread
-
#print(threading.active_count())
-
if threading.active_count() == 1:
-
#get the last elem from the queue
-
if shared_queue.get() == _sentinel:
-
shared_queue.task_done()
-
break
-
else:
-
time.sleep(0.5)
第二种:
使用qsize() 方法来检查如果queue里面只剩下一个elem,那么我处理它就可以了。
因为这时候其他线程都已经结束了。但是这个qsize() 根据文档来说并不可靠。not reliable
-
#create shared queue
-
shared_queue=Queue()
-
consumers = []
-
-
#start the producer
-
p1=Thread(target=producer, args=(shared_queue, ))
-
p1.start()
-
-
#start 3 consumer
-
for i in range(0, 3):
-
t=Thread(target=consumer, args=(shared_queue,))
-
t.start()
-
consumers.append(t)
-
-
-
while True:
-
#print(threading.active_count())
-
#qsize() method of queue.Queue instance
-
#Return the approximate size of the queue (not
-
#use the qsize() method to justify whether there is only 1 None left in the shared queue
-
if shared_queue.qsize() == 1:
-
#get the last elem from the queue
-
if shared_queue.get_nowait() is None:
-
shared_queue.task_done()
-
break
-
else:
-
time.sleep(0.5)
第三种:
干脆我不对queue做join操作,只要我的线程做join就可以了。
-
shared_queue=Queue()
-
consumers = []
-
#start the producer
-
p1=Producer(shared_queue)
-
p1.start()
-
-
#start 3 consumer
-
for i in range(0, 3):
-
t=Consumer(shared_queue)
-
t.start()
-
consumers.append(t)
-
-
#block until all task in shared_queue are done
-
#we can not do this, because we put 4 None in the queue, but only use 3 of them
-
#as sentinel for consumer, thus use shared_queue.join() will block forever
-
#shared_queue.join()
-
-
#stop producer
-
p1.join()
-
-
#stop consumers
-
for t in consumers:
-
t.join()
这三种写法都达到了同样的效果,但是重新看了queue的文档,让我感觉学到了新知识。
阅读(1642) | 评论(0) | 转发(0) |