好长时间没有上来更新自己的博客了,今年差点连用户名都忘记了,试了好几个才进来。
最近一直在忙系统优化的事情,新上线了好多精简版的脚本,由于系统资源紧张,也并没有达到预期想到的效果。为了更好的保证重要数据的及时展现,将Teradata etlmaster这个服务进行了一个功能上的扩充,etlmaster服务负责queue目录下控制文件的调度和ETL_JOB_QUEUE表中的记录(根据记录在queue目录下产生控制文件),当重要的作业都处于Pending状态,不能得到及时的调度,导致后续的数据不能及时展现。
目前采用了一个多线程的调度方式,可以满足暂时5个KPI脚本的优先调度,目前这个5个作业是写死的,日后可以进行扩展
#!/user/bin/perl
######################################################################
# Head Section
# Function: 将重要KPI作业强行Force 减少Pending时间
# Create Date: 2009-12-29 14:10下午
# Creator: Chen Chuan Wei
# Reviewer:
# Comment:
# 采用5个线程取调取5个作业
#---------------------------------------------------------------------
use strict; # Declare using Perl strict syntax
use File::Basename; # Extract the base filename from a path
require hbdw;
use threads;
use threads::shared;
my $AUTO_HOME = $ENV{"AUTO_HOME"};
my $LogonFile = "${AUTO_HOME}/etc/LOGON_AUTO";
my $slave = $AUTO_HOME."";
my @JobList = ('KPI_PRD_EVT_DAILY','KPI_CALL_DAILY','KPI_PRD_INCOME_DAILY','KPI_MOB_BIL_INCOME_DAILY','KPI_MOB_TICKET_DAILY');
my $DIRDELI = "\\";
my $DSN = $ENV{"AUTO_DSN"};
my $AUTO_DB = $ENV{"AUTO_DB"};
my $dbcon;
my $ETL_QUEUE = "${AUTO_HOME}\\DATA\\queue";
my $ETL_PROCESS = "${AUTO_HOME}\\DATA\\process";
my @threads;
my $MaxThreads = 5;
my $CurrentThreads: shared = 0;
sub checkQueueDir
{
## 当前新建的线程数加一:
{
lock($CurrentThreads);
$CurrentThreads++;
}
my $find = 0;
my $filename;
my $JobName = shift;
print "当前作业名为:".$JobName."\n";
print "Checking queue directory ".$ETL_QUEUE."...\n";
# Open the queue directory for processing
while(1)
{
unless ( opendir(QUE_DIR, $ETL_QUEUE) )
{
print "ERROR - Unable to open ${ETL_QUEUE}!";
exit;
}
foreach my $filename(readdir(QUE_DIR))
{
if ( $filename eq "." || $filename eq ".." ) { next; }
# If the file is directory then skip it
if ( -d "${ETL_QUEUE}${DIRDELI}${filename}" ) { next; }
#print $filename."\n";
if ($filename =~ /$JobName/)
{
$find = 1;
HBDW::showTime();
# print $JobName."\n";
# print $filename."\n";
print "找到控制文件:".$filename."\n";
my $CONTROL_FILE = $filename;
my $TX_DATE = substr($CONTROL_FILE, -12, 8);
my $SYS = substr($CONTROL_FILE,0,3);
my $JOB = substr($CONTROL_FILE,4,length($CONTROL_FILE)-17);
#print $CONTROL_FILE.$TX_DATE.$SYS.$JOB."\n";
ForceJob($CONTROL_FILE,$SYS,$JOB,$TX_DATE);
last;
}
closedir(QUE_DIR);
}
if ($find == 1)
{
last;
lock($CurrentThreads);
$CurrentThreads--;
}
else
{
sleep 5;
}
}
}
sub ForceJob
{
my ($ctr_file,$etl_system,$etl_job,$tx_date) = @_;
print "Force Job ".$etl_job."\n";
rename("${ETL_QUEUE}${DIRDELI}${ctr_file}","${ETL_PROCESS}${DIRDELI}${ctr_file}");
#my $cmd1 = "del ${ETL_QUEUE}${DIRDELI}${ctr_file}";
my $cmd = "perl $slave $etl_system $etl_job $tx_date";
HBDW::showTime();
print "\n";
print " ".$cmd."\n";
system($cmd);
# updateJobStatus($dbcon, $etl_system, $etl_job, "Running");
}
sub updateJobStatus
{
my ($dbh, $sys, $job, $status) = @_;
my $sqlText = "UPDATE ${AUTO_DB}.ETL_Job " .
" SET Last_JobStatus = '$status'" .
" WHERE ETL_System = '$sys' AND ETL_Job = '$job'";
my $sth = $dbh->prepare($sqlText);
my $ret = $sth->execute();
$sth->finish();
}
sub GetJobList
{
my $JobList = "";
foreach my $Job (@JobList)
{
$JobList = $JobList.",'".$Job."'";
}
$JobList = substr($JobList,1); #去除第一个逗号
$JobList = "(".$JobList.")"; #加括号
return $JobList;
}
sub CheckAllJobDone
{
my $JobList = GetJobList();
my $sqlText = "select COUNT(*) FROM ${AUTO_DB}.ETL_JOB WHERE last_jobstatus <> 'Pending' and last_TxDate = date - 1 and etl_job in".$JobList;
# print $sqlText."\n";
my $sth = $dbcon->prepare($sqlText);
$sth->execute();
my $res = $sth->fetchrow;
$sth->finish();
my $JobCount = @JobList;
#print $res.$JobCount;
if ($res == $JobCount)
{
return 0
}
else {return 1; }
}
sub ConnectTD
{
my ($DBUserID,$DBPassword);
open(FLOGON,$LogonFile) || die "cann't open logon file:$!";
my $LogonStr = ;
close(FLOGON);
$LogonStr = `${AUTO_HOME}/bin/IceCode.exe "$LogonStr"`;
$DBPassword = substr($LogonStr,7);
chop($DBPassword);
($DBUserID,$DBPassword) = split(/[ ]*,[ ]*/,$DBPassword);
$dbcon = DBI->connect("dbi:ODBC:$DSN",$DBUserID,$DBPassword,{ AutoCommit => 1, PrintError => 0, RaiseError => 0 } ) ;
unless(defined($dbcon)) {
sleep(60);
$dbcon = DBI->connect("dbi:ODBC:$DSN",$DBUserID,$DBPassword,{ AutoCommit => 1, PrintError => 0, RaiseError => 0 } ) ;
die "cann't connect database $!" unless(defined($dbcon));
}
}
sub ForceJobs
{
my $etl_job = shift;
# 每次创建线程处理文件之前,先检查当前已启动的线程数目是否小于最大允许数目
# 如果已经创建了足够多的线程,就不断地轮询
while(1)
{
if($CurrentThreads < $MaxThreads)
{
# 创建线程,传入参数
my $thread = threads->create('checkQueueDir',$etl_job);
push(@threads, \$thread);
print "创建线程".$etl_job."\n";
last; # 退出循环
}
else
{
# 线程数达到最大时,等待一个时间段(120m):
print "线程数已到达最大值,退出";
last;
}
}
}
sub main
{
ConnectTD();
HBDW::showTime(); print "连接数据库成功!\n";
foreach my $job(@JobList)
{
ForceJobs($job);
}
foreach my $thread (@threads)
{
$$thread->join();
}
if ($CurrentThreads == 0)
{
return 0;
exit;
}
}
######################################################################
# Program Section
open(STDERR, ">&STDOUT");
HBDW::showTime();
print "进程号PID:$$ \n";
exit(main());
阅读(2317) | 评论(0) | 转发(0) |