Chinaunix首页 | 论坛 | 博客
  • 博客访问: 5393079
  • 博文数量: 1144
  • 博客积分: 11974
  • 博客等级: 上将
  • 技术积分: 12312
  • 用 户 组: 普通用户
  • 注册时间: 2005-04-13 20:06
文章存档

2017年(2)

2016年(14)

2015年(10)

2014年(28)

2013年(23)

2012年(29)

2011年(53)

2010年(86)

2009年(83)

2008年(43)

2007年(153)

2006年(575)

2005年(45)

分类: LINUX

2010-09-21 11:28:25

package ThreadsGroup;
use warnings;
use strict;
use threads;
use threads::shared;
use Thread::Semaphore;
use vars qw($VERSION);
$VERSION = 0.4;
sub new
{
        my $invocant = shift;
        my $class = ref($invocant) || $invocant;
        my $default_thread = 10;
        my $self = { 'max_thread' => $default_thread, @_ };
        my $thread_num_sem = Thread::Semaphore->new( $self->{'max_thread'} );
        $self->{'thread_num_sem'} = $thread_num_sem;
        # hashtable use to store working thread
        my %work_threads : shared = ( );
        my $no_more_job : shared = 0;
        $self->{'work_threads'} = \%work_threads;
        $self->{'no_more_job'} = \$no_more_job;
        my $thr = threads->create( "_Monitor", $thread_num_sem, \$no_more_job, \%work_threads );
        # monitor thread
        $self->{'monitor'} = $thr;
        return bless $self, $class;
}
sub _Monitor
{
        my $thread_num_sem = shift;
        my $no_more_job = shift;
        my $work_threads = shift;
        while( 1 )
        {
                # join all threads that can be joined.
                foreach ( keys %{$work_threads} )
                {
                        if( $work_threads->{$_}->{'thread'}->is_joinable() )
                        {
                                $work_threads->{$_}->{'thread'}->join( );
                                #print "thread ".$_." finish.\n";
                                $thread_num_sem->up();
                                delete $work_threads->{$_};
                        }
                }
                if( ${$no_more_job} == 1 )
                {
                        #print "waiting to quit.\n";
                        last;
                }
                sleep 2;
        }
        # last chance to join all threads
        foreach ( threads->list(threads::all) )
        {
                if( threads->self()->equal($_) )
                {
                        #print "get monitor thread, continue\n";
                        next;
                }
                $_->join( );
        }
        return;
}
sub AddJob
{
        my $self = shift;
        my $function = shift;
        my @args = @_;
        $self->{'thread_num_sem'}->down( );
        my $thr = threads->create( $function, @args );
        share( $thr );
        my $ref : shared;
        my %tmp_hash : shared = ( thread=>$thr );
        $ref = \%tmp_hash;
        $self->{'work_threads'}->{$thr->tid()} = $ref;
}
sub WaitAll
{
        my $self = shift;
        ${$self->{'no_more_job'}} = 1;
        $self->{'monitor'}->join();
        undef $self;
}
 
 
###########################################################
#!/usr/bin/perl
use warnings;
use strict;
use LWP::Simple;
use Data::Dumper;
use lib "./";
use ThreadsGroup;
my $pool = ThreadsGroup->new( 'max_thread' => 10 );
foreach ( 1 ... 15 )
{
        $pool->AddJob( \&test, $_ );
}
$pool->WaitAll();
$pool = ThreadsGroup->new( 'max_thread' => 3 );
my @tmp_arr = ( 1, 3, 9, 27 );
foreach ( 1 ... 8 )
{
        $pool->AddJob( \&test2, \@tmp_arr );
}
$pool->WaitAll();
$pool = ThreadsGroup->new( 'max_thread' => 3 );
foreach ( 1 ... 8 )
{
        $pool->AddJob( \&test3,  $_, $_ + 1 );
}
$pool->WaitAll();
sub test
{
        my $arg = shift;
        print $arg,"\n";
        sleep 3;
}
sub test2
{
 my $arg = shift;
 print Dumper($arg);
 sleep 2;
}
sub test3
{
 my $arg1 = shift;
 my $arg2 = shift;
 print "$arg1, $arg2\n";
 sleep 2;
}
阅读(1360) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~