#!/usr/bin/perl
#use strict;
use Net::FTP; use DBI; use Data::UUID; use Fcntl ':flock'; require '/opt/ws1/run_control.pl';
my $ftphost=$ARGV[0]; my $ftpuser=$ARGV[1]; my $ftppass=$ARGV[2]; my $tbls=$ARGV[3]; my $tags=$ARGV[4]; my $mids=$ARGV[5]; my $fprefix=$ARGV[6]; my $fdir=$ARGV[7];
my $downloadfolder = "/var/tmp/"; my ($dst_fheader_tbl, $dst_dheader_tbl, $dst_ddetail_tbl, $dst_dfooter_tbl) = split(/,/, $tbls,4); my @linetags = split(/,/, $tags); my @mapids = split(/,/, $mids);
#print $mapids[0], $mapids[1], $mapids[2], $mapids[3];
#print $linetags[0], $linetags[1], $linetags[2], $linetags[3];
#exit;
# concurrency control
open(LOCKF, ">>", "/var/lock/$fprefix") or die "cannot acquire lock /var/lock/$fprefix"; flock(LOCKF, LOCK_EX | LOCK_NB) or die "another ftp2local $fprefix is running"; # concurrency control
#$size=@ARGV;
#if($size!=6){
#print "Usage: ftp2local.pl ,,, ,,, \n";
#}
# generate UUID
#print $ftphost, $ftpuser, $ftppass, $tbls, $maps, $fprefix, $fdir;
#$ug = new Data::UUID;
#$uuid1 = $ug->create_str();
#print "$uuid1\n";
# end generate UUID
my $ftp; $ftp = Net::FTP->new("$ftphost", Debug => 0, Passive => 1) or die "Cannot connect to ftp server: $@"; $ftp->login("$ftpuser","$ftppass") or die "Cannot login ", $ftp->message; $ftp->binary; $ftp->cwd("$fdir"); # or die "Cannot change working directory ", $ftp->message;
my $flist; @flist = $ftp->ls("$fprefix\*");
# or die "ls failed ", $ftp->message;
#print "array size=$#flist\n";
my $no_of_files = $#flist + 1;
if ($no_of_files <= 0) { $ftp->quit; goto BYE; } my @sflist = sort @flist; my $downloadfile = $sflist[0]; my $localfile = $downloadfolder . $downloadfile; my $remote_fsize = $ftp->size($downloadfile) or die "cannot get size", $ftp->message; my $getstatus = $ftp->get($downloadfile, $localfile) or die "error downloading $downloadfile", $ftp->message; if (!defined($getstatus)) { $ftp->quit; goto BYE; } my $local_fsize = -s "$localfile"; print "remote = $remote_fsize\n"; print "local = $local_fsize\n"; if ($remote_fsize != $local_fsize) { $ftp->quit; goto BYE; }
#foreach (@flist) {
# print "$_\n" ;
#}
### BEGIN HANDLING THE DOWNLOADED FILE
my $stagedbh=DBI->connect("DBI:mysql:database=$localdatabase;host=$localdbhost","$localdbuser","$localdbpass",{'RaiseError'=>1,'AutoCommit' => 0}) or die "cannot connect!\n";
open(FH, "<:encoding(UTF-8)", $localfile) or die "cannot open $localfile:$!\n";
my $hline; # current line
#chomp($hline); # remove end of line char
my $cur_line = 0; # point to the current line
#my $fh_tableid = substr($hline, 0, 10);
#print "-$fh_tableid-\n";
$stagedbh->{AutoCommit} = 0; # disable autocommit for block level rollback
$stagedbh->{RaiseError}=1; # use eval block to provide one exit point for error trapping to abort transaction as atomic
my @fheader_tbl=(); my @field_list=(); my @qmark_list=(); my @field_pos=(); eval{ build_maps(); foreach (@field_list) { print "---$_\n"; } ## HANDLE HEADER LINE
my $fileid = gen_uuid(); # unique across this session as key to refer to all 4 inbound table records
my $create_time = get_datetime(); my $record_id = 1; # repeating group for each row (1..n) for each group, header, details, footer
### HERE
my $this_field_list; my $this_qmark_list; my $this_field_pos; my $this_fheader_tbl; while (<FH>) { chomp(); $hline = $_; $cur_line++; if ($cur_line == 1) { print ("at file header\n"); $this_fheader_tbl = $fheader_tbl[0]; $this_field_list = @field_list[0]; $this_qmark_list = @qmark_list[0]; $this_field_pos = @field_pos[0]; $record_id=1; # header line record count always 1
} elsif (is_tag($hline, $linetags[1])) { # data header
print ("at data header\n"); $this_fheader_tbl = $fheader_tbl[1]; $this_field_list = @field_list[1]; $this_qmark_list = @qmark_list[1]; $this_field_pos = @field_pos[1]; $record_id=0; # reset line count for subsequent data details
} elsif (is_tag($hline, $linetags[2])) { # data detail
print ("at data detail\n"); $this_fheader_tbl = $fheader_tbl[2]; $this_field_list = @field_list[2]; $this_qmark_list = @qmark_list[2]; $this_field_pos = @field_pos[2]; $record_id++; # add 1 for each data detail line
} elsif (is_tag($hline, $linetags[3])) { # file footer
print ("at file footer\n"); $this_fheader_tbl = $fheader_tbl[3]; $this_field_list = @field_list[3]; $this_qmark_list = @qmark_list[3]; $this_field_pos = @field_pos[3]; $record_id = 1; } else { die("line not recognized"); } my @fpos = split(/:/, $this_field_pos); my @bind_values; foreach (@fpos) { ($src_begin, $src_end)= split(/,/,$_); # in "01-05" format
$src_begin += 0; # convert from string to number i.e. add 0 to force an internal conv
$src_end += 0; $src_len = $src_end - $src_begin + 1; # print "begin=$src_begin; len=$src_len\n";
push(@bind_values, substr($hline, $src_begin-1, $src_len)); # get the file line fields into bind value array for insert execute
} #foreach
$this_field_list .= ", fileid, record_id, create_time"; $this_qmark_list .= ", ?, ?, ?"; push(@bind_values, $fileid); push(@bind_values, $record_id); push(@bind_values, $create_time); my $isql = "insert into $this_fheader_tbl ($this_field_list) VALUES ($this_qmark_list)"; print "$isql\n"; foreach (@bind_values) { print "value=-$_-\n"; } $sth2=$stagedbh->prepare(qq/$isql/); $sth2->execute(@bind_values); }; # end while FH
$stagedbh->commit(); # master commit
}; # exit eval block
if($@){ print "Warning: Transaction aborted:$@"; $stagedbh->rollback(); goto BYE; }
BYE: $stagedbh->disconnect(); $ftp->quit; close(FH); #concurrency control
flock(LOCKF, LOCK_UN); close(LOCKF); # end conconcurrency control
exit;
sub build_maps { # mapids[0] is file header mapid
$n1 = scalar(@mapids); for ($i = 0; $i < $n1; $i++) { $sth=$stagedbh->prepare(qq/select * from database_field_map where map_id=$mapids[$i] order by src_columnnum/); $sth->execute(); my $map_ref=[]; ## get header map in memory
$map_ref = $sth->fetchall_arrayref(); # my @bind_values=(); # array to hold the field values to insert in sequence
foreach $r ( @{$map_ref} ) { my @row= @$r; # print "size of row = $#ary\n";
# for ($i = 0; $i <= $#row; $i++) {
# print "$row[$i]\n";
# }
# print "***$row[2]\n";
$fheader_tbl[$i] = $row[1]; # assume each map type one table
$field_list[$i] = $field_list[$i] . ", " . $row[2]; # create the insert sql field list
$qmark_list[$i] = $qmark_list[$i] . ", " . "?"; # create the insert value question mark list - placement variables
# print "***$row[4]\n";
($src_begin, $src_end)= split(/-/,$row[4]); # in "01-05" format
$src_begin += 0; # convert from string to number i.e. add 0 to force an internal conv
$src_end += 0; $src_len = $src_end - $src_begin + 1; # print "begin=$src_begin; len=$src_len\n";
$field_pos[$i] = $field_pos[$i] . ":" . $src_begin . "," . $src_end; # push(@bind_values, substr($hline, $src_begin-1, $src_len)); # get the file line fields into bind value array for insert execute
} $field_list[$i] = substr($field_list[$i],1); # cut the first comma
$qmark_list[$i] = substr($qmark_list[$i],1); # cut the first comma
$field_pos[$i] = substr($field_pos[$i],1); # cut the first comma
# $field_list[$i] .= ", fileid, record_id, create_time";
# $qmark_list .= ", ?, ?, ?";
# push(@bind_values, $fileid);
# push(@bind_values, $record_id);
# push(@bind_values, $create_time);
# my $isql = "insert into $fheader_tbl ($field_list) VALUES ($qmark_list)";
# print "$isql\n";
# foreach (@bind_values) {
# print "value=-$_-\n";
# }
} return; }
sub is_tag { $line = shift; $tag = shift; $linetag = substr($line, 0, 5); return ($tag eq $linetag); }
|