Chinaunix首页 | 论坛 | 博客
  • 博客访问: 226500
  • 博文数量: 36
  • 博客积分: 1188
  • 博客等级: 军士长
  • 技术积分: 802
  • 用 户 组: 普通用户
  • 注册时间: 2010-04-08 21:45
文章分类

全部博文(36)

文章存档

2020年(1)

2017年(2)

2015年(1)

2014年(1)

2013年(1)

2012年(3)

2011年(27)

分类: Windows平台

2017-08-18 23:11:24

众所周知,Spark的ML Pipelines类库用于构建机器学习的工作流,每一个PipelineStage 都会完成一个任务,如数据集处理转化,模型训练,参数设置或数据预测等。
Spark的ML Pipelines工作流大概是这个样子的。
val pipeline = new   
Pipeline().setStages(Array(labelIndexer,vectorAssembler,rfClassifier,labelConverter))  
val model = pipeline.fit(trainingData)
而perl的cpan上也有个Pipeline模块,作者是用perl的旧版oop实现的,在分析了源码和个人需求后,用Moose改写出了自已的Pipeline,主要实现了类ML Pipelines的DataFrame(改用perl的Data::Table模块)与Transformer功能。以下介绍如何使用Moose写Pipeline模块。

一、UML类图设计


二、模块间关系说明
  • Pipeline类的dispatcher属性是PipelineDispatch的实例对象
  • PipelineDispatch类继承自ipelineBase,属性segments是利用Moose的属性委托功能实现的用于存放PipelineSegment实例对象的数组引用
  • PipelineDispatch类内,属性dfhash是利用Moose的属性委托功能实现的用于存放各PipelineSegment->dispatch()返回值的hash引用:{ref($PipelineSegment)=>$df}即(YouSegmentClassName=>$df)
  • PipelineSegment类的store属性是Pipelinestore的实例对象
  • Pipelinestore类的功能,主要是在不同的PipelineSegment间存入数据或取出数据
dispatch()是核心方法,抽象方法如下:
  • dispatch()属Pipeline类方法
  • dispatch_loop()属Pipeline类方法,是dispatch()的内置方法
    • next()属PipelineDispatch类方法
内部调用关系如下:

点击(此处)折叠或打开

  1. Pipeline->dispatch(
  2.        Pipeline->dispatch_loop(
  3.           Pipeline::Dispatch->next(
  4.              Pipeline::Segment->prepare_dispatch(Pipeline);
  5.              my $df = Pipeline::Segment->dispatch();
  6.           );
  7.         );
  8.     );

注:自已写的继承自Pipeline::Segment的Segment类,即是Spark的ML Pipelines类的一个个Transformer

三、模块源码

3.1Pipeline类

点击(此处)折叠或打开

  1. package Pipeline;

  2. use Moose;

  3. #use namespace::clean;
  4. use Pipeline::Dispatch;

  5. has 'debug' => (
  6.     is => 'rw',
  7.     isa => 'Int',
  8.     default => 0,
  9. );
  10. has 'dispatcher' => (
  11.     is => 'ro',
  12.     isa => 'Pipeline::Dispatch',
  13.     default => sub { Pipeline::Dispatch->new(); },
  14.     handles => {
  15.         get_segment => 'get',
  16.         add_segment => 'add',
  17.         del_segment => 'delete'
  18.     }
  19. );

  20. has 'store' => (
  21.     is => 'rw',
  22.     isa => 'Pipeline::Store',
  23.     default => sub { Pipeline::Store->new() },
  24. );

  25. sub segments {
  26.     my $self = shift;
  27.     return $self->{dispatcher}->segments(@_);
  28. }

  29. sub dispatch {
  30.     my $self = shift;
  31.     $self->dispatch_loop();
  32.     $self->{dispatcher}->reset();
  33. }

  34. sub dispatch_loop {
  35.     my $self = shift;
  36.     $self->{dispatcher}->debug( $self->{debug} );
  37.     while ( $self->{dispatcher}->segment_available ) {
  38.         $self->{dispatcher}->next($self);
  39.     }
  40. }
  41. sub getDf {
  42.     my ($self,$segname) = @_;
  43.     
  44.     $self->{dispatcher}->getDf($segname);
  45. }

  46. #__PACKAGE__->meta->make_immutable;

  47. 1;

3.2Pipeline::Dispatch类

点击(此处)折叠或打开

  1. package Pipeline::Dispatch;

  2. use Moose;
  3. use Pipeline::Store;
  4. use Pipeline::Segment;
  5. extends 'Pipeline::Base';

  6. use Data::Printer;

  7. has 'segments' => (
  8.     traits => ['Array'],
  9.     is => 'rw',
  10.     isa => 'ArrayRef[Pipeline::Segment]',
  11.     default => sub { [] },
  12.     handles => {
  13.         get => 'get',
  14.         add => 'push',
  15.         get_next_segment => 'shift',
  16.         delete => 'delete',
  17.         segment_available => 'count'
  18.     }
  19. );

  20. has 'dispatched_segments' => (
  21.     is => 'rw',
  22.     isa => 'ArrayRef[Pipeline::Segment]',
  23.     default => sub { [] }
  24. );
  25. has 'dfhash' => (
  26.     traits => ['Hash'],
  27.     is => 'ro',
  28.     isa => 'HashRef',
  29.     default => sub { {} },
  30.     handles => {
  31.         _set_opt => 'set',
  32.         getDf => 'get',
  33.     }
  34. );

  35. sub setDf {
  36.     my ( $self, $obj, $df ) = @_;

  37.     if ( defined($obj) ) {
  38.         $self->_set_opt( ref($obj), $df );
  39.     }
  40.     return $self;

  41. }

  42. sub next {
  43.     my $self = shift;
  44.     my $pipe = shift;

  45.     my $segment = $self->get_next_segment();
  46.     $segment->prepare_dispatch($pipe);
  47.     $self->emit( "dispatching to " . ref($segment) ) if $self->debug;

  48.     my $df = $segment->dispatch();

  49.     #将segment->dispatch的返回值克隆一份保存进dfhash
  50.     if ( ref($df) eq 'Data::Table' ) {
  51.         $self->setDf( $segment, $df->clone() );
  52.     }
  53.     else {
  54.         $self->setDf( $segment, $df );
  55.         $df=undef;
  56.     }
  57.     push @{ $self->{dispatched_segments} }, $segment;
  58. }

  59. sub reset {
  60.     my $self = shift;
  61.     $self->segments( $self->{dispatched_segments} );
  62.     $self->dispatched_segments( [] );
  63. }

  64. #__PACKAGE__->meta->make_immutable;

  65. 1;

3.3Pipeline::Base类

点击(此处)折叠或打开

  1. package Pipeline::Base;

  2. use Moose;
  3. #use namespace::clean;

  4. has 'debug' => (
  5.     is => 'rw',
  6.     isa => 'Int',
  7.     default => 0,
  8. );

  9. sub emit {
  10.     my ( $self, $mesg ) = @_;
  11.     $self->log( $self->_format_message($mesg) ) if $self->debug;
  12. }

  13. sub log {
  14.     my ( $self, $mesg ) = @_;
  15.     print STDERR $mesg;
  16. }

  17. sub _format_message {
  18.     my ( $self, $mesg ) = @_;
  19.     my $class = ref($self);
  20.     return "[$class] $mesg\n";
  21. }

  22. #__PACKAGE__->meta->make_immutable;

  23. 1;

3.4Pipeline::Segment类

点击(此处)折叠或打开

  1. package Pipeline::Segment;

  2. use Moose;

  3. has 'store' => (
  4.     is => 'rw',
  5.     isa => 'Pipeline::Store',
  6.     default => sub { Pipeline::Store->new() },
  7. );

  8. sub dispatch {
  9.     my $self = shift;
  10. }

  11. sub prepare_dispatch {
  12.     my ( $self, $pipe ) = @_;
  13.     $self->store( $pipe->store );
  14. }

  15. #__PACKAGE__->meta->make_immutable;

  16. 1;

3.5Pipeline::Store类

点击(此处)折叠或打开

  1. package Pipeline::Store;

  2. use Moose;

  3. has 'storehash' => (
  4.     traits => ['Hash'],
  5.     is => 'ro',
  6.     isa => 'HashRef[Object]',
  7.     default => sub { {} },
  8.     handles => {
  9.         _set_opt => 'set',
  10.         get => 'get',
  11.     }
  12. );

  13. sub set {
  14.   my $self = shift;
  15.   my $obj = shift;
  16.    if (defined( $obj )) {
  17.     $self->_set_opt(ref($obj),$obj);
  18.   }
  19.   return $self;
  20.   
  21. }

  22. #__PACKAGE__->meta->make_immutable;

  23. 1

四、Example
4.1example.pl

点击(此处)折叠或打开

  1. package MyDf;
  2. use Moose;

  3. extends 'Pipeline::Segment';

  4. has 'df' => (
  5.     is => 'rw',
  6.     isa => 'Data::Table',
  7. );

  8. package MyData;
  9. use Moose;

  10. extends 'Pipeline::Segment';

  11. has 'df' => (
  12.     is => 'rw',
  13.     isa => 'Data::Table',
  14. );

  15. sub dispatch {
  16.     my $self = shift;
  17.     $self->store->set( MyDf->new( df => $self->{df} ) );
  18.     return $self->{df};
  19. }

  20. package MySeg1;
  21. use Moose;

  22. extends 'Pipeline::Segment';

  23. sub dispatch {
  24.     my $self = shift;
  25.     my $df = $self->store->get('MyDf');
  26.     #MySeg1将MyDf增加了一行合计数
  27.     $df->{df}->addRow( ['合计',8,undef], 3 );
  28.     return $df->{df};
  29. }

  30. package MySeg2;
  31. use Moose;

  32. extends 'Pipeline::Segment';

  33. sub dispatch {
  34.     my $self = shift;

  35.     my $df = $self->store->get('MyDf');
  36.     #MySeg2将MyDf增加了一列总金额
  37.     $df->{df}->addCol( [100,100,100,300],"total" ,3 );
  38.     return $df->{df};
  39. }

  40. package main;
  41. use lib './lib';
  42. use Pipeline;
  43. use Data::Table;
  44. use Data::Printer;

  45. my $headers = [ 'name', 'count', 'price' ];

  46. my $rows = [ [ 'A', '1', '100' ],
  47.              [ 'B', '2', '50' ],
  48.              [ 'C', '5', '20' ]];
  49. my $df = Data::Table->new( $rows, $headers, 0 );

  50. #p $df->csv;

  51. my $pipeline = Pipeline->new();
  52. $pipeline->debug(1);

  53. my $mydata = MyData->new( df => $df );
  54. my $seg1 = MySeg1->new();
  55. my $seg2 = MySeg2->new();
  56. $pipeline->add_segment( $mydata, $seg1, $seg2 );
  57. my $production = $pipeline->dispatch();
  58. #p $pipeline->store->get('MyDf')->{df}->csv;
  59. p $pipeline->getDf("MyData")->csv;
  60. p $pipeline->getDf("MySeg1")->csv;
  61. p $pipeline->getDf("MySeg2")->csv;
  62. #Author's blog:tianyv.github.io



阅读(2515) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~