为什么需要线程池
当我们需要异步处理任务时,最常用也是最简单的方式就是新开一个线程去做。而线程的创建和销毁是需要消耗 CPU 资源的,当异步任务越来越多时,如果一味的新开线程去处理,那么我们可能会无法控制 CPU 的资源。所以首先我们需要控制线程的开启数量。
类比数据库连接池,想一想之前使用 JDBC 访问数据库的时候是不是要先建立连接,也就是创建一个 Connection 对象。在 web 项目中,通常会有很多请求访问数据库,在使用框架 Spring+Mybatis 时,如果当前上下文没有事务的话那么每一个数据库操作方法都需要创建一个 Connection,这样频繁的创建 Connection 对象无疑是资源的浪费,也会拉低接口吞吐量。所以我们需要一个池子来维护数据库连接,也就是数据库连接池。同样我们也需要线程池。
CompletableFuture介绍
JDK1.8 中提供的 CompletableFuture 提供了异步函数式编程。可以帮助我们简化异步编程的复杂性,通过回调的方式处理计算结果,并且提供了转换和组合的方法。
CompletableFuture是JDK1.8版本新引入的类,使用completionStage接口去支持完成时触发的函数和操作。
一个completetableFuture就代表了一个任务。他能用Future的方法。还能做一些之前说的executorService配合futures做不了的。
之前future需要等待isDone为true才能知道任务跑完了。或者就是用get方法调用的时候会出现阻塞。而使用completableFuture的使用就可以用then,when等等操作来防止以上的阻塞和轮询isDone的现象出现。
CompletableFuture 的使用
(1)创建 CompletableFuture 对象
提供了四个静态方法来创建:
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
-
supply开头:这种方法,可以返回异步线程执行之后的结果
-
run开头:这种不会返回结果,就只是执行线程任务
默认使用
ForkJoinPool.commonPool()
,
commonPool
是一个会被很多任务共享的线程池,比如同一
JVM
上的所有
CompletableFuture
、并行
Stream
都将共享
commonPool
,
commonPool
设计时的目标场景是运行非阻塞的
CPU
密集型任务,为最大利用
CPU
,其线程数默认为
CPU数量-1。
(2)阻塞获取
以下四个方法用于获取结果:
public T get()
public T get(long timeout, TimeUnit unit)
public T getNow(T valueIfAbsent)
public T join()
-
getNow() 代表计算完,如果返回结果或抛出异常就正常get,否则就返回给定的 valueIfAbsent 值
-
join() 返回计算的结果或者抛出一个 unchecked 异常(CompletionException)
-
get() 方法调用的时候会出现阻塞
(3)计算完成时处理
这四个方法是计算阶段结束的时候触发
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
常用方法
1、thenCompose
thenCompose方法会在某个任务执行完成后,将该任务的执行结果,作为方法入参,去执行指定的方法。该方法会返回一个新的CompletableFuture实例
-
如果该CompletableFuture实例的result不为null,则返回一个基于该result新的CompletableFuture实例;
-
如果该CompletableFuture实例为null,然后就执行这个新任务。
public static void main(String[] args) {
SmallTool.printTimeAndThread("小白进入餐厅");
SmallTool.printTimeAndThread("小白点了 番茄炒蛋 + 一碗米饭");
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
SmallTool.printTimeAndThread("厨师炒菜");
SmallTool.sleepMillis(200);
return "番茄炒蛋";
}).thenCompose(dish -> CompletableFuture.supplyAsync(() -> {
SmallTool.printTimeAndThread("服务员打饭");
SmallTool.sleepMillis(100);
return dish + "米饭";
}));
SmallTool.printTimeAndThread("小白在打王者");
SmallTool.printTimeAndThread(String.format("%s ,小白开吃 ", cf1.join()));
}
结果输出:
2、thenCombine
会将两个任务的执行结果作为方法入参,传递到指定方法中,且有返回值。
public static void main(String[] args) {
SmallTool.printTimeAndThread("小白进入餐厅");
SmallTool.printTimeAndThread("小白点了 番茄炒蛋 + 一碗米饭");
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
SmallTool.printTimeAndThread("厨师炒菜");
SmallTool.sleepMillis(200);
return "番茄炒蛋";
}).thenCombine(CompletableFuture.supplyAsync(() -> {
SmallTool.printTimeAndThread("服务器蒸饭");
SmallTool.sleepMillis(300);
return "米饭";
}), (dish, rice) -> {
SmallTool.printTimeAndThread("服务器打饭");
SmallTool.sleepMillis(100);
return String.format("%s + %s 好了", dish, rice);
});
SmallTool.printTimeAndThread("小白在打王者");
SmallTool.printTimeAndThread(String.format("%s ,小白开吃", cf1.join()));
}
结果输出:
参考文档:
1、CompletableFuture 使用及应用场景_再见丶孙悟空的博客
2、JDK1.8新特性CompletableFuture总结_finalheart的博客
学习视频推荐:
2、B站 CompletableFuture学习 :https://www.bilibili.com/video/BV1ui4y1T7xf/?spm_id_from=pageDriver&vd_source=7c9c5325e34e359ba5671d7a733fcfd3