#!/usr/bin/perl
require 'run_control.pl';
my $srcdatabase=$ARGV[0]; my $srctable=$ARGV[1]; my $destable=$ARGV[2]; my $mapid=$ARGV[3]; my $jobid=$ARGV[4]; my $encoding=$ARGV[5]; my $size=@ARGV;
if ( $size != 6) { print "Usage: transform.pl ; exit(1); } if (!($destable=~m/[\d|\w|\,]+/)) { print "Error: please input valid destination table name\n"; exit(4); }
if ($encoding eq "") { print "Error: please input valid encoding method\n"; exit(1); }
if (!($jobid=~m/\d+/)) { print "Error: please input valid jobid\n"; exit(2); } if (!($mapid=~m/[\d|\,]+/)) { print "Error: please input valid mapid\n"; exit(3); } my @destable_ar=split(/,/,$destable); my @mapid_ar=split(/,/,$mapid); my @encoding_ar=split(/,/,$encoding);
if (scalar(@destable_ar) != scalar(@mapid_ar)) { print "Error: number of destination table not match with number of map id\n"; exit (5); } my $runid=get_runid($jobid); my $createtime=get_datetime(); my $record_counter=0;
my $stagedbh=DBI->connect("DBI:mysql:database=$srcdatabase;host=$localhost","$localdbuser","$localdbpass",{'RaiseError'=>1, 'AutoCommit' => 0}) or die "cannot connect!\n"; $stagedbh->do("SET NAMES 'utf8'"); eval{ $sth=$stagedbh->prepare(qq/select * from $srctable where status_flag=0/); for (my $j=0;$j<scalar(@destable_ar);$j++) { $field_count=$stagedbh->selectrow_array(qq/select count(*) from database_field_map where map_id=$mapid_ar[$j]/); $fmh=$stagedbh->prepare(qq/select src_columnnum from database_field_map where map_id=$mapid_ar[$j] order by src_columnnum/); $fmh->execute(); @fm=(); while (@fmrow=$fmh->fetchrow_array) { push(@fm,$fmrow[0]); }
$stagedbh->do("SET NAMES '".$encoding_ar[$j]."'"); $insert_statement=gen_insert($destable_ar[$j],$field_count); $inh=$stagedbh->prepare($insert_statement); $sth->execute(); while(@row=$sth->fetchrow_array){ for (my $k=0;$k<scalar($field_count);$k++) { print "parameter $k:".encode_data($row[$fm[$k] - 1],$encoding_ar[$j])."\n"; $inh->bind_param($k+1,encode_data($row[$fm[$k] - 1],$encoding_ar[$j])); # $inh->bind_param($k+1,"$row[$fm[$k] - 1]");
} $inh->bind_param($field_count+1,encode_data($jobid,$encoding_ar[$j])); $inh->bind_param($field_count+2,encode_data($runid,$encoding_ar[$j])); $inh->bind_param($field_count+3,encode_data($record_counter,$encoding_ar[$j])); $inh->bind_param($field_count+4,encode_data($createtime,$encoding_ar[$j])); $inh->bind_param($field_count+5,0); # $inh->bind_param($field_count+1,"$jobid");
# $inh->bind_param($field_count+2,"$runid");
# $inh->bind_param($field_count+3,"$record_counter");
# $inh->bind_param($field_count+4,"$createtime");
# $inh->bind_param($field_count+5,0);
$record_counter++; $inh->execute(@insertrow); } } #$stagedbh->do("update $srctable set status_flag=1 where status_flag=0");
$stagedbh->commit(); };
if($@){ print "Warning: Transaction aborted:$@"; $stagedbh->rollback(); } $sth->finish(); $inh->finish();
$stagedbh->disconnect();
|