Perl 对线程的支持最早可以追溯到 1998 年 7 月发布的 Perl v5.005。其发布申明指出,Perl v5.005 中加入了对操作系统级线程的支持,这个新特性是一个实验性的产品,这也就是我们现在所称的 5005threads 线程模型。对于 5005threads 线程模型来说,默认情况下,所有数据结构都是共享的,所以用户必须负责这些共享数据结构的同步访问。如今 5005threads 已经不再被推荐实用,Perl v5.10 以后的版本里,也将不会再支持 5005threads 线程模型。
2000 年 5 月发布的 Perl v5.6.0 中开始引入了一个全新的线程模型,即 interpreter threads, 或称为 ithreads,也正是在这个版本的发布申明中第一次提出了 5005threads 线程模型将来可能会被禁用的问题。尽管如此,ithreads 在那个时候还是一个新的实验性的线程模型,用户并不能直接使用它,唯一的办法是通过 fork 函数模拟。经过两年时间的发展,到 2002 年 7 月,Perl v5.8.0 正式发布,这时 ithreads 已经是一个相对成熟的线程模型,发布申明中也鼓励用户从老的 5005threads 线程模型转换到新的 ithreads 线程模型,并明确指出 5005threads 线程模型最终将被淘汰。本文后面所讨论的所有内容也都是基于新的 ithreads 线程模型。在 ithreads 线程模型中,最与众不同的特点就在于默认情况一下一切数据结构都不是共享的,这一点我们会在后面内容中有更深刻的体会。
既然 Perl 中有可能存在两种不同的线程模型,我们很自然地就需要判断现有 Perl 环境到底支持的是哪一种线程实现方式。归纳起来,我们有两种方法:
- 在 shell 里,我们可以通过执行 perl – V | grep usethreads 命令来获取当前线程模型的相关信息,例如
> perl -V | grep use.*threads config_args='-des -Doptimize=-O2 -g -pipe -m32 -march=i386 -mtune=pentium4 -Dversion=5.8.5 -Dmyhostname=localhost -Dperladmin=root@localhost -Dcc=gcc -Dcf_by=Red Hat, Inc. -Dinstallprefix=/usr -Dprefix=/usr -Darchname=i386-linux -Dvendorprefix=/usr -Dsiteprefix=/usr -Duseshrplib -Dusethreads -Duseithreads -Duselargefiles -Dd_dosuid -Dd_semctl_semun -Di_db -Ui_ndbm -Di_gdbm -Di_shadow -Di_syslog -Dman3ext=3pm -Duseperlio -Dinstallusrbinperl -Ubincompat5005 -Uversiononly -Dpager=/usr/bin/less -isr -Dinc_version_list=5.8.4 5.8.3 5.8.2 5.8.1 5.8.0' usethreads=define use5005threads=undef useithreads=define usemultiplicity=define |
从结果中不难看出,在当前的 Perl 环境中提供了对 ithreads 线程模型的支持。
- 在 Perl 程序中,我们也可以通过使用 Config 模块来动态获取 Perl 线程模型的相关信息,例如
#!/usr/bin/perl # use Config; if( $Config{useithreads} ) { printf("Hello ithreads\n") } elsif( $Config{use5005threads} ) { printf("Hello 5005threads\n"); } else { printf("Can not support thread in your perl environment\n"); exit( 1 ); } |
值得一提的是,对于 5005threads 和 ithreads 线程模型,Perl 同时只能支持其中的一种。你不可能在某一个 Perl 环境中同时使用这两种线程模型。本文后面讨论的所有内容都是基于 ithreads 线程模型的。
线程作为 Perl 中的一种实体,其一生可以粗略的分为创建,运行与退出这三个阶段。创建使得线程从无到有,运行则是线程完成其主要工作的阶段,退出自然就是指线程的消亡。线程的运行和普通函数的执行非常类似,有其入口参数,一段特定的代码流程以及执行完毕后返回的一个或一组结果,唯一与普通函数调用的不同之处就在于新建线程的执行与当前线程的执行是并行的。
Perl 里创建一个新的线程非常简单,主要有两种方法,他们分别是:
- 使用 threads 包的 create() 方法,例如
use threads; sub say_hello { printf("Hello thread! @_.\n"); return( rand(10) ); } my $t1 = threads->create( \&say_hello, "param1", "param2" ); my $t2 = threads->create( "say_hello", "param3", "param4" ); my $t3 = threads->create( sub { printf("Hello thread! @_\n"); return( rand(10) ); }, "param5", "param6" ); |
- 使用 async{} 块创建线程,例如
#!/usr/bin/perl # use threads; my $t4 = async{ printf("Hello thread!\n"); }; |
线程一旦被成功创建,它就立刻开始运行了,这个时候你面临两种选择,分别是 join 或者 detach 这个新建线程。当然你也可以什么都不做,不过这可不是一个好习惯,后面我们会解释这是为什么。
我们先来看看 join 方法, 这也许是大多数情况下你想要的。从字面上来理解,join 就是把新创建的线程结合到当前的主线程中来,把它当成是主线程的一部分,使他们合二为一。join 会触发两个动作,首先,主线程会索取新建线程执行结束以后的返回值;其次,新建线程在执行完毕并返回结果以后会自动释放它自己所占用的系统资源。例如
#!/usr/bin/perl # use threads; sub func { sleep(1); return(rand(10)); } my $t1 = threads->create( \&func ); my $t2 = threads->create( \&func ); printf("do something in the main thread\n"); my $t1_res = $t1->join(); my $t2_res = $t2->join(); printf("t1_res = $t1_res\nt2_res = $t2_res\n"); |
由此我们不难发现,调用 join 的时机是一个十分有趣的问题。如果调用 join 方法太早,新建线程尚未执行完毕,自然就无法返回任何结果,那么这个时候,主线程就不得不被阻塞,直到新建线程执行完毕之后,才能获得返回值,然后资源会被释放,join 才能结束,这在很大程度上破话了线程之间的并行性。相反,如果调用 join 方法太晚,新建线程早已执行完毕,由于一直没有机会返回结果,它所占用的资源就一直无法得到释放,直到被 join 为止,这在很大程度上浪费了宝贵的系统资源。因此,join 新建线程的最好时机应该是在它刚刚执行完毕的时候,这样既不会阻塞当前线程的执行,又可以及时释放新建线程所占用的系统资源。
我们再来看看 detach 方法,这也许是最省心省力的处理方法了。从字面上来理解,detach 就是把新创建的线程与当前的主线程剥离开来,让它从此和主线程无关。当你使用 detach 方法的时候,表明主线程并不关心新建线程执行以后返回的结果,新建线程执行完毕后 Perl 会自动释放它所占用的资源。例如
#!/usr/bin/perl # use threads; use Config; sub say_hello { my ( $name ) = @_; printf("Hello World! I am $name.\n"); } my $t1 = threads->create( \&say_hello, "Alex" ); $t1->detach(); printf("doing something in main thread\n"); sleep(1); |
一个新建线程一旦被 detach 以后,就无法再 join 了。当你使用 detach 方法剥离线程的时候,有一点需要特别注意,那就是你需要保证被创建的线程先于主线程结束,否则你创建的线程会被迫结束,除非这种结果正是你想要的,否则这也许会造成异常情况的出现,并增加程序调试的难度。
本节的开始我们提到,新线程被创建以后,如果既不 join,也不 detach 不是一个好习惯,这是因为除非明确地调用 detach 方法剥离线程,Perl 会认为你也许要在将来的某一个时间点调用 join,所以新建线程的返回值会一直被保存在内存中以备不时之需,它所占用的系统资源也一直不会得到释放。然而实际上,你打算什么也不做,因此宝贵的系统资源直到整个 Perl 应用结束时才被释放。同时,由于你即没有调用 join 有没有调用 detach,应用结束时 Perl 还会返回给你一个线程非正常结束的警告。
大多数情况下,你希望你创建的线程正常退出,这就意味着线程所对应的函数体在执行完毕后返回并释放资源。例如在清单 5 的示例中,新建线程被 join 以后的退出过程。可是,如果由于 detach 不当或者由于主线因某些意外的异常提前结束了,尽管它所创建的线程可能尚未执行完毕,但是他们还是会被强制中止,正所谓皮之不存,毛将焉附。这时你也许会得到一个类似于“Perl exited with active threads”的警告。
当然,你也可以显示地调用 exit() 方法来结束一个线程,不过值得注意的是,默认情况下,如果你在一个线程中调用了 exit() 方法, 其他线程都会随之一起结束,在很多情况下,这也许不是你想要的,如果你希望 exit() 方法只在调用它的线程内生效,那么你在创建该线程的时候就需要设置’ exit ’ => ’ thread_only ’。例如
#!/usr/bin/perl # use threads; sub say_hello { printf("Hello thread! @_.\n"); sleep(10); printf("Bye\n"); } sub quick_exit { printf("I will be exit in no time\n"); exit(1); } my $t1 = threads->create( \&say_hello, "param1", "param2" ); my $t2 = threads->create( {'exit'=>'thread_only'}, \&quick_exit ); $t1->join(); $t2->join(); |
如果你希望每个线程的 exit 方法都只对自己有效,那么在每次创建一个新线程的时候都去要显式设置’ exit ’ => ’ thread_only ’属性显然有些麻烦,你也可以在引入 threads 包的时候设置这个属性在全局范围内有效,例如
use threads ('exit' => 'threads_only'); sub func { ... if( $condition ) { exit(1); } } my $t1 = threads->create( \&func ); my $t2 = threads->create( \&func ); $t1->join(); $t2->join(); |
和现有大多数线程模型不同,在 Perl ithreads 线程模型中,默认情况下任何数据结构都不是共享的。当一个新线程被创建以后,它就已经包含了当前所有数据结构的一份私有拷贝,新建线程中对这份拷贝的数据结构的任何操作都不会在其他线程中有效。因此,如果需要使用任何共享的数据,都必须显式地申明。threads::shared 包可以用来实现线程间共享数据的目的。
#!/usr/bin/perl # use threads; use threads::shared; use strict; my $var :shared = 0; # use :share tag to define my @array :shared = (); # use :share tag to define my %hash = (); share(%hash); # use share() funtion to define sub start { $var = 100; @array[0] = 200; @array[1] = 201; $hash{'1'} = 301; $hash{'2'} = 302; } sub verify { sleep(1); # make sure thread t1 execute firstly printf("var = $var\n"); # var=100 for(my $i = 0; $i < scalar(@array); $i++) { printf("array[$i] = $array[$i]\n"); # array[0]=200; array[1]=201 } foreach my $key ( sort( keys(%hash) ) ) { printf("hash{$key} = $hash{$key}\n"); # hash{1}=301; hash{2}=302 } } my $t1 = threads->create( \&start ); my $t2 = threads->create( \&verify ); $t1->join(); $t2->join(); |
多线程间既然有了共享的数据,那么就必须对共享数据进行小心地访问,否则,冲突在所难免。Perl ithreads 线程模型中内置的 lock 方法实现了线程间共享数据的锁机制。有趣的是,并不存在一个 unlock 方法用来显式地解锁,锁的生命周期以代码块为单位,也就是说,当 lock 操作所在的代码块执行结束之后,也就是锁被隐式释放之时。例如
use threads::shared; # in thread 1 { lock( $share ); # lock for 3 seconds sleep(3); # other threads can not lock again } # unlock implicitly now after the block # in thread 2 { lock($share); # will be blocked, as already locked by thread 1 $share++; # after thread 1 quit from the block } # unlock implicitly now after the block |
上面的示例中,我们在 thread 1 中使用 lock 方法锁住了一个普通的标量,这会导致 thread 2 在试图获取 $share 变量的锁时被阻塞,当 thread 1 从调用 lock 的代码块中退出时,锁被隐式地释放,从而 thread 2 阻塞结束,lock 成功以后,thread 2 才可以执行 $share++ 的操作。对于数组和哈希表来说,lock 必须用在整个数据结构上,而不是用在数组或哈希表的某一个元素上。例如
use threads; use threads::shared; { lock(@share); # the array has been locked lock(%hash); # the hash has been locked sleep(3); # other threads can not lock again } { lock($share[1]); # error will occur lock($hash{key}); # error will occur } |
假如一个线程对某一个共享变量实施了锁操作,在它没有释放锁之前,如果另外一个线程也对这个共享变量实施锁操作,那么这个线程就会被阻塞,阻塞不会被自动中止而是直到前一个线程将锁释放为止。这样的模式就带来了我们常见的死锁问题。例如
use threads; use threads::shared; # in thread 1 { lock($a); # lock for 3 seconds sleep(3); # other threads can not lock again lock($b); # dead lock here } # in thread 2 { lock($b); # will be blocked, as already locked by thread 1 sleep(3); # after thread 1 quit from the block lock($a); # dead lock here } |
死锁常常是多线程程序中最隐蔽的问题,往往难以发现与调试,也增加了排查问题的难度。为了避免在程序中死锁的问题,在程序中我们应该尽量避免同时获取多个共享变量的锁,如果无法避免,那么一是要尽量使用相同的顺序来获取多个共享变量的锁,另外也要尽可能地细化上锁的粒度,减少上锁的时间。
Thread::Semaphore 包为线程提供了信号量的支持。你可以创建一个自己的信号量,并通过 down 操作和 up 操作来实现对资源的同步访问。实际上,down 操作和 up 操作对应的就是我们所熟知的 P 操作和 V 操作。从内部实现上看,Thread::Semaphore 本质上就是加了锁的共享变量,无非是把这个加了锁的共享变量封装成了一个线程安全的包而已。由于信号量不必与任何变量绑定,因此,它非常灵活,可以用来控制你想同步的任何数据结构和程序行为。例如
use threads; use threads::shared; use Thread::Semaphore; my $s = Thread::Semaphore->new(); $s->down(); # P operation ... $s->up(); # V operation |
从本质上说,信号量是一个共享的整型变量的引用。默认情况下,它的初始值为 1,down 操作使它的值减 1,up 操作使它的值加 1。当然,你也可以自定义信号量初始值和每次 up 或 down 操作时信号量的变化。例如
use threads; use Thread::Semaphore; my $s = Thread::Semaphore->new(5); printf("s = " . ${$s} . "\n"); # s = 5 $s->down(3); printf("s = " . ${$s} . "\n"); # s = 2 ... $s->up(4); printf("s = " . ${$s} . "\n"); # s = 6 |
Thread::Queue 包为线程提供了线程安全的队列支持。与信号量类似,从内部实现上看,Thread::Queue 也是把一个通过锁机制实现同步访问的共享队列封装成了一个线程安全的包,并提供统一的使用接口。Thread::Queue 在某些情况下可以大大简化线程间通信的难度和成本。例如在生产者 - 消费者模型中,生产者可以不断地在线程队列上做 enqueue 操作,而消费者只需要不断地在线程队列上做 dequeue 操作,这就很简单地实现了生产者和消费者之间同步的问题。例如
#!/usr/bin/perl # use threads; use Thread::Queue; my $q = Thread::Queue->new(); sub produce { my $name = shift; while(1) { my $r = int(rand(100)); $q->enqueue($r); printf("$name produce $r\n"); sleep(int(rand(3))); } } sub consume { my $name = shift; while(my $r = $q->dequeue()) { printf("consume $r\n"); } } my $producer1 = threads->create(\&produce, "producer1"); my $producer2 = threads->create(\&produce, "producer2"); my $consumer1 = threads->create(\&consume, "consumer2"); $producer1->join(); $producer2->join(); $consumer1->join(); |
本文前面讨论的所有内容都在 Perl 线程核心包的范畴之内。其实 CPAN 上还有其他一些与线程相关的非核心包,它们往往也会给 Perl 线程的使用带来很大的便利,这里我们选出两个稍加介绍,抛砖引玉。
Thread::Pool 包允许你在程序中创建一批线程去完成多个类似的任务。例如当你希望创建一个多线程程序去完成检验 1000 个 ip 地址是否都能 ping 通的任务时,Thread::Pool 包可以给你带来便利。
Thread::RWLock 包为线程中的读写操作提供了锁机制的支持。例如当你有多个 reader 和 writer 线程共同访问某一个或几个文件时,Thread::RWLock 包可以给你带来便利。
本文主要介绍了 Perl 中线程的使用方法,包括线程的创建、执行与消亡,如何在线程中使用共享变量并通过锁机制、信号量和线程队列的方法来实现线程间的同步。Perl ithreads 线程模型与主流线程模型最大的不同之处在于默认情况下任何数据结构都是非共享的,或者说 Perl 中的 ithreads 是一个“非轻量级”的线程模型。虽然这样的线程模型增加了程序的开销,但它并不会在线程的功能性上打折扣,同时它也使得线程间的通讯和共享变得更加简单。这也符合了 Perl 一贯的简单而强大的理念和原则。