Perl中的多线程的实现一般有两种办法,而老版本的办法实际上是一种多进程的办法。
一 Thread->New
该办法是传统的老办法,它与folk很类似,新建一个进程时,会把当前内存空间的所有变量都复制一份传到新的进程里面。已实现共享数据。而随着技术的发展,本文不针对该方法做深入研究。
二 IThread
这种方法是通过新建一个新的perl interpreter。 默认情况下,所有的数据和变量是不被线程共享的。 如果想共享一个变量,需通过threads::shared来实现。在使用此方法的时候,需要注意以下三点:
- 变量默认是不在线程中共享的。
- 通过"use threads"引用命名空间,不能通过 eval, do, 或者 require。
- 如果有变量需要共享,必须引用"threads::shared"。 并在定义变量的时候如下:
my $var1 : shared = "value";
以下是一个简单的使用perl 多线程的例子。
Code
#!/usr/local/bin/perl
use threads;
@domain = ("tom.com", "chinadns.com", "163.com", "aol.com");
for ($i=0;$i<4;$i++)
{
print $i.'.'.$domain[$i].' ';
}
print "\n";
my $thr0 = threads->new(\&checkwhois, '0', $domain[0]);
my $thr1 = threads->new(\&checkwhois, '1', $domain[1]);
my $thr2 = threads->new(\&checkwhois, '2', $domain[2]);
my $thr3 = threads->new(\&checkwhois, '3', $domain[3]);
sub checkwhois()
{
my ($l,$r)=@_;
my $i=0;
while($i<1000000)
{
$i*$i;
$i++;
}
print "done --$l\t\n";
print $l.$r." query successful! \n";
}
$thr0->join;
$thr1->join;
$thr2->join;
$thr3->join;
这个简单的perl主要是新建了4个子线程去做不同的事情,然后调用join方法等待他们执行完成并让线程自动回收。但有时,还是需要结合folk 做一些复杂的工作,下面是关于这个的例外一个demo。
Code
use strict;
use English;
use threads;
use threads::shared;
my $items = 20;
my $maxchild = 65;
my $pid;
my $forks : shared = 1;
print "startn\n";
my $item : shared = 0;
my $myid = 1;
my $main_pid = $PID;
print "$main_pid \n";
sub Process
{
my $sid;
{
lock($item);
$item++ if ($item < $items);
}
if($sid < $items)
{
print "Child process ($PID/$myid) start : $sid/$forks\n";
print "$sid \n";
sleep(1);
print "Child process ($PID/$myid) end : $sid/$forks\n";
return 1;
}
elsif($main_pid == $PID)
{
wait;
exit 1;
}
else
{
print "Child process ($PID/$myid) exit : $sid/$forks\n";
exit 1;
}
}
while($item < $items)
{
if(($forks < $maxchild) && ($PID == $main_pid))
{
if($pid = fork)
{
$| = 1;
$forks ++;
$myid++;
print "Starting Sub Process : ($pid/$PID)\n";
}
elsif(defined $pid)
{
$| = 1;
last unless (Process);
}
else
{
die "cann't fork: $!\n";
}
}
}
该实例使用了folk 和共享数据等比较高级的用法。
在本文最后,给一个比较留下的perl 多线程的例子:上传文件到文件服务器ftp。
Code
#use strict;
use File::Copy;
use File::stat;
use File::Find;
use Net::FTP;
use threads;
use threads::shared;
my $maxthread=20;
# all running threads.
my $CurrentThreads : shared = 0;
# total files
my $total_files : shared = 0;
# succeed files
my $processed_files : shared = 0;
# skip files
my $skipped_files : shared = 0;
# ftp retry times
my $ftp_retrytimes : shared = 3;
# whether upload all the files or not, -1 indecate no and 1 indicate yes.
my $g_isAllFiles_uploadSuccess : shared = 1;
my $ftp_server="";
my $ftp_dir="";
my $ftp_uid="";
my $ftp_pw="";
my $ftp_timeout = 1800;
my $ftp_debug=0;
my @src_dir_files=();
my @src_dir_NameListFile=();
my @wc_exclude=("_vti", ".lob", "\\bak", "\\data", "server.inc");
my $logFileName = 'upload.log';
my $log_cnt=0;
my $span=0;
my $start_date = TimeString(time);
print $start_date . "\n";
my $g_uploadSuccess = 1;
my $g_strLastError="";
################################################################################
################ Convert between "\"(backlash) and "/" ########################
################################################################################
sub BacklashToLash
{
my ($s) = @_;
$s = s/\\/\//gis;
return $s;
}
sub LashToBacklash
{
my ($s) = @_;
$s = s/\//\\/gis;
return $s;
}
################################################################################
####################### format the time strings ###############################
################################################################################
sub TimeString
{
my ($tm) = @_;
my ($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst) = localtime($tm);
return sprintf("%04d-%02d-%02d %02d:%02d:%02d", $year+1900, $mon+1, $mday, $hour, $min, $sec);
}
sub ShortTimeString
{
my ($tm) = @_;
my ($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst) = localtime($tm);
return sprintf("%04d-%02d-%02d_%02d_%02d", $year+1900, $mon+1, $mday, $hour, $min);
}
sub ScanDate
{
# scan the date format "2009-03-29 09:09:51"
my ($date) = @_;
my ($year, $month, $day, $hour, $minute, $seconds);
$year = substr($date, 0, 4);
$month = substr($date, 5, 2);
$day = substr($date, 8, 2);
$hour = substr($date, 11, 2);
$minute = substr($date, 14, 2);
$seconds = substr($date, 17, 2);
return ($year, $month, $day, $hour, $minute, $seconds);
}
################################################################################
############### get the directory of current file name ########################
################################################################################
sub GetDirFromFileName
{
my ($s) = @_;
my $pos = rindex($s, "\\");
return substr($s, 0, $pos);
}
################################################################################
######################## log method to log files ##############################
################################################################################
my $HLOG;
sub LOG
{
my ($text) = @_;
my $time = TimeString(time);
my $LOG_STEP = 10;
FlushLogFile() if ($log_cnt % $LOG_STEP) == 0 or $log_cnt == 0;
$log_cnt ++;
print HLOG "[$time] $text\n";
}
sub OpenLogFile
{
CloseLogFile();
open(HLOG, ">>$logFileName") or die ("Open file error.");
}
sub CloseLogFile
{
close(HLOG) if defined HLOG;
}
sub FlushLogFile
{
CloseLogFile();
OpenLogFile();
}
################################################################################
######################## Process File method ##############################
################################################################################
sub ProcessFile
{
# The total thread number add one
{
lock($CurrentThreads);
$CurrentThreads++;
}
# get the thread
my ($srcThread, $dstThread, $dstdirThread) = @_;
# Increase file number.
{
lock($total_files);
$total_files++;
LOG("Processing $total_files \"$srcThread\" ");
}
my $need_upload = 0;
my $bPutResult = 0;
my $t1 = $lookup{$srcThread};
my $t2 = TimeString(stat($srcThread)->mtime);
if(not defined $t1)
{
$lookup{$srcThread} = $t2;
$need_upload = 1;
}
else
{
# time longer than 5
my $delta_sec = 10;
$need_upload = 1 if $delta_sec > 5;
}
if($need_upload > 0)
{
for(my $nProcessIndex = 1; $nProcessIndex < $ftp_retrytimes; $nProcessIndex++)
{
my $ftp = Net::FTP->new($ftp_server, Debug=>$ftp_debug, Timeout=>$ftp_timeout);
if($@)
{
$g_strLastError = "Can't connect to the FTP server, the reason: " . $@;
LOG("$g_strLastError\n");
}
else
{
$ftp->binary;
LOG("The $nProcessIndex time to try upload file from \"$srcThread\" to \"$dstThread\". Current total thread number is $CurrentThreads");
{
$bPutResult = 0;
$ftp->mkdir($dstdirThread, 1);
$ftp->put($srcThread, $dstThread) or $bPutResult = -1;
}
if($bPutResult < 0)
{
LOG("The $nProcessIndex time to try upload file FAILED from \"$srcThread\" to \"$dstThread\" (des-dir : \"$dstdirThread\").");
if($@)
{
LOG("The reason is $@ \n");
}
}
else
{
LOG("The $nProcessIndex time to try upload file SUCCEED from \"$srcThread\" to \"$dstThread\"");
{
lock($processed_files);
$processed_files++;
}
#close the connect
$ftp->quit() if ($ftp);
last;
}
}
$ftp->quit() if ($ftp);
}
if($bPutResult < 0)
{
# failed for $ftp_retrytimes and skipp
{
lock($skipped_files);
$skipped_files ++;
lock($g_isAllFiles_uploadSuccess);
$g_isAllFiles_uploadSuccess = -1;
}
}
}
else
{
# skipp
{
lock($skipped_files);
$skipped_files ++;
}
}
# decrease current thread
{
lock($CurrentThreads);
$CurrentThreads--;
}
}
sub ProcessFiles
{
my $srcdir = LashToBacklash($File::Find::dir);
my $srcpath = LashToBacklash($File::Find::name);
my $base = LashToBacklash($File::Find::topdir);
foreach my $exclude (@wc_exclude)
{
if(index($srcpath, $exclude) > -1)
{
$File::Find::prune = 1 if -d $srcpath;
return;
}
}
if(-d $srcpath)
{
return;
}
my $dstdir = $srcdir;
my $dstpath = $srcpath;
$dstdir =~ s{\Q$base\E}{$ftp_dir}is;
$dstpath =~ s{\Q$base\E}{$ftp_dir}is;
$dstdir = BacklashToLash($dstdir);
$dstpath = BacklashToLash($dstpath);
# old way. one by one
# processFile($srcpath, $dstpath, $detdir);
# new way threads
while(1)
{
if($CurrentThreads < $maxthread)
{
my $thread = threads->create('ProcessFile', $srcpath, $dstpath, $detdir);
push(@$self, \$thread);
$thread->detach();
}
else
{
LOG("-sleep 1 second");
sleep 1;
}
}
}
################################################################################
######################## Main GOES HERE ###############################
################################################################################
# step 1: try to login the ftp.
$start_date = time();
LOG("Connecting to the ftp server($ftp_server)");
my $ftp = Net::FTP->new($ftp_server, Debug=>$ftp_debug, Timeout=>$ftp_timeout);
if($@)
{
$g_strLastError = "Can't connect to the FTP server, the reason: " . $@;
LOG("$g_strLastError\n");
$g_uploadSuccess = -1;
}
else
{
$ftp->login($ftp_uid, $ftp_pw);
if($@)
{
$g_strLastError = "Can't login to the FTP server, the reason: " . $@;
LOG("$g_strLastError\n");
$g_uploadSuccess = -1;
}
else
{
LOG("Connect ftp server successful!");
$ftp->quit();
# step 2: upload the files
my %lookup;
LOG("Start to upload files in directory(@src_dir_files)");
find(\&ProcessFiles, @src_dir_files);
LOG("The directoty(@src_dir_files) have been completed. The result: ");
foreach my $thread (@$self)
{
print("Joining thread\n");
$$thread->join();
}
#step 3:
if($g_isAllFiles_uploadSuccess > 0)
{
LOG("+==================================================================+");
LOG("Start to upload files in directory(@src_dir_NameListFile)");
find(\&ProcessFiles, @src_dir_NameListFile);
LOG("The directoty(@src_dir_NameListFile) have been completed. The result: ");
foreach my $thread (@$self)
{
print("Joining thread\n");
$$thread->join();
}
LOG("The directory (@rc_dir_NameListFile) has been completed.");
LOG("+==================================================================+");
}
else
{
LOG("+==================================================================+");
LOG("These files will not be upload for directory(@src_dir_files) failed.");
LOG("+==================================================================+");
}
#Step 4: log time
$span = time() - $start_date;
LOG("Upload succeed! \nTime:$span second. the total files is $total_files. \
\nSucceed are $processed_files and skipped are $skipped_files.\n");
}
CloseLogFile();
}
################################################################################
很多时候使用perl多线程可以达到很不错的效果,可以节约很多时间完成很复杂的工作。但通过perl threads模块的描述文件可以看到,它也有很多缺点。比如说在使用perl多线程的时候,必须的保证所有引用的模块都是支持thread。而在实际应用中,我们很难做到这样。比如我们要多线程,但同时要应用OLE模块去操作activex。 此用例应该是一种很常见的用例。那是不是意味着此时我们不得不放弃使用多线程呢。 非也, 本文介绍一种可以使用多线程和ole的例子。
在官方网站上对这种情况给出的方案是:
If the module will only be used inside a thread, you can try loading the module from inside the thread entry point function using require
(and import
if needed):
sub thr_func
{
require Unsafe::Module
# Unsafe::Module->import(...);
....
}
If the module is needed inside the main thread, try modifying your application so that the module is loaded (again using require
and ->import()
) after any threads are started, and in such a way that no other threads are started afterwards。
再次,主要讨论一下第二种情况,既主要是该非thread模块放到方法中引用。下面是一个demo。
Code
use threads;
use threads::shared;
use Thread::Queue;
no warnings 'threads';
# Time out const
my $TIMEOUT : shared;
$TIMEOUT = 1;
# the sig for end thread
my $TERM : shared;
$TERM = 0;
#my $excel;
$SIG{'INT'} = $SIG{'TERM'} = sub{ print("\n>>> Terminating <<<\n"); $TERM=1;};
$SIG{'KILL'} = sub{ printf("%3d <- Killed\n", threads->tid());
threads->detach() if !threads->is_detached();
threads->exit(); };
sub ThreadWatcher
{
my $queue = shift;
my %timers;
while(!$TERM)
{
#print "ThreadWatcher -- TERM : $TERM\n";
while(my $tid = $queue->dequeue_nb())
{
if (! defined($timers{$tid}{'timeout'} = $queue->dequeue()) ||
! defined($timers{$tid}{'thread'} = threads->object($tid)))
{
# No timeout - unregister thread
delete($timers{$tid});
}
}
foreach my $tid (keys(%timers))
{
#print "$timers{$tid}{'thread'} \n";
if(--$timers{$tid}{'timeout'} < 0)
{
print "thread $timers{$tid}{'thread'} will be killed.\n";
$timers{$tid}{'thread'}->kill('KILL');
delete($timers{$tid});
}
}
# tick tock
sleep(1);
}
}
sub Worker
{
#eval {use Win32::OLE::Variant;};
my ($queue, $dataqueue) = @_;
# get the thread id and register with watch
my $tid = threads->tid();
printf("Working -> %3d\n", $tid);
$queue->enqueue($tid, $TIMEOUT);
print "Worker -- TERM : $TERM\n";
while(!$TERM)
{
#my $App = $dataqueue->dequeue();
my $data = $dataqueue->dequeue();
#deal with the data
#print "Worker -- DATA : $App\n";
print "Worker -- DATA : $data\n";
#my $var = Win32::OLE::Variant->new(VT_BSTR, $data);
#print "Worker VAR: $var\n";
}
# Remove signal handler
$SIG{'KILL'} = sub {};
# Unregister with timer thread
$queue->enqueue($tid, undef);
# Tell user we're done
printf("%3d <- Finished\n", $tid);
threads->detach() if ! threads->is_detached();
threads->exit();
}
# create time thread
my $watchQueue = Thread::Queue->new();
threads->create('ThreadWatcher', $watchQueue)->detach();
# create work thread
my $dataQueue = Thread::Queue->new();
threads->create('Worker', $watchQueue, $dataQueue);
NoneSafeModelScript('C:\Joe_Chen\Perl_Projects\Threads\update.xlsx');
WairLongTime(10);
sub WairLongTime
{
my $temp = $_[0];
$temp = $temp * 10000000;
for(my $index = 0; $index < $temp; $index++)
{
$index * $index;
}
return $index;
}
sub NoneSafeModelScript
{
eval 'use Win32::OLE';
eval 'use Win32::OLE::Variant';
my $excel;
for(my $index = 0; $index < 600; $index++)
{
print "Getting the Excel ActiveObject. Try # $index \n";
WairLongTime(1);
eval
{
$excel = Win32::OLE->GetActiveObject('Excel.Application') || Win32::OLE->new('Excel.Application', 'Quit');
};
if($@ or $excel == undef)
{
print "Unsuccessful: $@ \n";
if($index == 599)
{
print "ERROR:Don\'t got the Excel Application";
}
}
else
{
last;
}
}
my $path = $_[0];
my $book = $excel->workbooks->open($path);
my $sheet = $book->worksheets(1);
my $values = $sheet->Range("A1:D5")->{Value};
my $row_counts = $sheet->Range("A1:C3")->{Rows}->{Count};
my $column_counts = $sheet->Range("A1:C3")->{Columns}->{Count};
print "NoneSafeModelScript : $row_counts \n";
print "NoneSafeModelScript : $column_counts \n";
for(my $row=1; $row<$row_counts + 1; $row++)
{
my $array_ref = $sheet->Cells($row,1)->{Value};
print "NoneSafeModelScript : $array_ref \n";
my $var = Variant(Win32::OLE::Variant->VT_BSTR, $array_ref);
my $v = ref($var);
#my $v = $var->Type();
print "NoneSafeModelScript VAR: $var\n";
print "NoneSafeModelScript VAR: $v\n";
#$dataQueue->enqueue($var);
$dataQueue->enqueue($array_ref);
WairLongTime(2);
}
my $v = Variant(VT_DATE, "April 1 99");
print $v->Type, "\n";
print $v->Date(DATE_LONGDATE), "\n";
print $v->Date("ddd',' MMM dd yy"), "\n";
print Win32::OLE::Variant->VT_BSTR , "\n";
$book->Close;
$excel->Quit;
}
sub Wrap
{
my $value = $_[0];
my $var = Variant(VT_DATE, 'Jan 1,1970');
print "Wrap : $var \n";
return $var;
}
在此例子中,用到了queue,它的作用是将非thread 安全的数据通过管道传输,这样能避免他们互相调用。
转自
http://www.cnblogs.com/joechen/archive/2009/04/27/1444602.html
阅读(2261) | 评论(0) | 转发(1) |