最近有个任务需使用到并发,试用了下perl6的supply与promise,挺好用的。
简介:
1、开启各15个线程的1个生产者与2个消费者A、B(AB同为耗时任务)
2、生产者按条件通过2个Channel向A与B分别发送数据(数据不需按原始顺序发送;按业务逻辑会发送不同数据,暂时按发送同一数据模拟)
3、数组@share为共享数据,消费者A、B会向其读取数据
4、消费者A处理接收的数据,不符合业务逻辑的数据会继续传入B进行处理 if $v % 2 {$supplierB.emit(1000);}
5、@aFir,@aSec
(分别与@retAdrgFIR、@retAdrgSEC对应)为消费者A、B各线程返回的结果(每个线程的结果保存在各个数组中)
6、过滤掉运行结果为空的结果:@retAdrgFIR
.=grep
({ $_}
); @retAdrgSEC
.=grep
({ $_}
);
最后要注意的是:
供应者需 sleep 0;消费者@pA与@pB分别为 sleep 0.2 sleep
0.1,这样有利于各个线程跑的任务相对均衡(因我使用的是Windows操作系统,CPU竞争策略属于抢占式)。当然
@pA设成与@pB一样也没问题。如果@pA sleep时间大于@pB,@pA sleep时间不能太大,否则可能出现B先结束,而@pA仍向关闭的通道B发送数据,但发送不了,任务永远不会结束。
-
my $TIME = now;
-
my $supplierA = Supplier.new;
-
my $channelA = $supplierA.Supply.Channel;
-
my $supplierB = Supplier.new;
-
my $channelB = $supplierB.Supply.Channel;
-
my @share=("C","D");
-
my $threads=15;
-
my (@pA,@pB);
-
for 1 .. $threads {
-
-
@pA.push: start {
-
my @aFir;
-
react {
-
whenever $channelA -> $v {
-
sleep 0.2;
-
if $v % 2 {$supplierB.emit(1000);}
-
say "channelA shareArr {@share[0]}:Thread {$*THREAD.id} got $v";
-
#my $h={"Val"=>$v};
-
@aFir.push: $v*2;
-
}
-
}
-
@aFir;
-
}
-
@pB.push: start {
-
-
my @aSec;
-
react {
-
whenever $channelB -> $v {
-
sleep 0.1;
-
say "channelB shareArr {@share[1]}:Thread {$*THREAD.id} got $v";
-
#my $h={"Val"=>$v};
-
@aSec.push: $v*2;
-
}
-
}
-
@aSec;
-
}
-
}
-
sleep 1;
-
my @promises;
-
for ^1000 -> $r {
-
push @promises, start {
-
sleep 0;
-
-
$supplierA.emit($r);
-
$supplierB.emit($r);
-
};
-
if @promises == $threads {
-
await Promise.anyof(@promises);
-
@promises .= grep({ !$_ });
-
}
-
}
-
-
await @promises;
-
-
$supplierA.done;
-
my @retAdrgFIR=await @pA;
-
$supplierB.done;
-
my @retAdrgSEC=await @pB;
-
-
#@retAdrgFIR .=grep({ $_});
-
#@retAdrgSEC .=grep({ $_});
-
dd @retAdrgFIR;
-
dd @retAdrgSEC;
-
-
#查看各线程最终分配执行的任务数
-
my @nFIR=@retAdrgFIR.map( -> $n {$n.List.elems});
-
my @nSEC=@retAdrgSEC.map( -> $n {$n.List.elems});
-
dd @nFIR;
-
dd @nSEC;
-
-
$TIME = now - $TIME;
-
say $TIME
如果线程中有自增计数操作,需
定义为atomicint类型的$total,使用$total'?此处符号有错,看官自行至raku管网文档搜索atomicint',相当于在代码临介点加LOOK。
-
my atomicint $total = 0;
-
await start { for ^20000 { $total'?'++ } } xx 4;
-
say $total;
阅读(1625) | 评论(0) | 转发(0) |