添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
详细讲解Java线程池(二)

详细讲解Java线程池(二)

作者: 佳瑞Jarrett

著作权归作者所有。商业转载请联系作者进行授权,非商业转载请注明出处。


八. 线程的异常捕获

1 . 常见范式

1)如果我们使用 execute()提交任务,我们一般要在 Runable 任务的代码加上try-catch 进行异常处理。

2)如果我们使用 submit()提交任务,我们一般要在主线程中,对Future.get()进行 try-catch 进行异常处理。

submit()底层实现依赖 execute(),两者应该统一呀,为什么有差异呢? 下面再扒一扒 submit()的源码。

ThreadPoolExecutor 中没有submit的代码,而是在父类 AbstractExecutorService 中,有三个submit的重载方法,关键就两行newTaskFor()。

 /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

2.newTaskFor()方法

正是因为这三个重载方法,都调用了 execute,所以 submit 底层依赖execute。 通过查看这里 execute 的实现,不难发现,它就是 ThreadPoolExecutor中的实现。所以,造成 submit 和 execute 的差异化的代码,不在这里。

那么造成差异的一定在 newTaskFor 方法中。 这个方法也就 new 了一个 FutureTask 而已, FutureTask实现 RunnableFuture 接口, RunnableFuture 接口继承 Runnable 接口和 Future 接口。而 Callable 只是FutureTask 的一个成员变量。

3.get()方法

另一个 Java 基础知识点: Call able 和 Future 的关系。 我们一般用 Callable 编 写任务代码, Future 是异步返回对象,通过它的 get 方法,阻塞式地获取结果。 FutureTask 的核心代码就是实现了 Future 接口,也就是 get 方法的实现:

/**
     * @throws CancellationException {@inheritDoc}
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            // 核心代码
            s = awaitDone(false, 0L);
        return report(s);
    }

其中awaitDone()方法如下:

 /**
     * Awaits completion or aborts on interrupt or timeout.
     * @param timed true if use timed waits
     * @param nanos time to wait, if timed
     * @return state upon completion
    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        // 死循环
        for (;;) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            int s = state;
            // 只有任务状态是已完成,才会跳出死循环
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                LockSupport.parkNanos(this, nanos);
                LockSupport.park(this);
    }

get 的核心实现是有个 awaitDone 方法,这是一个死循环,只有任务的状态是“已完成”,才会跳出死循环;否则会依赖 UNSAFE 包下的 LockSupport.park 原语进行阻塞,等待LockSupport.unpark 信号量。 而这个信号量只有当运行结束获得结果、或者出现异常的情况下,才会发出来。 分别对应方法 set 和setException。

为什么 submit 之后,通过get 方法可以获取到异常?原因是FutureTask 有一个 Object 类型的 outcome 成员 变量,用来记录执行结果。 这个结果可以是传入的泛型,也可以是 Throwable 异常:

FutureTask中的run()方法:

 public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                if (ran)
                    set(result);
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
    }

get()方法依赖的report()方法

 /**
     * Returns result or throws exception for completed task.
     * @param s completed state value
    @SuppressWarnings("unchecked")
    private V report(int s) throws ExecutionException {
        // outcome中记录了执行结果
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

FutureTask 的另一个巧妙的地方就是借用 RunnableAdapter 内部类,将submit 的Runnable 封装成 Callable。 所以就算你 submit 的是 Runnable,一样可以用 get 获取到异常。

4.异常处理

ass="nolink">1)不论是用 execute 还是 submit,都可以自己在业务代码上加 try-catch 进行异常处理。 我一般喜欢使用这种方式,因为我喜欢对不同业务场景的异常进行差异化处理,至少打不一样的日志吧。

lass="nolink">2)如果是 execute,还可以自定义线程池,继承ThreadPoolExecutor 并复写其afterExecute(Runnable r, Throwable t)方法。或者实现 Thread.UncaughtExceptionHandler 接口,实现void u ncaughtExcept ion(Thread t, Throwable e);方法,并将该 handler 传递给线程池的 ThreadFactory。

3)但是注意, afterExecute 和 UncaughtExceptionHandler 都不适用 submit 。 因为通过上面的 FutureTask.run()不难发现,它自己对 Throwable 进行了 try-catch,封装到了 outcome 属性,所以底层方法 execute 的 Worker 是拿不到异常信息的。

九. corePoolSize = 0

1)线程池提交任务后,首先判断当前池中线程数是否小于 corePoolSize。

2)如果小于则尝试创建新的线程执行该任务;否则尝试添加到等待队列。

3)如果添加队列成功,判断当前池内线程数是否为0,如果是则创建一个 firstTask 为null 的 worker,这个worker 会从等待队列中获取任务并执行。

4)如果添加到等待队列失败,一般是队列已满,才会再尝试创建新的线程。

5)但在创建之前需要与 maximumPoolSize 比较,如果小于则创建成功。

6)否则执行拒绝策略。

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
         * Proceed in 3 steps:
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                // 添加到等待队列成功后, 判断当前池内线程数是否为 0, 如果
                // 是则创建一个 firstTask 为 null 的 worker, 这个 worker 会从等待队列中获取任务并执行。
                addWorker(null, false);
        else if (!addWorker(command, false))
            reject(command);
    }

十. 自行实现一个线程池代码

参考代码: jianshu.com/p/9b7dfb407

1.线程池的变量

 /**存放线程的集合*/
private ArrayList<MyThead> threads;
/**任务队列*/
private ArrayBlockingQueue<Runnable> taskQueue;
/**线程池初始限定大小*/
private int threadNum;
/**已经工作的线程数目*/
private int workThreadNum;

2.线程池的核心方法

public void execute(Runnable runnable) {
        try {
            mainLock.lock();
            //线程池未满,每加入一个任务则开启一个线程
            if(workThreadNum < threadNum) {
                MyThead myThead = new MyThead(runnable);
                myThead.start();
                threads.add(myThead);
                workThreadNum++;
            //线程池已满,放入任务队列,等待有空闲线程时执行
            else {
                //队列已满,无法添加时,拒绝任务
                if(!taskQueue.offer(runnable)) {
                    rejectTask();
        } finally {
            mainLock.unlock();
    }

到这里,一个线程池已经实现的差不多了,我们还有最后一个难点要解决:从任务队列中取出任务,分配给线程池中 “空闲”的线程完成。

分配任务给线程的第一种思路

很容易想到一种解决思路:额外开启一个线程,时刻监控线程池的线程空余情况,一旦有线程空余,则马上从任务队列取出任务,交付给空余线程完成。

这种思路理解起来很容易,但仔细思考,实现起来很麻烦(1. 如何检测到线程池中的空闲线程 2. 如何将任务交付给一个.start()运行状态中的空闲线程)。而且使线程池的架构变的更复杂和不优雅。

分配任务给线程的第二种思路

现在我们来讲第二种解决思路:线程池中的所有线程一直都是运行状态的,线程的空闲只是代表此刻它没有在执行任务而已;我们可以让运行中的线程,一旦没有执行任务时,就自己从队列中取任务来执行。

为了达到这种效果,我们要重写run方法,所以要写一个自定义Thread类,然后让线程池都放这个自定义线程类

 class MyThead extends Thread{
        private Runnable task;
        public MyThead(Runnable runnable) {
            this.task = runnable;
        @Override
        public void run() {
            //该线程一直启动着,不断从任务队列取出任务执行
            while (true) {
                //如果初始化任务不为空,则执行初始化任务
                if(task != null) {
                    task.run();
                    task = null;
                //否则去任务队列取任务并执行
                else {
                    Runnable queueTask = taskQueue.poll();
                    if(queueTask != null)
                        queueTask.run();    
    }

3.简单线程池

/**
 * 自定义简单线程池
public class MyThreadPool{
    /**存放线程的集合*/
    private ArrayList<MyThead> threads;
    /**任务队列*/
    private ArrayBlockingQueue<Runnable> taskQueue;
    /**线程池初始限定大小*/
    private int threadNum;
    /**已经工作的线程数目*/
    private int workThreadNum;
    private final ReentrantLock mainLock = new ReentrantLock();
    public MyThreadPool(int initPoolNum) {
        threadNum = initPoolNum;
        threads = new ArrayList<>(initPoolNum);
        //任务队列初始化为线程池线程数的四倍
        taskQueue = new ArrayBlockingQueue<>(initPoolNum*4);
        threadNum = initPoolNum;
        workThreadNum = 0;
    public void execute(Runnable runnable) {
        try {
            mainLock.lock();
            //线程池未满,每加入一个任务则开启一个线程
            if(workThreadNum < threadNum) {
                MyThead myThead = new MyThead(runnable);
                myThead.start();
                threads.add(myThead);
                workThreadNum++;
            //线程池已满,放入任务队列,等待有空闲线程时执行
            else {
                //队列已满,无法添加时,拒绝任务
                if(!taskQueue.offer(runnable)) {
                    rejectTask();
        } finally {
            mainLock.unlock();
    private void rejectTask() {
        System.out.println("任务队列已满,无法继续添加,请扩大您的初始化线程池!");
    public static void main(String[] args) {
        MyThreadPool myThreadPool = new MyThreadPool(5);
        Runnable task = new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+"执行中");
        for (int i = 0; i < 20; i++) {
            myThreadPool.execute(task);
    class MyThead extends Thread{
        private Runnable task;
        public MyThead(Runnable runnable) {
            this.task = runnable;
        @Override
        public void run() {
            //该线程一直启动着,不断从任务队列取出任务执行
            while (true) {
                //如果初始化任务不为空,则执行初始化任务
                if(task != null) {
                    task.run();
                    task = null;
                //否则去任务队列取任务并执行
                else {
                    Runnable queueTask = taskQueue.poll();
                    if(queueTask != null)
                        queueTask.run();    
}

4. 总结

自定义线程池的整个工作过程:

1.初始化线程池,指定线程池的大小。

2.向线程池中放入任务执行。

3.如果线程池中创建的线程数目未到指定大小,则创建我们自定义的线程类放入线程池集合,并执行任务。执行完了后该线程会一直监听队列

4.如果线程池中创建的线程数目已满,则将任务放入缓冲任务队列

5.线程池中所有创建的线程,都会一直从缓存任务队列中取任务,取到任务马上执行

十一. 线程池的监控与评估

对于资源紧张的应用,如果担心线程池资源使用不当,可以利用 ThreadPoolExecutor 的 API 实现简单的监控,然后进行分析和优化。

ThreadPoolExecutor类的getActiveCount()方法:该方法可以获取线程池中当前正在执行任务的线程数量。

ThreadPoolExecutor类的getPoolSize()方法:该方法可以获取线程池中当前的线程数量。

ThreadPoolExecutor类的getQueue()方法:该方法可以获取线程池中的任务队列。

ThreadPoolExecutor类的getCompletedTaskCount()方法:该方法可以获取线程池中已经完成的任务数量。

多线程的情况下数据如何线程间同步? 使用了hashMap会报错吗?

  • JConsole 工具:JConsole是Java自带的监控工具,可以用于监控Java应用程序的运行情况,包括线程池的状态和行为。
  • VisualVM 工具:VisualVM是一个免费的Java性能分析工具,可以用于监控Java应用程序的运行情况,包括线程池的状态和行为。

十二. 使用线程池一些建议

【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险

说明:Executors 返回的线程池对象的弊端如下:

1) Excutors.fixedThreadPool(fixedPoolSiz e)和 Si ngleThreadPool:允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。

2 ) Excutors.cachedThreadPool()允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而 导致 OOM

【建议】创建线程池的时候应该尽量给线程给个具体的业务名字前缀,方便定位问题

// 1. 使用guava的ThreadFactoryBuilder
ThreadFactory threadFactory = new ThreadFactoryBuilder()
                        .setNameFormat(threadNamePrefix + "-%d")
                        .setDaemon(true).build();
// 2. 自己实现 ThreadFactory接口
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
 * 线程工厂,它设置线程名称,有利于我们定位问题。
public final class NamingThreadFactory implements ThreadFactory {
    private final AtomicInteger threadNum = new AtomicInteger();
    private final ThreadFactory delegate;
    private final String name;
     * 创建一个带名字的线程池生产工厂
    public NamingThreadFactory(ThreadFactory delegate, String name) {
        this.delegate = delegate;
        this.name = name;
    @Override