CompletableFuture

CompletableFuture是由并发大师Doug Lea编写于JDK8中,可以显式的主动控制任务完成并设置结果和状态,并在任务完成后可以结合CompletionStage提供的接续方法完成相关回调处理。

Completable概述

JDK8之前我们使用异步调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
ExecutorService executorService = new ThreadPoolExecutor(4, 10, 2000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1000));
Future<?> result = executorService.submit(() -> {
System.out.println("start to make cake...");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("finished cake...");
return "cake";
});

try {
System.out.println(result.get());
System.out.println("I go to drink milk.");
} catch (Exception e) {
e.printStackTrace();
}

JDK8我们使用CompletableFuture:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
CompletableFuture.supplyAsync(() -> {
System.out.println("start to make cake...");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("finished cake...");
return "cake";
}).thenAccept(cake -> {
System.out.println("I eat " + cake);
});

System.out.println("I go to drink milk.");
try {
Thread.currentThread().join();
} catch (InterruptedException e) {
e.printStackTrace();
}

代码对比可以看出,JDK5提供的Future提供异步计算结果的能力,通过isDone()方法检查计算是否完成或者get()阻塞调用线程等待计算结果返回,又或者使用cancel()取消任务的执行,虽然Future以及相关使用方法提供了异步执行的能力,但是对于计算结果的获取却
很不便利,只能通过轮询或者阻塞方式得到,阻塞的方式显然违背了异步编程的初衷,轮询的方式又会浪费无畏的CPU资源,而且不能及时得到返回结果。
JDK8提供了CompletableFuture类,实现了java.util.concurrent.Futurejava.util.concurrent.CompletionStage接口,所以我们还可以像以前那样通过阻塞或者轮询的方式获得结果,尽管这种方式不推荐使用。
CompletionStage定义了任务执行完成后不同阶段,提供任务执行完成后进行操作的回调,CompletableFuture类中包含了大概50多个方法,但从命名是有规律可循的

  1. 用于创建CompletableFuture对象
1
2
3
4
5
6
7
public CompletableFuture()
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
  1. 用于判断当前状态和同步等待取值
1
2
3
4
5
6
public T join()
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
public T getNow(T valueIfAbsent)
public boolean isCancelled()
public boolean isCompletedExceptionally()
public boolean isDone()
  1. 用于显示得控制CompletableFuture完成状态

    1
    2
    3
    public boolean complete(T value)
    public boolean completeExceptionally(Throwable ex)
    public boolean cancel(boolean mayInterruptIfRunning)
  2. CompletableFutre的接续方法,任务完成后的回调

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    thenApply, thenApplyAsync
    thenAccept, thenAcceptAsync
    thenRun, thenRunAsync
    thenCombine, thenCombineAsync
    thenAcceptBoth, thenAcceptBothAsync
    runAfterBoth, runAfterBothAsync
    applyToEither, applyToEitherAsync
    acceptEither, acceptEitherAsync
    runAfterEither, runAfterEitherAsync
    thenCompose, thenComposeAsync
    whenComplete, whenCompleteAsync
    handle, handleAsync
    exceptionally

Async结尾的都是异步方法,如果指定了线程池则使用指定的线程池,否则在默认的ForkJoinPool.commonPool()中执行
run开头的方法,其方法入参的lambda表达式一定是无参数 并且无返回值Runnable接口
supply开头的方法,其方法入参的lambda表达式一定是无参数有返回值Supplier接口
accepet开头或结尾的方法,其方法入参的lambda表达式一定是有参数无返回值Comsumer接口
apply开头或结尾的方法,其方法入参的lambda表达式一定是有参数有返回值Function接口
带有either后缀的表示谁先完成则消费谁

源码分析

了解了CompletableFuture的大概用法,我们从runAsync入手看看其执行过程

1
2
3
 public static CompletableFuture<Void> runAsync(Runnable runnable) {
return asyncRunStage(asyncPool, runnable);
}

这里的asyncPool通过判断是否支持并行化 默认使用ForkJoinPool.commonPool()

private static final boolean useCommonPool = (ForkJoinPool.getCommonPoolParallelism() > 1);
private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) {
if (f == null) throw new NullPointerException();
CompletableFuture<Void> d = new CompletableFuture<Void>();
e.execute(new AsyncRun(d, f));
return d;
}
@SuppressWarnings("serial")
static final class AsyncRun extends ForkJoinTask<Void>
implements Runnable, AsynchronousCompletionTask {
CompletableFuture<Void> dep; Runnable fn;
AsyncRun(CompletableFuture<Void> dep, Runnable fn) {
this.dep = dep; this.fn = fn;
}

public final Void getRawResult() { return null; }
public final void setRawResult(Void v) {}
public final boolean exec() { run(); return true; }

public void run() {
CompletableFuture<Void> d; Runnable f;
if ((d = dep) != null && (f = fn) != null) {
dep = null; fn = null;
if (d.result == null) {
try {
f.run();
d.completeNull();
} catch (Throwable ex) {
d.completeThrowable(ex);
}
}
d.postComplete();
}
}
}
坚持原创技术分享,您的支持将鼓励我继续创作!