package ThreadsGroup;
use warnings;
use strict;
use threads;
use threads::shared;
use Thread::Semaphore;
use vars qw($VERSION);
$VERSION = 0.4;
sub new
{
my $invocant = shift;
my $class = ref($invocant) || $invocant;
my $default_thread = 10;
my $self = { 'max_thread' => $default_thread, @_ };
my $thread_num_sem = Thread::Semaphore->new( $self->{'max_thread'} );
$self->{'thread_num_sem'} = $thread_num_sem;
# hashtable use to store working thread
my %work_threads : shared = ( );
my $no_more_job : shared = 0;
$self->{'work_threads'} = \%work_threads;
$self->{'no_more_job'} = \$no_more_job;
my $thr = threads->create( "_Monitor", $thread_num_sem, \$no_more_job, \%work_threads );
# monitor thread
$self->{'monitor'} = $thr;
return bless $self, $class;
}
sub _Monitor
{
my $thread_num_sem = shift;
my $no_more_job = shift;
my $work_threads = shift;
while( 1 )
{
# join all threads that can be joined.
foreach ( keys %{$work_threads} )
{
if( $work_threads->{$_}->{'thread'}->is_joinable() )
{
$work_threads->{$_}->{'thread'}->join( );
#print "thread ".$_." finish.\n";
$thread_num_sem->up();
delete $work_threads->{$_};
}
}
if( ${$no_more_job} == 1 )
{
#print "waiting to quit.\n";
last;
}
sleep 2;
}
# last chance to join all threads
foreach ( threads->list(threads::all) )
{
if( threads->self()->equal($_) )
{
#print "get monitor thread, continue\n";
next;
}
$_->join( );
}
return;
}
sub AddJob
{
my $self = shift;
my $function = shift;
my @args = @_;
$self->{'thread_num_sem'}->down( );
my $thr = threads->create( $function, @args );
share( $thr );
my $ref : shared;
my %tmp_hash : shared = ( thread=>$thr );
$ref = \%tmp_hash;
$self->{'work_threads'}->{$thr->tid()} = $ref;
}
sub WaitAll
{
my $self = shift;
${$self->{'no_more_job'}} = 1;
$self->{'monitor'}->join();
undef $self;
}
###########################################################
#!/usr/bin/perl
use warnings;
use strict;
use LWP::Simple;
use Data::Dumper;
use lib "./";
use ThreadsGroup;
my $pool = ThreadsGroup->new( 'max_thread' => 10 );
foreach ( 1 ... 15 )
{
$pool->AddJob( \&test, $_ );
}
$pool->WaitAll();
$pool = ThreadsGroup->new( 'max_thread' => 3 );
my @tmp_arr = ( 1, 3, 9, 27 );
foreach ( 1 ... 8 )
{
$pool->AddJob( \&test2, \@tmp_arr );
}
$pool->WaitAll();
$pool = ThreadsGroup->new( 'max_thread' => 3 );
foreach ( 1 ... 8 )
{
$pool->AddJob( \&test3, $_, $_ + 1 );
}
$pool->WaitAll();
sub test
{
my $arg = shift;
print $arg,"\n";
sleep 3;
}
sub test2
{
my $arg = shift;
print Dumper($arg);
sleep 2;
}
sub test3
{
my $arg1 = shift;
my $arg2 = shift;
print "$arg1, $arg2\n";
sleep 2;
}
阅读(1350) | 评论(0) | 转发(0) |