Java并行编程–从并行任务集获取反馈
在并行任务启动后,强制性地从并行任务得到反馈。
假想有一个程序,可以发送批邮件,还使用了多线程机制。你想知道有多少邮件成功发送吗?你想知道在实际发送过程期间,这个批处理工作的实时进展吗?
要实现多线程的这种反馈,我们可以使用Callable接口。此接口的工作方式基本上与Runnable相同,但是执行方法(call())会返回一个值,该值反映了执行计算的结果。
-
package com.ricardozuasti;
-
-
import java.util.concurrent.Callable;
-
-
public class FictionalEmailSender implements Callable{
-
private String to;
-
private String subject;
-
private String body;
-
public FictionalEmailSender(String to, String subject, String body){
-
this.to = to;
-
this.subject = subject;
-
this.body = body;
-
}
-
-
@Override
-
public Boolean call() throws InterruptedException {
-
-
Thread.sleep(Math.round(Math.random()*0.5*1000));
-
-
if(Math.random()>0.2){
-
return true;
-
}else{
-
return false;
-
}
-
}
-
-
}
注意:Callable接口可用于返回任意数据类型,因此我们的任务可以返回我们需要的任何信息。
现在,我们使用一个线程池ExecutorService来发送邮件,由于我们的任务是以Callable接口实现的,我们提交执行的每个新任务,都会得到一个Future引用。注意我们要使用直接的构造器创建ExecutorService,而不是使用来自Executors的工具方法创建。这是因为使用指定类ThreadPoolExecutor提供了一些方法可以派上用场。
-
package com.ricardozuasti;
-
-
import java.util.concurrent.Future;
-
import java.util.concurrent.LinkedBlockingQueue;
-
import java.util.concurrent.ThreadPoolExecutor;
-
import java.util.concurrent.TimeUnit;
-
import java.util.ArrayList;
-
import java.util.List;
-
-
public class Concurrency2 {
-
public static void main(String[] args){
-
try{
-
ThreadPoolExecutor executor = new ThreadPoolExecutor(30, 30, 1,
-
TimeUnit.SECONDS, new LinkedBlockingQueue());
-
List> futures = new ArrayList>(9000);
-
-
for(int i=1000; i<10000; i++){
-
futures.add(executor.submit(new FictionalEmailSender(i+"@sina.com",
-
"Knock, knock, Neo", "The Matrix has you...")));
-
}
-
-
System.out.println("Starting shutdown...");
-
executor.shutdown();
-
-
-
while(!executor.isTerminated()){
-
executor.awaitTermination(1, TimeUnit.SECONDS);
-
int progress = Math.round((executor.getCompletedTaskCount()
-
*100)/executor.getTaskCount());
-
System.out.println(progress + "% done (" +
-
executor.getCompletedTaskCount() + " emails have been sent).");
-
}
-
-
int errorCount = 0;
-
int successCount = 0;
-
for(Future future : futures){
-
if(future.get()){
-
successCount++;
-
}else{
-
errorCount++;
-
}
-
}
-
System.out.println(successCount + " emails were successfully sent, but " +
-
errorCount + " failed.");
-
}catch(Exception ex){
-
ex.printStackTrace();
-
}
-
}
-
}
执行这个类,输出结果如下:
-
Starting shutdown...
-
1% done (118 emails have been sent).
-
2% done (232 emails have been sent).
-
3% done (358 emails have been sent).
-
5% done (478 emails have been sent).
-
6% done (587 emails have been sent).
-
7% done (718 emails have been sent).
-
9% done (850 emails have been sent).
-
10% done (969 emails have been sent).
-
……
所有的任务都由ExecutorService提交,我们开始它的关闭(防止提交新任务)并使用一个循环(实时场景,可能你会继续做其它的事情)来等待,直至所有任务都被执行完成、计算和打印当前每次迭代的进度。
注意,你可以存储executor引用,也可以在任意时间从其它线程查询它的计算结果和报告进程进度。
最后,使用Future集合引用,我们得到ExecutorService提交的每个Callable接口,通知成功发送的邮件数量和发送失败的邮件数量。
此结构不但易于使用,还使得相关性得到清晰的隔离,在调度程序和实际任务之间提供了一个预定义的通信机制。
阅读(2172) | 评论(0) | 转发(0) |