Chinaunix首页 | 论坛 | 博客
  • 博客访问: 343215
  • 博文数量: 82
  • 博客积分: 3353
  • 博客等级: 中校
  • 技术积分: 742
  • 用 户 组: 普通用户
  • 注册时间: 2008-11-20 19:13
文章分类

全部博文(82)

文章存档

2015年(1)

2014年(1)

2013年(1)

2012年(12)

2011年(3)

2010年(25)

2009年(37)

2008年(2)

我的朋友

分类: Python/Ruby

2010-04-12 11:34:43

使用 Python 进行线程编程

 

对于 Python 来说,并不缺少并发选项,其标准库中包括了对线程、进程和异步 I/O 的支持。在许多情况下,通过创建诸如异步、线程和子进程之类的高层模块,Python 简化了各种并发方法的使用。除了标准库之外,还有一些第三方的解决方案,例如 TwistedStackless 和进程模块。本文重点关注于使用 Python 的线程,并使用了一些实际的示例进行说明。虽然有许多很好的联机资源详细说明了线程 API,但本文尝试提供一些实际的示例,以说明一些常见的线程使用模式。

  首先弄清进程和线程之间的区别,这一点是非常重要的。线程与进程的不同之处在于,它们共享状态、内存和资源。对于线程来说,这个简单的区别既是它的优势,又是它的缺点。一方面,线程是轻量级的,并且相互之间易于通信,但另一方面,它们也带来了包括死锁、争用条件和高复杂性在内的各种问题。幸运的是,由于 GIL 和队列模块,与采用其他的语言相比,采用 Python 语言在线程实现的复杂性上要低得多。

1、使用 Python 线程

#!/usr/local/bin/python

#thread_exp2.py

import threading

import datetime

 #first  example

class ThreadClass(threading.Thread):

     def run(self):

        now = datetime.datetime.now()

        print "%s says hello at %s" % (self.getName(),now)

 

 for i in range(2):

    t = ThreadClass()

    t.start()

 #second example

 def test():

    now = datetime.datetime.now()

    print "the time is:%s" % now

 

t = threading.Thread(target = test)

t.start()

 

         上面例子中,threading.py文件中的类Thread(),生成一个对象,即新线程。初始化为:

def __init__(self, group=None, target=None, name=None,

                 args=(), kwargs=None, verbose=None):

        assert group is None, "group argument must be None for now"

        _Verbose.__init__(self, verbose)

        if kwargs is None:

            kwargs = {}

        self.__target = target

        self.__name = str(name or _newname())

        self.__args = args

        self.__kwargs = kwargs

        self.__daemonic = self._set_daemon()

        self.__started = False

        self.__stopped = False

        self.__block = Condition(Lock())

        self.__initialized = True

        # sys.stderr is not stored in the class like

        # sys.exc_info since it can be changed between instances

        self.__stderr = _sys.stderr

 

其中,target是我们传入的函数,是在线程中执行的对象,argskwargs是对应的参数。

从上面的例子中可以看出,start()是启动线程的,在threading.py中,它其实是调用了thread.start_new_thread(),这才是生成新线程的语句。然后调用self.run(),在这里,才调用通过target传入的函数。

         在上面的例子中,分别用了run()方法重载和通过target参数载入函数,所执行的结果为:

[zhoulm@sanpdev python]$ python thread_exp2.py

Thread-1 says hello at 2010-04-09 15:58:42.649991

Thread-2 says hello at 2010-04-09 15:58:42.650172

the time is:2010-04-09 15:58:42.650308

 

self.getName()Thread类中的方法,获得线程名。有getName(),那就会有setName(str),用于设置线程名。还有方法isAlive(),判断线程是否在运行。

对于大多数情况来说,从 threading.Thread 进行继承是一种最佳实践,因为它创建了用于线程编程的常规 API

 

 

2使用线程队列

  如前所述,当多个线程需要共享数据或者资源的时候,可能会使得线程的使用变得复杂。线程模块提供了许多同步原语,包括信号量、条件变量、事件和锁。当这些选项存在时,最佳实践是转而关注于使用队列。相比较而言,队列更容易处理,并且可以使得线程编程更加安全,因为它们能够有效地传送单个线程对资源的所有访问,并支持更加清晰的、可读性更强的设计模式。

在下一个示例中,您将首先创建一个以串行方式或者依次执行的程序,获取网站的 URL,并显示页面的前 1024 个字节。有时使用线程可以更快地完成任务,下面就是一个典型的示例。首先,让我们使用 urllib2 模块以获取这些页面(一次获取一个页面),并且对代码的运行时间进行计时:

URL 获取序列

#!/usr/local/bin/python

#get data from url,no use thread

 

import time

import urllib2

 

hosts = ["","","",\

            "",""]

 

start = time.time()

 

for host in hosts:

    url = urllib2.urlopen(host)

    print url.read(1024)

 

print "Elapsed Time:%s" % (time.time() - start)

 

 

 URL 获取线程化

#!/usr/local/bin/python

 

import Queue

import time

import threading

import urllib2

 

 

hosts = ["","","",\

            "",""]

queue = Queue.Queue()

 

class threadUrl(threading.Thread):

    def __init__(self,queue):

        threading.Thread.__init__(self)

        self.queue = queue

    def run(self):

        while True:

          #grabs host from queue

          host = self.queue.get()

          #grabs urls of hosts and prints first 1024 bytes of page

          url = urllib2.urlopen(host)

          print url.read(1024)

          #signal to queue job is done

          self.queue.task_done()         #当前的任务完成时,将会通知之前的任务

start = time.time()

def main():

   #spawn a pool of threads, and pass them queue instance

   for i in range(5):

      t = threadUrl(queue)

      t.setDaemon(True)         #守护线程?

      t.start()

   #populate queue with data

   for host in hosts:

     queue.put(host)

 

   #wait on the queue until everything has been processed

   queue.join()     

#在队列中的所有项均执行后,再推出,如果没有这句,函数将立即推出

     #这是join()执行的核心:     while self.unfinished_tasks:

     #                               self.all_tasks_done.wait()

main()

print "Elapsed Time:%s" % (time.time() - start)

 

对于这个示例,有更多的代码需要说明,但与第一个线程示例相比,它并没有复杂多少,这正是因为使用了队列模块。在 Python 中使用线程时,这个模式是一种很常见的并且推荐使用的方式。具体工作步骤描述如下:

  创建一个 Queue.Queue() 的实例,然后使用数据对它进行填充。

  将经过填充数据的实例传递给线程类,后者是通过继承 threading.Thread 的方式创建的。

  生成守护线程池。

  每次从队列中取出一个项目,并使用该线程中的数据和 run 方法以执行相应的工作。

  在完成这项工作之后,使用 queue.task_done() 函数向任务已经完成的队列发送一个信号。

  对队列执行 join 操作,实际上意味着等到队列为空,再退出主程序。

  在使用这个模式时需要注意一点:通过将守护线程设置为 true,将允许主线程或者程序仅在守护线程处于活动状态时才能够退出。这种方式创建了一种简单的方式以控制程序流程,因为在退出之前,您可以对队列执行 join 操作、或者等到队列为空。队列模块文档详细说明了实际的处理过程,请参见参考资料:

  join()

保持阻塞状态,直到处理了队列中的所有项目为止。在将一个项目添加到该队列时,未完成的任务的总数就会增加。当使用者线程调用 task_done() 以表示检索了该项目、并完成了所有的工作时,那么未完成的任务的总数就会减少。当未完成的任务的总数减少到零时,join() 就会结束阻塞状态。

 

3、使用多个队列

  因为上面介绍的模式非常有效,所以可以通过连接附加线程池和队列来进行扩展,这是相当简单的。在上面的示例中,您仅仅输出了 Web 页面的开始部分。而下一个示例则将返回各线程获取的完整 Web 页面,然后将结果放置到另一个队列中。然后,对加入到第二个队列中的另一个线程池进行设置,然后对 Web 页面执行相应的处理。这个示例中所进行的工作包括使用一个名为 Beautiful Soup 的第三方 Python 模块来解析 Web 页面。使用这个模块,您只需要两行代码就可以提取所访问的每个页面的 title 标记,并将其打印输出。

#!/usr/local/bin/python
import Queue
import time
import threading
import urllib2
from BeautifulSoup import BeautifulSoup

 hosts = ["","","",\

            "",""]

 queue = Queue.Queue()

out_queue = Queue.Queue()

 

class threadUrl(threading.Thread):

   def __init__(self,queue,out_queue):

        threading.Thread.__init__(self)

        self.queue = queue

        self.out_queue = out_queue

    def run(self):

         while True:

             host = self.queue.get()

             url = urllib2.urlopen(host)

             chunk = url.read()

             self.out_queue.put(chuck)

             self.queue.task_done()

 class datamineThread(threading.Thread):

   def __init__(self,out_queue):

         threading.Thread.__init__(self)

         self.out_queue = out_queue

         threading.Thread.__init__(self)

         self.out_queue = out_queue

    def run(self):

        while True:

            chuck = out_queue.get()

            soup = BeautifulSoup(chuck)

            print soup.findAll(['title'])

            self.out_queue.task_done()

 

start = time.time()

def main():

    for i in range(5):

       t = threadUrl(queue,out_queue)

       t.setDeamon(True)

       t.start()

    for host in hosts:

       queue.put(host)

    for i in range(5):

       dt = datamineThread(out_queue)

       dt.setDeamon(True)

       dt.start() 

    queue.join()

    out_queue.join()

main()

print "Escape Tiem is:",(time.time() - start)

 

分析这段代码时您可以看到,我们添加了另一个队列实例,然后将该队列传递给第一个线程池类 ThreadURL。接下来,对于另一个线程池类 DatamineThread,几乎复制了完全相同的结构。在这个类的 run 方法中,从队列中的各个线程获取 Web 页面、文本块,然后使用 Beautiful Soup 处理这个文本块。在这个示例中,使用 Beautiful Soup 提取每个页面的 title 标记、并将其打印输出。可以很容易地将这个示例推广到一些更有价值的应用场景,因为您掌握了基本搜索引擎或者数据挖掘工具的核心内容。一种思想是使用 Beautiful Soup 从每个页面中提取链接,然后按照它们进行导航。

  总结

  本文研究了 Python 的线程,并且说明了如何使用队列来降低复杂性和减少细微的错误、并提高代码可读性的最佳实践。尽管这个基本模式比较简单,但可以通过将队列和线程池连接在一起,以便将这个模式用于解决各种各样的问题。在最后的部分中,您开始研究如何创建更复杂的处理管道,它可以用作未来项目的模型。参考资料部分提供了很多有关常规并发性和线程的极好的参考资料。

  最后,还有很重要的一点需要指出,线程并不能解决所有的问题,对于许多情况,使用进程可能更为合适。特别是,当您仅需要创建许多子进程并对响应进行侦听时,那么标准库子进程模块可能使用起来更加容易。

 

 

阅读(1228) | 评论(0) | 转发(0) |
0

上一篇:defer的理解

下一篇:ASCII码表

给主人留下些什么吧!~~