1) 基础知识,不一定常用
线程池相关,只要返回Future/CompletableFuture 都是异步,因为它支持回调。这个看使用都,用不好又回到同步编程了
ThreadPoolExecutor executor=(ThreadPoolExecutor)Executors.newCachedThreadPool()
有 submit(Callable) / execute(Runnable) / shutdown() / shutdownNow() / awaitTermination(long timeout, TimeUnit unit)
不重要的方法:
getPoolSize(): This method returns the actual number of threads in the pool of the executor
getActiveCount(): This method returns the number of threads that are executing tasks in the executor
getCompletedTaskCount(): This method returns the number of tasks completed by the executor
注意不同的线程池种类:
newCachedThreadPool =》 The executor creates a new thread for each task that receives
有返回结果的:
One of the advantages of the Executor framework is that you can run concurrent tasks that return a result. The Java Concurrency API achieves this with the following two interfaces:
Callable: This interface has the call() method. In this method, you have to implement the logic of a task. The Callable interface is a parameterized interface, meaning you have to indicate the type of data the call() method will return.
Future: This interface has some methods to obtain the result generated by a Callable object and to manage its state.
如果只想取池中最先完成的线挰返回的结果
String result = executor.invokeAny(List
>)
如果想处理所有返回的结果:
List>resultList = executor.invokeAll(List>)
注意,循环处理结果的时候用CompletionService会效率更高些,因为它是按完成的顺序来组织结果列表的,避免了不必要的等待。
提交延迟执行的任务
Executors.newScheduledThreadPool(1)
ScheduledFuture> result = executor.schedule(task,i+1 , TimeUnit.SECONDS)
提交一个周期执行的任务
ScheduledFuture> result = executor.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
result.getDelay(TimeUnit.MILLISECONDS)
取消一个任务
Future result=executor.submit(Callable<>);
result.cancel(true);
如果要让任务的提交和处理区分开,需要用到CompletionService
ExecutorService executor=(ExecutorService)Executors.newCachedThreadPool();
CompletionService service = new ExecutorCompletionService<>(executor);
Future result = service.poll(20, TimeUnit.SECONDS)
result.get()
如果要处理池足够忙,开始拒绝新的任务,想要定制自己的处理方式,这时需要重写一个 RejectedExecutionHandler
public class RejectedTaskController implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {}
}
设置到线程池中:
executor.setRejectedExecutionHandler( new RejectedTaskController() )
如何测试呢? 我们知道 executor.shutdown() 之后就不会接受新的任务提交了,呵呵你懂的,此时再调用 executor.submit(task) 就会起效果了。
2) 高级知识,常用的
ForkJoinPool / FutureTask
这是一种特殊的线程池,基于 work-stealing 算法 提供以下两种主要操作:
fork : When you divide a task into smaller tasks and execute them using the framework
join : When a task waits for the finalization of the tasks it has created
推荐编程模型为:
A) new ForkJoinPool()
B) Creating a subclass of ForkJoinTask to be executed in the pool, like RecursiveTask/RecursiveAction
不需要处理返回值时,
If (problem size > default size){
tasks=divide(task);
execute(tasks);
} else {
resolve problem using another algorithm;
}
注意ForkJoinPool是不对Runnable或Callalble 的任务适用 Work-Stealing 算法的, 只认ForkJoinTask
invoke(ForkJoinTask task)
invokeAll(ForkJoinTask>... tasks) 可变参数
invokeAll(Collection tasks) : (参数可是ArrayList object, a LinkedList object, or a TreeSet object) of objects of a generic type T. This generic type T must be the ForkJoinTask class or a subclass of it.
差点忘了, invoke开头的方法都是同步调用啊。
Although the ForkJoinPool class is designed to execute an object of ForkJoinTask, you can also execute Runnable and Callable objects directly. 记住不会适用Work-Stealing 算法
You may also use the adapt() method of the ForkJoinTask class that accepts a Callable object or a Runnable object and returns a ForkJoinTask object to execute that task.
invoke(ForkJoinTask.adapt(Callable/Runnable) )
需要处理返回值时, 推荐使用编程模型如下:
If (problem size > size){
tasks=Divide(task);
execute(tasks);
groupResults()
return result;
} else {
resolve problem;
return result;
}
例如
if (end-start<100) {
result=count(line, start, end, word);
} else {
int mid=(start+end)/2;
LineTask task1=new LineTask(line, start, mid, word);
LineTask task2=new LineTask(line, mid, end, word);
invokeAll(task1, task2);
try {
result=groupResults(task1.get(),task2.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
return result;
}
pool.shutdown();
try {
pool.awaitTermination(1, TimeUnit.DAYS);
} catch (InterruptedException e) {
e.printStackTrace();
}
这里有一个坑啊,只有使用invoke()之类的同步调用的时候,Work-Stealing算法才会起作用。
如果要异步来运行任务,但是同步获取结果,这样效率会稍高一点,但是只有在调用fork() / join()的时候.
if (end-start<100) {
result=count(line, start, end, word);
} else {
int mid=(start+end)/2;
LineTask task1=new LineTask(line, start, mid, word);
task1.fork()
LineTask task2=new LineTask(line, mid, end, word);
task2.fork()
try {
result=groupResults(task1.get(),task2.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
return result;
}
另一个例子:
Class ReverseTask extends RecursiveTask>
{
@Override
protected List compute()
{
List tasks=new ArrayList<>();
for (int i = 0; i < content.length; i++) {
FolderProcessor task = new FolderProcessor(content[i].getAbsolutePath(), extension);
task.fork();
tasks.add(task);
}
for (FolderProcessor item: tasks) {
list.addAll(item.join());
}
}
}
主程序中一定要用join()来等待最终的结果。
There are two main differences between the get() and the join() methods:
1) The join() method can't be interrupted. If you interrupt the thread that called the join() method, the method throws an InterruptedException exception.
2) While the get() method will return an ExecutionException exception if the tasks throw any unchecked exception, the join() method will return a RuntimeException exception.
在 RecursiveTask / RecursiveAction 的 compute() 方法中,是不能throws checked Exception的,所以要自己处理,但是可以unchecked exception like NumberFormatException.
阅读(2841) | 评论(0) | 转发(0) |