从此
文章
📄文章 #️⃣专题 🌐上网 📺 🛒 📱

Java CompletableFuture 概念和用法

🕗2024-05-10

简介

CompletableFuture结合了Future的优点,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。

CompletableFuture被设计在Java中进行异步编程。异步编程意味着在主线程之外创建一个独立的线程,与主线程分隔开,并在上面运行一个非阻塞的任务,然后通知主线程进展,成功或者失败。

CompletableFuture是由Java8引入的,在Java8之前我们一般通过Future实现异步。
Future用于表示异步计算的结果,只能通过阻塞或者轮询的方式获取结果,而且不支持设置回调方法,Java8之前若要设置回调一般会使用guava的ListenableFuture。 CompletableFuture对Future进行了扩展,可以通过设置回调的方式处理计算结果,同时也支持组合操作,支持进一步的编排,同时一定程度解决了回调地狱的问题。

核心概念

CompletableFuture 是一个非常强大的并发工具类,它实现了 Future 和 CompletionStage 接口,用于表示某个异步计算的结果,与传统的 Future 不同,CompletableFuture 提供了函数式编程的方法,可以更容易地组织异步代码,处理回调和组合多个异步操作。

假设,有一个电商网站,用户浏览产品详情页时,需要展示产品的基本信息、价格、库存、用户评价等多个方面的数据,这些数据可能来自不同的数据源或服务,比如:

  1. 产品基本信息可能来自一个主数据库。
  2. 价格库存 可能需要实时从另一个库存服务获取。
  3. 用户评价可能存储在另一个专门用于用户反馈的系统中。

为了提升用户体验,希望这些数据的获取能够并行进行,而不是一个接一个地串行获取,这就是 CompletableFuture 的经典场景。

CompletableFuture 类在主要用来解决异步编程和并发执行的问题,在传统的同步编程模型中,代码的执行通常是阻塞的,即一行代码执行完成后,下一行代码才能开始执行,这种模型在处理耗时操作时,如 I/O 操作、数据库访问或网络请求,会导致线程长时间闲置,等待操作完成,从而降低系统的吞吐量和响应能力。

因此,CompletableFuture 类提供了一种非阻塞的、基于回调的编程方式,可以在等待某个长时间运行的任务完成时,同时执行其他任务,这样,就可以更充分地利用系统资源,提高程序的并发性和响应速度。

使用CompletableFuture通常用于解决以下类似场景的问题:

  1. 发起异步请求:当用户请求一个产品详情页时,后端服务可以同时发起对三个数据源的异步请求,这可以通过创建三个 CompletableFuture 实例来实现,每个实例负责一个数据源的请求。
  2. 处理异步结果:一旦这些异步请求发出,它们就可以独立地执行,主线程可以继续处理其他任务,当某个 CompletableFuture 完成时,它会包含一个结果(或者是执行过程中的异常)。
  3. 组合异步结果:使用 CompletableFuture 的组合方法(如 thenCombinethenAcceptBoth 或 allOf),可以等待所有异步操作完成,并将它们的结果组合在一起,比如,可以等待产品基本信息、价格和库存以及用户评价都返回后,再将这些数据整合到一个响应对象中,返回给前端。
  4. 异常处理:如果在获取某个数据源时发生异常,CompletableFuture 允许以异步的方式处理这些异常,比如通过 exceptionally 方法提供一个默认的备选结果或执行一些清理操作。
  5. 最终响应:一旦所有数据源的数据都成功获取并组合在一起,或者某个数据源发生异常并得到了妥善处理,服务就可以将最终的产品详情页响应发送给前端用户。

使用CompletableFuture 可以高效的并发数据获取,提升系统的响应速度和整体性能。

核心API

CompletableFuture 列用于表示某个异步计算的结果,它提供了函数式编程的方法来处理异步计算,允许以非阻塞的方式编写并发代码,并且可以链接多个异步操作,以下是一些常用方法的含义:

1、静态工厂方法

  • CompletableFuture.supplyAsync(Supplier<? extends U> supplier): 异步执行给定的 Supplier,并返回一个表示结果的新 CompletableFuture
  • CompletableFuture.supplyAsync(Supplier<? extends U> supplier, Executor executor): 使用指定的执行器异步执行给定的 Supplier
  • CompletableFuture.runAsync(Runnable runnable): 异步执行给定的 Runnable,并返回一个表示其完成的新 CompletableFuture
  • CompletableFuture.runAsync(Runnable runnable, Executor executor): 使用指定的执行器异步执行给定的 Runnable

2、完成时的处理

  • thenApply(Function<? super T,? extends U> fn): 当此 CompletableFuture 完成时,对其结果应用给定的函数。
  • thenAccept(Consumer<? super T> action): 当此 CompletableFuture 完成时,执行给定的操作。
  • thenRun(Runnable action): 当此 CompletableFuture 完成时,执行给定的无参数操作。

3、异常处理

  • exceptionally(Function<Throwable,? extends T> fn): 当此 CompletableFuture 异常完成时,对其异常应用给定的函数。

4、组合多个 CompletableFuture

  • thenCombine(CompletableFuture<? extends U> other, BiFunction<? super T,? super U,? extends V> fn): 当此 CompletableFuture 和另一个都完成时,使用给定的函数组合它们的结果。
  • thenAcceptBoth(CompletableFuture<? extends U> other, BiConsumer<? super T,? super U> action): 当此 CompletableFuture 和另一个都完成时,对它们的结果执行给定的操作。
  • runAfterBoth(CompletableFuture<?> other, Runnable action): 当此 CompletableFuture 和另一个都完成时,执行给定的操作。
  • applyToEither(CompletableFuture<? extends T> other, Function<? super T, U> fn): 当此 CompletableFuture 或另一个完成时(哪个先完成),对其结果应用给定的函数。
  • acceptEither(CompletableFuture<? extends T> other, Consumer<? super T> action): 当此 CompletableFuture 或另一个完成时(哪个先完成),对其结果执行给定的操作。
  • runAfterEither(CompletableFuture<?> other, Runnable action): 当此 CompletableFuture 或另一个完成时(哪个先完成),执行给定的操作。

5、等待和获取结果

  • get(): 等待计算完成,然后获取其结果。
  • get(long timeout, TimeUnit unit): 等待计算在给定的时间内完成,并获取其结果。
  • join(): 类似于 get(),但是会在计算未完成时抛出未检查的异常。
  • complete(T value): 如果尚未完成,则设置此 CompletableFuture 的结果。
  • completeExceptionally(Throwable ex): 如果尚未完成,则使此 CompletableFuture 异常完成。

6、取消

  • cancel(boolean mayInterruptIfRunning): 尝试取消此 CompletableFuture
  • isCancelled(): 如果此 CompletableFuture 被取消,则返回 true

7、查询

  • isDone(): 如果此 CompletableFuture 完成(无论是正常完成还是异常完成),则返回 true

封装计算逻辑的CompletableFuture

上面的代码允许我们选择任何并发执行的机制,但是如果我们想跳过这个样板文件,简单地异步执行一些代码呢?

静态方法runAsync和supplyAsync允许我们相应地使用Runnable和Supplier函数类型创建一个可完成的未来实例。

Runnable和Supplier都是函数接口,由于新的java8特性,它们允许将实例作为lambda表达式传递。

Runnable接口与线程中使用的旧接口相同,不允许返回值。

Supplier接口是一个通用函数接口,它有一个方法,该方法没有参数,并且返回一个参数化类型的值。

这允许我们提供一个供应商实例作为lambda表达式来执行计算并返回结果。简单到:

CompletableFuture<String> future
  = CompletableFuture.supplyAsync(() -> "Hello");

// ...

assertEquals("Hello", future.get());

异步计算的处理结果

处理计算结果的最通用的方法是将其提供给函数。thenApply方法正是这样做的;它接受一个函数实例,用它来处理结果,并返回一个包含函数返回值的Future:

CompletableFuture<String> completableFuture
  = CompletableFuture.supplyAsync(() -> "Hello");

CompletableFuture<String> future = completableFuture
  .thenApply(s -> s + " World");

assertEquals("Hello World", future.get());

如果我们不需要在Future中返回值,我们可以使用Consumer函数接口的实例。它的单个方法接受一个参数并返回void。

在可完成的将来,有一种方法可以解决这个用例。thenAccept方法接收使用者并将计算结果传递给它。最后一个future.get()调用返回Void类型的实例:

CompletableFuture<String> completableFuture
  = CompletableFuture.supplyAsync(() -> "Hello");

CompletableFuture<Void> future = completableFuture
  .thenAccept(s -> System.out.println("Computation returned: " + s));

future.get();

最后,如果我们既不需要计算的值,也不想返回值,那么我们可以将一个可运行的lambda传递给thenRun方法。在下面的示例中,我们只需在调用future.get()后在控制台中打印一行:

CompletableFuture<String> completableFuture 
  = CompletableFuture.supplyAsync(() -> "Hello");

CompletableFuture<Void> future = completableFuture
  .thenRun(() -> System.out.println("Computation finished."));

future.get();

组合CompletableFuture

CompletableFuture API最好的部分是能够在一系列计算步骤中组合CompletableFuture实例。

这种链接的结果本身就是一个完整的Future,允许进一步的链接和组合。这种方法在函数语言中普遍存在,通常被称为享元模式。

在下面的示例中,我们使用thenCompose方法按顺序链接两个Future。

请注意,此方法接受一个返回CompletableFuture实例的函数。此函数的参数是上一计算步骤的结果。这允许我们在下一个CompletableFuture的lambda中使用此值:

CompletableFuture<String> completableFuture 
  = CompletableFuture.supplyAsync(() -> "Hello")
    .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));

assertEquals("Hello World", completableFuture.get());

thenCompose方法与thenApply一起实现了享元模式的基本构建块。它们与流的map和flatMap方法以及java8中的可选类密切相关。

两个方法都接收一个函数并将其应用于计算结果,但是thencomose(flatMap)方法接收一个返回另一个相同类型对象的函数。这种功能结构允许将这些类的实例组合为构建块。

如果我们想执行两个独立的未来,并对它们的结果进行处理,我们可以使用thenCombine方法,该方法接受一个未来和一个具有两个参数的函数来处理这两个结果:

CompletableFuture<String> completableFuture 
  = CompletableFuture.supplyAsync(() -> "Hello")
    .thenCombine(CompletableFuture.supplyAsync(
      () -> " World"), (s1, s2) -> s1 + s2));

assertEquals("Hello World", completableFuture.get());

一个简单的例子是,当我们想处理两个CompletableFuture的结果时,但不需要将任何结果值传递给CompletableFuture的链。thenAcceptBoth方法可以帮助:

CompletableFuture future = CompletableFuture.supplyAsync(() -> "Hello")
  .thenAcceptBoth(CompletableFuture.supplyAsync(() -> " World"),
    (s1, s2) -> System.out.println(s1 + s2));

thenApply()和thenCompose()方法之间的区别

在前面的部分中,我们展示了有关thenApply()和thenCompose()的示例。两个api都有助于链接不同的CompletableFuture调用,但这两个函数的用法不同。

thenApply()

我们可以使用此方法处理上一次调用的结果。但是,需要记住的一点是,返回类型将由所有调用组合而成。

因此,当我们要转换CompletableFuture调用的结果时,此方法非常有用:

CompletableFuture<Integer> finalResult = compute().thenApply(s-> s + 1);

thenCompose()

thenCompose()方法与thenApply()类似,因为两者都返回一个新的完成阶段。但是,thencose()使用前一阶段作为参数。它将展平并直接返回一个带有结果的CompletableFuture,而不是我们在thenApply()中观察到的嵌套CompletableFuture:

CompletableFuture<Integer> computeAnother(Integer i){
    return CompletableFuture.supplyAsync(() -> 10 + i);
}
CompletableFuture<Integer> finalResult = compute().thenCompose(this::computeAnother);

因此,如果要链接可完成的CompletableFuture方法,那么最好使用thenCompose()。

另外,请注意,这两个方法之间的差异类似于map()和flatMap()之间的差异。

并行运行多个CompletableFuture

当我们需要并行执行多个期货时,我们通常希望等待所有Supplier执行,然后处理它们的组合结果。

CompletableFuture.allOf静态方法允许等待的所有Supplier的完成:

CompletableFuture<String> future1  
  = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2  
  = CompletableFuture.supplyAsync(() -> "Beautiful");
CompletableFuture<String> future3  
  = CompletableFuture.supplyAsync(() -> "World");

CompletableFuture<Void> combinedFuture 
  = CompletableFuture.allOf(future1, future2, future3);

// ...

combinedFuture.get();

assertTrue(future1.isDone());
assertTrue(future2.isDone());
assertTrue(future3.isDone());

注意CompletableFuture.allOf()的返回类型是CompletableFuture。这种方法的局限性在于它不能返回所有Supplier的组合结果。相反,我们必须从未来手动获取结果。幸运的是,CompletableFuture.join()方法和Java 8 Streams API使它变得简单:

String combined = Stream.of(future1, future2, future3)
  .map(CompletableFuture::join)
  .collect(Collectors.joining(" "));

assertEquals("Hello Beautiful World", combined);

join()方法类似于get方法,但是如果Future不能正常完成,它会抛出一个未检查的异常。这样就可以将其用作Stream.map()方法中的方法引用。

具体使用简单demo案例

/**
 * 获取统计指标信息
 * @return  统计指标值
 */
@GetMapping("/statistics")
public ResultVO statistics(){
    ResultVO resultVO = new ResultVO();
    try {
        // 获取企业所需要统计的数量
        CompletableFuture<Integer> unitNum = CompletableFuture.supplyAsync(() ->  {
            log.info("执行[获取企业所需要统计的数量]任务:"+"线程id:"+Thread.currentThread().getId()+"线程名称:"+
        Thread.currentThread().getName());
            return enterpriseService.list().size();
        });
        // 获取供需所需要统计的数量
        CompletableFuture<Integer> supplyDemandNum = CompletableFuture.supplyAsync(() -> {
            log.info("执行[获取供需所需要统计的数量]任务:"+"线程id:"+Thread.currentThread().getId()+"线程名称:"+
            Thread.currentThread().getName());
            return supplyDemandService.list().size();
        });
        // 等待所有异步任务执行完成
        CompletableFuture.allOf(unitNum,supplyDemandNum).join();

        resultVO.setUnitNum(unitNum.get());
        resultVO.setSupplyDemandNum(supplyDemandNum.get());
    } catch (Exception e) {
        log.error("任务执行异常!");
        throw new RuntimeException("获取统计指标异常!");
    }
    return resultVO;
}
 

 返回结果:

{
"unitNum": 109,
"supplyDemandNum": 2
}