CompletableFuture 详解

JAVA8之前的Future

public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        Future<String> future = executorService.submit(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            return "callable finished";
        //do something else
        Thread.sleep(2000);
        String callableResult = future.get();
        System.out.println(callableResult);

CompletableFuture的优势

  • 提供了异步程序执行的另一种方式:回调,不需要像future.get()通过阻塞线程来获取异步结果或者通过isDone来检测异步线程是否完成来执行后续程序。
  • 能够管理多个异步流程,并根据需要选择已经结束的异步流程返回结果。
  • 构建CompletableFuture

    * Creates a new incomplete CompletableFuture. public CompletableFuture() {

    纯翻译:构建一个不完整的CompletableFuture,为什么说不完整呢,请往下看

    public static class test{
            public static String getTestResult()
                int i = 10/0;
                return "test";
        public static void main(String[] args) {
            CompletableFuture<String> completableFuture  = new CompletableFuture();
            new Thread(()->{
                try {
                    completableFuture.complete(test.getTestResult());
                } catch (Exception e) {
                    System.out.println("get exception in side");
                    completableFuture.completeExceptionally(e);
            }).start();
            try {
                String result = completableFuture.get();
                System.out.println(result);
            } catch (Exception e) {
                System.out.println("get exception out side");
                e.printStackTrace();
    

    一般需要complete() 设置异步线程可能返回的值,以及completeExceptionally() 向上抛出异常

    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
                                                           Executor executor)
    public static CompletableFuture<Void> runAsync(Runnable runnable)
    public static CompletableFuture<Void> runAsync(Runnable runnable,
                                                       Executor executor)
    

    从入参可以看到,CompletableFuture 允许我们自定义执行器,在实际项目中我们可以选择合适的线程池来提高异步程序的效率。

        CompletableFuture completableFuture = CompletableFuture.supplyAsync(() ->
                        try {Thread.sleep(1);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        return "test";
    

    CompletableFuture 流

    java8 流式处理在CompletableFuture 也得到了完美的体现,流处理包含的中间操作,终端操作分别对应CompletableFuture 中以thenAccept开头返回CompletableFuture <Void>(也就是回调)的实例方法。中间操作对应thenApply,thenCompose等等返回非CompletableFuture <Void>的实例方法。

     CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(1);
                    System.out.println(Thread.currentThread().getName());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                return "test ";
            }).thenApply(u -> {
                System.out.println(Thread.currentThread().getName());
                return u + "in thenApply first";
                    .thenCompose(u -> CompletableFuture.supplyAsync(() -> {
                                System.out.println(Thread.currentThread().getName());
                                return u + "in thenCompose second";
                    ).thenAccept(u -> {
                System.out.println(Thread.currentThread().getName());
                System.out.println(u + "in thenAccept last");
    
    ForkJoinPool.commonPool-worker-1
    ForkJoinPool.commonPool-worker-1
    test in thenApply firstin thenCompose secondin thenAccept last
    

    可以看到默认的异步线程池都是ForkJoinPool.commonPool,同步操作都在main线程中处理。
    多说一句thenApply 和thenCompose的区别,thenCompose在调用外部接口返回CompletableFuture<>类型时更方便。

    多个CompletableFuture任务的管理

    现实应用中可能同时存在多个异步任务,有时候我们需要他们一起完成才能进行下面的操作,有时候我们又只需要在存在一个结果的情况下就返回。

       private static  final Random random = new Random();
        public static  String randomDelay()
            int delay = 500 + random.nextInt(2000);
            try {
                System.out.println(String.format("%s sleep in %d",Thread.currentThread().getName(),delay));
                Thread.sleep(delay);
            } catch (InterruptedException e) {
                e.printStackTrace();
            System.out.println(String.format("%s sleep in %s",Thread.currentThread().getName(),"end"));
            return Thread.currentThread().getName()+" return";
        public static void main(String[] args) {
             CompletableFuture [] futures = {CompletableFuture.supplyAsync(()->randomDelay()),
                     CompletableFuture.supplyAsync(()->randomDelay()),CompletableFuture.supplyAsync(()->randomDelay())};
            CompletableFuture.allOf(futures).join();
            System.out.println("all timeout process end");
    ForkJoinPool.commonPool-worker-2 sleep in 1957
    ForkJoinPool.commonPool-worker-3 sleep in 2097
    ForkJoinPool.commonPool-worker-1 sleep in 2422
    ForkJoinPool.commonPool-worker-2 sleep in end
    ForkJoinPool.commonPool-worker-3 sleep in end
    ForkJoinPool.commonPool-worker-1 sleep in end
    all timeout process end
    

    上段代码展示了 CompletableFuture.allOf 的用法,可以看到所有的线程结束后打印了"all timeout process end",注意 allOf 接受的是数组类对象。如果把allOf改为 anyOf

    CompletableFuture [] futures = {CompletableFuture.supplyAsync(()->randomDelay()),
                     CompletableFuture.supplyAsync(()->randomDelay()),CompletableFuture.supplyAsync(()->randomDelay())};
            System.out.println(CompletableFuture.anyOf(futures).get());
            System.out.println("all timeout process end");
    
    ForkJoinPool.commonPool-worker-2 sleep in 529
    ForkJoinPool.commonPool-worker-3 sleep in 759
    ForkJoinPool.commonPool-worker-1 sleep in 1750
    ForkJoinPool.commonPool-worker-2 sleep in end
    ForkJoinPool.commonPool-worker-2 return
    all timeout process end
    

    可以看到只有一个线程结束时结果已经返回,另外CompletableFuture还提供了专为两个任务处理的方法
    acceptEither

     CompletableFuture<String> completableFuture  = CompletableFuture.supplyAsync(()->randomDelay());
            completableFuture.acceptEither(completableFuture.supplyAsync(()->randomDelay()),u-> System.out.println(u)).join();
    
    ForkJoinPool.commonPool-worker-2 sleep in 935
    ForkJoinPool.commonPool-worker-1 sleep in 2422
    ForkJoinPool.commonPool-worker-2 sleep in end
    ForkJoinPool.commonPool-worker-2 return
    

    CompletableFuture 异常处理

    除了在get的时候通过 try catch 处理异常,CompletableFuture 提供了更优雅的方式 exceptionally()和 handle()。handle处理方法类似,都是把异常对象转为我们所需要的其他类型对象,然后处理。

      public static String getTestResult()
            int i = 10/0;
            return "test";
        public static void main(String[] args) {
            CompletableFuture<String> completableFuture  = new CompletableFuture();
            new Thread(()->{
                try {
                    completableFuture.complete(getTestResult());
                } catch (Exception e) {
                    System.out.println("get exception in side");
                    completableFuture.completeExceptionally(e);
            }).start();
            completableFuture.exceptionally(e->"we hava a exception"+e.getMessage())
                    .thenAccept(u-> System.out.println(u));