全部博文(47)
2013年(47)
分类: PERL
2013-04-25 18:07:36
创建线程
线程作为 Perl 中的一种实体,其一生可以粗略的分为创建,运行与退出这三个阶段。创建使得线程从无到有,运行则是线程完成其主要工作的阶段,退出自然就是指线程的消亡。线程的运行和普通函数的执行非常类似,有其入口参数,一段特定的代码流程以及执行完毕后返回的一个或一组结果,唯一与普通函数调用的不同之处就在于新建线程的执行与当前线程的执行是并行的。
Perl 里创建一个新的线程非常简单,主要有两种方法,他们分别是:
use threads; sub say_hello { printf("Hello thread! @_.\n"); return( rand(10) ); } my $t1 = threads->create( \&say_hello, "param1", "param2" ); my $t2 = threads->create( "say_hello", "param3", "param4" ); my $t3 = threads->create( sub { printf("Hello thread! @_\n"); return( rand(10) ); }, "param5", "param6" ); |
#!/usr/bin/perl # use threads; my $t4 = async{ printf("Hello thread!\n"); }; |
join 方法和 detach 方法
线程一旦被成功创建,它就立刻开始运行了,这个时候你面临两种选择,分别是 join 或者 detach 这个新建线程。当然你也可以什么都不做,不过这可不是一个好习惯,后面我们会解释这是为什么。
我们先来看看 join 方法, 这也许是大多数情况下你想要的。从字面上来理解,join 就是把新创建的线程结合到当前的主线程中来,把它当成是主线程的一部分,使他们合二为一。join 会触发两个动作,首先,主线程会索取新建线程执行结束以后的返回值;其次,新建线程在执行完毕并返回结果以后会自动释放它自己所占用的系统资源。例如
#!/usr/bin/perl # use threads; sub func { sleep(1); return(rand(10)); } my $t1 = threads->create( \&func ); my $t2 = threads->create( \&func ); printf("do something in the main thread\n"); my $t1_res = $t1->join(); my $t2_res = $t2->join(); printf("t1_res = $t1_res\nt2_res = $t2_res\n"); |
由此我们不难发现,调用 join 的时机是一个十分有趣的问题。如果调用 join 方法太早,新建线程尚未执行完毕,自然就无法返回任何结果,那么这个时候,主线程就不得不被阻塞,直到新建线程执行完毕之后,才能获得返回值,然后资源会被释放,join 才能结束,这在很大程度上破话了线程之间的并行性。相反,如果调用 join 方法太晚,新建线程早已执行完毕,由于一直没有机会返回结果,它所占用的资源就一直无法得到释放,直到被 join 为止,这在很大程度上浪费了宝贵的系统资源。因此,join 新建线程的最好时机应该是在它刚刚执行完毕的时候,这样既不会阻塞当前线程的执行,又可以及时释放新建线程所占用的系统资源。
我们再来看看 detach 方法,这也许是最省心省力的处理方法了。从字面上来理解,detach 就是把新创建的线程与当前的主线程剥离开来,让它从此和主线程无关。当你使用 detach 方法的时候,表明主线程并不关心新建线程执行以后返回的结果,新建线程执行完毕后 Perl 会自动释放它所占用的资源。例如
#!/usr/bin/perl # use threads; use Config; sub say_hello { my ( $name ) = @_; printf("Hello World! I am $name.\n"); } my $t1 = threads->create( \&say_hello, "Alex" ); $t1->detach(); printf("doing something in main thread\n"); sleep(1); |
一个新建线程一旦被 detach 以后,就无法再 join 了。当你使用 detach 方法剥离线程的时候,有一点需要特别注意,那就是你需要保证被创建的线程先于主线程结束,否则你创建的线程会被迫结束,除非这种结果正是你想要的,否则这也许会造成异常情况的出现,并增加程序调试的难度。
本节的开始我们提到,新线程被创建以后,如果既不 join,也不 detach 不是一个好习惯,这是因为除非明确地调用 detach 方法剥离线程,Perl 会认为你也许要在将来的某一个时间点调用 join,所以新建线程的返回值会一直被保存在内存中以备不时之需,它所占用的系统资源也一直不会得到释放。然而实际上,你打算什么也不做,因此宝贵的系统资源直到整个 Perl 应用结束时才被释放。同时,由于你即没有调用 join 有没有调用 detach,应用结束时 Perl 还会返回给你一个线程非正常结束的警告。
线程的消亡
大多数情况下,你希望你创建的线程正常退出,这就意味着线程所对应的函数体在执行完毕后返回并释放资源。例如在清单 5 的示例中,新建线程被 join 以后的退出过程。可是,如果由于 detach 不当或者由于主线因某些意外的异常提前结束了,尽管它所创建的线程可能尚未执行完毕,但是他们还是会被强制中止,正所谓皮之不存,毛将焉附。这时你也许会得到一个类似于“Perl exited with active threads”的警告。
当然,你也可以显示地调用 exit() 方法来结束一个线程,不过值得注意的是,默认情况下,如果你在一个线程中调用了 exit() 方法, 其他线程都会随之一起结束,在很多情况下,这也许不是你想要的,如果你希望 exit() 方法只在调用它的线程内生效,那么你在创建该线程的时候就需要设置’ exit ’ => ’ thread_only ’。例如
#!/usr/bin/perl # use threads; sub say_hello { printf("Hello thread! @_.\n"); sleep(10); printf("Bye\n"); } sub quick_exit { printf("I will be exit in no time\n"); exit(1); } my $t1 = threads->create( \&say_hello, "param1", "param2" ); my $t2 = threads->create( {'exit'=>'thread_only'}, \&quick_exit ); $t1->join(); $t2->join(); |
如果你希望每个线程的 exit 方法都只对自己有效,那么在每次创建一个新线程的时候都去要显式设置’ exit ’ => ’ thread_only ’属性显然有些麻烦,你也可以在引入 threads 包的时候设置这个属性在全局范围内有效,例如
use threads ('exit' => 'threads_only'); sub func { ... if( $condition ) { exit(1); } } my $t1 = threads->create( \&func ); my $t2 = threads->create( \&func ); $t1->join(); $t2->join(); |
共享与同步
threads::shared
和现有大多数线程模型不同,在 Perl ithreads 线程模型中,默认情况下任何数据结构都不是共享的。当一个新线程被创建以后,它就已经包含了当前所有数据结构的一份私有拷贝,新建线程中对这份拷贝的数据结构的任何操作都不会在其他线程中有效。因此,如果需要使用任何共享的数据,都必须显式地申明。threads::shared 包可以用来实现线程间共享数据的目的。
#!/usr/bin/perl # use threads; use threads::shared; use strict; my $var :shared = 0; # use :share tag to define my @array :shared = (); # use :share tag to define my %hash = (); share(%hash); # use share() funtion to define sub start { $var = 100; @array[0] = 200; @array[1] = 201; $hash{'1'} = 301; $hash{'2'} = 302; } sub verify { sleep(1); # make sure thread t1 execute firstly printf("var = $var\n"); # var=100 for(my $i = 0; $i < scalar(@array); $i++) { printf("array[$i] = $array[$i]\n"); # array[0]=200; array[1]=201 } foreach my $key ( sort( keys(%hash) ) ) { printf("hash{$key} = $hash{$key}\n"); # hash{1}=301; hash{2}=302 } } my $t1 = threads->create( \&start ); my $t2 = threads->create( \&verify ); $t1->join(); $t2->join(); |
锁
多线程间既然有了共享的数据,那么就必须对共享数据进行小心地访问,否则,冲突在所难免。Perl ithreads 线程模型中内置的 lock 方法实现了线程间共享数据的锁机制。有趣的是,并不存在一个 unlock 方法用来显式地解锁,锁的生命周期以代码块为单位,也就是说,当 lock 操作所在的代码块执行结束之后,也就是锁被隐式释放之时。例如
use threads::shared; # in thread 1 { lock( $share ); # lock for 3 seconds sleep(3); # other threads can not lock again } # unlock implicitly now after the block # in thread 2 { lock($share); # will be blocked, as already locked by thread 1 $share++; # after thread 1 quit from the block } # unlock implicitly now after the block |
上面的示例中,我们在 thread 1 中使用 lock 方法锁住了一个普通的标量,这会导致 thread 2 在试图获取 $share 变量的锁时被阻塞,当 thread 1 从调用 lock 的代码块中退出时,锁被隐式地释放,从而 thread 2 阻塞结束,lock 成功以后,thread 2 才可以执行 $share++ 的操作。对于数组和哈希表来说,lock 必须用在整个数据结构上,而不是用在数组或哈希表的某一个元素上。例如
use threads; use threads::shared; { lock(@share); # the array has been locked lock(%hash); # the hash has been locked sleep(3); # other threads can not lock again } { lock($share[1]); # error will occur lock($hash{key}); # error will occur } |
假如一个线程对某一个共享变量实施了锁操作,在它没有释放锁之前,如果另外一个线程也对这个共享变量实施锁操作,那么这个线程就会被阻塞,阻塞不会被自动中止而是直到前一个线程将锁释放为止。这样的模式就带来了我们常见的死锁问题。例如
use threads; use threads::shared; # in thread 1 { lock($a); # lock for 3 seconds sleep(3); # other threads can not lock again lock($b); # dead lock here } # in thread 2 { lock($b); # will be blocked, as already locked by thread 1 sleep(3); # after thread 1 quit from the block lock($a); # dead lock here } |
死锁常常是多线程程序中最隐蔽的问题,往往难以发现与调试,也增加了排查问题的难度。为了避免在程序中死锁的问题,在程序中我们应该尽量避免同时获取多个共享变量的锁,如果无法避免,那么一是要尽量使用相同的顺序来获取多个共享变量的锁,另外也要尽可能地细化上锁的粒度,减少上锁的时间。
信号量
Thread::Semaphore 包为线程提供了信号量的支持。你可以创建一个自己的信号量,并通过 down 操作和 up 操作来实现对资源的同步访问。实际上,down 操作和 up 操作对应的就是我们所熟知的 P 操作和 V 操作。从内部实现上看,Thread::Semaphore 本质上就是加了锁的共享变量,无非是把这个加了锁的共享变量封装成了一个线程安全的包而已。由于信号量不必与任何变量绑定,因此,它非常灵活,可以用来控制你想同步的任何数据结构和程序行为。例如
use threads; use threads::shared; use Thread::Semaphore; my $s = Thread::Semaphore->new(); $s->down(); # P operation ... $s->up(); # V operation |
从本质上说,信号量是一个共享的整型变量的引用。默认情况下,它的初始值为 1,down 操作使它的值减 1,up 操作使它的值加 1。当然,你也可以自定义信号量初始值和每次 up 或 down 操作时信号量的变化。例如
use threads; use Thread::Semaphore; my $s = Thread::Semaphore->new(5); printf("s = " . ${$s} . "\n"); # s = 5 $s->down(3); printf("s = " . ${$s} . "\n"); # s = 2 ... $s->up(4); printf("s = " . ${$s} . "\n"); # s = 6 |
线程队列
Thread::Queue 包为线程提供了线程安全的队列支持。与信号量类似,从内部实现上看,Thread::Queue 也是把一个通过锁机制实现同步访问的共享队列封装成了一个线程安全的包,并提供统一的使用接口。Thread::Queue 在某些情况下可以大大简化线程间通信的难度和成本。例如在生产者 - 消费者模型中,生产者可以不断地在线程队列上做 enqueue 操作,而消费者只需要不断地在线程队列上做 dequeue 操作,这就很简单地实现了生产者和消费者之间同步的问题。例如
#!/usr/bin/perl # use threads; use Thread::Queue; my $q = Thread::Queue->new(); sub produce { my $name = shift; while(1) { my $r = int(rand(100)); $q->enqueue($r); printf("$name produce $r\n"); sleep(int(rand(3))); } } sub consume { my $name = shift; while(my $r = $q->dequeue()) { printf("consume $r\n"); } } my $producer1 = threads->create(\&produce, "producer1"); my $producer2 = threads->create(\&produce, "producer2"); my $consumer1 = threads->create(\&consume, "consumer2"); $producer1->join(); $producer2->join(); $consumer1->join(); |
其他有用的非核心包
本文前面讨论的所有内容都在 Perl 线程核心包的范畴之内。其实 CPAN 上还有其他一些与线程相关的非核心包,它们往往也会给 Perl 线程的使用带来很大的便利,这里我们选出两个稍加介绍,抛砖引玉。
Thread::Pool 包允许你在程序中创建一批线程去完成多个类似的任务。例如当你希望创建一个多线程程序去完成检验 1000 个 ip 地址是否都能 ping 通的任务时,Thread::Pool 包可以给你带来便利。
Thread::RWLock 包为线程中的读写操作提供了锁机制的支持。例如当你有多个 reader 和 writer 线程共同访问某一个或几个文件时,Thread::RWLock 包可以给你带来便利。
总结
本文主要介绍了 Perl 中线程的使用方法,包括线程的创建、执行与消亡,如何在线程中使用共享变量并通过锁机制、信号量和线程队列的方法来实现线程间的同步。Perl ithreads 线程模型与主流线程模型最大的不同之处在于默认情况下任何数据结构都是非共享的,或者说 Perl 中的 ithreads 是一个“非轻量级”的线程模型。虽然这样的线程模型增加了程序的开销,但它并不会在线程的功能性上打折扣,同时它也使得线程间的通讯和共享变得更加简单。这也符合了 Perl 一贯的简单而强大的理念和原则。
参考资料