Chinaunix首页 | 论坛 | 博客
  • 博客访问: 485110
  • 博文数量: 279
  • 博客积分: 4467
  • 博客等级: 上校
  • 技术积分: 2830
  • 用 户 组: 普通用户
  • 注册时间: 2007-04-03 14:43
文章分类

全部博文(279)

文章存档

2013年(1)

2012年(39)

2011年(35)

2009年(29)

2008年(131)

2007年(44)

分类:

2008-03-12 16:58:32


#!/usr/bin/perl

#use strict;

use Net::FTP;
use DBI;
use Data::UUID;
use Fcntl ':flock';
require '/opt/ws1/run_control.pl';

my $ftphost=$ARGV[0];
my $ftpuser=$ARGV[1];
my $ftppass=$ARGV[2];
my $tbls=$ARGV[3];
my $tags=$ARGV[4];
my $mids=$ARGV[5];
my $fprefix=$ARGV[6];
my $fdir=$ARGV[7];

my $downloadfolder = "/var/tmp/";
my ($dst_fheader_tbl, $dst_dheader_tbl, $dst_ddetail_tbl, $dst_dfooter_tbl) = split(/,/, $tbls,4);
my @linetags = split(/,/, $tags);
my @mapids = split(/,/, $mids);

#print $mapids[0], $mapids[1], $mapids[2], $mapids[3];

#print $linetags[0], $linetags[1], $linetags[2], $linetags[3];

#exit;

# concurrency control

open(LOCKF, ">>", "/var/lock/$fprefix") or die "cannot acquire lock /var/lock/$fprefix";
flock(LOCKF, LOCK_EX | LOCK_NB) or die "another ftp2local $fprefix is running";
# concurrency control


#$size=@ARGV;

#if($size!=6){

#print "Usage: ftp2local.pl ,,, ,,, \n";

#}


# generate UUID

#print $ftphost, $ftpuser, $ftppass, $tbls, $maps, $fprefix, $fdir;

#$ug = new Data::UUID;

#$uuid1 = $ug->create_str();

#print "$uuid1\n";

# end generate UUID


my $ftp;
$ftp = Net::FTP->new("$ftphost", Debug => 0, Passive => 1)
      or die "Cannot connect to ftp server: $@";
$ftp->login("$ftpuser","$ftppass")
      or die "Cannot login ", $ftp->message;
$ftp->binary;
$ftp->cwd("$fdir");
# or die "Cannot change working directory ", $ftp->message;

my $flist;
@flist = $ftp->ls("$fprefix\*");

# or die "ls failed ", $ftp->message;

#print "array size=$#flist\n";


my $no_of_files = $#flist + 1;

if ($no_of_files <= 0) {
  $ftp->quit;
  goto BYE;
}
my @sflist = sort @flist;
my $downloadfile = $sflist[0];
my $localfile = $downloadfolder . $downloadfile;
my $remote_fsize = $ftp->size($downloadfile) or die "cannot get size", $ftp->message;
my $getstatus = $ftp->get($downloadfile, $localfile) or die "error downloading $downloadfile", $ftp->message;
if (!defined($getstatus)) {
  $ftp->quit;
  goto BYE;
}
my $local_fsize = -s "$localfile";
print "remote = $remote_fsize\n";
print "local = $local_fsize\n";
if ($remote_fsize != $local_fsize) {
  $ftp->quit;
  goto BYE;
}

#foreach (@flist) {

# print "$_\n" ;

#}


### BEGIN HANDLING THE DOWNLOADED FILE

my $stagedbh=DBI->connect("DBI:mysql:database=$localdatabase;host=$localdbhost","$localdbuser","$localdbpass",{'RaiseError'=>1,'AutoCommit' => 0}) or die "cannot connect!\n";

open(FH, "<:encoding(UTF-8)", $localfile) or die "cannot open $localfile:$!\n";

my $hline; # current line

#chomp($hline); # remove end of line char

my $cur_line = 0; # point to the current line

#my $fh_tableid = substr($hline, 0, 10);

#print "-$fh_tableid-\n";


$stagedbh->{AutoCommit} = 0; # disable autocommit for block level rollback

$stagedbh->{RaiseError}=1;
# use eval block to provide one exit point for error trapping to abort transaction as atomic

my @fheader_tbl=();
my @field_list=();
my @qmark_list=();
my @field_pos=();
eval{
  build_maps();
  foreach (@field_list) {
    print "---$_\n";
  }
## HANDLE HEADER LINE

  my $fileid = gen_uuid(); # unique across this session as key to refer to all 4 inbound table records

  my $create_time = get_datetime();
  my $record_id = 1; # repeating group for each row (1..n) for each group, header, details, footer

### HERE

  my $this_field_list;
  my $this_qmark_list;
  my $this_field_pos;
  my $this_fheader_tbl;
 while (<FH>) {
    chomp();
    $hline = $_;
    $cur_line++;
    if ($cur_line == 1) {
      print ("at file header\n");
      $this_fheader_tbl = $fheader_tbl[0];
      $this_field_list = @field_list[0];
      $this_qmark_list = @qmark_list[0];
      $this_field_pos = @field_pos[0];
    $record_id=1; # header line record count always 1

    }
    elsif (is_tag($hline, $linetags[1])) { # data header

    print ("at data header\n");
      $this_fheader_tbl = $fheader_tbl[1];
      $this_field_list = @field_list[1];
      $this_qmark_list = @qmark_list[1];
      $this_field_pos = @field_pos[1];
    $record_id=0; # reset line count for subsequent data details

    }
    elsif (is_tag($hline, $linetags[2])) { # data detail

    print ("at data detail\n");
      $this_fheader_tbl = $fheader_tbl[2];
      $this_field_list = @field_list[2];
      $this_qmark_list = @qmark_list[2];
      $this_field_pos = @field_pos[2];
    $record_id++; # add 1 for each data detail line

    }
    elsif (is_tag($hline, $linetags[3])) { # file footer

    print ("at file footer\n");
      $this_fheader_tbl = $fheader_tbl[3];
      $this_field_list = @field_list[3];
      $this_qmark_list = @qmark_list[3];
      $this_field_pos = @field_pos[3];
    $record_id = 1;
    }
    else {
      die("line not recognized");
    }
  my @fpos = split(/:/, $this_field_pos);
  my @bind_values;
  foreach (@fpos) {
    ($src_begin, $src_end)= split(/,/,$_); # in "01-05" format

    $src_begin += 0; # convert from string to number i.e. add 0 to force an internal conv

    $src_end += 0;
    $src_len = $src_end - $src_begin + 1;
# print "begin=$src_begin; len=$src_len\n";

    push(@bind_values, substr($hline, $src_begin-1, $src_len)); # get the file line fields into bind value array for insert execute

  } #foreach

  $this_field_list .= ", fileid, record_id, create_time";
  $this_qmark_list .= ", ?, ?, ?";
  push(@bind_values, $fileid);
  push(@bind_values, $record_id);
  push(@bind_values, $create_time);
  my $isql = "insert into $this_fheader_tbl ($this_field_list) VALUES ($this_qmark_list)";
  print "$isql\n";
  foreach (@bind_values) {
    print "value=-$_-\n";
  }
  $sth2=$stagedbh->prepare(qq/$isql/);
  $sth2->execute(@bind_values);
 }; # end while FH


 $stagedbh->commit(); # master commit

};
# exit eval block

if($@){
  print "Warning: Transaction aborted:$@";
  $stagedbh->rollback();
  goto BYE;
}


BYE:
$stagedbh->disconnect();
$ftp->quit;
close(FH);
#concurrency control

flock(LOCKF, LOCK_UN);
close(LOCKF);
# end conconcurrency control

exit;


sub build_maps {
  # mapids[0] is file header mapid

  $n1 = scalar(@mapids);
  for ($i = 0; $i < $n1; $i++) {
    $sth=$stagedbh->prepare(qq/select * from database_field_map where map_id=$mapids[$i] order by src_columnnum/);
    $sth->execute();
    my $map_ref=[];
    ## get header map in memory

    $map_ref = $sth->fetchall_arrayref();
# my @bind_values=(); # array to hold the field values to insert in sequence

    foreach $r ( @{$map_ref} ) {
      my @row= @$r;
  # print "size of row = $#ary\n";

  # for ($i = 0; $i <= $#row; $i++) {

  # print "$row[$i]\n";

  # }

  # print "***$row[2]\n";

      $fheader_tbl[$i] = $row[1]; # assume each map type one table

      $field_list[$i] = $field_list[$i] . ", " . $row[2]; # create the insert sql field list

      $qmark_list[$i] = $qmark_list[$i] . ", " . "?"; # create the insert value question mark list - placement variables

  # print "***$row[4]\n";

      ($src_begin, $src_end)= split(/-/,$row[4]); # in "01-05" format

      $src_begin += 0; # convert from string to number i.e. add 0 to force an internal conv

      $src_end += 0;
      $src_len = $src_end - $src_begin + 1;
  # print "begin=$src_begin; len=$src_len\n";

      $field_pos[$i] = $field_pos[$i] . ":" . $src_begin . "," . $src_end;
# push(@bind_values, substr($hline, $src_begin-1, $src_len)); # get the file line fields into bind value array for insert execute

    }
    $field_list[$i] = substr($field_list[$i],1); # cut the first comma

    $qmark_list[$i] = substr($qmark_list[$i],1); # cut the first comma

    $field_pos[$i] = substr($field_pos[$i],1); # cut the first comma

# $field_list[$i] .= ", fileid, record_id, create_time";

# $qmark_list .= ", ?, ?, ?";

# push(@bind_values, $fileid);

# push(@bind_values, $record_id);

# push(@bind_values, $create_time);

# my $isql = "insert into $fheader_tbl ($field_list) VALUES ($qmark_list)";

# print "$isql\n";

# foreach (@bind_values) {

# print "value=-$_-\n";

# }

  }
  return;
}

sub is_tag {
  $line = shift;
  $tag = shift;
  $linetag = substr($line, 0, 5);
  return ($tag eq $linetag);
}

阅读(398) | 评论(0) | 转发(0) |
0

上一篇:outbound to mysql(local2sql_direct)

下一篇:cmd

给主人留下些什么吧!~~