#!/usr/bin/perl
use strict;
use warnings;
use threads;
use threads::shared;
use Thread::Queue;
print "Starting main program\n";
open FH,"< test";
my @ips = ;
close FH;
chomp @ips;
if(defined $ARGV[0]){
@ips = split /,/,$ARGV[0];
}
my $window_size = 40; #定义线程窗口
my $retry = 0; #定义线程重试的最多的次数
my $time_out = 2 * 60 * 60; #定义超时时长
if(scalar @ips < $window_size){
$window_size = scalar @ips;
}
my $window_queue = new Thread::Queue;
my $ip_queue = new Thread::Queue;
$ip_queue->enqueue(@ips);
my @thread_time : shared; #存放线程执行时间
$thread_time[0] = 0; #记录成功执行完成的线程数量
grep{$thread_time[$_] = -1}(1 .. $window_size); #初始化
my @thread_ip_info : shared; #存放线程执行内容信息
my %thread_retry_times; #存放线程还可以重试的次数 key:ip,value:retry_times
my %thread;
my $monitor_queue = threads->new(\&monitor_queue_add,$ip_queue,$window_queue,$window_size,\@thread_time);
for(1 .. $window_size){
$thread{$_} = threads->new(\&do_something,$ip_queue,$window_queue,$window_size,$_,\@thread_time,\@thread_ip_info);
}
my $monitor_thread = &monitor_thread_time($ip_queue,$window_queue,$window_size,$time_out,\@thread_time,\@thread_ip_info);
for(1 .. $window_size){
my $num = $thread{$_}->join;
print "线程 $num 执行成功\n";
}
print "线程 monitor_queue_add 执行成功\n" if($monitor_queue->join eq 'done');
print "线程 monitor_thread_time 执行成功\n" if($monitor_thread eq 'done');
print "--- $thread_time[0] ---\n";
print "全部程序执行成功\n";
sub check_size(){
my ($num,$ip) = @_;
my $cmd = 'export DEPLOY_PASSWORD=deploy;export DEPLOY_SUPASS=xxxxxx;deploytool dremotecmd -s dummy -f -u -supass -i '.$ip.' "su - wls81 -c \"du -s /wls/DR/wls;df \"" 2>/dev/null;';
my @tmp = `$cmd`;
my $n = 1;
my ($zaibai,$kongxian,$used_percent);
for(@tmp){
#print "--$n:灾备$1\n" if(m{^(\S+)\s+/wls/DR/wls(?:\r|$)});
$zaibai = $1 if(m{^(\S+)\s+/wls/DR/wls(?:\r|$)});
#print "--$n:空闲$1\n" if(m{(\S+)\s+(?:\S+)\s+/wls(?:\r|$)});
if(m{(\S+)\s+(\S+%)\s+/wls(?:\r|$)}){
$kongxian = $1;
$used_percent = $2;
}
elsif(m{(\S+)\s+(\S+%)\s+/(?:\r|$)}){
$kongxian = $1;
$used_percent = $2;
}
#print "--$n:$_";
$n++;
}
warn "ERROR: [$num] $ip: 查询灾备文件占用空间大小失败\n" if(not defined $zaibai);
warn "ERROR: [$num] $ip: 查询/wls卷剩余空间大小失败\n" if(not defined $kongxian);
if(defined $zaibai and defined $kongxian and $zaibai * 1 > $kongxian){
$kongxian =~ s/(?=(\d\d\d)+$)/,/g;
$zaibai =~ s/(?=(\d\d\d)+$)/,/g;
warn "ERROR: [$num] $ip: 灾难恢复空间不足,/wls卷现有空闲空间:$kongxian,已使用百分比:$used_percent,灾备文件大小:$zaibai\n";
}
}
sub check_wls(){
my ($num,$ip) = @_;
my $cmd = 'export DEPLOY_PASSWORD=deploy;export DEPLOY_SUPASS=xxxxxx;deploytool dremotecmd -s dummy -f -u -supass -i '.$ip.' "su - wls81 -c \"ls -l -d /wls\"" 2>/dev/null;';
my @tmp = `$cmd`;
my $n = 1;
my $quanxian;
for(@tmp){
#print;
if(m{\d\s+/wls(?:\r|$)}){
if(m{^drwxr.xr.x\s+\d+\s+wls81\s+wls\s+.+\d\s+/wls(?:\r|$)}){
$quanxian = 'ok';
}
else{
print "ERROR: $ip: $_\n";
$quanxian = '不ok';
}
}
$n++;
}
print "ERROR: 没能获取$ip:/wls的权限信息\n" if not defined $quanxian;
}
sub check_rand(){
my ($num,$ip) = @_;
#my $cmd = 'export DEPLOY_PASSWORD=deploy;export DEPLOY_SUPASS=xxxxxx;deploytool dremotecmd -s dummy -f -u -supass -i '.$ip.' "su - wls81 -c \"sudo -l\"";';
my $cmd = 'export DEPLOY_PASSWORD=deploy;export DEPLOY_SUPASS=xxxxxx;deploytool dremotecmd -s dummy -f -u -supass -i '.$ip.' "su - wls81 -c \"ln -s /wls/wls81/bea/weblogic81/server/lib/linux/i686 /wls/wls81/bea/weblogic81/server/lib/linux/x86_64\"";';
my @tmp = `$cmd`;
my $n = 1;
my $quanxian;
for(@tmp){
print;
}
}
sub backup_dr(){
my ($num,$ip) = @_;
my $cmd = 'export DEPLOY_PASSWORD=deploy;export DEPLOY_SUPASS=xxxxxx;deploytool dremotecmd -s dummy -f -u -supass -i '.$ip.' "su - wls81 -c \"mkdir -p /wls/DR_bak;cd /wls/DR/;sudo tar --same-permissions --same-owner -zcf /wls/DR_bak/wls.tgz wls \""'." >logs/tar_$ip.log ";
if(system($cmd)){
print "ERROR: $ip 备份/wls/DR失败\n";
}
else{
print "$ip: 备份/wls/DR成功\n"
}
#my @tmp = `$cmd`;
#for(@tmp){
# print;
#}
}
sub check_backup(){
my ($num,$ip) = @_;
my $cmd = 'export DEPLOY_PASSWORD=deploy;export DEPLOY_SUPASS=xxxxxx;deploytool dremotecmd -s dummy -f -u -supass -i '.$ip.' "su - wls81 -c \"echo DR;find /wls/DR/wls/ |wc -l;echo DR_bak;tar -ztf /wls/DR_bak/wls.tgz |wc -l;\""';
my $tmp = `$cmd`;
#print $tmp;
if($tmp =~ /^DR\r\n(\d+)\r\nDR_bak\r\n(\d+)\r\nlogout/m){
#grep {print unpack ("U",$_);print "\n"}split //,$1;
if($1 != $2){
print "$ip ERROR: DR size: $1 DR_bak size: $2\n";
}
else{
print "$ip DR size: $1 DR_bak size: $2\n";
}
}
elsif($tmp =~ /^DR\r\n(\d+)\r\n/m){
print "$ip ERROR: 无法获得 /wls/DR_bak/wls.tgz 的信息\n";
}
elsif($tmp =~ /^DR_bak\r\n(\d+)\r\nlogout/m){
print "$ip ERROR: 无法获得 /wls/DR/wls/ 的信息\n";
}
else{
print "$ip ERROR: 什么都没有\n";
}
}
sub recover_dr(){
my ($num,$ip) = @_;
#my $cmd = 'export DEPLOY_PASSWORD=deploy;export DEPLOY_SUPASS=xxxxxx;deploytool dremotecmd -s dummy -f -u -supass -i '.$ip.' "su - wls81 -c \"mkdir -p /wls/DR_bak;cd /wls/DR/;sudo tar --same-permissions --same-owner -zxf /wls/DR_bak/wls.tgz;sudo chown wls81:wls /wls/DR_bak/wls.tgz;rm /wls/DR_bak/wls.tgz;rm -r /wls/DR_bak/ \""'." >logs/untar_$ip.log ";
my $cmd = 'export DEPLOY_PASSWORD=deploy;export DEPLOY_SUPASS=xxxxxx;deploytool dremotecmd -s dummy -f -u -supass -i '.$ip.' "su - wls81 -c \"mkdir -p /wls/DR_bak;cd /wls/DR/;sudo tar --same-permissions --same-owner -zxf /wls/DR_bak/wls.tgz;\""'." >logs/untar_$ip.log ";
if(system($cmd)){
print "ERROR: $ip 恢复/wls/DR失败\n";
}
else{
print "$ip: 恢复/wls/DR成功\n"
}
#my @tmp = `$cmd`;
#for(@tmp){
# print;
#}
}
sub do_something(){
my ($ip_q,$win_q,$win_s,$num,$thr_t_ref,$thr_i_ref) = @_;
print "线程 $num 启动......\n";
$SIG{'KILL'} = sub { threads->exit(); };
$$thr_t_ref[$num] = -1;
#print "".(scalar $ip_q->pending)." + ".(scalar $win_q->pending)."\n";
while($ip_q->pending > 0 or $win_q->pending > 0){
if(my $ip_temp = $win_q->dequeue_nb){
$$thr_i_ref[$num] = $ip_temp;
$$thr_t_ref[$num] = &get_now_time;
print "$num: $ip_temp\n";
#if($ip_temp eq '10.31.101.28'){ while(<>){} }
#&check_size($num,$ip_temp);
#&check_wls($num,$ip_temp);
#&check_rand($num,$ip_temp);
#&backup_dr($num,$ip_temp);
#&check_backup($num,$ip_temp);
#&recover_dr($num,$ip_temp);
}
}
$$thr_t_ref[$num] = 0;
$$thr_i_ref[$num] = 0;
$$thr_t_ref[0] ++;
return $num;
}
sub monitor_queue_add(){
my ($ip_q,$win_q,$win_s,$thr_t_ref) = @_;
while($$thr_t_ref[0] < $win_s){
if($win_q->pending < $win_s){
for (1 .. $win_s - ($win_q->pending) ){
if(my $ip_temp = $ip_q->dequeue_nb){
$win_q->enqueue($ip_temp);
}
}
}
#sleep 1;
}
return 'done';
}
sub monitor_thread_time(){
my ($ip_q,$win_q,$win_s,$time_o,$thr_t_ref,$thr_i_ref) = @_;
while($$thr_t_ref[0] < $win_s){
#print "============$$thr_t_ref[0]================\n";
for my $id (1 .. $#$thr_t_ref){
#print "[$id]: <$$thr_t_ref[$id]>\n";
my $now_time = &get_now_time;
if($$thr_t_ref[$id] > 0 and $now_time - $$thr_t_ref[$id] >= $time_o){
warn "WARNING: 线程 $id 处理 $$thr_i_ref[$id] 超时, $now_time - $$thr_t_ref[$id] = ".($now_time - $$thr_t_ref[$id])."\n";
if(not exists $thread_retry_times{$$thr_i_ref[$id]}){
$thread_retry_times{$$thr_i_ref[$id]} = $retry;
}
if($thread_retry_times{$$thr_i_ref[$id]} > 0){
$thread_retry_times{$$thr_i_ref[$id]} --;
warn "WARNING: 重启线程来处理 $$thr_i_ref[$id], 还能尝试 $thread_retry_times{$$thr_i_ref[$id]} 次\n";
#$thread{$id}->kill('KILL')->detach;
$thread{$id}->detach;
$ip_q->enqueue($$thr_i_ref[$id]); #把ip补回队列
$thread{$id} = threads->new(\&do_something,$ip_q,$win_q,$win_s,$id,$thr_t_ref,$thr_i_ref);
}
else{
warn "CRITICAL: $$thr_i_ref[$id] 重试了 $retry 次都失败了,请检查\n";
$thread{$id}->detach;
$thread{$id} = threads->new(\&do_something,$ip_q,$win_q,$win_s,$id,$thr_t_ref,$thr_i_ref);
}
}
else{
}
}
sleep 5;
}
return 'done';
}
sub get_now_time(){
my @timeData = localtime(time);
return ($timeData[2]*3600 + $timeData[1]*60 + $timeData[0]);
}