> 技术文档 > Java并发编程第十篇(ThreadPoolExecutor线程池组件分析)

Java并发编程第十篇(ThreadPoolExecutor线程池组件分析)


Java并发系列: JUC.ThreadPoolExecutor

  • 一,JUC.ThreadPoolExecutor概况
    • 1.1 线程池的意义
    • 1.2 继承关系
  • 二,源码解析
    • 2.1 静态变量
    • 2.2 属性
    • 2.3 内部类 Worker
  • 三,方法调用
    • 3.1 runWorker
    • 3.2 getTask
    • 3.3 execute
    • 3.4 addWorker

一,JUC.ThreadPoolExecutor概况

ThreadPoolExecutor作为开发中最常用的线程池, 也作为面试中被问到的最高频的并发组件之一, 我们有必要来聊聊它的作用以及内部构造。

1.1 线程池的意义

在讲解线程池之前, 有些读者可能存在这样的疑惑: 为什么需要线程池, 线程池有什么优越性?

关于这个问题, 主要从两个角度来进行解答:

  • 减少开销
    在大部分JVM上, 用户线程与操作系统内核线程是1:1的关系, 也就是说每次创建回收线程都要进行内核调用, 开销较大。那么有了线程池, 就可以重复使用线程资源, 大幅降低创建和回收的频率。此外, 也能一定程度上避免有人在写BUG时, 大量创建线程导致资源耗尽。

  • 便于管理
    线程池可以帮你维护线程ID、线程状态等信息, 也可以帮你统计任务执行状态等信息。

理解了线程池的意义, 那么本文的主角便是JUC提供的线程池组件: ThreadPoolExecutor.

请注意, 有人会将JUC中的ThreadPoolExecutor与Spring Framework中的ThreadPoolTaskExecutor混淆。这是两个不同的组件, ThreadPoolTaskExecutor可以理解为对ThreadPoolExecutor做的一层封装, 主要就是为了支持线程池的Bean化, 将其交给Spring Context来管理, 防止滥用线程池。而内部的核心逻辑还是由ThreadPoolExecutor处理。关于这一点, 简单了解即可。

从宏观上看, 开发者将任务提交给ThreadPoolExecutor, ThreadPoolExecutor分配工作线程(Worker)来执行任务, 任务完成后, 工作线程回到ThreadPoolExecutor, 等待后续任务。

根据这段描述, 产生了三个比较值得探究的问题:

  1. ThreadPoolExecutor自身有哪些状态, 如何维护这些状态?
  2. ThreadPoolExecutor如何维护内部的工作线程?
  3. ThreadPoolExecutor处理任务的整体逻辑是什么样的?

源码之前, 了无秘密。在读源码的过程中, 脑海中带着这三个问题。

1.2 继承关系

ThreadPoolExecutor继承了AbstractExecutorService, AbstractExecutorService实现了ExecutorService接口。ExecutorService接口继承了Executor接口, 整体呈现出了这样的关系:

ThreadPoolExecutor → AbstractExecutorService → ExecutorService → Executor

从上往下来看:

public interface Executor { void execute(Runnable command);}

Executor 接口中只声明了一个execute方法, 用于执行提交的任务。

ExecutorService扩展了Executor的语义, 增加了多种多样的操作。

public interface ExecutorService extends Executor { void shutdown(); List<Runnable> shutdownNow(); boolean isShutdown(); boolean isTerminated(); boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; <T> Future<T> submit(Callable<T> task); Future<?> submit(Runnable task); <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;}

而AbstractExecutorService则是对ExecutorService中声明的方法进行默认实现, 方便子类进行调用。比如ThreadPoolExecutor就直接使用了AbstractExecutorService的submit方法。 AbstractExecutorService也是一个比较核心的类, 但它不是本文的重点, 所以不会详细讲解。

二,源码解析

2.1 静态变量

private static final int COUNT_BITS = Integer.SIZE - 3;private static final int CAPACITY = (1 << COUNT_BITS) - 1;// runState is stored in the high-order bitsprivate static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;

这几个变量很关键, 在注释中也已经有了比较详细的解释。我这里就以更直白的方式加以介绍, 顺便帮你温习一些计算机基础知识。

先看下半部分的五个变量, 从命名上可以判定这些值代表了ThreadPoolExecutor的状态。

这些状态值涉及到了二进制移位操作, 我们知道int类型在Java中的二进制表示是以补码存储的。-1的二进制表示是32个1的序列, COUNT_BITS值是常数, 为32-3=29。因此RUNNING的二进制表示是高三位为111, 低29位都为0的序列。

我们用同样的方式表示出其余四个状态:

  • RUNNING: 11100000 00000000 00000000 00000000
  • SHUTDOWN: 00000000 00000000 00000000 00000000
  • STOP: 00100000 00000000 00000000 00000000
  • TIDYING: 01000000 00000000 00000000 00000000
  • TERMINATED: 01100000 00000000 00000000 00000000

不难发现, 这五个状态可以理解为目前只用到了高三位, 这是因为ThreadPoolExecutor只用一个int变量来同时保存线程池状态以及工作线程数这两个信息, 线程状态使用高三位, 工作线程数使用低29位。CAPACITY这个变量就表示为工作线程的最大数量 (2^29 - 1)。

这种将两种状态存储在一个二进制序列中的做法, 在业务代码中相对比较少见, 在底层源码中很常见。比如ReentrantReadWriteLock中, 用一个int来组合表示读锁和写锁的个数, 比如在ZooKeeper中, 用一个long来组合表示epoch和事务个数。

这几种状态的含义是:

  • RUNNING: 接受新任务, 也能处理阻塞队列里的任务。
  • SHUTDOWN: 不接受新任务, 但是处理阻塞队列里的任务。
  • STOP: 不接受新任务, 不处理阻塞队列里的任务, 中断处理过程中的任务。
  • TIDYING: 当所有的任务都执行完了, 当前线程池已经没有工作线程, 这时线程池将会转换为TIDYING状态, 并且将要调用terminated方法。
  • TERMINATED: terminated方法调用完成。

这几个状态之间的变化如图所示:

Java并发编程第十篇(ThreadPoolExecutor线程池组件分析)

2.2 属性

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));// 相关方法private static int ctlOf(int rs, int wc) { return rs | wc; }

首先着重介绍的是AtomicInteger类型的属性ctl

ctl就是上文所说的, 组合了线程池状态以及池中工作线程数两个信息的变量。它初始化时调用了ctlOf方法, 可以看到ctlOf只是一个或操作。这就说明, 线程池在初始化时, 状态被标记为RUNNING, 工作线程数为0。

读到这里, 有一些读者可能会存在疑惑: 为啥非要用一个int值来组合表示两种状态? 用两个值表示, 清清楚楚不行吗?

可以, 当然可以。但使用一个变量的好处是: 如果需要对两个状态值进行同步修改, 直接通过位操作就可以了, 省去了加锁操作。因为在操作系统级别, 对int的修改本身就是原子的。

再来看看其他属性, 属性往往会透露出这个类是如何组织的。

private final BlockingQueue<Runnable> workQueue;private final ReentrantLock mainLock = new ReentrantLock();private final HashSet<Worker> workers = new HashSet<Worker>();private final Condition termination = mainLock.newCondition();private int largestPoolSize;private long completedTaskCount;private volatile ThreadFactory threadFactory;private volatile RejectedExecutionHandler handler;private volatile long keepAliveTime;private volatile boolean allowCoreThreadTimeOut;private volatile int corePoolSize;private volatile int maximumPoolSize;

一个个属性来看:

  • workQueue: BlockingQueue类型, 用来存储积压任务的阻塞队列。Callable类型的任务会被父类AbstractExecutorService转化为FutureTaskRunnable的子类)。
  • mainLock: ReentrantLock类型, 对线程池的一些操作需要状态同步, 所以需要用到锁。
  • workers: HashSet类型, Worker是对工作线程以及一些状态的封装, workers是用来存储所有Worker的集合。
  • termination: 由mainLock创建的Condition, 用于awaitTermination调用时的线程同步。
  • largestPoolSize: 线程池中最多有过多少个活跃线程。
  • completedTaskCount: 线程池总共处理了多少任务。
  • threadFactory: ThreadFactory接口, 用户可以自定义创建工作线程的工厂。
  • handler: 拒绝策略, 当workQueue满载时将会触发。
  • keepAliveTime: 工作线程空闲时则保持存活的时间。
  • allowCoreThreadTimeOut: 布尔类型, 是否需要保持核心线程始终处于存活。
  • corePoolSize: 核心线程数。当阻塞队列还未满载时, 线程池将保持核心线程数。
  • maximumPoolSize: 最大线程数。当阻塞队列满载时, 线程池将会在核心线程数的基础上创建新线程来处理任务, 直到最大线程数。

2.3 内部类 Worker

private final class Worker extends AbstractQueuedSynchronizer implements Runnable{ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } public void run() { runWorker(this); } protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } }}

继承关系

  • 继承AQS: 说明Worker内部存在同步需求。当某个WorkerworkQueue中获取一个任务后, 便持有锁, 直到将任务在当前线程内执行完后, 再释放锁。这里加锁的作用是表示Worker是否处于工作中, 不接收中断信号。
  • 实现Runnable: Worker本身就是一个异步的任务调度者。

在构造函数中, 将AQS的state初始化为-1, 是为了在初始化期间不接受中断信号, 直到runWorker方法开始运行, state将会被修改为0, 此时相当于锁被释放的状态, 可以接受中断信号。这部分逻辑可以从interruptIfStarted方法中理解。

三,方法调用

3.1 runWorker

Worker中最主要的run方法, 也就是调用runWorker方法。

final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) ||  (Thread.interrupted() &&  runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try {  task.run(); } catch (RuntimeException x) {  thrown = x; throw x; } catch (Error x) {  thrown = x; throw x; } catch (Throwable x) {  thrown = x; throw new Error(x); } finally {  afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); }}
  • 若当前线程从workQueue中获取任务, 首先加锁, 标记当前工作线程正在执行任务, 不接收中断信号。
  • 接下来判断线程池状态, 若线程池状态>= STOP, 则对当前线程调用中断。
  • 任务处理有两个细节:
    • 调用了task的run方法而不是start方法, 表示依然在当前线程中处理, 而非新启线程。
    • task.run()方法的前后, 有beforeExecuteafterExecute这两个钩子方法。
  • getTask方法返回了null, 那么将会跳出循环, 调用processWorkerExit方法来对Worker进行回收。

3.2 getTask

private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } }}
  • 5-12行: 获取线程池状态, 若是STOP, TIDYING, TERMINATED, 或是SHUTDOWN且工作队列为空, 那么返回null, 代表当前worker可以回收。
  • 14-24行: 如果当前工作线程数超过了最大线程数, 或达到了核心线程数的回收条件, 则开始尝试回收当前worker。
  • 26-35行: 从阻塞队列里获取任务。timedtrue时使用poll超时获取, 否则使用take阻塞等待。poll超时也会导致新一轮的回收判断。

getTask方法是线程池动态维护工作线程的核心。

3.3 execute

最重要的execute方法。

public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (!isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command);}

这段逻辑可以用一张流程图辅助理解。

Java并发编程第十篇(ThreadPoolExecutor线程池组件分析)

简单来说: 当向线程池提交一个任务, 如果当前线程数小于核心线程数, 那么就新增worker。如果新增失败, 则进入下面的流程。向阻塞队列offer一个任务, 如果阻塞队列已满, 那么继续尝试创建worker(奔着最大线程数去), 如果已经达到了最大线程数, 那么触发拒绝策略。而如果成功提交到了阻塞队列, 这时再判断线程池的状态, 如果处于非RUNNING状态, 那么尝试移除任务, 如果成功移除该任务, 就触发拒绝策略。

3.4 addWorker

private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN &&  firstTask == null &&  ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN ||  (rs == SHUTDOWN && firstTask == null)) {  if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException();  workers.add(w);  int s = workers.size();  if (s > largestPoolSize) largestPoolSize = s;  workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted;}
  • 外层retry循环和内层循环通过CAS操作compareAndIncrementWorkerCount保证了只有一个线程能成功增加worker计数并跳出循环去创建Worker, 这保证了worker数量不会超限。
  • 既然CAS已经保证了线程安全, 为什么还要获取mainLock呢? 这是为了同步对workers这个HashSet的操作, 因为HashSet本身不是线程安全的, 需要防止在add时其他地方发生remove操作。