Chinaunix首页 | 论坛 | 博客
  • 博客访问: 4562038
  • 博文数量: 1214
  • 博客积分: 13195
  • 博客等级: 上将
  • 技术积分: 9105
  • 用 户 组: 普通用户
  • 注册时间: 2007-01-19 14:41
个人简介

C++,python,热爱算法和机器学习

文章分类

全部博文(1214)

文章存档

2021年(13)

2020年(49)

2019年(14)

2018年(27)

2017年(69)

2016年(100)

2015年(106)

2014年(240)

2013年(5)

2012年(193)

2011年(155)

2010年(93)

2009年(62)

2008年(51)

2007年(37)

分类: Python/Ruby

2016-08-13 10:40:45

原文地址:http://www.cnblogs.com/vamei/archive/2012/10/12/2721484.html

我们已经见过了使用subprocess包来创建子进程,但这个包有两个很大的局限性:1) 我们总是让subprocess运行外部的程序,而不是运行一个Python脚本内部编写的函数。2) 进程间只通过管道进行文本交流。以上限制了我们将subprocess包应用到更广泛的多进程任务。(这样的比较实际是不公平的,因为subprocessing本身就是设计成为一个shell,而不是一个多进程管理包)

 

threading和multiprocessing

(请尽量先阅读Python多线程与同步)

multiprocessing包是Python中的多进程管理包。与threading.Thread类似,它可以利用multiprocessing.Process对象来创建一个进程。该进程可以运行在Python程序内部编写的函数。该Process对象与Thread对象的用法相同,也有start(), run(), join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition类 (这些对象可以像多线程那样,通过参数传递给各个进程),用以同步进程,其用法与threading包中的同名类一致。所以,multiprocessing的很大一部份与threading使用同一套API,只不过换到了多进程的情境。

但在使用这些共享API的时候,我们要注意以下几点:

  • 在UNIX平台上,当某个进程终结之后,该进程需要被其父进程调用wait,否则进程成为僵尸进程(Zombie)。所以,有必要对每个Process对象调用join()方法 (实际上等同于wait)。对于多线程来说,由于只有一个进程,所以不存在此必要性。
  • multiprocessing提供了threading包中没有的IPC(比如Pipe和Queue),效率上更高。应优先考虑Pipe和Queue,避免使用Lock/Event/Semaphore/Condition等同步方式 (因为它们占据的不是用户进程的资源)。
  • 多进程应该避免共享资源。在多线程中,我们可以比较容易地共享资源,比如使用全局变量或者传递参数。在多进程情况下,由于每个进程有自己独立的内存空间,以上方法并不合适。此时我们可以通过共享内存Manager的方法来共享资源。但这样做提高了程序的复杂度,并因为同步的需要而降低了程序的效率。

Process.PID中保存有PID,如果进程还没有start(),则PID为None。

 

我们可以从下面的程序中看到Thread对象和Process对象在使用上的相似性与结果上的不同。各个线程和进程都做一件事:打印PID。但问题是,所有的任务在打印的时候都会向同一个标准输出(stdout)输出。这样输出的字符会混合在一起,无法阅读。使用Lock同步,在一个任务输出完成之后,再允许另一个任务输出,可以避免多个任务同时向终端输出。

复制代码
# Similarity and difference of multi thread vs. multi process # Written by Vamei import os import threading import multiprocessing # worker function def worker(sign, lock):
    lock.acquire() print(sign, os.getpid())
    lock.release() # Main print('Main:',os.getpid()) # Multi-thread record = []
lock = threading.Lock() for i in range(5):
    thread = threading.Thread(target=worker,args=('thread',lock))
    thread.start()
    record.append(thread) for thread in record:
    thread.join() # Multi-process record = []
lock = multiprocessing.Lock() for i in range(5):
    process = multiprocessing.Process(target=worker,args=('process',lock))
    process.start()
    record.append(process) for process in record:
    process.join()
复制代码

所有Thread的PID都与主程序相同,而每个Process都有一个不同的PID。

(练习: 使用mutiprocessing包将Python多线程与同步中的多线程程序更改为多进程程序)

 

Pipe和Queue

正如我们在Linux多线程中介绍的管道PIPE和消息队列message queue,multiprocessing包中有PipeQueue类来分别支持这两种IPC机制。Pipe和Queue可以用来传送常见的对象。

 

1) Pipe可以是单向(half-duplex),也可以是双向(duplex)。我们通过mutiprocessing.Pipe(duplex=False)创建单向管道 (默认为双向)。一个进程从PIPE一端输入对象,然后被PIPE另一端的进程接收,单向管道只允许管道一端的进程输入,而双向管道则允许从两端输入。

下面的程序展示了Pipe的使用:

 

复制代码
# Multiprocessing with Pipe # Written by Vamei import multiprocessing as mul def proc1(pipe):
    pipe.send('hello') print('proc1 rec:',pipe.recv()) def proc2(pipe): print('proc2 rec:',pipe.recv())
    pipe.send('hello, too') # Build a pipe pipe = mul.Pipe() # Pass an end of the pipe to process 1 p1   = mul.Process(target=proc1, args=(pipe[0],)) # Pass the other end of the pipe to process 2 p2   = mul.Process(target=proc2, args=(pipe[1],))
p1.start()
p2.start()
p1.join()
p2.join()
复制代码

这里的Pipe是双向的。

Pipe对象建立的时候,返回一个含有两个元素的表,每个元素代表Pipe的一端(Connection对象)。我们对Pipe的某一端调用send()方法来传送对象,在另一端使用recv()来接收。

 

2) Queue与Pipe相类似,都是先进先出的结构。但Queue允许多个进程放入,多个进程从队列取出对象。Queue使用mutiprocessing.Queue(maxsize)创建,maxsize表示队列中可以存放对象的最大数量。

下面的程序展示了Queue的使用:

复制代码
# Written by Vamei import os import multiprocessing import time #================== # input worker def inputQ(queue):
    info = str(os.getpid()) + '(put):' + str(time.time())
    queue.put(info) # output worker def outputQ(queue,lock):
    info = queue.get()
    lock.acquire() print (str(os.getpid()) + '(get):' + info)
    lock.release() #=================== # Main record1 = [] # store input processes record2 = [] # store output processes lock  = multiprocessing.Lock() # To prevent messy print queue = multiprocessing.Queue(3) # input processes for i in range(10):
    process = multiprocessing.Process(target=inputQ,args=(queue,))
    process.start()
    record1.append(process) # output processes for i in range(10):
    process = multiprocessing.Process(target=outputQ,args=(queue,lock))
    process.start()
    record2.append(process) for p in record1:
    p.join()

queue.close() # No more object will come, close the queue for p in record2:
    p.join()
复制代码

 一些进程使用put()在Queue中放入字符串,这个字符串中包含PID和时间。另一些进程从Queue中取出,并打印自己的PID以及get()的字符串。

 

总结

Process, Lock, Event, Semaphore, Condition

Pipe, Queue

阅读(1089) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~