Java CompletableFuture 快速入门指南
1. 为什么需要 CompletableFuture?
在 CompletableFuture 出现之前,Java 5 引入了 Future 接口,用于表示一个异步计算的结果。但 Future 的能力非常有限:
- 无法主动完成:你无法手动将一个
Future标记为已完成,并设置其结果。它只能被动地等待执行它的线程完成任务。 - 阻塞式获取结果:
future.get()方法是阻塞的,调用时会暂停当前线程,直到异步任务完成。这违背了异步编程的初衷。 - 没有回调机制:你无法在
Future完成时自动触发某个动作(回调函数),只能通过循环调用isDone()来检查,或者直接阻塞在get()上。 - 无法组合:你很难将多个
Future串联起来,例如当一个Future完成后,用其结果去执行另一个异步任务。
CompletableFuture 扩展了 Future 接口,并实现了 CompletionStage 接口,彻底解决了以上痛点。它为异步编程提供了一种功能强大、非阻塞、可组合的范式,是现代 Java 异步编程的基石。
核心思想:CompletableFuture 像一个承诺(Promise),它承诺在未来的某个时刻会有一个结果。你可以基于这个承诺(无论它成功还是失败)来编排一系列后续操作,而无需阻塞等待。
2. CompletableFuture 的创建
创建 CompletableFuture 通常有两种方式:
runAsync(Runnable runnable)/runAsync(Runnable runnable, Executor executor)- 作用:执行一个没有返回值的异步任务。
- 返回值:
CompletableFuture<Void>。 - 使用场景:当你只想异步执行一个操作,不关心其返回值时。例如,异步记录日志、发送通知邮件等。
supplyAsync(Supplier<U> supplier)/supplyAsync(Supplier<U> supplier, Executor executor)- 作用:执行一个有返回值的异步任务。
Supplier<U>是一个函数式接口,代表一个提供类型为U的结果的函数。 - 返回值:
CompletableFuture<U>,其中U是supplier提供的结果类型。 - 使用场景:这是最常用的创建方式。当你需要异步获取数据(如查询数据库、调用远程API)时使用。
- 作用:执行一个有返回值的异步任务。
关于Executor参数:
- 不带
Executor的版本:默认使用ForkJoinPool.commonPool()作为其线程池。这个线程池是全局共享的,适用于CPU密集型任务。如果用于IO密集型任务(如网络请求、文件读写),可能会因为线程阻塞而耗尽commonPool的线程,影响整个应用的性能。 - 带
Executor的版本:允许你指定自定义的线程池。强烈推荐为IO密集型任务创建专用的线程池,以避免上述问题。
3. 核心API详解:回调与链式处理
这是 CompletableFuture 最强大的部分,用于处理异步任务的结果。
3.1 结果处理(消费或转换)
当一个 CompletableFuture 完成后,你可以使用以下方法来处理其结果。这些方法都有一个同步版本(如 thenApply)和一个异步版本(如 thenApplyAsync)。
- 同步版本 (e.g.,
thenApply):后续任务可能由完成上一个任务的线程执行,也可能由调用thenApply的主线程执行。它不会切换线程池。 - 异步版本 (e.g.,
thenApplyAsync):后续任务会被提交到线程池(默认是ForkJoinPool.commonPool(),或指定的Executor)中执行,确保了任务的异步性。
| 方法 | 作用 | 入参 | 返回值 | 使用场景 |
|---|---|---|---|---|
thenApply(Function<? super T,? extends U> fn) |
转换:接收上一步的结果,并返回一个新的结果。同步执行。 | Function (T -> U) |
CompletableFuture<U> |
将一个异步结果(如用户ID)转换为另一种形式(如用户信息对象)。这是最常用的链式操作。 |
thenAccept(Consumer<? super T> action) |
消费:接收上一步的结果,但没有返回值。同步执行。 | Consumer (T -> Void) |
CompletableFuture<Void> |
对异步结果进行最终处理,如打印到控制台、存入数据库,并且不需要返回任何东西。 |
thenRun(Runnable action) |
运行:不关心上一步的结果,只要上一步完成了,就执行一个 Runnable。同步执行。 |
Runnable (Void -> Void) |
CompletableFuture<Void> |
在某个异步操作(无论结果是什么)完成后,触发一个没有入参和返回值的动作。 |
示例理解:
假设有一个 CompletableFuture<Long> userIdFuture。
userIdFuture.thenApply(id -> "用户" + id)-> 返回CompletableFuture<String>userIdFuture.thenAccept(id -> System.out.println("成功获取用户ID:" + id))-> 返回CompletableFuture<Void>userIdFuture.thenRun(() -> System.out.println("获取ID的任务已完成"))-> 返回CompletableFuture<Void>
3.2 任务编排(串行化)
当一个异步任务的执行依赖于另一个异步任务的结果时,你需要 thenCompose。
thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)- 作用:这是处理依赖异步任务的核心。它接收上一步的结果,然后返回另一个
CompletionStage(通常是CompletableFuture)。最终的结果由这个新的CompletableFuture决定。 - 与
thenApply的关键区别:thenApply的函数体返回一个普通值 (U),thenApply自身会把这个值包装成CompletableFuture<U>。thenCompose的函数体直接返回一个新的CompletableFuture<U>。它会将两个CompletableFuture“铺平”(flat),避免出现CompletableFuture<CompletableFuture<User>>这样的嵌套结构。
- 使用场景:当你有多个需要依次执行的异步步骤时。例如:
- 异步获取订单ID。
- 使用订单ID,异步获取订单详情。
- 使用订单详情,异步获取用户信息。
1
2
3
4// 伪代码演示
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "订单ID123") // 步骤1
.thenCompose(orderId -> supplyAsync(() -> "订单详情 for " + orderId)) // 步骤2
.thenCompose(orderDetails -> supplyAsync(() -> "用户信息 for " + orderDetails)); // 步骤3- 作用:这是处理依赖异步任务的核心。它接收上一步的结果,然后返回另一个
4. 组合多个CompletableFuture
当需要并行执行多个独立的异步任务,并在它们都完成后做一些事情时,使用组合API。
4.1 AND 关系(等待两个都完成)
thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)- 作用:将两个独立的
CompletableFuture结果合并。当两个任务都成功完成后,将它们的结果作为BiFunction的参数,并返回一个新的结果。 - 使用场景:需要并行获取两个不相关的数据,然后将它们组合起来。例如,异步获取用户信息和用户权限,然后合并成一个用户视图对象。
- 作用:将两个独立的
thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)- 作用:与
thenCombine类似,但用于消费结果,不返回值。
- 作用:与
runAfterBoth(CompletionStage<?> other, Runnable action)- 作用:当两个任务都完成后,执行一个
Runnable,不关心它们的结果。
- 作用:当两个任务都完成后,执行一个
4.2 OR 关系(等待任意一个完成)
applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn)- 作用:两个
CompletableFuture,任意一个先完成,就将其结果作为Function的参数,并返回一个新的结果。另一个未完成的任务将被忽略。 - 使用场景:从多个数据源(如主/备数据库,不同地区的缓存)获取相同的数据,哪个快就用哪个。
- 作用:两个
acceptEither(...)和runAfterEither(...):与applyToEither类似,分别用于消费和运行。
4.3 等待所有/任意任务(静态方法)
allOf(CompletableFuture<?>... cfs)- 作用:等待所有给定的
CompletableFuture都完成。 - 返回值:
CompletableFuture<Void>。注意:这个allOf本身不提供所有结果的聚合。它只是一个完成信号。 - 如何获取所有结果? 你需要先调用
allOf(...).join()(或.get()),然后再遍历原始的CompletableFuture列表,分别调用.join()来获取它们各自的结果。因为此时可以确定它们都已完成,所以.join()不会阻塞。 - 使用场景:需要并行执行大量独立的异步任务,并等待它们全部结束后再进行下一步。例如,批量发送邮件。
- 作用:等待所有给定的
anyOf(CompletableFuture<?>... cfs)- 作用:等待任意一个给定的
CompletableFuture完成。 - 返回值:
CompletableFuture<Object>。返回的是第一个完成的任务的结果。 - 使用场景:与
applyToEither类似,但可以处理两个以上的任务。例如,向多个服务发起同一个请求,谁先响应就用谁的数据。
- 作用:等待任意一个给定的
5. 异常处理
异步代码的异常处理至关重要。CompletableFuture 提供了一套优雅的机制。
exceptionally(Function<Throwable, ? extends T> fn)- 作用:类似于
try-catch中的catch块。如果异步链中任何一步出现异常,exceptionally会捕获它,并提供一个备用/默认值作为后续链的结果。 - 返回值:
CompletableFuture<T>。它可以让一个失败的流程“恢复正常”。
- 作用:类似于
whenComplete(BiConsumer<? super T, ? super Throwable> action)- 作用:类似于
try-catch-finally中的finally块(但更像一个观察者)。无论成功还是失败,它都会被执行。BiConsumer接收两个参数:result和exception,其中一个必然为null。 - 关键:
whenComplete不能改变计算的结果。它只是一个“窥视”的机会,通常用于记录日志、资源清理等副作用操作。如果上游是成功的,它下游仍然是成功的;如果上游是失败的,它下游仍然是失败的。
- 作用:类似于
handle(BiFunction<? super T, Throwable, ? extends U> fn)- 作用:
whenComplete和exceptionally的结合体。它既能处理正常结果,也能处理异常,并且可以改变最终的结果。 BiFunction接收result和exception两个参数。你可以根据这两个参数返回一个全新的结果。- 使用场景:需要根据成功或失败返回不同类型或状态的结果时,
handle是最灵活的选择。
- 作用:
选择建议:
- 如果想在出错时提供一个默认值,让流程继续下去 ->
exceptionally。 - 如果只想记录日志或做清理,不影响结果 ->
whenComplete。 - 如果需要根据成功或失败,对结果进行复杂的转换和处理 ->
handle。
6. 手动完成 Future
complete(T value):如果 Future 尚未完成,则将其结果设置为value。如果已完成,则调用无效。completeExceptionally(Throwable ex):如果 Future 尚未完成,则使其以异常ex结束。
使用场景:将一个非异步的、基于回调的API(例如某些老旧的库)适配到 CompletableFuture 模型中。你可以先创建一个 CompletableFuture,在回调的成功方法中调用 complete(),在失败方法中调用 completeExceptionally()。
7. 代码实践
1 | import java.util.concurrent.CompletableFuture; |
- 保存代码: 将以上代码保存为 CompletableFutureQuickStartGuide.java 文件。
- 编译运行: 使用您的IDE(如IntelliJ IDEA, Eclipse)或命令行 javac 和 java 命令来编译和运行这个类。
- 观察控制台输出:
- 注意线程名称: [业务线程-xx] 和 [main] 的交替出现,直观地展示了任务是在哪个线程上执行的,帮您理解异步和并行的概念。
- 注意执行顺序: 主线程的日志总是会先打印出来,表明任务提交后主线程没有被阻塞,可以继续执行。
- 注意结果: 通过 .join() 获取并打印最终结果,验证整个异步流程的正确性。
- 逐个场景学习:
- 从 main 方法开始,它依次调用了各个场景。
- 您可以注释掉 main 方法中的部分调用,只关注某一个特定场景,仔细阅读该场景的代码和注释。
- 尝试修改 mockRpcCall 的耗时,观察对最终结果的影响,加深理解。
- 在 scene6 和 scene8 中,由于异常是随机发生的,可以多次运行程序,观察在正常和异常情况下的不同输出。
总结与最佳实践
- 为IO密集型任务提供专用线程池:不要阻塞
ForkJoinPool.commonPool(),这是最重要的实践之一。 - 理解
thenApplyvsthenCompose:thenApply用于同步转换值,thenCompose用于链接依赖的异步任务,这是区分中级和初级使用者的关键。 - 明智地选择同步与异步回调:如果回调中的任务非常快(如简单的内存操作),使用同步版本(
thenApply)可以减少线程切换开销。如果任务耗时或可能阻塞,请务必使用异步版本(thenApplyAsync)。 - 优先使用
handle进行全面的结果处理:它比exceptionally和whenComplete更通用和强大。 - 使用
join()代替get():在主线程等待最终结果时,join()抛出的是非受检异常(CompletionException),代码更简洁。get()抛出的是受检异常(InterruptedException,ExecutionException),需要显式try-catch。
掌握了以上概念和API,您就已经具备了在实际项目中熟练运用 CompletableFuture 的能力。祝您学习愉快!


