Java开发中多线程编程司空见惯,从开始的Thread、Runnable到Future再到CompletableFuture,JDK为我们使用多线程不断扩展功能。
关于CompletableFuture的介绍、教程一搜一大堆,那为什么还要写这篇文章呢?教程倒是不少,但是复制粘贴的一大堆,要不就是API的堆叠。难以完整地看下去。
因此本文主要从自己学习的角度来谈谈CompletableFuture,从三个主线来对CompletableFuture进行展开,让你从一个清晰的层次来了解CompletableFuture有一个清晰的层次。
二、从Future机制说起
Future机制是Java 1.5中的引入的,代表着一个异步计算的结果。关于Future/Callable可以参考
juejin.cn/post/684490…
,保证你瞬间有了层次感。不至于什么都会,但是好像又挺糊涂。
Future解决了Runnable无返回值的问题,提供了2种get()来获取结果。
public interface Future<V>{
V get() throws InterruptedException,ExecutionException;
V get(long timeout,TimeUnit unit) throws InterruptedException,ExecutionException,TimeoutException;
复制代码
三、CompleteFuture诞生
堵塞获取结果的方式显然与异步编程相违背,timeout轮询的方式既不优雅也无法及时得到计算结果。很多语言提供了回调实现异步编程,如Node.js。Java的一些类库如netty、guava等也扩展了Future来改善异步编程的体验。官方显然也不甘落后,在Java 8中新增了CompleteFuture及相关API,大大扩展了Future的能力。
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
可以看到CompleteFuture实现了Future、CompletionStage2个接口。Future大家都了解,主要是用于实现一个未开始的异步事件。
3.1、新建一个CompletableFuture
runAsync无返回值
supplyAsync有返回值
3.2、CompletionStage
Stage是阶段的意思。顾名思义,CompletionStage它代表了某个同步或者异步计算的一个阶段,最终结果上的一个环节。多个CompletionStage构成了一条流水线,一个环节组装完成了可以移交给下一个环节。一个结果可能需要流转了多个CompletionStage。
如很常见的一个兑换场景,先扣积分、然后发短信、然后设置个人主页标识,这其中扣积分、发短信、设置标识分别是一个Stage。
CompletableFuture链式调用背后就是CompletionStage来提供支持,CompletionStage的thenApply().thenApply()...一环扣一环。
简单的CompletionStage,supplyAsync()异步任务执行完后使用thenApply()将结果传递给下一个stage。
四、三个角度来剖析CompletableFuture
有了CompletableFuture、CompletionStage这2个前置概念的介绍,下面我们可以正式从三个角度来认识CompletableFuture。
4.1、串行关系
thenApply
public static void main(String[] args) {
CompletableFuture<String> completableFuture = CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(3);
System.out.println("hello");
}catch (Exception e){
e.printStackTrace();
}).thenApply(s1 -> {
System.out.println(" big");
return s1 + "big";
}).thenApply(s2 -> " world");
thenRun
计算完成时候执行一个Runnable,不使用CompletableFuture计算结果,此前的CompletableFuture计算结果也会被忽略,返回CompletableFuture类型。
thenAccept
thenApply、thenAccept类似,区别在于thenAccept纯消费,无返回值,支持链式调用。
CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(5);
return "success";
} catch (Exception e) {
e.printStackTrace();
return "error";
}).thenAcceptAsync(s->{
if ("success".equals(s)){
System.out.println(s);
}else {
System.err.println(s);
thenCompose
都是接受一个Function,输入是当前CompleteableFuture计算值,返回一个新CompletableFuture。即这个新的CompleteableFuture组合原来的CompleteableFuture和函数返回的CompleteableFuture。
通常使用在第二个CompleteableFuture需要使用第一个CompleteableFuture结果作为输入情况下。
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor)
复制代码
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 1)
.thenCompose(x -> CompletableFuture.supplyAsync(() -> x+1))
复制代码
thenApply、thenCompose都接受一个Function的函数式接口,那么区别呢?
1.thenApply使用在同步mapping方法。
2.thenCompose使用在异步mapping方法。
thenApply(Function<? super T,? extends U> fn)
thenCompose(Function<? super T,? extends CompletableFuture<U>> fn)
复制代码
4.2、AND 关系
thenCombine
combine、compose依我六级的水平来看,好像都有组合、结合的意思,那么区别呢?
主要区别在于thenCombine结合的2个CompletableFuture没有依赖关系,且第二个CompletableFuture不需要等第一个CompletableFuture执行完成才开始。
thenCombine组合的多个CompletableFuture虽然是独立的,但是整个流水线是同步的。
thenAcceptBoth
runAfterBoth
2个CompletableFuture都完成之后,会执行一个Runnable。这点与thenAcceptBoth、thenCombine都类似,但是不同点在于runAfterBoth毫不关心任意一个CompletableFuture的返回值,只要CompletableFuture都执行完成它就run,同样它也没有返回值。
allOf
只是单纯等待所有的CompletableFuture执行完成,返回一个 CompletableFuture。
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
复制代码
public static void main(String[] args) throws Exception {
long start = System.currentTimeMillis()
CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1)
System.out.println("i am sleep 1")
} catch (Exception e) {
e.printStackTrace()
return "service 1"
CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2)
System.out.println("i am sleep 2")
} catch (Exception e) {
e.printStackTrace()
return "service 2"
CompletableFuture<String> completableFuture3 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(3)
System.out.println("i am sleep 3")
} catch (Exception e) {
e.printStackTrace()
return "service 3"
CompletableFuture<Void> completableFuture = CompletableFuture
.allOf(completableFuture1, completableFuture2, completableFuture3)
completableFuture.join()
System.out.println(System.currentTimeMillis() - start)
复制代码
4.3、OR关系
applyToEither
CompletableFuture最快的执行完成的结果作为下一个stage的输入结果
acceptEither
最快CompletableFuture执行完成的时候,action消费就会得到执行。
runAfterEither
任何一个CompletableFuture完成之后,就会执行下一步的Runnable。
anyOf
任何一个CompletableFuture完成,anyOf函数就会返回。
CompletableFuture大大扩展了Future能力,简化异步编程的复杂性。
本文主要从AND、OR、串行化三个角度来了解CompletableFuture之间的stage。不同的场景需要叠加不同的stage。