Chinaunix首页 | 论坛 | 博客
  • 博客访问: 225697
  • 博文数量: 36
  • 博客积分: 1188
  • 博客等级: 军士长
  • 技术积分: 802
  • 用 户 组: 普通用户
  • 注册时间: 2010-04-08 21:45
文章分类

全部博文(36)

文章存档

2020年(1)

2017年(2)

2015年(1)

2014年(1)

2013年(1)

2012年(3)

2011年(27)

分类: PERL

2020-06-03 22:37:15

最近有个任务需使用到并发,试用了下perl6的supply与promise,挺好用的。

简介:
1、开启各15个线程的1个生产者与2个消费者A、B(AB同为耗时任务)
2、生产者按条件通过2个Channel向A与B分别发送数据(数据不需按原始顺序发送;按业务逻辑会发送不同数据,暂时按发送同一数据模拟)
3、数组@share为共享数据,消费者A、B会向其读取数据
4、消费者A处理接收的数据,不符合业务逻辑的数据会继续传入B进行处理  if $v % 2 {$supplierB.emit(1000);}
5、@aFir,@aSec(分别与@retAdrgFIR、@retAdrgSEC对应)为消费者A、B各线程返回的结果(每个线程的结果保存在各个数组中)
6、过滤掉运行结果为空的结果:@retAdrgFIR .=grep({ $_}); @retAdrgSEC .=grep({ $_});

最后要注意的是:
    供应者需 sleep 0;消费者@pA与@pB分别为 sleep 0.2 sleep 0.1,这样有利于各个线程跑的任务相对均衡(因我使用的是Windows操作系统,CPU竞争策略属于抢占式)。当然 @pA设成与@pB一样也没问题。如果@pA sleep时间大于@pB,@pA sleep时间不能太大,否则可能出现B先结束,而@pA仍向关闭的通道B发送数据,但发送不了,任务永远不会结束。


点击(此处)折叠或打开

  1. my $TIME = now;
  2.     my $supplierA = Supplier.new;
  3.     my $channelA = $supplierA.Supply.Channel;
  4.     my $supplierB = Supplier.new;
  5.     my $channelB = $supplierB.Supply.Channel;
  6.     my @share=("C","D");
  7.     my $threads=15;
  8.     my (@pA,@pB);
  9.     for 1 .. $threads {

  10.       @pA.push: start {
  11.         my @aFir;
  12.         react {
  13.             whenever $channelA -> $v {
  14.               sleep 0.2;
  15.               if $v % 2 {$supplierB.emit(1000);}
  16.               say "channelA shareArr {@share[0]}:Thread {$*THREAD.id} got $v";
  17.               #my $h={"Val"=>$v};
  18.               @aFir.push: $v*2;
  19.             }
  20.         }
  21.         @aFir;
  22.       }
  23.       @pB.push: start {

  24.         my @aSec;
  25.         react {
  26.             whenever $channelB -> $v {
  27.               sleep 0.1;
  28.               say "channelB shareArr {@share[1]}:Thread {$*THREAD.id} got $v";
  29.               #my $h={"Val"=>$v};
  30.               @aSec.push: $v*2;
  31.             }
  32.         }
  33.         @aSec;
  34.       }
  35.     }
  36.     sleep 1;
  37.     my @promises;
  38.     for ^1000 -> $r {
  39.       push @promises, start {
  40.             sleep 0;

  41.             $supplierA.emit($r);
  42.             $supplierB.emit($r);
  43.         };
  44.         if @promises == $threads {
  45.            await Promise.anyof(@promises);
  46.            @promises .= grep({ !$_ });
  47.         }
  48.     }

  49.     await @promises;

  50.     $supplierA.done;
  51.     my @retAdrgFIR=await @pA;
  52.     $supplierB.done;
  53.     my @retAdrgSEC=await @pB;

  54.     #@retAdrgFIR .=grep({ $_});
  55.     #@retAdrgSEC .=grep({ $_});
  56.     dd @retAdrgFIR;
  57.     dd @retAdrgSEC;

  58.     #查看各线程最终分配执行的任务数
  59.     my @nFIR=@retAdrgFIR.map( -> $n {$n.List.elems});
  60.     my @nSEC=@retAdrgSEC.map( -> $n {$n.List.elems});
  61.     dd @nFIR;
  62.     dd @nSEC;

  63.     $TIME = now - $TIME;
  64.     say $TIME

如果线程中有自增计数操作,需定义为atomicint类型的$total,使用$total'?此处符号有错,看官自行至raku管网文档搜索atomicint',相当于在代码临介点加LOOK。

点击(此处)折叠或打开

  1. my atomicint $total = 0;
  2. await start { for ^20000 { $total'?'++ } } xx 4;
  3. say $total;


阅读(1625) | 评论(0) | 转发(0) |
0

上一篇:利用Moose写Pileline模块

下一篇:没有了

给主人留下些什么吧!~~