CompletableFuture指南

评论 0 浏览 0 2016-08-17

1.绪论

本教程是关于CompletableFuture类的功能和使用案例的指南,该类是作为Java 8并发API的改进而引入的。

2.Java中的异步计算

异步计算是很难推理的。通常,我们想把任何计算看作是一系列的步骤,但在异步计算的情况下,以回调表示的操作往往要么分散在代码中,要么深深地嵌套在彼此之间。当我们需要处理在其中一个步骤中可能发生的错误时,情况就更糟糕了。

Future接口是在Java 5中添加的,以作为异步计算的结果,但它没有任何方法来组合这些计算或处理可能的错误。

Java 8引入了CompletableFuture类。除了Future接口,它还实现了CompletionStage接口。这个接口定义了一个异步计算步骤的契约,我们可以将其与其他步骤结合起来。

CompletableFuture同时也是一个构件和框架,它有大约50种不同的方法来组成、组合和执行异步计算步骤,并处理错误

这么大的一个API可能会让人不知所措,但这些大多属于几个明确的、不同的用例。

3.将CompletableFuture作为一个简单的Future

首先,CompletableFuture类实现了Future接口,这样我们就可以把它当作Future的实现来使用,但要有额外的完成逻辑

例如,我们可以用一个没有参数的构造函数来创建这个类的一个实例,以代表一些future的结果,把它交给消费者,并在未来的某个时间使用complete方法完成它。消费者可以使用get方法来阻塞当前线程,直到这个结果被提供。

在下面的例子中,我们有一个方法创建了一个CompletableFuture实例,然后在另一个线程中进行了一些计算,并立即返回了Future

当计算完成后,该方法通过向complete方法提供结果来完成Future的计算:

public Future<String> calculateAsync() throws InterruptedException {
    CompletableFuture<String> completableFuture = new CompletableFuture<>();

    Executors.newCachedThreadPool().submit(() -> {
        Thread.sleep(500);
        completableFuture.complete("Hello");
        return null;
    });

    return completableFuture;
}

为了分拆计算,我们使用Executor API。这种创建和完成CompletableFuture的方法可用于任何并发机制或API,包括原始线程。

请注意,calculateAsync方法返回了一个Future实例

我们简单地调用该方法,接收Future实例,并在我们准备为结果进行阻塞时调用其上的get方法。

另外,观察一下get方法抛出的一些检查过的异常,即ExecutionException(封装了计算过程中发生的异常)和InterruptedException(标志着线程在活动前或活动中被中断的异常):

Future<String> completableFuture = calculateAsync();

// ... 

String result = completableFuture.get();
assertEquals("Hello", result);

如果我们已经知道一个计算的结果,我们可以使用静态的completedFuture方法,其参数代表这个计算的结果。因此,getFuture方法将永远不会阻塞,而是立即返回这个结果:

Future<String> completableFuture = 
  CompletableFuture.completedFuture("Hello");

// ...

String result = completableFuture.get();
assertEquals("Hello", result);

作为另一种情况,我们可能想取消一个Future的执行。

4. 封装计算逻辑的CompletableFuture

上面的代码允许我们选择任何并发执行的机制,但如果我们想跳过这个模板,异步执行一些代码,该怎么办?

静态方法runAsyncsupplyAsync允许我们从RunnableSupply功能类型中相应地创建一个CompletableFuture的实例。

RunnableSupply是功能性接口,由于Java 8的新特性,它们允许以lambda表达式的形式传递它们的实例。

Runnable接口与线程中使用的接口相同,不允许返回一个值。

Supply接口是一个通用的功能接口,它有一个没有参数的单一方法,并返回一个参数化类型的值。

这使得我们可以提供一个Supplier的实例,作为一个lambda表达式来进行计算并返回结果。它就像这样简单:

CompletableFuture<String> future
  = CompletableFuture.supplyAsync(() -> "Hello");

// ...

assertEquals("Hello", future.get());

5.处理异步计算的结果

处理计算结果的最通用方法是将其送入一个函数。thenApply方法正是这样做的;它接受一个Function实例,用它来处理结果,并返回一个Future,它持有一个由函数返回的值:

CompletableFuture<String> completableFuture
  = CompletableFuture.supplyAsync(() -> "Hello");

CompletableFuture<String> future = completableFuture
  .thenApply(s -> s + " World");

assertEquals("Hello World", future.get());

如果我们不需要沿着Future链返回一个值,我们可以使用Consumer功能接口的一个实例。它的单一方法接受一个参数并返回void

CompletableFuture中,有一个方法用于这个用例。thenAccept方法接收一个Consumer并将计算结果传递给它。然后最后的future.get()调用返回一个Void类型的实例:

CompletableFuture<String> completableFuture
  = CompletableFuture.supplyAsync(() -> "Hello");

CompletableFuture<Void> future = completableFuture
  .thenAccept(s -> System.out.println("Computation returned: " + s));

future.get();

最后,如果我们既不需要计算的值,也不想在链的末端返回一些值,那么我们可以将一个Runnable lambda传递给thenRun方法。在下面的例子中,我们只是在调用future.get()后在控制台中打印一行:

CompletableFuture<String> completableFuture 
  = CompletableFuture.supplyAsync(() -> "Hello");

CompletableFuture<Void> future = completableFuture
  .thenRun(() -> System.out.println("Computation finished."));

future.get();

6.6.组合Future

API的最好的部分是CompletableFuture可以在计算步骤链中组合CompletableFuture实例

这种连锁的结果本身就是一个CompletableFuture,可以进一步连锁和组合。这种方法在函数式语言中无处不在,通常被称为单体设计模式。

在下面的例子中,我们使用thenCompose方法依次连锁两个Future的方法。

请注意,这个方法需要一个函数来返回一个CompletableFuture实例。这个函数的参数是前一个计算步骤的结果。这使得我们可以在下一个CompletableFuture‘的lambda中使用这个值:

CompletableFuture<String> completableFuture 
  = CompletableFuture.supplyAsync(() -> "Hello")
    .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));

assertEquals("Hello World", completableFuture.get());

thenCompose方法和thenApply实现了单体模式的基本构建模块。它们与mapflatMap方法密切相关,StreamOptional类也在Java 8中可用。

这两个方法都接收一个函数并将其应用于计算结果,但thenCompose (flatMap)方法接收一个返回另一个相同类型对象的函数。这种功能结构允许将这些类的实例作为构建块进行合成。

如果我们想执行两个独立的Future并对其结果进行处理,我们可以使用thenCombine方法,该方法接受一个Future和一个有两个参数的Function来处理这两个结果:

CompletableFuture<String> completableFuture 
  = CompletableFuture.supplyAsync(() -> "Hello")
    .thenCombine(CompletableFuture.supplyAsync(
      () -> " World"), (s1, s2) -> s1 + s2));

assertEquals("Hello World", completableFuture.get());

一个更简单的情况是,当我们想对两个Future结果做一些事情,但不需要将任何结果值传递到Future链。这时,thenAcceptBoth方法就可以提供帮助:

CompletableFuture future = CompletableFuture.supplyAsync(() -> "Hello")
  .thenAcceptBoth(CompletableFuture.supplyAsync(() -> " World"),
    (s1, s2) -> System.out.println(s1 + s2));

7.thenApply()thenCompose()之间的区别

在我们之前的章节中,我们已经展示了关于thenApply()thenCompose()的例子。这两个API都有助于连锁不同的CompletableFuture调用,但这两个函数的用法是不同的。

7.1. thenApply()

我们可以使用这个方法来处理前一个调用的结果。然而,需要记住的一个关键点是,返回的类型将是所有调用的组合。

因此,当我们想要转换一个CompletableFuture调用的结果时,这个方法是很有用的:

CompletableFuture<Integer> finalResult = compute().thenApply(s-> s + 1);

7.2. thenCompose()

thenCompose()thenApply()类似,都是返回一个新的CompletionStage。然而,thenCompose()使用前一个阶段作为参数。它将直接平铺并返回一个带有结果的Future,而不是像我们在thenApply()中观察到的那样返回一个嵌套的future:

CompletableFuture<Integer> computeAnother(Integer i){
    return CompletableFuture.supplyAsync(() -> 10 + i);
}
CompletableFuture<Integer> finalResult = compute().thenCompose(this::computeAnother);

因此,如果想法是连锁CompletableFuture方法,那么最好是使用thenCompose()

另外,请注意这两种方法之间的区别类似于 map()flatMap() 之间的区别

8.并行运行多个Future的程序

当我们需要并行地执行多个Future时,我们通常希望等待所有的Future执行,然后处理它们的综合结果。

CompletableFuture.allOf 静态方法允许等待所有作为var-arg提供的Future的完成:

CompletableFuture<String> future1  
  = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2  
  = CompletableFuture.supplyAsync(() -> "Beautiful");
CompletableFuture<String> future3  
  = CompletableFuture.supplyAsync(() -> "World");

CompletableFuture<Void> combinedFuture 
  = CompletableFuture.allOf(future1, future2, future3);

// ...

combinedFuture.get();

assertTrue(future1.isDone());
assertTrue(future2.isDone());
assertTrue(future3.isDone());

注意,CompletableFuture.allOf()的返回类型是一个CompletableFuture<Void>。这个方法的局限性在于,它并不返回所有Future的综合结果。相反,我们必须从Future手动获取结果。幸运的是,CompletableFuture.join()方法和Java 8 Streams API使之变得简单:

String combined = Stream.of(future1, future2, future3)
  .map(CompletableFuture::join)
  .collect(Collectors.joining(" "));

assertEquals("Hello Beautiful World", combined);

CompletableFuture.join()方法与get方法类似,但它在Future没有正常完成的情况下抛出一个未检查的异常。这使得它可以在Stream.map()方法中作为方法引用使用。

9.处理错误

对于异步计算步骤链中的错误处理,我们必须以类似的方式调整throw/catch习惯用法。

CompletableFuture类允许我们在一个特殊的handle方法中处理异常,而不是在一个句法块中捕捉异常。这个方法接收两个参数:计算的结果(如果它成功完成)和抛出的异常(如果某些计算步骤没有正常完成)。

在下面的例子中,我们使用handle方法来提供一个默认值,当一个问候语的异步计算因为没有提供名字而以错误结束时:

String name = null;

// ...

CompletableFuture<String> completableFuture  
  =  CompletableFuture.supplyAsync(() -> {
      if (name == null) {
          throw new RuntimeException("Computation error!");
      }
      return "Hello, " + name;
  }).handle((s, t) -> s != null ? s : "Hello, Stranger!");

assertEquals("Hello, Stranger!", completableFuture.get());

作为另一种情况,假设我们想用一个值来手动完成Future,就像第一个例子中那样,但也有能力用一个异常来完成它。completeExceptionally方法就是为了这个目的。下面例子中的completableFuture.get()方法抛出了一个ExecutionException,其原因是RuntimeException

CompletableFuture<String> completableFuture = new CompletableFuture<>();

// ...

completableFuture.completeExceptionally(
  new RuntimeException("Calculation failed!"));

// ...

completableFuture.get(); // ExecutionException

在上面的例子中,我们可以用handle方法异步处理这个异常,但是通过get方法,我们可以使用更典型的同步处理异常的方法。

10.异步方法

CompletableFuture类中的大多数方法都有两个带有Async后缀的附加变体。这些方法通常是为了在另一个线程中运行相应的执行步骤

没有Async后缀的方法使用一个调用线程运行下一个执行阶段。相比之下,没有Executor参数的Async方法使用fork/join池实现Executor,只要ForkJoinPool.commonPool()被访问,就运行一个步骤parallelism > 1。最后,带有Executor参数的Async方法使用传递的Executor运行一个步骤。

这里有一个修改过的例子,它用一个Function实例来处理计算的结果。唯一可见的区别是thenApplyAsync方法,但在内部,函数的应用被包裹在一个ForkJoinTask实例中(关于fork/join框架的更多信息,见文章“Guide to the Fork/Join Framework in Java”)。这使我们能够更多地并行计算,更有效地使用系统资源:

CompletableFuture<String> completableFuture  
  = CompletableFuture.supplyAsync(() -> "Hello");

CompletableFuture<String> future = completableFuture
  .thenApplyAsync(s -> s + " World");

assertEquals("Hello World", future.get());

11. JDK 9 CompletableFuture API

Java 9通过以下变化增强了CompletableFuture API的功能:

  • 增加了新的工厂方法
  • 支持延时和超时功能
  • 改进了对子类的支持。

和新的实例应用程序接口:

  • Executor defaultExecutor()
  • CompletableFuture<U> newIncompleteFuture()。
  • CompletableFuture<T>copy()
  • CompletionStage<T>minimalCompletionStage()。
  • CompletableFuture<T> completeAsync(Supplier<? extends T> supplier, Executor executor)。
  • CompletableFuture<T> completeAsync(Supplier<?extends T>supplier)
  • CompletableFuture<T> orTimeout(long timeout, TimeUnit unit)。
  • CompletableFuture<T> completeOnTimeout(T value, long timeout, TimeUnit unit)。

我们现在也有一些静态的实用方法:

  • Executor delayedExecutor(long delay, TimeUnit unit, Executor executor)
  • Executor delayedExecutor(long delay, TimeUnit unit)
  • <U> CompletionStage<U> completedStage(U value)。
  • <U> CompletionStage<U> failedStage(Throwable ex)。
  • <U> CompletableFuture<U> failedFuture(Throwable ex)。

最后,为了解决超时问题,Java 9又引入了两个新的函数:

  • orTimeout()
  • completeOnTimeout()

这里有详细的文章供进一步阅读:Java 9 CompletableFuture API Improvements.

12.结语

在这篇文章中,我们已经描述了CompletableFuture类的方法和典型的使用案例。

文章的源代码可以在GitHub上过获得。

最后更新2023-06-28
0 个评论
标签