Chinaunix首页 | 论坛 | 博客
  • 博客访问: 531831
  • 博文数量: 126
  • 博客积分: 851
  • 博客等级: 准尉
  • 技术积分: 1287
  • 用 户 组: 普通用户
  • 注册时间: 2012-10-06 11:21
个人简介

个人最新博客地址http://www.skylway.com/

文章分类

全部博文(126)

文章存档

2016年(2)

2014年(60)

2013年(35)

2012年(29)

分类:

2012-12-06 17:47:40

原文地址:perl的多线程处理 作者:snowtty

Perl语言的多线程(一)

Perl中的多线程的实现一般有两种办法,而老版本的办法实际上是一种多进程的办法。

一   Thread->New

该办法是传统的老办法,它与folk很类似,新建一个进程时,会把当前内存空间的所有变量都复制一份传到新的进程里面。已实现共享数据。而随着技术的发展,本文不针对该方法做深入研究。

二   IThread

这种方法是通过新建一个新的perl interpreter。 默认情况下,所有的数据和变量是不被线程共享的。 如果想共享一个变量,需通过threads::shared来实现。在使用此方法的时候,需要注意以下三点:

  1. 变量默认是不在线程中共享的。
  2. 通过"use threads"引用命名空间,不能通过 eval, do, 或者 require。
  3. 如果有变量需要共享,必须引用"threads::shared"。 并在定义变量的时候如下:

            my $var1 : shared = "value";

 以下是一个简单的使用perl 多线程的例子。

 


#!/usr/local/bin/perl   
use threads;   

@domain   
=   ("tom.com",   "chinadns.com",   "163.com",   "aol.com");   
for ($i=0;$i<4;$i++)
{   
    print   $i.
'.'.$domain[$i].'     ';   
}   
print   
"\n";   
    
my   $thr0   
=   threads->new(\&checkwhois,   '0',   $domain[0]);   
my   $thr1   
=   threads->new(\&checkwhois,   '1',   $domain[1]);   
my   $thr2   
=   threads->new(\&checkwhois,   '2',   $domain[2]);   
my   $thr3   
=   threads->new(\&checkwhois,   '3',   $domain[3]);   
    
sub   checkwhois()   
{   
    my ($l,$r)
=@_;   
    my $i
=0;   
    
while($i<1000000)   
    {   
          $i
*$i;   
          $i
++;   
    }   
    print   
"done  --$l\t\n";   
    print   $l.$r.
"   query   successful!   \n";    
}

$thr0
->join;  
$thr1
->join;   
$thr2
->join;   
$thr3
->join; 

 

这个简单的perl主要是新建了4个子线程去做不同的事情,然后调用join方法等待他们执行完成并让线程自动回收。但有时,还是需要结合folk 做一些复杂的工作,下面是关于这个的例外一个demo。

 


use strict;
use English;
use threads;
use threads::shared;

my $items 
= 20;
my $maxchild 
= 65;
my $pid;
my $forks : shared 
= 1;

print 
"startn\n";

my $item : shared 
= 0;
my $myid 
= 1;
my $main_pid 
= $PID;

print 
"$main_pid \n";

sub Process
{
    my $sid;
    
    {
        
lock($item);
        $item
++ if ($item < $items);
    }
    
    
if($sid < $items)
    {
        print 
"Child process ($PID/$myid) start : $sid/$forks\n";
        print 
"$sid \n";
        sleep(
1);
        print 
"Child process ($PID/$myid) end : $sid/$forks\n";
        
return 1;
    }
    elsif($main_pid 
== $PID)
    {
        wait;
        exit 
1;
    }
    
else
    {
        print 
"Child process ($PID/$myid) exit : $sid/$forks\n";
        exit 
1;
    }
}

while($item < $items)
{
    
if(($forks < $maxchild) && ($PID == $main_pid))
    {
        
if($pid = fork)
        {
            $
| = 1;
            $forks 
++;
            $myid
++;
            print 
"Starting Sub Process : ($pid/$PID)\n";
        }
        elsif(defined $pid)
        {
            $
| = 1;
            last unless (Process);
        }
        
else
        {
            die 
"cann't fork: $!\n"
        }
    }
}

 

该实例使用了folk 和共享数据等比较高级的用法。

在本文最后,给一个比较留下的perl 多线程的例子:上传文件到文件服务器ftp。

 


#use strict;
use File::Copy;
use File::stat;
use File::Find;
use Net::FTP;
use threads;
use threads::shared;

my $maxthread
=20;
# all running threads.
my $CurrentThreads : shared 
= 0;
# total files
my $total_files : shared 
= 0;
# succeed files
my $processed_files : shared 
= 0;
# skip files
my $skipped_files : shared 
= 0;
# ftp retry times
my $ftp_retrytimes : shared 
= 3;
# whether upload all the files or not, 
-1 indecate no and 1 indicate yes.
my $g_isAllFiles_uploadSuccess : shared 
= 1;

my $ftp_server
="";
my $ftp_dir
="";
my $ftp_uid
="";
my $ftp_pw
="";
my $ftp_timeout 
= 1800;
my $ftp_debug
=0;
my @src_dir_files
=();
my @src_dir_NameListFile
=();
my @wc_exclude
=("_vti"".lob""\\bak""\\data""server.inc");

my $logFileName 
= 'upload.log';
my $log_cnt
=0;
my $span
=0;

my $start_date 
= TimeString(time);
print $start_date . 
"\n";
my $g_uploadSuccess 
= 1;
my $g_strLastError
="";

################################################################################
################ Convert between 
"\"(backlash) and "/"  ########################
################################################################################
sub BacklashToLash
{
    my ($s) 
= @_;
    $s 
= s/\\/\//gis;
    return $s;


sub LashToBacklash
{
    my ($s) 
= @_;
    $s 
= s/\//\\/gis;
    return $s;


################################################################################
####################### format the time strings  ###############################
################################################################################
sub TimeString
{
    my ($tm) 
= @_;
    my ($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst) 
= localtime($tm);
    
return sprintf("%04d-%02d-%02d %02d:%02d:%02d", $year+1900, $mon+1, $mday, $hour, $min, $sec);
}

sub ShortTimeString
{
    my ($tm) 
= @_;
    my ($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst) 
= localtime($tm);
    
return sprintf("%04d-%02d-%02d_%02d_%02d", $year+1900, $mon+1, $mday, $hour, $min);
}

sub ScanDate
{
    # scan the date format 
"2009-03-29 09:09:51"
    my ($date) 
= @_;
    my ($year, $month, $day, $hour, $minute, $seconds);
    
    $year 
= substr($date, 04);
    $month 
= substr($date, 52);
    $day 
= substr($date, 82);
    $hour 
= substr($date, 112);
    $minute 
= substr($date, 142);
    $seconds 
= substr($date, 172);

    
return ($year, $month, $day, $hour, $minute, $seconds);
}

################################################################################
############### 
get the directory of current file name  ########################
################################################################################
sub GetDirFromFileName
{
    my ($s) 
= @_;
    my $pos 
= rindex($s, "\\");
    
return substr($s, 0, $pos);
}

################################################################################
######################## log method to log files  ##############################
################################################################################
my $HLOG;
sub LOG
{
    my ($text) 
= @_;
    my $time 
= TimeString(time);
    
    my $LOG_STEP 
= 10;
    FlushLogFile() 
if ($log_cnt % $LOG_STEP) == 0 or $log_cnt == 0;
    $log_cnt 
++;
    print HLOG 
"[$time] $text\n";
}

sub OpenLogFile
{
    CloseLogFile();
    open(HLOG, 
">>$logFileName") or die ("Open file error.");  
}

sub CloseLogFile
{
    close(HLOG) 
if defined HLOG;
}

sub FlushLogFile
{
    CloseLogFile();
    OpenLogFile();
}

################################################################################
########################   Process File method    ##############################
################################################################################
sub ProcessFile
{
    # The total thread number add one
    {
        
lock($CurrentThreads);
        $CurrentThreads
++;
    }
    
    # 
get the thread
    my ($srcThread, $dstThread, $dstdirThread) 
= @_;
    
    # Increase file number.
    {
        
lock($total_files);
        $total_files
++;
        LOG(
"Processing $total_files \"$srcThread\" ");
    }
    
    my $need_upload 
= 0;
    my $bPutResult 
= 0;
    
    my $t1 
= $lookup{$srcThread};
    my $t2 
= TimeString(stat($srcThread)->mtime);
    
    
if(not defined $t1)
    {
        $lookup{$srcThread} 
= $t2;
        $need_upload 
= 1;
    }
    
else
    {
        # time longer than 
5
        my $delta_sec 
= 10;
        $need_upload 
= 1 if $delta_sec > 5;
    }
    
    
if($need_upload > 0)
    {        
        
for(my $nProcessIndex = 1; $nProcessIndex < $ftp_retrytimes; $nProcessIndex++)
        {
            my $ftp 
= Net::FTP->new($ftp_server, Debug=>$ftp_debug, Timeout=>$ftp_timeout);
            
if($@)
            {
                $g_strLastError 
= "Can't connect to the FTP server, the reason: " . $@;
                LOG(
"$g_strLastError\n");
            }
            
else
            {
                $ftp
->binary;
                LOG(
"The $nProcessIndex time to try upload file from \"$srcThread\" to \"$dstThread\". Current total thread number is $CurrentThreads");
                
                {
                    $bPutResult 
= 0;
                    $ftp
->mkdir($dstdirThread, 1);
                    $ftp
->put($srcThread, $dstThread) or $bPutResult = -1;
                }
                
                
if($bPutResult < 0)
                {
                    LOG(
"The $nProcessIndex time to try upload file FAILED from \"$srcThread\" to \"$dstThread\" (des-dir : \"$dstdirThread\").");
                    
if($@)
                    {
                        LOG(
"The reason is $@ \n");
                    }
                }
                
else
                {
                    LOG(
"The $nProcessIndex time to try upload file SUCCEED from \"$srcThread\" to \"$dstThread\"");
                    {
                        
lock($processed_files);
                        $processed_files
++;
                    }
                    
                    #close the connect
                    $ftp
->quit() if ($ftp);
                    last;
                }
            }
            $ftp
->quit() if ($ftp);
        }
        
        
if($bPutResult < 0)
        {
            # failed 
for $ftp_retrytimes and skipp
            {
                
lock($skipped_files);
                $skipped_files 
++;
                
lock($g_isAllFiles_uploadSuccess);
                $g_isAllFiles_uploadSuccess 
= -1;
            }
        }
    }
    
else
    {
        # skipp
        {
            
lock($skipped_files);
            $skipped_files 
++;
        }
    }
    
    # decrease current thread
    {
        
lock($CurrentThreads);
        $CurrentThreads
--;
    }
}

sub ProcessFiles
{
    my $srcdir 
= LashToBacklash($File::Find::dir);
    my $srcpath 
= LashToBacklash($File::Find::name);
    my $
base = LashToBacklash($File::Find::topdir);
    
    
foreach my $exclude (@wc_exclude)
    {
        
if(index($srcpath, $exclude) > -1)
        {
            $File::Find::prune 
= 1 if -d $srcpath;
            
return;
        }
    }
    
    
if(-d $srcpath)
    {
        
return;
    }
    
    my $dstdir 
= $srcdir;
    my $dstpath 
= $srcpath;
    $dstdir 
=~ s{\Q$base\E}{$ftp_dir}is;
    $dstpath 
=~ s{\Q$base\E}{$ftp_dir}is;
    $dstdir 
= BacklashToLash($dstdir);
    $dstpath 
= BacklashToLash($dstpath);

    # old way. one by one
    # processFile($srcpath, $dstpath, $detdir);
    
    # 
new way  threads
    
while(1)
    {
        
if($CurrentThreads < $maxthread)    
        {
            my $thread 
= threads->create('ProcessFile', $srcpath, $dstpath, $detdir);
            push(@$self, \$thread);
            $thread
->detach();
        }
        
else
        {
            LOG(
"-sleep 1 second");
            sleep 
1;
        }
    }
}
################################################################################
########################     Main GOES HERE      ###############################
################################################################################

# step 
1try to login the ftp.
$start_date 
= time();
LOG(
"Connecting to the ftp server($ftp_server)");
my $ftp 
= Net::FTP->new($ftp_server, Debug=>$ftp_debug, Timeout=>$ftp_timeout);
if($@)
{
    $g_strLastError 
= "Can't connect to the FTP server, the reason: " . $@;
    LOG(
"$g_strLastError\n");
    $g_uploadSuccess 
= -1;
}
else
{
    $ftp
->login($ftp_uid, $ftp_pw);
    
if($@)
    {
        $g_strLastError 
= "Can't login to the FTP server, the reason: " . $@;
        LOG(
"$g_strLastError\n");
        $g_uploadSuccess 
= -1;
    }
    
else
    {
        LOG(
"Connect ftp server successful!");
        $ftp
->quit();
        
        # step 
2: upload the files
        my 
%lookup;
        LOG(
"Start to upload files in directory(@src_dir_files)");
        find(\
&ProcessFiles, @src_dir_files);
        LOG(
"The directoty(@src_dir_files) have been completed. The result: ");
        
        
foreach my $thread (@$self)
        {
            print(
"Joining thread\n");
            $$thread
->join();
        }
        
        #step 
3
        
if($g_isAllFiles_uploadSuccess > 0)
        {
            LOG(
"+==================================================================+");
            LOG(
"Start to upload files in directory(@src_dir_NameListFile)");
            find(\
&ProcessFiles, @src_dir_NameListFile);
            LOG(
"The directoty(@src_dir_NameListFile) have been completed. The result: ");
        
            
foreach my $thread (@$self)
            {
                print(
"Joining thread\n");
                $$thread
->join();
            }
            LOG(
"The directory (@rc_dir_NameListFile) has been completed.");
            LOG(
"+==================================================================+");
        }
        
else
        {
            LOG(
"+==================================================================+");
            LOG(
"These files will not be upload for directory(@src_dir_files) failed.");
            LOG(
"+==================================================================+");
        }
        
        #Step 
4: log time
        $span 
= time() - $start_date;
        LOG(
"Upload succeed! \nTime:$span second. the total files is $total_files. \
        \nSucceed are $processed_files and skipped are $skipped_files.\n");
    }
    
    CloseLogFile();
}
 
 
################################################################################

很多时候使用perl多线程可以达到很不错的效果,可以节约很多时间完成很复杂的工作。但通过perl threads模块的描述文件可以看到,它也有很多缺点。比如说在使用perl多线程的时候,必须的保证所有引用的模块都是支持thread。而在实际应用中,我们很难做到这样。比如我们要多线程,但同时要应用OLE模块去操作activex。 此用例应该是一种很常见的用例。那是不是意味着此时我们不得不放弃使用多线程呢。 非也, 本文介绍一种可以使用多线程和ole的例子。

 在官方网站上对这种情况给出的方案是:

If the module will only be used inside a thread, you can try loading the module from inside the thread entry point function using require (and import if needed):

     sub thr_func
{
require Unsafe::Module
# Unsafe::Module->import(...);
....
}

      If the module is needed inside the main thread, try modifying your application so that the module is loaded (again using require and ->import()) after any threads are started, and in such a way that no other threads are started afterwards。

再次,主要讨论一下第二种情况,既主要是该非thread模块放到方法中引用。下面是一个demo。

 


use threads;
use threads::shared;
use Thread::Queue;
no warnings 
'threads';

# Time 
out const
my $TIMEOUT : shared;
$TIMEOUT 
= 1;
# the sig 
for end thread 
my $TERM : shared;
$TERM 
= 0;
#my $excel;

$SIG{
'INT'= $SIG{'TERM'= sub{ print("\n>>> Terminating <<<\n"); $TERM=1;};
$SIG{
'KILL'= sub{ printf("%3d <- Killed\n", threads->tid()); 
                    threads
->detach() if !threads->is_detached();
                    threads
->exit(); };

sub ThreadWatcher
{
    my $queue 
= shift;
    my 
%timers;
    
    
while(!$TERM)
    {
        #print 
"ThreadWatcher -- TERM : $TERM\n";
        
while(my $tid = $queue->dequeue_nb())
        {
            
if (! defined($timers{$tid}{'timeout'= $queue->dequeue()) ||
                
! defined($timers{$tid}{'thread'}  = threads->object($tid)))
            {
                # No timeout 
- unregister thread
                delete($timers{$tid});
            }
        }
               
        
        
foreach my $tid (keys(%timers))
        {
            #print 
"$timers{$tid}{'thread'} \n";
            
if(--$timers{$tid}{'timeout'< 0)
            {
                print 
"thread $timers{$tid}{'thread'} will be killed.\n";
                $timers{$tid}{
'thread'}->kill('KILL');
                delete($timers{$tid});
            }
        }
        
        # tick tock
        sleep(
1);
    }
}

sub Worker
{
    #eval {use Win32::OLE::Variant;};
    
    my ($queue, $dataqueue) 
= @_;
    # 
get the thread id and register with watch
    my $tid 
= threads->tid();
    printf(
"Working -> %3d\n", $tid);
    $queue
->enqueue($tid, $TIMEOUT);
    print 
"Worker -- TERM : $TERM\n";
    
while(!$TERM)
    {
        
        #my $App 
= $dataqueue->dequeue();
        my $data 
= $dataqueue->dequeue();
        #deal with the data
        #print 
"Worker -- DATA : $App\n";
        print 
"Worker -- DATA : $data\n";
        
        #my $var 
= Win32::OLE::Variant->new(VT_BSTR, $data);
        #print 
"Worker VAR: $var\n";
    }
    
    # Remove signal handler
    $SIG{
'KILL'= sub {};

    # Unregister with timer thread
    $queue
->enqueue($tid, undef);

    # Tell user we
're done
    printf("%3d <- Finished\n", $tid);
   
    threads
->detach() if ! threads->is_detached();
    threads
->exit();
}

# create time thread

my $watchQueue 
= Thread::Queue->new();
threads
->create('ThreadWatcher', $watchQueue)->detach();

# create work thread
my $dataQueue 
= Thread::Queue->new();
threads
->create('Worker', $watchQueue, $dataQueue);

NoneSafeModelScript(
'C:\Joe_Chen\Perl_Projects\Threads\update.xlsx');

WairLongTime(
10);


sub WairLongTime
{
    my $temp 
= $_[0];
    $temp 
= $temp * 10000000;
    
for(my $index = 0; $index < $temp; $index++)
    {
        $index 
* $index;
    }
    
return $index;
}

sub NoneSafeModelScript
{
    eval 
'use Win32::OLE';  
    eval 
'use Win32::OLE::Variant';
    
    my $excel;
    
for(my $index = 0; $index < 600; $index++)
    {
        print 
"Getting the Excel ActiveObject. Try # $index \n";
        WairLongTime(
1);
        eval
        {
            $excel 
= Win32::OLE->GetActiveObject('Excel.Application'|| Win32::OLE->new('Excel.Application''Quit');
        };
        
if($@ or $excel == undef)
        {
            print 
"Unsuccessful: $@ \n";
            
if($index == 599)
            {
                print 
"ERROR:Don\'t got the Excel Application";
            }
        }
        
else
        {
            last;
        }
    }
    
    my $path 
= $_[0];
    
    my $book 
= $excel->workbooks->open($path);
    my $sheet 
= $book->worksheets(1);
    
    my $values 
= $sheet->Range("A1:D5")->{Value};
    my $row_counts 
= $sheet->Range("A1:C3")->{Rows}->{Count};
    my $column_counts 
= $sheet->Range("A1:C3")->{Columns}->{Count};
        
    print 
"NoneSafeModelScript : $row_counts \n";
    print 
"NoneSafeModelScript : $column_counts \n";
    
    
for(my $row=1; $row<$row_counts + 1; $row++)
    {
        my $array_ref 
= $sheet->Cells($row,1)->{Value};
        print 
"NoneSafeModelScript : $array_ref \n";
        
        my $var 
= Variant(Win32::OLE::Variant->VT_BSTR, $array_ref);
        my $v 
= ref($var);
        #my $v 
= $var->Type();
        print 
"NoneSafeModelScript VAR: $var\n";
        print 
"NoneSafeModelScript VAR: $v\n";
        #$dataQueue
->enqueue($var);
        $dataQueue
->enqueue($array_ref);
        WairLongTime(
2);
    }
    
    my $v 
= Variant(VT_DATE, "April 1 99");
    print $v
->Type, "\n";
    print $v
->Date(DATE_LONGDATE), "\n";
    print $v
->Date("ddd',' MMM dd yy"), "\n";
    
    print Win32::OLE::Variant
->VT_BSTR , "\n";
    
    $book
->Close;
    $excel
->Quit;
}

sub Wrap
{
    my $value 
= $_[0];
    my $var 
= Variant(VT_DATE, 'Jan 1,1970');
    print 
"Wrap : $var \n"
    
return $var;
}

 

在此例子中,用到了queue,它的作用是将非thread 安全的数据通过管道传输,这样能避免他们互相调用。

 

转自

http://www.cnblogs.com/joechen/archive/2009/04/27/1444602.html

 
阅读(1023) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~