上接:
http://blog.chinaunix.net/uid-7713641-id-5707052.html
7. 使用queue.Queue() 做数据共享。
主线程往Queue() 写入0-99,开10个线程分别把这些99个数字取出来。最后使用join 对线程做同步。
-
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s (%(threadName)-2s) %(message)s',)
-
def thread_pool2():
-
def do_stuff(q):
-
while True:
-
item=q.get()
-
if item is None:
-
break
-
logging.debug(item)
-
q.task_done()
-
-
q=queue.Queue(maxsize=0)
-
num_threads=10
-
-
pool=[threading.Thread(target=do_stuff,args=(q,),daemon=False) for i in range(num_threads)]
-
for thread in pool:
-
thread.start()
-
-
for x in range(100):
-
q.put(x)
-
-
#block until all task are done
-
q.join()
-
-
#stop workers
-
for i in range(num_threads):
-
q.put(None)
-
for thread in pool:
-
thread.join()
-
-
thread_pool2()
8. 简单的消费者/生产者,使用condition, 生产者生产了后通知消费者。传递candition.
-
#another way of synchronizing threads is through using a Condition object. Because the Condition uses a Lock, it can be tied to a shared resource.
-
#This allows threads to wait for the resource to be updated. In this example, the consumer() threads wait() for the Condition to be set before continuing.
-
#The producer() thread is responsible for setting the condition and notifying the other threads that they can continue
-
def consumer_producer():
-
def consumer(cond):
-
#wait for the condition and use the resource
-
logging.debug("starting consumer thread")
-
t=threading.currentThread()
-
with cond:
-
cond.wait()
-
logging.debug("Resource is available to consume")
-
-
&nban>pause))
-
time.sleep(pause)
-
c.increment()
-
logging.debug("Done")
-
-
counter=Counter()
-
for i in range(2):
-
t=threading.Thread(target=worker,args=(counter,))
-
t.start()
-
-
logging.debug("Waiting for worker threads")
-
main_thread=threading.currentThread()
-
for t in threading.enumerate():
-
if t is not main_thread:
-
t.join()
-
logging.debug("Counter: {}".format(counter.value))
-
-
thread_lock()
10. 两个线程使用Lock来共享数据区。
acquire(blocking=True,timeout=-1):
Acquire a lock, blocking or non-blocking.
When invoked with the blocking argument set to True (the default), block until the lock is unlocked, then set it to locked and return True.
When invoked with the blocking argument set to False, do not block. If a call with blocking set to True would block, return False immediately; otherwise, set the lock to locked and return True.
When invoked with the floating-point timeout argument set to a positive value, block for at most the number of seconds specified by timeout and as long as the lock cannot be acquired. A timeout argument of -1 specifies an unbounded wait. It is forbidden to specify a timeout when blocking is false.
The return value is True if the lock is acquired successfully, False if not (for example if the timeout expired).
Changed in version 3.2: The timeout parameter is new.
Changed in version 3.2: Lock acquires can now be interrupted by signals on POSIX.
下面这段代码worker 一共获取成功了3次 lock,一共尝试了7次。
-
def who_have_lock():
-
def lock_holder(lock):
-
logging.debug("starting")
-
while True:
-
lock.acquire()
-
try:
-
logging.debug("Holding")
-
time.sleep(0.5)
-
finally:
-
logging.debug("Not holding")
-
lock.release()
-
time.sleep(0.5)
-
return
-
-
def worker(lock):
-
logging.debug("starting")
-
num_tries=0
-
num_acquires=0
-
while num_acquires < 3:
-
time.sleep(0.5)
-
logging.debug("Trying to acquire")
-
have_it=lock.acquire(blocking=True,timeout=0)
-
try:
-
num_tries+=1
-
if have_it:
-
logging.debug("Iteration {}: Acquired".format(num_tries))
-
num_acquires+=1
-
else:
-
logging.debug("Iteration {} :Not acquired".format(num_tries))
-
finally:
-
if have_it:
-
lock.release()
-
logging.debug("Done after {} iterations".format(num_tries))
-
-
lock=threading.Lock()
-
holder=threading.Thread(target=lock_holder,args=(lock,),name='LockHolder')
-
holder.setDaemon(True)
-
holder.start()
-
-
worker=threading.Thread(target=worker,args=(lock,),name="Worker")
-
worker.start()
-
-
who_have_lock()
11. semaphore的例子,初始了一个2个semaphore,允许两个线程来访问,第3个,4个的访问的将会被阻塞。直到前面某个线程结束。
-
#limiting Concurrrent Access to Resources
-
#it is useful to allow more than one worker access to a resource at a time, while still limiting the overall number. For example,
-
#a connection pool might support a fixed number of simultaneous connections, or a network application might support a fixed number of concurrent downloads. A Semaphore is one way to manage those connections
-
-
def active_pool():
-
class ActivePool:
-
def __init__(self):
-
super(ActivePool,self).__init__()
-
self.active=[]
-
self.lock=threading.Lock()
-
-
def makeActive(self,name):
-
with self.lock:
-
self.active.append(name)
-
logging.debug("Running: {}".format(self.active))
-
def makeInactive(self,name):
-
with self.lock:
-
self.active.remove(name)
-
logging.debug("Running {}".format(self.active))
-
-
def worker(s,pool):
-
logging.debug("Waiting to join the pool")
-
with s:
-
name=threading.currentThread().getName()
-
pool.makeActive(name)
-
time.sleep(0.1)
-
pool.makeInactive(name)
-
-
pool=ActivePool()
-
s=threading.Semaphore(2)
-
for i in range(4):
-
t=threading.Thread(target=worker,name=str(i),args=(s,pool))
-
t.start()
-
-
active_pool()
12 . 线程内部的数据。
-
#Thread-specific Data
-
#While some resources need to be locked so multiple threads can use them, others need to be protected so that they are hidden
-
#from view in threads that do not “own” them. The local() function creates an object capable of hiding values from view in separate threads.
-
def thread_private_data():
-
-
def show_value(data):
-
try:
-
val=data.value
-
except AttributeError as e:
-
logging.debug("No value yet")
-
else:
-
logging.debug("values={}".format(val))
-
-
def worker(data):
-
show_value(data)
-
data.value=random.randint(1,100)
-
show_value(data)
-
-
local_data=threading.local()
-
show_value(local_data)
-
local_data.value=1000
-
show_value(local_data)
-
-
for i in range(2):
-
t=threading.Thread(target=worker,args=(local_data,))
-
t.start()
-
-
thread_private_data()
-
#To initialize the settings so all threads start with the same value, use a subclass and set the attributes in __init__().
-
def thread_private_data2():
-
-
def show_value(data):
-
try:
-
val=data.value
-
except AttributeError as e:
-
logging.debug("No value yet")
-
else:
-
logging.debug("values={}".format(val))
-
-
def worker(data):
-
show_value(data)
-
data.value=random.randint(1,100)
-
show_value(data)
-
-
class MyLocal(threading.local):
-
def __init__(self,value):
-
logging.debug("Initializing {}".format(self))
-
self.value=value
-
-
local_data=MyLocal(1000)
-
show_value(local_data)
-
-
for i in range(2):
-
t=threading.Thread(target=worker,args=(local_data,))
-
t.start()
-
-
thread_private_data2()
Bonus:
Rlock, 可以多次acquire()
-
#re-entrant Locks
-
#In a situation where separate code from the same thread needs to “re-acquire” the lock, use an RLock instead
-
#with Lock() the second acquisition fails and would have blocked forever, thus need use lock.acquire(timeout=0)
-
def re_entrant_locks():
-
lock=threading.RLock()
-
print("First try {}".format(lock.acquire()))
-
print("Second try {}".format(lock.acquire(timeout=0)))
-
print("third try {}".format(lock.acquire(timeout=0)))
-
-
#re_entrant_locks()
阅读(1088) | 评论(0) | 转发(0) |