CompletableFuture 使用指南

更新于 2025-12-29

baeldung 2016-08-16

1. 引言

本教程是对 Java 8 并发 API 中引入的 CompletableFuture 类的功能和使用场景的全面介绍。

2. Java 中的异步计算

异步计算难以推理。通常,我们希望将任何计算视为一系列步骤,但在异步计算中,以回调形式表示的操作往往分散在代码各处,或彼此深度嵌套。当我们需要处理某个步骤中可能发生的错误时,情况会变得更糟。

Java 5 引入了 Future 接口,用于表示异步计算的结果,但它没有任何方法来组合这些计算或处理可能出现的错误。

Java 8 引入了 CompletableFuture 类。它不仅实现了 Future 接口,还实现了 CompletionStage 接口。该接口定义了可与其他步骤组合的异步计算步骤的契约。

CompletableFuture 既是一个构建块,也是一个框架,提供了大约 50 种不同的方法,用于组合、合并和执行异步计算步骤,并处理错误。

如此庞大的 API 可能令人望而生畏,但它们大多可以归入几个清晰明确的使用场景。

3. 将 CompletableFuture 用作简单的 Future

首先,CompletableFuture 类实现了 Future 接口,因此我们可以将其作为 Future 的实现使用,但具备额外的完成逻辑。

例如,我们可以使用无参构造函数创建一个此类的实例,以表示某个未来的计算结果,将其交给消费者,并在未来某个时间通过 complete 方法完成它。消费者可以调用 get 方法阻塞当前线程,直到获得该结果。

下面的示例中,我们有一个方法创建了一个 CompletableFuture 实例,然后在另一个线程中启动某些计算并立即返回该 Future

public Future<String> calculateAsync() throws InterruptedException {
    CompletableFuture<String> completableFuture = new CompletableFuture<>();

    Executors.newCachedThreadPool().submit(() -> {
        Thread.sleep(500);
        completableFuture.complete("Hello");
        return null;
    });

    return completableFuture;
}

为了启动计算,我们使用了 Executor API。这种创建和完成 CompletableFuture 的方式可以与任何并发机制或 API 一起使用,包括原始线程。

注意:calculateAsync 方法返回的是一个 Future 实例。

我们只需调用该方法,获取 Future 实例,并在准备好阻塞等待结果时调用其 get 方法。

另外请注意,get 方法会抛出一些受检异常,即 ExecutionException(封装了计算过程中发生的异常)和 InterruptedException(表示线程在活动之前或期间被中断):

Future<String> completableFuture = calculateAsync();

// ...

String result = completableFuture.get();
assertEquals("Hello", result);

如果我们已经知道计算结果,可以使用静态方法 completedFuture,传入代表该结果的参数。这样,Futureget 方法将永远不会阻塞,而是立即返回该结果:

Future<String> completableFuture = 
  CompletableFuture.completedFuture("Hello");

// ...

String result = completableFuture.get();
assertEquals("Hello", result);

在另一种场景中,我们可能希望取消 Future 的执行。

4. 封装计算逻辑的 CompletableFuture

上面的代码允许我们选择任意并发执行机制,但如果想跳过这些样板代码并直接异步执行某些代码呢?

静态方法 runAsyncsupplyAsync 允许我们分别从 RunnableSupplier 函数式类型创建 CompletableFuture 实例。

RunnableSupplier 是函数式接口,得益于 Java 8 的新特性,我们可以将其实例作为 Lambda 表达式传递。

  • Runnable 接口是在线程中使用的老接口,不能返回值。
  • Supplier 是一个泛型函数式接口,其唯一方法不接受参数,但返回一个参数化类型的值。

这使我们可以提供一个 Supplier 实例作为 Lambda 表达式来执行计算并返回结果。用法非常简单:

CompletableFuture<String> future
  = CompletableFuture.supplyAsync(() -> "Hello");

// ...

assertEquals("Hello", future.get());

5. 处理异步计算的结果

处理计算结果最通用的方式是将其传递给一个函数。thenApply 方法正是为此设计:它接收一个 Function 实例,使用它处理结果,并返回一个持有函数返回值的新 Future

CompletableFuture<String> completableFuture
  = CompletableFuture.supplyAsync(() -> "Hello");

CompletableFuture<String> future = completableFuture
  .thenApply(s -> s + " World");

assertEquals("Hello World", future.get());

如果我们不需要沿 Future 链向下返回值,可以使用 Consumer 函数式接口。它的唯一方法接收一个参数,不返回值。

CompletableFuture 提供了对应的方法:thenAccept 接收一个 Consumer,并将计算结果传递给它。最终的 future.get() 调用将返回 Void 类型的实例:

CompletableFuture<String> completableFuture
  = CompletableFuture.supplyAsync(() -> "Hello");

CompletableFuture<Void> future = completableFuture
  .thenAccept(s -> System.out.println("Computation returned: " + s));

future.get();

最后,如果我们既不需要计算值,也不想在链的末尾返回任何值,可以将 Runnable Lambda 传递给 thenRun 方法。在以下示例中,我们在调用 future.get() 后仅在控制台打印一行:

CompletableFuture<String> completableFuture 
  = CompletableFuture.supplyAsync(() -> "Hello");

CompletableFuture<Void> future = completableFuture
  .thenRun(() -> System.out.println("Computation finished."));

future.get();

6. 组合多个 Future

CompletableFuture API 最强大的部分在于能够将多个 CompletableFuture 实例组合成一系列计算步骤。

这种链式组合的结果本身就是一个 CompletableFuture,允许进一步链式操作和组合。这种模式在函数式语言中非常普遍,通常被称为 monadic(单子)设计模式

在以下示例中,我们使用 thenCompose 方法顺序地链接两个 Future

注意:此方法接收一个返回 CompletableFuture 实例的函数。该函数的参数是上一步计算的结果。这使我们可以在下一个 CompletableFuture 的 Lambda 中使用该值:

CompletableFuture<String> completableFuture 
  = CompletableFuture.supplyAsync(() -> "Hello")
    .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));

assertEquals("Hello World", completableFuture.get());

thenCompose 方法与 thenApply 一起构成了 monadic 模式的基石。它们与 Java 8 中 StreamOptional 类的 mapflatMap 方法密切相关。

这两个方法都接收一个函数并将其应用于计算结果,但 thenCompose(相当于 flatMap)接收的函数返回的是相同类型的另一个对象。这种函数结构允许我们将这些类的实例作为构建块进行组合。

如果我们希望执行两个独立的 Future 并对其结果进行某种操作,可以使用 thenCombine 方法。它接收一个 Future 和一个接受两个参数的 Function 来处理两个结果:

CompletableFuture<String> completableFuture 
  = CompletableFuture.supplyAsync(() -> "Hello")
    .thenCombine(CompletableFuture.supplyAsync(
      () -> " World"), (s1, s2) -> s1 + s2);

assertEquals("Hello World", completableFuture.get());

更简单的情况是:我们想对两个 Future 的结果做些事情,但不需要将结果继续传递下去。这时可以使用 thenAcceptBoth 方法:

CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> "Hello")
  .thenAcceptBoth(CompletableFuture.supplyAsync(() -> " World"),
    (s1, s2) -> System.out.println(s1 + s2));

7. thenApply() 与 thenCompose() 的区别

在前面的章节中,我们展示了 thenApply()thenCompose() 的示例。这两个 API 都有助于链接不同的 CompletableFuture 调用,但它们的用途不同。

7.1. thenApply()

我们可以使用此方法处理前一个调用的结果。但关键点在于:返回类型将是所有调用的组合结果。

因此,当我们想要转换 CompletableFuture 调用的结果时,该方法非常有用:

CompletableFuture<Integer> finalResult = compute().thenApply(s -> s + 1);

7.2. thenCompose()

thenCompose()thenApply() 类似,两者都返回一个新的 CompletionStage。但 thenCompose() 将前一个阶段作为参数传入,并会“扁平化”结果,直接返回包含最终结果的 Future,而不是像 thenApply() 那样返回嵌套的 Future

CompletableFuture<Integer> computeAnother(Integer i){
    return CompletableFuture.supplyAsync(() -> 10 + i);
}
CompletableFuture<Integer> finalResult = compute().thenCompose(this::computeAnother);

因此,如果目标是链接 CompletableFuture 方法,最好使用 thenCompose()

此外,请注意:这两个方法的区别类似于 map()flatMap() 的区别。

8. 并行运行多个 Future

当我们需要并行执行多个 Future 时,通常希望等待它们全部完成,然后处理其组合结果。

CompletableFuture.allOf 静态方法允许我们等待作为变长参数提供的所有 Future 完成:

CompletableFuture<String> future1  
  = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2  
  = CompletableFuture.supplyAsync(() -> "Beautiful");
CompletableFuture<String> future3  
  = CompletableFuture.supplyAsync(() -> "World");

CompletableFuture<Void> combinedFuture 
  = CompletableFuture.allOf(future1, future2, future3);

// ...

combinedFuture.get();

assertTrue(future1.isDone());
assertTrue(future2.isDone());
assertTrue(future3.isDone());

注意:CompletableFuture.allOf() 的返回类型是 CompletableFuture<Void>。该方法的局限性在于它不会返回所有 Future 的组合结果。我们需要手动从各个 Future 中获取结果。幸运的是,CompletableFuture.join() 方法结合 Java 8 的 Stream API 使得这一操作变得简单:

String combined = Stream.of(future1, future2, future3)
  .map(CompletableFuture::join)
  .collect(Collectors.joining(" "));

assertEquals("Hello Beautiful World", combined);

CompletableFuture.join() 方法类似于 get 方法,但如果 Future 未正常完成,它会抛出非受检异常。这使得它可以在 Stream.map() 方法中作为方法引用使用。

9. 错误处理

在异步计算步骤链中进行错误处理时,我们需要以类似 throw/catch 的方式适配异常处理逻辑。

CompletableFuture 类没有使用语法块捕获异常,而是提供了特殊的 handle 方法。该方法接收两个参数:计算结果(如果成功完成)和抛出的异常(如果某计算步骤未正常完成)。

在以下示例中,我们使用 handle 方法在异步计算问候语因未提供名称而出错时提供默认值:

String name = null;

// ...

CompletableFuture<String> completableFuture  
  = CompletableFuture.supplyAsync(() -> {
      if (name == null) {
          throw new RuntimeException("Computation error!");
      }
      return "Hello, " + name;
  }).handle((s, t) -> s != null ? s : "Hello, Stranger!");

assertEquals("Hello, Stranger!", completableFuture.get());

另一种场景:假设我们希望像第一个示例那样手动用值完成 Future,但也希望有能力用异常完成它。completeExceptionally 方法正是为此设计。以下示例中的 completableFuture.get() 方法将抛出一个 ExecutionException,其原因为 RuntimeException

CompletableFuture<String> completableFuture = new CompletableFuture<>();

// ...

completableFuture.completeExceptionally(
  new RuntimeException("Calculation failed!"));

// ...

completableFuture.get(); // ExecutionException

在上面的例子中,我们也可以使用 handle 方法异步处理异常,但使用 get 方法则可以采用更典型的同步异常处理方式。

10. 异步方法(Async Methods)

CompletableFuture 类的流式 API 中,大多数方法都有两个带 Async 后缀的额外变体。这些方法通常用于在另一个线程中运行对应的执行步骤。

  • 不带 Async 后缀的方法使用调用线程执行下一个阶段。
  • Async 后缀但不带 Executor 参数的方法使用公共的 Fork/Join 池(即 ForkJoinPool.commonPool())执行步骤(前提是并行度 > 1)。
  • Async 后缀且带 Executor 参数的方法使用传入的 Executor 执行步骤。

以下是一个修改后的示例,它使用 Function 实例处理计算结果。唯一的可见区别是使用了 thenApplyAsync 方法,但在底层,函数的应用被包装为 ForkJoinTask 实例(有关 Fork/Join 框架的更多信息,请参阅《Java Fork/Join 框架指南》)。这使我们能够进一步并行化计算,更高效地利用系统资源:

CompletableFuture<String> completableFuture  
  = CompletableFuture.supplyAsync(() -> "Hello");

CompletableFuture<String> future = completableFuture
  .thenApplyAsync(s -> s + " World");

assertEquals("Hello World", future.get());

11. JDK 9 中的 CompletableFuture API 增强

Java 9 引入了新的实例方法,提高了异步计算的灵活性和易用性:

  • Executor defaultExecutor()
  • CompletableFuture<U> newIncompleteFuture()
  • CompletableFuture<T> copy()
  • CompletionStage<T> minimalCompletionStage()
  • CompletableFuture<T> completeAsync(Supplier<? extends T> supplier, Executor executor)
  • CompletableFuture<T> completeAsync(Supplier<? extends T> supplier)
  • CompletableFuture<T> orTimeout(long timeout, TimeUnit unit)
  • CompletableFuture<T> completeOnTimeout(T value, long timeout, TimeUnit unit)

Java 9 还新增了用于创建和管理 CompletableFuture 实例的静态工具方法:

  • Executor delayedExecutor(long delay, TimeUnit unit, Executor executor)
  • Executor delayedExecutor(long delay, TimeUnit unit)
  • <U> CompletionStage<U> completedStage(U value)
  • <U> CompletionStage<U> failedStage(Throwable ex)
  • <U> CompletableFuture<U> failedFuture(Throwable ex)

11.1. defaultExecutor()

defaultExecutor() 返回用于未指定 Executor 的异步方法的默认执行器:

new CompletableFuture().defaultExecutor()

子类可以重写此方法,返回至少提供一个独立线程的执行器。

11.2. newIncompleteFuture()

newIncompleteFuture()(也称为“虚拟构造函数”)用于获取相同类型的新的未完成 CompletableFuture 实例:

new CompletableFuture().newIncompleteFuture()

当子类化 CompletableFuture 时,此方法特别有用,因为它在几乎所有返回新 CompletionStage 的方法内部都被使用,允许子类控制这些方法返回的具体子类型。

11.3. copy()

copy() 方法创建一个新的 CompletableFuture,其完成状态反映原始 CompletableFuture 的状态:

new CompletableFuture().copy()

该方法适用于“防御性复制”场景,可防止客户端完成该 Future,同时仍允许在其上安排依赖操作。

新创建的 CompletableFuture 在原始 Future 正常完成时也正常完成;若原始 Future 异常完成,则新 Future 也会以异常完成,且包装为 CompletionException,其 cause 为原始异常。

11.4. minimalCompletionStage()

minimalCompletionStage() 方法返回一个新的 CompletionStage,其行为与 copy() 描述的一致,但该新实例在任何尝试获取或设置解析值的操作中都会抛出 UnsupportedOperationException

new CompletableFuture().minimalCompletionStage()

该方法适用于需要暴露 CompletableFuture 的受限视图的场景,防止客户端修改其解析值,但仍允许安排依赖操作。

11.5. completeAsync()

completeAsync() 方法用于使用 Supplier 提供的值异步完成 CompletableFuture

CompletableFuture<T> completeAsync(Supplier<? extends T> supplier, Executor executor)
CompletableFuture<T> completeAsync(Supplier<? extends T> supplier)

两个重载方法的区别在于是否指定 Executor。若未指定,则使用默认执行器(由 defaultExecutor() 返回)。

11.6. orTimeout()

orTimeout() 方法用于在指定超时时间内未完成时,自动以 TimeoutException 异常完成 CompletableFuture

new CompletableFuture().orTimeout(1, TimeUnit.SECONDS)

11.7. completeOnTimeout()

completeOnTimeout() 方法在指定超时前未完成时,以指定值正常完成 CompletableFuture

new CompletableFuture().completeOnTimeout(value, 1, TimeUnit.SECONDS)

11.8. delayedExecutor()

delayedExecutor() 返回一个新的 Executor,它会在给定延迟后(若延迟非正数则无延迟)将任务提交给指定的基础执行器:

Executor delayedExecutor(long delay, TimeUnit unit, Executor executor)
Executor delayedExecutor(long delay, TimeUnit unit)

每次调用返回的执行器的 execute 方法时,延迟才开始计时。若未指定执行器,则使用默认执行器(ForkJoinPool.commonPool())。

11.9. completedStage() 与 failedStage()

completedStage()failedStage() 工具方法返回已解析的 CompletionStage 实例:

<U> CompletionStage<U> completedStage(U value)
<U> CompletionStage<U> failedStage(Throwable ex)
  • completedStage() 返回正常完成的阶段;
  • failedStage() 返回异常完成的阶段。

11.10. failedFuture()

failedFuture() 方法用于创建一个已异常完成的 CompletableFuture 实例:

<U> CompletableFuture<U> failedFuture(Throwable ex)

该方法在测试或模拟异步工作流中的失败条件时非常有用。

12. 结论

本文详细介绍了 CompletableFuture 类的方法及其典型使用场景,并讨论了 Java 9 中新增的方法增强。