public class TestCompletionService {
private static final String commandstr01 = "hahah";
private static final String commandstr02 = "hahah";
public static void main(String[] args) throws InterruptedException, ExecutionException {
//1、创建一个线程池
ExecutorService executorService = Executors.newCachedThreadPool();
CompletionService<String> completionService = new ExecutorCompletionService<String>(executorService);
completionService.submit(new MyThreadt33(commandstr01));
completionService.submit(new MyThreadt44(commandstr01));
executorService.shutdown();
System.out.println(completionService.take().get());
System.out.println(completionService.take().get());
class MyThreadt33 implements Callable<String>{
private String commandstr; // 要运行的mingling
public MyThreadt33(String commandstr) {
this.commandstr = commandstr;
@Override
public String call() throws Exception {
int sum = 0;
for (int i = 0; i < 100; i++) {
Thread.sleep(200);
sum += i;
System.out.println("Mythread3: "+i);
return String.valueOf(sum+300000);
class MyThreadt44 implements Callable<String>{
private String commandstr; // 要运行的mingling
public MyThreadt44(String commandstr) {
this.commandstr = commandstr;
@Override
public String call() throws Exception {
int sum = 0;
for (int i = 0; i < 50; i++) {
Thread.sleep(200);
sum += i;
System.out.println("Mythread4: "+i);
return String.valueOf(sum+400000);
CompletionService方法可以通过completionService.take().get()方法获取最快完成的线程的返回结果(若当前没有线程执行完成则阻塞直到最快的线程执行结束),第二次调用则返回第二快完成的线程的返回结果。
3、CompletableFuture接口
所谓异步调用其实就是实现一个可无需等待被调函数的返回值而让操作继续运行的方法。简单的讲就是另启一个线程来完成调用中的部分计算,使调用继续运行或返回,而不需要等待计算结果。但调用者仍需要取线程的计算结果。
JDK1.5新增了Future接口,用于描述一个异步计算的结果。虽然 Future 以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。
JDK1.8后提出了CompletableFuture接口实现了Future和CompletionStage两个接口,CompletionStage可以看做是一个异步任务执行过程的抽象(CompletionStage代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段,一个阶段的计算执行可以是一个Function,Consumer或者Runnable。比如:
stage.thenApply(x -> square(x)).thenAccept(x -> System.out.print(x)).thenRun(() -> System.out.println()))
我们可以基于CompletableFuture创建任务和链式处理多个任务,并实现按照任务完成的先后顺序获取任务的结果。
(1)创建任务
##使用runAsync方法新建一个线程来运行Runnable对象(无返回值);
##使用supplyAysnc方法新建线程来运行Supplier<T>对象(有返回值);
##基于线程池创建
(2)任务的异步处理
不论Future.get()方法还是CompletableFuture.get()方法都是阻塞的,为了获取任务的结果同时不阻塞当前线程的执行,我们可以使用CompletionStage提供的方法结合callback来实现任务的异步处理。
##whenComplete:是执行当前任务的线程执行继续执行 whenComplete 的任务。
##whenCompleteAsync:把 whenCompleteAsync 这个任务继续提交给线程池来进行执行,也就是并行执行。
##thenApply:当一个线程依赖另一个线程时,可以使用 thenApply 方法来把这两个线程串行化
##thenAccept:thenAccept接收上一阶段的输出作为本阶段的输入,并消费处理,无返回结果。
##thenRun:不关心前一阶段的计算结果,因为它不需要输入参数,进行消费处理,无返回结果。
## thenCombine:会把两个 CompletionStage 的任务都执行完成后,把两个任务的结果一块交给 thenCombine 来处理。
## applyToEither :两个CompletionStage,谁执行返回的结果快,我就用那个CompletionStage的结果进行下一步的转化操作。
##acceptEither 方法:两个CompletionStage,谁执行返回的结果快,我就用那个CompletionStage的结果进行下一步的消耗操作
public class TestCompletableFuture {
private static final String commandstr01 = "hahah";
private static final String commandstr02 = "hahah";
private static final String commandstr03 = "hahah";
private static final String commandstr04 = "hahah";
public static void main(String[] args) throws InterruptedException, ExecutionException{
ExecutorService executorService = Executors.newCachedThreadPool();
CompletableFuture.supplyAsync(new MyThreadt444(commandstr02),executorService).whenComplete((result, e) -> {
//执行线程执行完以后的操作。
System.out.println(result + " " + e);
}).exceptionally((e) -> {
//抛出异常
System.out.println("exception " + e);
return "exception";
CompletableFuture.supplyAsync(new MyThreadt333(commandstr02),executorService).whenComplete((result, e) -> {
//执行线程执行完以后的操作。
System.out.println(result + " " + e);
}).exceptionally((e) -> {
System.out.println("exception " + e);
return "exception";
class MyThreadt333 implements Supplier<String>{
private String commandstr; // 要运行的mingling
public MyThreadt333(String commandstr) {
this.commandstr = commandstr;
@Override
public String get() {
int sum = 0;
for (int i = 0; i < 30; i++) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
sum += i;
System.out.println("Mythread333: "+i);
return String.valueOf(sum+300000);
class MyThreadt444 implements Supplier<String>{
private String commandstr; // 要运行的mingling
public MyThreadt444(String commandstr) {
this.commandstr = commandstr;
@Override
public String get() {
int sum = 0;
for (int i = 0; i < 40; i++) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
sum += i;
System.out.println("Mythread444: "+i);
return String.valueOf(sum+400000);
在CompletableFuture接口中除了使用whenComplete还可以使用handle等方法能实现按照任务完成的先后顺序获取任务的结果。
4、几种多线程并发取结果方式的总结
下图参考自:https://blog.csdn.net/u011726984/article/details/79320004
先说说Future。
他是一个interface 也可以理解为一种设计模式。
大致如下:我有一个任务,提交给了Future,Future替我完成这个任务。期间我自己可以去做任何想做的事情。一段时间之后,我就便可以从Future那儿取出结果。就是我之前所理解的异步。
Future提供了三个功能:
判断任务是否完成
isDone(): get results
not done: keep waiting?kill it?
Future里面有个方法 get 是个阻塞方法。当线程池一次性submit多个任务的时候。
为了保证系统响应迅速,需要寻找一种方法能够使调取接口能够异步执行,而Java正好提供了类似的方法,在java.util.concurrent中包含了Future相关的类,运用其中的一些类可以进行异步计算,以减少主线程的等待时间。比如启动一个main方法,main中又包含了若干个其它任务,在不使用Java future的情况下,main方法中的任务会同步阻塞执行,一个执行完成后,才能去执行另一个;如...
使用 CompletableFuture 中获取执行结果的方法时应该避免阻塞主线程,因为它们都可能会导致线程阻塞,影响程序的整体性能和响应能力。因此,在实际使用中我们应该结合 CompletableFuture 的回调函数或者连续的异步操作等方法,来实现非阻塞的异步计算和结果处理。
1、Future
Future模式是多线程设计常用的一种设计模式。Future模式可以理解成:我有一个任务,提交给了Future,Future替我完成这个任务。期间我自己可以去做任何想做的事情。一段时间之后,我就便可以从Future那儿取出结果。
Future提供了三种功能:
判断任务是否完成
能够中断任务
能够获取任务执行的结果
向线程池中提交任务的submit方法不是阻塞方法,而Future.get方法是一个阻塞方法,当submit提交多个任务时,只有所有任务都完成后,才能使用get按照任务的提交顺序
线程池 future.get() 方法对多线程并发的影响
使用ThreadPoolExecutor做多线程并发测试的时候,用for循环给线程池加任务,发现最终执行的时候所有线程都按顺序依次执行,没有并发执行
Java 5在concurrency包中引入了java.util.concurrent.Callable 接口,它和Runnable接口很相似,但它可以返回一个对象或者抛出一个异常。
Callable接口使用泛型去定义它的返回类型。Executors类提供了一些有用的方法在线程池中执行Callable内的任务。由于Callable任务是并行的,我们必须等待它返回的结果。而线程是属于异步计算模型,所以不可能直接从别的线程中得到函数返回值。
java.util.concurrent.Future对象
在Java中,如果需要设定代码执行的最长时间,即超时,可以用Java线程池ExecutorService类配合Future接口来实现。 Future接口是Java标准API的一部分,在java.util.concurrent包中。Future接口是Java线程Future模式的实现,可以来进行异步计算。Future模式可以这样来描述:我有一个任务,提交给了Future,Future替我完成这个任务...
//先新建一个 ExecutorService
ExecutorService exec = Executors.newSingleThreadExecutor();
//新建future,callable,call //返回值为String,任何类型都可以
Future...
future机制是:在通过线程去执行某个任务的时候,如果比较耗时,我们可以通过futureTask机制,异步返回,继续去执行其他的任务,在需要获取执行结果的时候,通过future.get()方法去获取,如果任务没有执行完毕,会通过lockSupport.park()方法去阻塞主线程,直到run()方法执行完毕之后,会通过lockSupport.unpark()方法去唤醒线程
public class FutureTest {
public static void main(String[] a
submit需要接受一个Callable参数,Callable需要实现一个call方法,并返回结果。如果在等待时间结束的时候,Future还有返回,则会抛出一个TimeoutException。一般来说,当我们执行一个长时间运行的任务时,使用Future就可以让我们暂时去处理其他的任务,等长任务执行完毕再返回其结果。Future代表的是异步执行的结果,意思是当异步执行结束之后,返回的结果将会保存在Future中。这里futureOne.get()是一个阻塞操作,会一直等待异步执行完毕才返回结果。
callable多线程future.get()方法能获取到当前线程的执行结果,但是会阻塞当前线程,即当前线程执行结束获取到结果后才会继续执行下一个线程,解决方法:
创建一个List数组存储funture,在所有线程执行以后遍历list获取结果。
int count = 0;
List<Future<Integer>> res = new ArrayList<>();
ExecutorService executorService = Executors.newCached
最近在看 effective java 里面有一个关于死锁的例子其中有一处代码是调用 ExecutorService.submit() 方法返回后的 Future 对象的 get 方法现在问题是如果不调用 get 方法就不会有死锁问题。刚开始还理解为并不是死锁,是 get 导致线程挂起,但是把 get 方法去掉就又不会出现程序挂起这个现象。想请教下这个 get 方法是什么机制?为什么调用后才会出现...
很多 Java 工程师在准备面试时,会刷很多八股文,线程和线程池这一块通常会准备线程的状态、线程的创建方式,Executors 里面的一些工厂方法和为什么不推荐使用这些工厂方法, 构造方法的一些参数和执行过程等。工作中,很多人会使用线程池的 方法 获取 Future 类型的返回值,然后使用 实现“最多等多久”的效果。但很多人对此的理解只停留在表面上,稍微问深一点点可能就懵逼了。比如, 超时之后,当前线程会怎样?线程池里执行对应任务的线程会有怎样的表现?如果你对这个问题没有很大的把握,说明你掌握的还不