众所周知,Spark的ML Pipelines类库用于构建机器学习的工作流,每一个PipelineStage 都会完成一个任务,如数据集处理转化,模型训练,参数设置或数据预测等。
Spark的ML Pipelines工作流大概是这个样子的。
val pipeline = new
Pipeline().setStages(Array(labelIndexer,vectorAssembler,rfClassifier,labelConverter))
val model = pipeline.fit(trainingData)
而perl的cpan上也有个Pipeline模块,作者是用perl的旧版oop实现的,在分析了源码和个人需求后,用Moose改写出了自已的Pipeline,主要实现了类ML
Pipelines的DataFrame(改用perl的Data::Table模块)与Transformer功能。以下介绍如何使用Moose写Pipeline模块。
一、UML类图设计
二、模块间关系说明
-
Pipeline类的dispatcher属性是PipelineDispatch的实例对象
-
PipelineDispatch类继承自ipelineBase,属性segments是利用Moose的属性委托功能实现的用于存放PipelineSegment实例对象的数组引用
-
PipelineDispatch类内,属性dfhash是利用Moose的属性委托功能实现的用于存放各PipelineSegment->dispatch()返回值的hash引用:{ref($PipelineSegment)=>$df}即(YouSegmentClassName=>$df)
-
PipelineSegment类的store属性是Pipelinestore的实例对象
-
Pipelinestore类的功能,主要是在不同的PipelineSegment间存入数据或取出数据
dispatch()是核心方法,抽象方法如下:
-
dispatch()属Pipeline类方法
-
dispatch_loop()属Pipeline类方法,是dispatch()的内置方法
-
next()属PipelineDispatch类方法
内部调用关系如下:
-
Pipeline->dispatch(
-
Pipeline->dispatch_loop(
-
Pipeline::Dispatch->next(
-
Pipeline::Segment->prepare_dispatch(Pipeline);
-
my $df = Pipeline::Segment->dispatch();
-
);
-
);
-
);
注:自已写的继承自Pipeline::Segment的Segment类,即是Spark的ML Pipelines类的一个个Transformer
三、模块源码
3.1Pipeline类
-
package Pipeline;
-
-
use Moose;
-
-
#use namespace::clean;
-
use Pipeline::Dispatch;
-
-
has 'debug' => (
-
is => 'rw',
-
isa => 'Int',
-
default => 0,
-
);
-
has 'dispatcher' => (
-
is => 'ro',
-
isa => 'Pipeline::Dispatch',
-
default => sub { Pipeline::Dispatch->new(); },
-
handles => {
-
get_segment => 'get',
-
add_segment => 'add',
-
del_segment => 'delete'
-
}
-
);
-
-
has 'store' => (
-
is => 'rw',
-
isa => 'Pipeline::Store',
-
default => sub { Pipeline::Store->new() },
-
);
-
-
sub segments {
-
my $self = shift;
-
return $self->{dispatcher}->segments(@_);
-
}
-
-
sub dispatch {
-
my $self = shift;
-
$self->dispatch_loop();
-
$self->{dispatcher}->reset();
-
}
-
-
sub dispatch_loop {
-
my $self = shift;
-
$self->{dispatcher}->debug( $self->{debug} );
-
while ( $self->{dispatcher}->segment_available ) {
-
$self->{dispatcher}->next($self);
-
}
-
}
-
sub getDf {
-
my ($self,$segname) = @_;
-
-
$self->{dispatcher}->getDf($segname);
-
}
-
-
#__PACKAGE__->meta->make_immutable;
-
-
1;
3.2Pipeline::Dispatch类
-
package Pipeline::Dispatch;
-
-
use Moose;
-
use Pipeline::Store;
-
use Pipeline::Segment;
-
extends 'Pipeline::Base';
-
-
use Data::Printer;
-
-
has 'segments' => (
-
traits => ['Array'],
-
is => 'rw',
-
isa => 'ArrayRef[Pipeline::Segment]',
-
default => sub { [] },
-
handles => {
-
get => 'get',
-
add => 'push',
-
get_next_segment => 'shift',
-
delete => 'delete',
-
segment_available => 'count'
-
}
-
);
-
-
has 'dispatched_segments' => (
-
is => 'rw',
-
isa => 'ArrayRef[Pipeline::Segment]',
-
default => sub { [] }
-
);
-
has 'dfhash' => (
-
traits => ['Hash'],
-
is => 'ro',
-
isa => 'HashRef',
-
default => sub { {} },
-
handles => {
-
_set_opt => 'set',
-
getDf => 'get',
-
}
-
);
-
-
sub setDf {
-
my ( $self, $obj, $df ) = @_;
-
-
if ( defined($obj) ) {
-
$self->_set_opt( ref($obj), $df );
-
}
-
return $self;
-
-
}
-
-
sub next {
-
my $self = shift;
-
my $pipe = shift;
-
-
my $segment = $self->get_next_segment();
-
$segment->prepare_dispatch($pipe);
-
$self->emit( "dispatching to " . ref($segment) ) if $self->debug;
-
-
my $df = $segment->dispatch();
-
-
#将segment->dispatch的返回值克隆一份保存进dfhash
-
if ( ref($df) eq 'Data::Table' ) {
-
$self->setDf( $segment, $df->clone() );
-
}
-
else {
-
$self->setDf( $segment, $df );
-
$df=undef;
-
}
-
push @{ $self->{dispatched_segments} }, $segment;
-
}
-
-
sub reset {
-
my $self = shift;
-
$self->segments( $self->{dispatched_segments} );
-
$self->dispatched_segments( [] );
-
}
-
-
#__PACKAGE__->meta->make_immutable;
-
-
1;
3.3Pipeline::Base类
-
package Pipeline::Base;
-
-
use Moose;
-
#use namespace::clean;
-
-
has 'debug' => (
-
is => 'rw',
-
isa => 'Int',
-
default => 0,
-
);
-
-
sub emit {
-
my ( $self, $mesg ) = @_;
-
$self->log( $self->_format_message($mesg) ) if $self->debug;
-
}
-
-
sub log {
-
my ( $self, $mesg ) = @_;
-
print STDERR $mesg;
-
}
-
-
sub _format_message {
-
my ( $self, $mesg ) = @_;
-
my $class = ref($self);
-
return "[$class] $mesg\n";
-
}
-
-
#__PACKAGE__->meta->make_immutable;
-
-
1;
3.4Pipeline::Segment类
-
package Pipeline::Segment;
-
-
use Moose;
-
-
has 'store' => (
-
is => 'rw',
-
isa => 'Pipeline::Store',
-
default => sub { Pipeline::Store->new() },
-
);
-
-
sub dispatch {
-
my $self = shift;
-
}
-
-
sub prepare_dispatch {
-
my ( $self, $pipe ) = @_;
-
$self->store( $pipe->store );
-
}
-
-
#__PACKAGE__->meta->make_immutable;
-
-
1;
3.5Pipeline::Store类
-
package Pipeline::Store;
-
-
use Moose;
-
-
has 'storehash' => (
-
traits => ['Hash'],
-
is => 'ro',
-
isa => 'HashRef[Object]',
-
default => sub { {} },
-
handles => {
-
_set_opt => 'set',
-
get => 'get',
-
}
-
);
-
-
sub set {
-
my $self = shift;
-
my $obj = shift;
-
if (defined( $obj )) {
-
$self->_set_opt(ref($obj),$obj);
-
}
-
return $self;
-
-
}
-
-
#__PACKAGE__->meta->make_immutable;
-
-
1
四、Example
4.1example.pl
-
package MyDf;
-
use Moose;
-
-
extends 'Pipeline::Segment';
-
-
has 'df' => (
-
is => 'rw',
-
isa => 'Data::Table',
-
);
-
-
package MyData;
-
use Moose;
-
-
extends 'Pipeline::Segment';
-
-
has 'df' => (
-
is => 'rw',
-
isa => 'Data::Table',
-
);
-
-
sub dispatch {
-
my $self = shift;
-
$self->store->set( MyDf->new( df => $self->{df} ) );
-
return $self->{df};
-
}
-
-
package MySeg1;
-
use Moose;
-
-
extends 'Pipeline::Segment';
-
-
sub dispatch {
-
my $self = shift;
-
my $df = $self->store->get('MyDf');
-
#MySeg1将MyDf增加了一行合计数
-
$df->{df}->addRow( ['合计',8,undef], 3 );
-
return $df->{df};
-
}
-
-
package MySeg2;
-
use Moose;
-
-
extends 'Pipeline::Segment';
-
-
sub dispatch {
-
my $self = shift;
-
-
my $df = $self->store->get('MyDf');
-
#MySeg2将MyDf增加了一列总金额
-
$df->{df}->addCol( [100,100,100,300],"total" ,3 );
-
return $df->{df};
-
}
-
-
package main;
-
use lib './lib';
-
use Pipeline;
-
use Data::Table;
-
use Data::Printer;
-
-
my $headers = [ 'name', 'count', 'price' ];
-
-
my $rows = [ [ 'A', '1', '100' ],
-
[ 'B', '2', '50' ],
-
[ 'C', '5', '20' ]];
-
my $df = Data::Table->new( $rows, $headers, 0 );
-
-
#p $df->csv;
-
-
my $pipeline = Pipeline->new();
-
$pipeline->debug(1);
-
-
my $mydata = MyData->new( df => $df );
-
my $seg1 = MySeg1->new();
-
my $seg2 = MySeg2->new();
-
$pipeline->add_segment( $mydata, $seg1, $seg2 );
-
my $production = $pipeline->dispatch();
-
#p $pipeline->store->get('MyDf')->{df}->csv;
-
p $pipeline->getDf("MyData")->csv;
-
p $pipeline->getDf("MySeg1")->csv;
-
p $pipeline->getDf("MySeg2")->csv;
-
#Author's blog:tianyv.github.io
阅读(2515) | 评论(0) | 转发(0) |