CompletableFuture
是由并发大师Doug Lea
编写于JDK8中,可以显式的主动控制任务完成并设置结果和状态,并在任务完成后可以结合CompletionStage
提供的接续方法完成相关回调处理。
Completable概述
JDK8之前我们使用异步调用:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18ExecutorService executorService = new ThreadPoolExecutor(4, 10, 2000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1000));
Future<?> result = executorService.submit(() -> {
System.out.println("start to make cake...");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("finished cake...");
return "cake";
});
try {
System.out.println(result.get());
System.out.println("I go to drink milk.");
} catch (Exception e) {
e.printStackTrace();
}
JDK8我们使用CompletableFuture
:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19CompletableFuture.supplyAsync(() -> {
System.out.println("start to make cake...");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("finished cake...");
return "cake";
}).thenAccept(cake -> {
System.out.println("I eat " + cake);
});
System.out.println("I go to drink milk.");
try {
Thread.currentThread().join();
} catch (InterruptedException e) {
e.printStackTrace();
}
代码对比可以看出,JDK5提供的Future
提供异步计算结果的能力,通过isDone()
方法检查计算是否完成或者get()
阻塞调用线程等待计算结果返回,又或者使用cancel()
取消任务的执行,虽然Future
以及相关使用方法提供了异步执行的能力,但是对于计算结果的获取却
很不便利,只能通过轮询或者阻塞方式得到,阻塞的方式显然违背了异步编程的初衷,轮询的方式又会浪费无畏的CPU资源,而且不能及时得到返回结果。
JDK8提供了CompletableFuture
类,实现了java.util.concurrent.Future
和java.util.concurrent.CompletionStage
接口,所以我们还可以像以前那样通过阻塞或者轮询的方式获得结果,尽管这种方式不推荐使用。CompletionStage
定义了任务执行完成后不同阶段,提供任务执行完成后进行操作的回调,CompletableFuture
类中包含了大概50多个方法,但从命名是有规律可循的
- 用于创建
CompletableFuture
对象
1 | public CompletableFuture() |
- 用于判断当前状态和同步等待取值
1 | public T join() |
用于显示得控制
CompletableFuture
完成状态1
2
3public boolean complete(T value)
public boolean completeExceptionally(Throwable ex)
public boolean cancel(boolean mayInterruptIfRunning)CompletableFutre
的接续方法,任务完成后的回调1
2
3
4
5
6
7
8
9
10
11
12
13thenApply, thenApplyAsync
thenAccept, thenAcceptAsync
thenRun, thenRunAsync
thenCombine, thenCombineAsync
thenAcceptBoth, thenAcceptBothAsync
runAfterBoth, runAfterBothAsync
applyToEither, applyToEitherAsync
acceptEither, acceptEitherAsync
runAfterEither, runAfterEitherAsync
thenCompose, thenComposeAsync
whenComplete, whenCompleteAsync
handle, handleAsync
exceptionally
以
Async
结尾的都是异步方法,如果指定了线程池则使用指定的线程池,否则在默认的ForkJoinPool.commonPool()
中执行
以run
开头的方法,其方法入参的lambda表达式一定是无参数
并且无返回值
即Runnable
接口
以supply
开头的方法,其方法入参的lambda表达式一定是无参数
并有返回值
即Supplier
接口
以accepet
开头或结尾的方法,其方法入参的lambda表达式一定是有参数
并无返回值
即Comsumer
接口
以apply
开头或结尾的方法,其方法入参的lambda表达式一定是有参数
并有返回值
即Function
接口
带有either
后缀的表示谁先完成则消费谁
源码分析
了解了CompletableFuture
的大概用法,我们从runAsync
入手看看其执行过程1
2
3 public static CompletableFuture<Void> runAsync(Runnable runnable) {
return asyncRunStage(asyncPool, runnable);
}
这里的asyncPool
通过判断是否支持并行化 默认使用ForkJoinPool.commonPool()
private static final boolean useCommonPool = (ForkJoinPool.getCommonPoolParallelism() > 1);
private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
1 | static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) { |