Chinaunix首页 | 论坛 | 博客
  • 博客访问: 38902
  • 博文数量: 17
  • 博客积分: 1400
  • 博客等级: 上尉
  • 技术积分: 182
  • 用 户 组: 普通用户
  • 注册时间: 2009-12-01 12:14
文章分类

全部博文(17)

文章存档

2010年(2)

2009年(15)

我的朋友

分类:

2009-12-31 15:09:01

   好长时间没有上来更新自己的博客了,今年差点连用户名都忘记了,试了好几个才进来。
   最近一直在忙系统优化的事情,新上线了好多精简版的脚本,由于系统资源紧张,也并没有达到预期想到的效果。为了更好的保证重要数据的及时展现,将Teradata etlmaster这个服务进行了一个功能上的扩充,etlmaster服务负责queue目录下控制文件的调度和ETL_JOB_QUEUE表中的记录(根据记录在queue目录下产生控制文件),当重要的作业都处于Pending状态,不能得到及时的调度,导致后续的数据不能及时展现。
    目前采用了一个多线程的调度方式,可以满足暂时5个KPI脚本的优先调度,目前这个5个作业是写死的,日后可以进行扩展
#!/user/bin/perl
######################################################################
# Head Section
# Function:    将重要KPI作业强行Force 减少Pending时间
# Create Date: 2009-12-29 14:10下午
# Creator:     Chen Chuan Wei
# Reviewer:
# Comment:
#              采用5个线程取调取5个作业
#---------------------------------------------------------------------
use strict; # Declare using Perl strict syntax
use File::Basename; # Extract the base filename from a path
require hbdw;
use threads;
use threads::shared;

my $AUTO_HOME = $ENV{"AUTO_HOME"};
my $LogonFile = "${AUTO_HOME}/etc/LOGON_AUTO";
my $slave = $AUTO_HOME."";
my @JobList = ('KPI_PRD_EVT_DAILY','KPI_CALL_DAILY','KPI_PRD_INCOME_DAILY','KPI_MOB_BIL_INCOME_DAILY','KPI_MOB_TICKET_DAILY');
my $DIRDELI = "\\";
my $DSN = $ENV{"AUTO_DSN"};
my $AUTO_DB = $ENV{"AUTO_DB"};
my $dbcon;
my $ETL_QUEUE = "${AUTO_HOME}\\DATA\\queue";
my $ETL_PROCESS = "${AUTO_HOME}\\DATA\\process";
my @threads;
my $MaxThreads = 5;
my $CurrentThreads: shared = 0;
sub checkQueueDir
{
   ## 当前新建的线程数加一:
   {
      lock($CurrentThreads);
      $CurrentThreads++;
   }
     my $find = 0;
     my $filename;
     my $JobName = shift;
     print "当前作业名为:".$JobName."\n";
     print "Checking queue directory ".$ETL_QUEUE."...\n";
     # Open the queue directory for processing
     while(1)
     {
        unless ( opendir(QUE_DIR, $ETL_QUEUE) )
        {
          print "ERROR - Unable to open ${ETL_QUEUE}!";
          exit;
        }
        foreach my $filename(readdir(QUE_DIR))
        {      
          if ( $filename eq "." || $filename eq ".." ) { next; }
          # If the file is directory then skip it
          if ( -d "${ETL_QUEUE}${DIRDELI}${filename}" ) { next; }
          #print $filename."\n";
          if ($filename =~ /$JobName/)
          {
           $find = 1;
            HBDW::showTime();
#           print $JobName."\n";
#           print $filename."\n";
            print "找到控制文件:".$filename."\n";
            my $CONTROL_FILE = $filename;
            my $TX_DATE = substr($CONTROL_FILE, -12, 8);
            my $SYS = substr($CONTROL_FILE,0,3);
            my $JOB = substr($CONTROL_FILE,4,length($CONTROL_FILE)-17);
            #print $CONTROL_FILE.$TX_DATE.$SYS.$JOB."\n";
            ForceJob($CONTROL_FILE,$SYS,$JOB,$TX_DATE);       
            last;
          }
          closedir(QUE_DIR);
        }
       if ($find == 1)  
           {
            last;
            lock($CurrentThreads);
            $CurrentThreads--;
           }
       else
       {
        sleep 5;
       }
     }
 }
sub ForceJob
{
   my ($ctr_file,$etl_system,$etl_job,$tx_date) = @_;
   print "Force Job ".$etl_job."\n";
   rename("${ETL_QUEUE}${DIRDELI}${ctr_file}","${ETL_PROCESS}${DIRDELI}${ctr_file}");
   #my $cmd1 = "del  ${ETL_QUEUE}${DIRDELI}${ctr_file}";
   my $cmd = "perl $slave $etl_system $etl_job $tx_date"; 
   HBDW::showTime();
   print "\n";
   print "     ".$cmd."\n";
   system($cmd);
#   updateJobStatus($dbcon, $etl_system, $etl_job, "Running");
}      
sub updateJobStatus
{
   my ($dbh, $sys, $job, $status) = @_;
   my $sqlText = "UPDATE ${AUTO_DB}.ETL_Job " .
                 "   SET Last_JobStatus = '$status'" .
                 "   WHERE ETL_System = '$sys' AND ETL_Job = '$job'";
   my $sth = $dbh->prepare($sqlText);
   my $ret = $sth->execute();
   $sth->finish();
}
sub GetJobList
{
 my $JobList = "";
 foreach my $Job (@JobList)
 {
  $JobList = $JobList.",'".$Job."'";
 }
 $JobList = substr($JobList,1); #去除第一个逗号
 $JobList = "(".$JobList.")";   #加括号
 return $JobList;
}
sub CheckAllJobDone
{
  my $JobList = GetJobList();
  my $sqlText = "select COUNT(*) FROM ${AUTO_DB}.ETL_JOB WHERE last_jobstatus <> 'Pending' and last_TxDate = date - 1 and etl_job in".$JobList;
# print $sqlText."\n";
  my $sth = $dbcon->prepare($sqlText);
  $sth->execute();
  my $res = $sth->fetchrow;
  $sth->finish();
  my $JobCount = @JobList;
  #print $res.$JobCount;
  if ($res == $JobCount)
  {
   return 0
  }
  else {return 1; }
}
sub ConnectTD
{
    my ($DBUserID,$DBPassword);
    open(FLOGON,$LogonFile) || die "cann't open logon file:$!";
    my $LogonStr = ;
    close(FLOGON);
    $LogonStr  = `${AUTO_HOME}/bin/IceCode.exe "$LogonStr"`;
    $DBPassword = substr($LogonStr,7);
    chop($DBPassword);
    ($DBUserID,$DBPassword) = split(/[ ]*,[ ]*/,$DBPassword);
    $dbcon = DBI->connect("dbi:ODBC:$DSN",$DBUserID,$DBPassword,{ AutoCommit => 1, PrintError => 0, RaiseError => 0 } ) ;
    unless(defined($dbcon)) {
        sleep(60);
        $dbcon = DBI->connect("dbi:ODBC:$DSN",$DBUserID,$DBPassword,{ AutoCommit => 1, PrintError => 0, RaiseError => 0 } ) ;
        die "cann't connect database $!" unless(defined($dbcon));
    }
}
sub ForceJobs

    my $etl_job = shift;
    # 每次创建线程处理文件之前,先检查当前已启动的线程数目是否小于最大允许数目
    # 如果已经创建了足够多的线程,就不断地轮询
    while(1)
    {
        if($CurrentThreads < $MaxThreads)
        {
            # 创建线程,传入参数
            my $thread = threads->create('checkQueueDir',$etl_job);
            push(@threads, \$thread);
            print "创建线程".$etl_job."\n";
            last; # 退出循环
        }
        else
        {
            # 线程数达到最大时,等待一个时间段(120m):
            print "线程数已到达最大值,退出";
            last;
        }
    }
}
sub main
{
 ConnectTD();
 HBDW::showTime(); print "连接数据库成功!\n";
 foreach my $job(@JobList)
  {
    ForceJobs($job);
  }
  foreach my $thread (@threads)
 {
  $$thread->join();
 }
 if ($CurrentThreads == 0)
 {
   return 0;
   exit;
  }
}
######################################################################
# Program Section
open(STDERR, ">&STDOUT");
HBDW::showTime();
print "进程号PID:$$ \n";
exit(main());
阅读(2275) | 评论(0) | 转发(0) |
0

上一篇:Teradata view

下一篇:perl

给主人留下些什么吧!~~