Chinaunix首页 | 论坛 | 博客
  • 博客访问: 373814
  • 博文数量: 270
  • 博客积分: 285
  • 博客等级: 二等列兵
  • 技术积分: 2696
  • 用 户 组: 普通用户
  • 注册时间: 2010-03-06 22:04
  • 认证徽章:
个人简介

Linuxer, ex IBMer. GNU https://hmchzb19.github.io/

文章分类

全部博文(270)

文章存档

2018年(41)

2017年(80)

2016年(80)

2015年(58)

2014年(1)

2013年(8)

2012年(3)

分类: Python/Ruby

2018-09-05 20:46:20

今天研究了一个以前的写的生产者-消费者的线程代码,这个代码会一直hang下去. 代码如下


点击(此处)折叠或打开

  1. # two entities here that try to share a common resource, a queue
  2. import threading
  3. from threading import Thread,Event
  4. from queue import Queue
  5. import time,random
  6. import logging
  7. import sys

  8. logging.basicConfig(level=logging.DEBUG,format='%(asctime)s (%(threadName)-10s) %(message)s',)
  9. _sentinel=object()
  10. #_sentinel = None

  11. class Producer(threading.Thread):
  12.     def __init__(self,queue):
  13.         Thread.__init__(self)
  14.         self.queue=queue
  15.         self.daemon=True

  16.     def run(self):
  17.         for i in range(5):
  18.             item=random.randint(0,256)
  19.             self.queue.put(item)
  20.             logging.debug('Producer notify: item %d appended to queue by %s\n' %(item,self.name))
  21.         self.queue.put(_sentinel)
  22.         

  23. class Consumer(threading.Thread):
  24.     def __init__(self,queue):
  25.         Thread.__init__(self)
  26.         self.queue=queue
  27.         self.daemon=True
  28.     
  29.     def run(self):
  30.         while True:
  31.             item=self.queue.get()
  32.             if item is _sentinel:
  33.                 self.queue.put(_sentinel)
  34.                 break
  35.             else:
  36.                 logging.debug('Consumer notify: %d poped from queue by %s' %(item,self.name))
  37.                 self.queue.task_done()

  38. def main():
  39.     shared_queue=Queue()
  40.     consumers = []
  41.     #start the producer
  42.     p1=Producer(shared_queue)
  43.     p1.start()
  44.    
  45.     #start 3 consumer
  46.     for i in range(0, 3):
  47.         t=Consumer(shared_queue)
  48.         t.start()
  49.         consumers.append(t)

  50.      
  51.     #join the queue
  52.     shared_queue.join()
  53.     
  54.     #stop producer
  55.     p1.join()
  56.     
  57.     #stop consumers
  58.     for t in consumers:
  59.         t.join()

  60. if __name__=='__main__':
  61.     main()


点击(此处)折叠或打开

  1. root@kali:/usr/local/src/py/network# ./thread_share_queue_hang.py
  2. 2018-09-05 20:20:01,481 (Thread-1 ) Producer notify: item 93 appended to queue by Thread-1

  3. 2018-09-05 20:20:01,482 (Thread-1 ) Producer notify: item 144 appended to queue by Thread-1

  4. 2018-09-05 20:20:01,482 (Thread-2 ) Consumer notify: 93 poped from queue by Thread-2
  5. 2018-09-05 20:20:01,482 (Thread-1 ) Producer notify: item 45 appended to queue by Thread-1

  6. 2018-09-05 20:20:01,482 (Thread-2 ) Consumer notify: 144 poped from queue by Thread-2
  7. 2018-09-05 20:20:01,482 (Thread-3 ) Consumer notify: 45 poped from queue by Thread-3
  8. 2018-09-05 20:20:01,482 (Thread-1 ) Producer notify: item 106 appended to queue by Thread-1

  9. 2018-09-05 20:20:01,483 (Thread-4 ) Consumer notify: 106 poped from queue by Thread-4
  10. 2018-09-05 20:20:01,483 (Thread-1 ) Producer notify: item 219 appended to queue by Thread-1

  11. 2018-09-05 20:20:01,483 (Thread-2 ) Consumer notify: 219 poped from queue by Thread-2
  12. ^CTraceback (most recent call last):
  13.   File "./thread_share_queue_hang.py", line 73, in <module>
  14.     main()
  15.   File "./thread_share_queue_hang.py", line 63, in main
  16.     shared_queue.join()

这段代码会hang在63行,shared_queue.join().  为什么呢?看官方文档的解释,join方法会阻塞直到queue里面所有的item都被处理完。producer 和3个consumer 都往queue里面放了一个None, 共有4个None, 而其中三个None被用来做sentinel 来使得3个consumer线程能够结束。最后留下一个None在queue里面,而没有人来处理最后的这个None,所以会导致queue.join()会一直hang.

点击(此处)折叠或打开

  1. Queue.join()

  2.     Blocks until all items in the queue have been gotten and processed.

那么问题找到了,我就重新写了这个main()部分,但是我用了不同的思路:
第一种:
main thread来检查是否只剩下最后一个线程了,如果只剩下最后一个线程(即它自己)
就从queue里面取出最后一个元素处理它.用task_done()函数。

点击(此处)折叠或打开

  1. def main():
  2.     shared_queue=Queue()
  3.     consumers = []
  4.     #start the producer
  5.     p1=Producer(shared_queue)
  6.     p1.start()
  7.    
  8.     #start 3 consumer
  9.     for i in range(0, 3):
  10.         t=Consumer(shared_queue)
  11.         t.start()
  12.         consumers.append(t)

  13.      
  14.     while True:
  15.         #if I - the main thread also am the last thread
  16.         #print(threading.active_count())
  17.         if threading.active_count() == 1:
  18.             #get the last elem from the queue
  19.             if shared_queue.get() == _sentinel:
  20.                 shared_queue.task_done()
  21.                 break
  22.         else:
  23.             time.sleep(0.5)

第二种:
使用qsize() 方法来检查如果queue里面只剩下一个elem,那么我处理它就可以了。
因为这时候其他线程都已经结束了。但是这个qsize() 根据文档来说并不可靠。not reliable

点击(此处)折叠或打开

  1. #create shared queue
  2.     shared_queue=Queue()
  3.     consumers = []

  4.     #start the producer
  5.     p1=Thread(target=producer, args=(shared_queue, ))
  6.     p1.start()
  7.    
  8.     #start 3 consumer
  9.     for i in range(0, 3):
  10.         t=Thread(target=consumer, args=(shared_queue,))
  11.         t.start()
  12.         consumers.append(t)

  13.      
  14.     while True:
  15.         #print(threading.active_count())
  16.         #qsize() method of queue.Queue instance
  17.         #Return the approximate size of the queue (not
  18.         #use the qsize() method to justify whether there is only 1 None left in the shared queue
  19.         if shared_queue.qsize() == 1:
  20.             #get the last elem from the queue
  21.             if shared_queue.get_nowait() is None:
  22.                 shared_queue.task_done()
  23.                 break
  24.         else:
  25.             time.sleep(0.5)

第三种:
干脆我不对queue做join操作,只要我的线程做join就可以了。

点击(此处)折叠或打开

  1. shared_queue=Queue()
  2.     consumers = []
  3.     #start the producer
  4.     p1=Producer(shared_queue)
  5.     p1.start()
  6.    
  7.     #start 3 consumer
  8.     for i in range(0, 3):
  9.         t=Consumer(shared_queue)
  10.         t.start()
  11.         consumers.append(t)

  12.     #block until all task in shared_queue are done
  13.     #we can not do this, because we put 4 None in the queue, but only use 3 of them
  14.     #as sentinel for consumer, thus use shared_queue.join() will block forever
  15.     #shared_queue.join()
  16.     
  17.     #stop producer
  18.     p1.join()
  19.     
  20.     #stop consumers
  21.     for t in consumers:
  22.         t.join()

这三种写法都达到了同样的效果,但是重新看了queue的文档,让我感觉学到了新知识。
阅读(19) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~
评论热议
请登录后评论。

登录 注册