baeldung 2016-08-16
1. 引言
本教程是对 Java 8 并发 API 中引入的 CompletableFuture 类的功能和使用场景的全面介绍。
2. Java 中的异步计算
异步计算难以推理。通常,我们希望将任何计算视为一系列步骤,但在异步计算中,以回调形式表示的操作往往分散在代码各处,或彼此深度嵌套。当我们需要处理某个步骤中可能发生的错误时,情况会变得更糟。
Java 5 引入了 Future 接口,用于表示异步计算的结果,但它没有任何方法来组合这些计算或处理可能出现的错误。
Java 8 引入了 CompletableFuture 类。它不仅实现了 Future 接口,还实现了 CompletionStage 接口。该接口定义了可与其他步骤组合的异步计算步骤的契约。
CompletableFuture 既是一个构建块,也是一个框架,提供了大约 50 种不同的方法,用于组合、合并和执行异步计算步骤,并处理错误。
如此庞大的 API 可能令人望而生畏,但它们大多可以归入几个清晰明确的使用场景。
3. 将 CompletableFuture 用作简单的 Future
首先,CompletableFuture 类实现了 Future 接口,因此我们可以将其作为 Future 的实现使用,但具备额外的完成逻辑。
例如,我们可以使用无参构造函数创建一个此类的实例,以表示某个未来的计算结果,将其交给消费者,并在未来某个时间通过 complete 方法完成它。消费者可以调用 get 方法阻塞当前线程,直到获得该结果。
下面的示例中,我们有一个方法创建了一个 CompletableFuture 实例,然后在另一个线程中启动某些计算并立即返回该 Future:
public Future<String> calculateAsync() throws InterruptedException {
CompletableFuture<String> completableFuture = new CompletableFuture<>();
Executors.newCachedThreadPool().submit(() -> {
Thread.sleep(500);
completableFuture.complete("Hello");
return null;
});
return completableFuture;
}
为了启动计算,我们使用了 Executor API。这种创建和完成 CompletableFuture 的方式可以与任何并发机制或 API 一起使用,包括原始线程。
注意:calculateAsync 方法返回的是一个 Future 实例。
我们只需调用该方法,获取 Future 实例,并在准备好阻塞等待结果时调用其 get 方法。
另外请注意,get 方法会抛出一些受检异常,即 ExecutionException(封装了计算过程中发生的异常)和 InterruptedException(表示线程在活动之前或期间被中断):
Future<String> completableFuture = calculateAsync();
// ...
String result = completableFuture.get();
assertEquals("Hello", result);
如果我们已经知道计算结果,可以使用静态方法 completedFuture,传入代表该结果的参数。这样,Future 的 get 方法将永远不会阻塞,而是立即返回该结果:
Future<String> completableFuture =
CompletableFuture.completedFuture("Hello");
// ...
String result = completableFuture.get();
assertEquals("Hello", result);
在另一种场景中,我们可能希望取消 Future 的执行。
4. 封装计算逻辑的 CompletableFuture
上面的代码允许我们选择任意并发执行机制,但如果想跳过这些样板代码并直接异步执行某些代码呢?
静态方法 runAsync 和 supplyAsync 允许我们分别从 Runnable 和 Supplier 函数式类型创建 CompletableFuture 实例。
Runnable 和 Supplier 是函数式接口,得益于 Java 8 的新特性,我们可以将其实例作为 Lambda 表达式传递。
Runnable接口是在线程中使用的老接口,不能返回值。Supplier是一个泛型函数式接口,其唯一方法不接受参数,但返回一个参数化类型的值。
这使我们可以提供一个 Supplier 实例作为 Lambda 表达式来执行计算并返回结果。用法非常简单:
CompletableFuture<String> future
= CompletableFuture.supplyAsync(() -> "Hello");
// ...
assertEquals("Hello", future.get());
5. 处理异步计算的结果
处理计算结果最通用的方式是将其传递给一个函数。thenApply 方法正是为此设计:它接收一个 Function 实例,使用它处理结果,并返回一个持有函数返回值的新 Future:
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future = completableFuture
.thenApply(s -> s + " World");
assertEquals("Hello World", future.get());
如果我们不需要沿 Future 链向下返回值,可以使用 Consumer 函数式接口。它的唯一方法接收一个参数,不返回值。
CompletableFuture 提供了对应的方法:thenAccept 接收一个 Consumer,并将计算结果传递给它。最终的 future.get() 调用将返回 Void 类型的实例:
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<Void> future = completableFuture
.thenAccept(s -> System.out.println("Computation returned: " + s));
future.get();
最后,如果我们既不需要计算值,也不想在链的末尾返回任何值,可以将 Runnable Lambda 传递给 thenRun 方法。在以下示例中,我们在调用 future.get() 后仅在控制台打印一行:
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<Void> future = completableFuture
.thenRun(() -> System.out.println("Computation finished."));
future.get();
6. 组合多个 Future
CompletableFuture API 最强大的部分在于能够将多个 CompletableFuture 实例组合成一系列计算步骤。
这种链式组合的结果本身就是一个 CompletableFuture,允许进一步链式操作和组合。这种模式在函数式语言中非常普遍,通常被称为 monadic(单子)设计模式。
在以下示例中,我们使用 thenCompose 方法顺序地链接两个 Future。
注意:此方法接收一个返回 CompletableFuture 实例的函数。该函数的参数是上一步计算的结果。这使我们可以在下一个 CompletableFuture 的 Lambda 中使用该值:
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "Hello")
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));
assertEquals("Hello World", completableFuture.get());
thenCompose 方法与 thenApply 一起构成了 monadic 模式的基石。它们与 Java 8 中 Stream 和 Optional 类的 map 与 flatMap 方法密切相关。
这两个方法都接收一个函数并将其应用于计算结果,但 thenCompose(相当于 flatMap)接收的函数返回的是相同类型的另一个对象。这种函数结构允许我们将这些类的实例作为构建块进行组合。
如果我们希望执行两个独立的 Future 并对其结果进行某种操作,可以使用 thenCombine 方法。它接收一个 Future 和一个接受两个参数的 Function 来处理两个结果:
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "Hello")
.thenCombine(CompletableFuture.supplyAsync(
() -> " World"), (s1, s2) -> s1 + s2);
assertEquals("Hello World", completableFuture.get());
更简单的情况是:我们想对两个 Future 的结果做些事情,但不需要将结果继续传递下去。这时可以使用 thenAcceptBoth 方法:
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> "Hello")
.thenAcceptBoth(CompletableFuture.supplyAsync(() -> " World"),
(s1, s2) -> System.out.println(s1 + s2));
7. thenApply() 与 thenCompose() 的区别
在前面的章节中,我们展示了 thenApply() 和 thenCompose() 的示例。这两个 API 都有助于链接不同的 CompletableFuture 调用,但它们的用途不同。
7.1. thenApply()
我们可以使用此方法处理前一个调用的结果。但关键点在于:返回类型将是所有调用的组合结果。
因此,当我们想要转换 CompletableFuture 调用的结果时,该方法非常有用:
CompletableFuture<Integer> finalResult = compute().thenApply(s -> s + 1);
7.2. thenCompose()
thenCompose() 与 thenApply() 类似,两者都返回一个新的 CompletionStage。但 thenCompose() 将前一个阶段作为参数传入,并会“扁平化”结果,直接返回包含最终结果的 Future,而不是像 thenApply() 那样返回嵌套的 Future:
CompletableFuture<Integer> computeAnother(Integer i){
return CompletableFuture.supplyAsync(() -> 10 + i);
}
CompletableFuture<Integer> finalResult = compute().thenCompose(this::computeAnother);
因此,如果目标是链接 CompletableFuture 方法,最好使用 thenCompose()。
此外,请注意:这两个方法的区别类似于 map() 与 flatMap() 的区别。
8. 并行运行多个 Future
当我们需要并行执行多个 Future 时,通常希望等待它们全部完成,然后处理其组合结果。
CompletableFuture.allOf 静态方法允许我们等待作为变长参数提供的所有 Future 完成:
CompletableFuture<String> future1
= CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2
= CompletableFuture.supplyAsync(() -> "Beautiful");
CompletableFuture<String> future3
= CompletableFuture.supplyAsync(() -> "World");
CompletableFuture<Void> combinedFuture
= CompletableFuture.allOf(future1, future2, future3);
// ...
combinedFuture.get();
assertTrue(future1.isDone());
assertTrue(future2.isDone());
assertTrue(future3.isDone());
注意:CompletableFuture.allOf() 的返回类型是 CompletableFuture<Void>。该方法的局限性在于它不会返回所有 Future 的组合结果。我们需要手动从各个 Future 中获取结果。幸运的是,CompletableFuture.join() 方法结合 Java 8 的 Stream API 使得这一操作变得简单:
String combined = Stream.of(future1, future2, future3)
.map(CompletableFuture::join)
.collect(Collectors.joining(" "));
assertEquals("Hello Beautiful World", combined);
CompletableFuture.join() 方法类似于 get 方法,但如果 Future 未正常完成,它会抛出非受检异常。这使得它可以在 Stream.map() 方法中作为方法引用使用。
9. 错误处理
在异步计算步骤链中进行错误处理时,我们需要以类似 throw/catch 的方式适配异常处理逻辑。
CompletableFuture 类没有使用语法块捕获异常,而是提供了特殊的 handle 方法。该方法接收两个参数:计算结果(如果成功完成)和抛出的异常(如果某计算步骤未正常完成)。
在以下示例中,我们使用 handle 方法在异步计算问候语因未提供名称而出错时提供默认值:
String name = null;
// ...
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> {
if (name == null) {
throw new RuntimeException("Computation error!");
}
return "Hello, " + name;
}).handle((s, t) -> s != null ? s : "Hello, Stranger!");
assertEquals("Hello, Stranger!", completableFuture.get());
另一种场景:假设我们希望像第一个示例那样手动用值完成 Future,但也希望有能力用异常完成它。completeExceptionally 方法正是为此设计。以下示例中的 completableFuture.get() 方法将抛出一个 ExecutionException,其原因为 RuntimeException:
CompletableFuture<String> completableFuture = new CompletableFuture<>();
// ...
completableFuture.completeExceptionally(
new RuntimeException("Calculation failed!"));
// ...
completableFuture.get(); // ExecutionException
在上面的例子中,我们也可以使用 handle 方法异步处理异常,但使用 get 方法则可以采用更典型的同步异常处理方式。
10. 异步方法(Async Methods)
CompletableFuture 类的流式 API 中,大多数方法都有两个带 Async 后缀的额外变体。这些方法通常用于在另一个线程中运行对应的执行步骤。
- 不带
Async后缀的方法使用调用线程执行下一个阶段。 - 带
Async后缀但不带 Executor 参数的方法使用公共的 Fork/Join 池(即ForkJoinPool.commonPool())执行步骤(前提是并行度 > 1)。 - 带
Async后缀且带 Executor 参数的方法使用传入的Executor执行步骤。
以下是一个修改后的示例,它使用 Function 实例处理计算结果。唯一的可见区别是使用了 thenApplyAsync 方法,但在底层,函数的应用被包装为 ForkJoinTask 实例(有关 Fork/Join 框架的更多信息,请参阅《Java Fork/Join 框架指南》)。这使我们能够进一步并行化计算,更高效地利用系统资源:
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future = completableFuture
.thenApplyAsync(s -> s + " World");
assertEquals("Hello World", future.get());
11. JDK 9 中的 CompletableFuture API 增强
Java 9 引入了新的实例方法,提高了异步计算的灵活性和易用性:
Executor defaultExecutor()CompletableFuture<U> newIncompleteFuture()CompletableFuture<T> copy()CompletionStage<T> minimalCompletionStage()CompletableFuture<T> completeAsync(Supplier<? extends T> supplier, Executor executor)CompletableFuture<T> completeAsync(Supplier<? extends T> supplier)CompletableFuture<T> orTimeout(long timeout, TimeUnit unit)CompletableFuture<T> completeOnTimeout(T value, long timeout, TimeUnit unit)
Java 9 还新增了用于创建和管理 CompletableFuture 实例的静态工具方法:
Executor delayedExecutor(long delay, TimeUnit unit, Executor executor)Executor delayedExecutor(long delay, TimeUnit unit)<U> CompletionStage<U> completedStage(U value)<U> CompletionStage<U> failedStage(Throwable ex)<U> CompletableFuture<U> failedFuture(Throwable ex)
11.1. defaultExecutor()
defaultExecutor() 返回用于未指定 Executor 的异步方法的默认执行器:
new CompletableFuture().defaultExecutor()
子类可以重写此方法,返回至少提供一个独立线程的执行器。
11.2. newIncompleteFuture()
newIncompleteFuture()(也称为“虚拟构造函数”)用于获取相同类型的新的未完成 CompletableFuture 实例:
new CompletableFuture().newIncompleteFuture()
当子类化 CompletableFuture 时,此方法特别有用,因为它在几乎所有返回新 CompletionStage 的方法内部都被使用,允许子类控制这些方法返回的具体子类型。
11.3. copy()
copy() 方法创建一个新的 CompletableFuture,其完成状态反映原始 CompletableFuture 的状态:
new CompletableFuture().copy()
该方法适用于“防御性复制”场景,可防止客户端完成该 Future,同时仍允许在其上安排依赖操作。
新创建的 CompletableFuture 在原始 Future 正常完成时也正常完成;若原始 Future 异常完成,则新 Future 也会以异常完成,且包装为 CompletionException,其 cause 为原始异常。
11.4. minimalCompletionStage()
minimalCompletionStage() 方法返回一个新的 CompletionStage,其行为与 copy() 描述的一致,但该新实例在任何尝试获取或设置解析值的操作中都会抛出 UnsupportedOperationException:
new CompletableFuture().minimalCompletionStage()
该方法适用于需要暴露 CompletableFuture 的受限视图的场景,防止客户端修改其解析值,但仍允许安排依赖操作。
11.5. completeAsync()
completeAsync() 方法用于使用 Supplier 提供的值异步完成 CompletableFuture:
CompletableFuture<T> completeAsync(Supplier<? extends T> supplier, Executor executor)
CompletableFuture<T> completeAsync(Supplier<? extends T> supplier)
两个重载方法的区别在于是否指定 Executor。若未指定,则使用默认执行器(由 defaultExecutor() 返回)。
11.6. orTimeout()
orTimeout() 方法用于在指定超时时间内未完成时,自动以 TimeoutException 异常完成 CompletableFuture:
new CompletableFuture().orTimeout(1, TimeUnit.SECONDS)
11.7. completeOnTimeout()
completeOnTimeout() 方法在指定超时前未完成时,以指定值正常完成 CompletableFuture:
new CompletableFuture().completeOnTimeout(value, 1, TimeUnit.SECONDS)
11.8. delayedExecutor()
delayedExecutor() 返回一个新的 Executor,它会在给定延迟后(若延迟非正数则无延迟)将任务提交给指定的基础执行器:
Executor delayedExecutor(long delay, TimeUnit unit, Executor executor)
Executor delayedExecutor(long delay, TimeUnit unit)
每次调用返回的执行器的 execute 方法时,延迟才开始计时。若未指定执行器,则使用默认执行器(ForkJoinPool.commonPool())。
11.9. completedStage() 与 failedStage()
completedStage() 和 failedStage() 工具方法返回已解析的 CompletionStage 实例:
<U> CompletionStage<U> completedStage(U value)
<U> CompletionStage<U> failedStage(Throwable ex)
completedStage()返回正常完成的阶段;failedStage()返回异常完成的阶段。
11.10. failedFuture()
failedFuture() 方法用于创建一个已异常完成的 CompletableFuture 实例:
<U> CompletableFuture<U> failedFuture(Throwable ex)
该方法在测试或模拟异步工作流中的失败条件时非常有用。
12. 结论
本文详细介绍了 CompletableFuture 类的方法及其典型使用场景,并讨论了 Java 9 中新增的方法增强。