添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

最近在学习Java线程池,在阅读ThreadPoolExecutor的源码过程中,一直有个疑问,即线程池等待队列中的任务是如何被调起执行的呢?

接下来从 ThreadPoolExecutor execute() 方法开始一步步分析其原理。

execute() 方法

// 存放线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static int workerCountOf(int c) {
    return c & CAPACITY;
private final BlockingQueue<Runnable> workQueue;
public void execute(Runnable command) {
    // 如果任务为null,则抛出异常。
    if (command == null)
        throw new NullPointerException();
    // ctl 中保存的线程池当前的一些状态信息
    int c = ctl.get();
    //  下面会涉及到 3 步 操作
    // 1.首先判断当前线程池中执行的任务数量是否小于 corePoolSize
    // 如果小于的话,通过addWorker(command, true)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    // 2.如果当前执行的任务数量大于等于 corePoolSize 的时候就会走到这里
    // 通过 isRunning 方法判断线程池状态,线程池处于 RUNNING 状态才会被并且队列可以加入任务,该任务才会被加入进去
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 再次获取线程池状态,如果线程池状态不是 RUNNING 状态就需要从任务队列中移除任务,并尝试判断线程是否全部执行完毕。同时执行拒绝策略。
        if (!isRunning(recheck) && remove(command))
            reject(command);
            // 如果当前线程池为空就新创建一个线程并执行。
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    //3. 通过addWorker(command, false)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
    //如果addWorker(command, false)执行失败,则通过reject()执行相应的拒绝策略的内容。
    else if (!addWorker(command, false))
        reject(command);

上述流程如下图:

那么被加入队列的任务如何被执行呢?
  1. 首先从execute() 方法中最核心的方法 addWorker()分析:
private boolean addWorker(Runnable firstTask, boolean core) {  
	// ... 省略一些对线程池状态的判断
     boolean workerStarted = false;  
	 boolean workerAdded = false;  
	 Worker w = null;  
	 try {
		// 1. 在此处通过ThreadPoolExecutor的内部类Worker创建一个新线程,并将当前command作为该线程的第一个任务
		w = new Worker(firstTask); 
		// 2. 此处变量t为Worker中的线程变量,即为创建好的新线程 
	 	final Thread t = w.thread;  
	 	if (t != null) {  
			final ReentrantLock mainLock = this.mainLock;  
			mainLock.lock();  
			try {  
				int rs = runStateOf(ctl.get());  
				if (rs < SHUTDOWN ||  (rs == SHUTDOWN && firstTask == null)) {  
					if (t.isAlive()) // precheck that t is startable  
						throw new IllegalThreadStateException();  
					workers.add(w);  
					int s = workers.size();  
					if (s > largestPoolSize)  
						largestPoolSize = s;  
				 workerAdded = true;  
			} finally {  
				mainLock.unlock();  
			if (workerAdded) {  
				// 3.调用线程的start()方法,开始执行对应的run()方法
				t.start();  
				workerStarted = true;  
	} finally {  
		if (! workerStarted)  
			addWorkerFailed(w);  
	return workerStarted;  
  1. Worker类是什么?

Worker类为ThreadPoolExecutor类的一个内部类,对线程池中的线程进行了包装,并提供了调度等待队列中任务的能力。

Worker类的成员变量

private final class Worker  
    extends AbstractQueuedSynchronizer  
    implements Runnable  
 	 * This class will never be serialized, but we provide a 
	 * serialVersionUID to suppress a javac warning. 
 private static final long serialVersionUID = 6138294804551838833L;  
 // 此处就是线程池中真正的线程了!!
 final Thread thread;  
 // 线程的第一个任务 
 Runnable firstTask;  
 // 执行完成任务计数器
 volatile long completedTasks;
 

Worker的构造方法

Worker(Runnable firstTask) {  
    setState(-1); // inhibit interrupts until runWorker  
 	this.firstTask = firstTask;  
	// 通过ThreadFactory创建新线程,此处默认为DefaultThreadFactory
 	this.thread = getThreadFactory().newThread(this);  

可以看到 Worker集成了AQS, 同时实现了Runnable接口,在构造方法中已经完成了Thread线程的创建,那么其run()方法干了什么呢?

Worker的run()方法

public void run() {  
    runWorker(this);  

可以看到其run方法中调用了runWorker()方法

final void runWorker(Worker w) {  
	// 获取当前线程
    Thread wt = Thread.currentThread();  
	// 获取第一个任务,第一个任务在Worker()的构造方法中已经传入
	Runnable task = w.firstTask;
	// 得到第一个任务后将task任务变量置为null,使其能够在while的下次循环中能够通过getTask()方法从等待队列中获取未执行的任务
	w.firstTask = null;  
	w.unlock(); // allow interrupts  
	boolean completedAbruptly = true;  
 	try {  
		// task执行完之后通过此处循环获取下一个任务,核心方法为getTask()
        while (task != null || (task = getTask()) != null) {  
            w.lock();  
		// ...省略一些在此处不太关键的代码
 		try {  
            beforeExecute(wt, task);  
 			Throwable thrown = null;  
 			try {  
				// 执行任务
                task.run();  
 			} catch (RuntimeException x) {  
                thrown = x; throw x;  
 		} catch (Error x) {  
             thrown = x; throw x;  
 	 } finally {  
		 // 任务执行完之后将task置为null,以便无论是否发生异常都可以通过getTask()获得下个任务
         task = null;  
		 // 将Worker实例的任务计数器+1
 		 w.completedTasks++;  
 		 w.unlock();  
	// ...
 

getTask()方法

private Runnable getTask() {
	// ...省略
	try {  
		// 此处通过任务队列获取下一个任务并返回
    	Runnable r = timed ?  
        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :  
        workQueue.take();  
 		if (r != null)  
        	return r;  
 		timedOut = true;  
	} catch (InterruptedException retry) {  
    	timedOut = false;  
	// ....省略

至此,线程池在何时通过何种方式调起任务队列中等待的任务就已经大致清晰了,其流程大致如下:

  1. 当调用ThreadPoolExecutorexecute()方法时,会调用addWorker()方法;
  2. addWorker()中会创建Worker类的实例,该实例对线程池中的线程进行了包装,其实现了Runnable接口,Worker实例创建成功后会调用线程的start()方法;
  3. Worker的run方法中通过runTask()方法执行任务,并循环通过getTask()方法获取队列中等待的任务去执行。
任务调度 任务调度是线程池的主要入口(execute(Runnable r)),接下来任务如何执行都是有这个阶段决定的; 所有的任务调度都是有execute方法完成,检查现在线程池的运行状态,运行的线程数,运行策略,决定接下来执行的流程,是直接申请线程执行,还是缓冲到队列执行,亦或是直接拒绝该任务执行过程如下: 1.首先检查线程池的运行状态,如果不是RUNNING,则直接拒绝,线程池要保证在RUNNING状态下执行任务 2.如果workCount < corePoolSiz 要执行任务数 >核心线程数,并且 < 最大连接数 并且 等待队列已满,则创建新线程; 要执行任务数 >核心线程数,并且>最大连接数 并且 等待队列已满,则调用拒绝策略来处理该任务(抛异常) 线程池线程池线程池可以看做是。它的工作主要是控制运行的线程的数量,处理过程任务放入队列,然后在线程创建后 启动这些任务,如果线程数量超过了最大数量超出数量的线程排队等候,等其它线程执行完毕, 再从队列取出任务执行。他的主要特点为:线程复用;控制最大并发数;管理线程。每一个 Thread 的类都有一个 start 方法。当调用 start 启动线程时 Java 虚拟机调用该类的 run 方法。那么该类的 run() 方法就是调用了 Runnable 对象的 run() 方法。 先说使用: ThreadPoolExecutor类参数,核心线程数,最大线程数(连核心数算在里面),非核心线程销毁之前等待任务的最长时间,前面时间的单位,等待队列大小。 提交优先级:核心线程>等待队列>非核心线程 执行优先级:核心线程>非核心线程>等待队列 如果核心线程10个,最大线程数20,等待队列10,在执行30个线程的时候。 执行任务1-10,然后执行任务21-30,最后执行任务11-20. 首先任务1-10直接给核心线程执行,核心线程满了之后,任务被放到等待队列;等待 首先检测线程池运行状态,如果不是running,则直接拒绝。 如果workCount < corePoolSize,则创建并启动一个线程线程来执行提交任务。 如果workCoun 要了解线程池的执行流程,我们首先就要知道什么是线程池?线程池就是里面含有若干线程的容器,没有任务时,线程池里面的这些线程都处于等待空闲状态。如果有新的线程任务,就分配一个空闲线程执行任务。如果所有的线程都处于忙碌状态,线程池就创建一个新的线程,亦或者是将任务放到工作队列等待。 大部分并发应用程序都是围绕“任务执行”来构造的:任务通常是一些抽象的且离散的工作单元。 一、在线程执行任务 当围绕“任务执行”来设计应用程序结构时,第一步就是要找出清晰的任务边界。在理想情况下,各个任务之间是相互独立的,独立性有助于实现并发。 在正常的负载下,服务器应用程序应该同时表现出良好的吞吐量和快速的响应性。应用程序提供商希望程序尽可能支持多的用户,从而降低每个用户的服务成本,而...