【Java源码阅读系列27】深度解读Java ThreadPoolExecutor 源码
Java的ThreadPoolExecutor
是并发编程中处理任务执行的核心类,广泛应用于异步任务调度、批量数据处理等场景。本文将从源码层面解析其核心机制,提炼设计模式,并结合实际场景给出使用示例。
一、线程池核心架构:状态管理与核心参数
1.1 状态压缩与原子控制:ctl
变量
ThreadPoolExecutor
通过一个原子整数ctl
(类型为AtomicInteger
)同时管理线程池状态(runState
)和有效线程数(workerCount
),这是其最精妙的设计之一。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));private static final int COUNT_BITS = Integer.SIZE - 3; // 32位中留3位给状态private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 最大线程数(约5亿)// 线程池状态(高位存储)private static final int RUNNING = -1 << COUNT_BITS; // 接收新任务+处理队列任务private static final int SHUTDOWN = 0 << COUNT_BITS; // 不接收新任务,但处理队列任务private static final int STOP = 1 << COUNT_BITS; // 不接收新任务+不处理队列任务+中断执行中任务private static final int TIDYING = 2 << COUNT_BITS; // 所有任务终止,workerCount=0,准备执行terminated()private static final int TERMINATED = 3 << COUNT_BITS; // terminated()执行完成// 状态与线程数的解包/打包private static int runStateOf(int c) { return c & ~CAPACITY; } // 取高位状态private static int workerCountOf(int c) { return c & CAPACITY; } // 取低位线程数private static int ctlOf(int rs, int wc) { return rs | wc; } // 合并状态与线程数
通过位运算将两个关键指标压缩到一个整数中,既保证了原子性(通过AtomicInteger
的CAS
操作),又减少了锁竞争,是典型的“空间换时间”优化。
1.2 核心参数与动态调整
线程池的行为由以下参数控制(均为volatile
变量保证可见性):
corePoolSize
:核心线程数(即使空闲也保留)maximumPoolSize
:最大线程数(超过核心数的线程会因空闲超时被回收)keepAliveTime
:非核心线程的空闲超时时间workQueue
:任务队列(存储待执行的Runnable
)threadFactory
:线程工厂(创建工作线程)handler
:拒绝策略(任务无法处理时的allback
)
二、任务执行主流程:从execute()
到任务落地
任务提交的入口是execute(Runnable command)
方法,其核心逻辑可分为三步:
2.1 步骤一:核心线程未满则直接创建新线程
public void execute(Runnable command) { int c = ctl.get(); // 1. 核心线程未满时,创建新线程执行任务 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) // true表示使用corePoolSize作为上限 return; c = ctl.get(); // 失败后重新获取状态(可能被其他线程修改) } // ...}
若当前线程数小于corePoolSize
,则通过addWorker(command, true)
创建新线程,并将任务作为该线程的第一个任务。
2.2 步骤二:核心线程已满则尝试入队
// 2. 核心线程已满,尝试将任务加入队列if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 二次检查:若入队后线程池已关闭,则移除任务并拒绝 if (!isRunning(recheck) && remove(command)) reject(command); // 若线程池存活但没有工作线程(可能核心线程被回收),则创建新线程 else if (workerCountOf(recheck) == 0) addWorker(null, false); // 无初始任务,线程从队列取任务}
若任务成功入队,需二次检查线程池状态(避免在入队过程中线程池被关闭)。若线程池已关闭,需将任务从队列移除并触发拒绝策略;若线程池存活但无工作线程(如核心线程被允许超时回收),则创建新线程处理队列任务。
2.3 步骤三:队列已满则创建最大线程或拒绝任务
// 3. 队列已满,尝试创建最大线程(使用maximumPoolSize)else if (!addWorker(command, false)) reject(command); // 创建失败则触发拒绝策略
若任务无法入队(队列已满),则尝试创建新线程(上限为maximumPoolSize
)。若创建失败(线程数已达上限或线程池已关闭),则调用reject(command)
触发拒绝策略。
三、工作线程管理:Worker
类与任务执行循环
3.1 Worker
类:线程的“包装器”
Worker
是ThreadPoolExecutor
的内部类,继承自AbstractQueuedSynchronizer
(AQS
)并实现Runnable
,负责封装工作线程和任务执行逻辑:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { final Thread thread; // 实际执行任务的线程 Runnable firstTask; // 初始任务(可能为null) volatile long completedTasks; // 该线程完成的任务数 Worker(Runnable firstTask) { setState(-1); // 初始状态为-1,禁止中断(直到runWorker开始) this.firstTask = firstTask; this.thread = threadFactory.newThread(this); // 使用线程工厂创建线程 } public void run() { runWorker(this); // 委托给外部runWorker方法 } // AQS方法:实现非重入锁(避免任务执行时被中断) protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; }}
AQS
的作用:Worker
通过AQS
实现了一个非重入锁。任务执行时(runWorker
中)会调用lock()
加锁,确保线程在执行任务期间不会被中断(中断仅发生在任务空闲等待时)。- 线程创建:通过
threadFactory
创建线程,默认使用Executors.defaultThreadFactory
(创建非守护线程,优先级为NORM_PRIORITY
)。
3.2 任务执行循环:runWorker()
runWorker
是工作线程的核心执行逻辑,负责循环获取并执行任务:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // 允许中断(初始状态为-1,解锁后状态变为0) boolean completedAbruptly = true; try { // 循环获取任务(从队列或初始任务) while (task != null || (task = getTask()) != null) { w.lock(); // 加锁,防止执行任务时被中断 // 检查线程池状态:若已STOP,强制中断当前线程 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); // 钩子方法(子类扩展) Throwable thrown = null; try { task.run(); // 执行任务 } catch (RuntimeException | Error x) { thrown = x; throw x; } finally { afterExecute(task, thrown); // 钩子方法(子类扩展) } } finally { task = null; w.completedTasks++; w.unlock(); // 解锁 } } completedAbruptly = false; // 正常退出循环(无任务可执行) } finally { processWorkerExit(w, completedAbruptly); // 处理线程退出 }}
- 任务获取:通过
getTask()
从队列中获取任务(可能阻塞或超时)。若线程池状态为SHUTDOWN
且队列为空,或状态为STOP
,则getTask()
返回null
,线程退出。 - 钩子方法:
beforeExecute
和afterExecute
是受保护的方法,允许子类重写以实现任务执行前后的扩展逻辑(如日志记录、ThreadLocal
清理)。 - 异常处理:任务执行中抛出的异常会被捕获,并通过
afterExecute
传递,最终可能导致线程异常退出(completedAbruptly
为true
),触发processWorkerExit
替换线程。
四、设计模式深度解析
ThreadPoolExecutor
的源码中巧妙运用了多种设计模式,使其具备高度的灵活性和可扩展性。
4.1 工厂模式(Factory Pattern)
应用点:ThreadFactory
接口用于创建工作线程。
线程池通过ThreadFactory
解耦线程的创建逻辑,用户可自定义工厂(如设置线程名称、优先级、守护状态)。默认实现为Executors.defaultThreadFactory
:
public static ThreadFactory defaultThreadFactory() { return new DefaultThreadFactory();}private static class DefaultThreadFactory implements ThreadFactory { public Thread newThread(Runnable r) { Thread t = new Thread(group, r, \"pool-\" + poolNumber.getAndIncrement() + \"-thread-\" + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; }}
作用:将线程创建的细节封装,允许用户通过替换ThreadFactory
实现定制化线程管理(如日志线程、监控线程)。
4.2 策略模式(Strategy Pattern)
应用点:RejectedExecutionHandler
接口定义拒绝策略,支持多种实现。
当任务无法被处理(线程池已满且队列已满)时,通过RejectedExecutionHandler
的实现类决定如何处理任务。ThreadPoolExecutor
内置了4种策略:
// 1. 直接抛出异常(默认策略)public static class AbortPolicy implements RejectedExecutionHandler { public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException(\"Task \" + r + \" rejected from \" + e); }}// 2. 调用者线程直接执行任务(降低提交速率)public static class CallerRunsPolicy implements RejectedExecutionHandler { public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) r.run(); }}// 3. 静默丢弃任务public static class DiscardPolicy implements RejectedExecutionHandler { public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}}// 4. 丢弃队列中最旧的任务,重新尝试提交当前任务public static class DiscardOldestPolicy implements RejectedExecutionHandler { public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } }}
作用:将拒绝逻辑抽象为策略接口,允许运行时动态切换(通过setRejectedExecutionHandler
),满足不同场景需求(如严格拒绝、降级执行)。
4.3 模板方法模式(Template Method Pattern)
应用点:beforeExecute
、afterExecute
、terminated
等钩子方法。ThreadPoolExecutor
定义了这些受保护的方法,子类可通过重写扩展行为:
// 任务执行前调用(子类扩展)protected void beforeExecute(Thread t, Runnable r) { }// 任务执行后调用(子类扩展)protected void afterExecute(Runnable r, Throwable t) { }// 线程池完全终止后调用(子类扩展)protected void terminated() { }
示例:用户可通过重写beforeExecute
记录任务开始时间,afterExecute记录任务耗时,实现任务执行监控。
4.4 观察者模式(Observer Pattern)
应用点:termination
条件变量监听线程池终止状态。
当线程池状态变为TERMINATED
时,通过termination.signalAll()
唤醒等待线程(如调用awaitTermination
的线程):
// tryTerminate方法中触发ctl.set(ctlOf(TERMINATED, 0));termination.signalAll(); // 唤醒所有等待终止的线程
作用:允许外部线程等待线程池完全终止(如资源清理场景),通过awaitTermination
方法实现。
五、关键机制总结
- 状态压缩:通过
ctl
变量原子化管理线程数和状态,减少锁竞争。 - 动态调参:
corePoolSize
、maximumPoolSize
等参数支持动态调整(通过setCorePoolSize
等方法),适应负载变化。 - 任务队列:支持
SynchronousQueue
(直接交接)、LinkedBlockingQueue
(无界队列)、ArrayBlockingQueue
(有界队列)等,影响线程池扩容策略。 - 线程回收:非核心线程在空闲超时后被回收(
keepAliveTime
控制),核心线程可通过allowCoreThreadTimeOut
配置是否允许回收。
六、常见使用场景与代码示例
6.1 常见使用场景
- Web服务器请求处理:如Tomcat的
Connector
线程池,处理HTTP请求。 - 批量数据处理:如读取文件后并行计算,使用线程池加速处理。
- 异步任务调度:如日志异步写入、消息队列消费。
- 定时任务扩展:
ScheduledThreadPoolExecutor
(ThreadPoolExecutor
子类)用于定时任务。
6.2 代码示例
示例1:固定大小线程池(FixedThreadPool
)
import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class FixedThreadPoolDemo { public static void main(String[] args) { // 创建固定大小为3的线程池 ExecutorService pool = Executors.newFixedThreadPool(3); // 提交10个任务 for (int i = 0; i < 10; i++) { int taskId = i; pool.execute(() -> { System.out.println(\"Task \" + taskId + \" executed by \" + Thread.currentThread().getName()); try { Thread.sleep(1000); // 模拟任务耗时 } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); } pool.shutdown(); // 关闭线程池(等待任务执行完成) }}
示例2:自定义拒绝策略
import java.util.concurrent.*;public class CustomRejectPolicyDemo { public static void main(String[] args) { // 创建核心线程数2,最大线程数3,队列大小2的线程池 ThreadPoolExecutor pool = new ThreadPoolExecutor( 2, 3, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), new RejectedExecutionHandler() { // 自定义拒绝策略:记录日志并丢弃任务 @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.err.println(\"Task \" + r + \" rejected. Pool stats: \" + \"active=\" + executor.getActiveCount() + \", queueSize=\" + executor.getQueue().size()); } } ); // 提交7个任务(核心2+队列2+最大1=5,第6、7个任务触发拒绝) for (int i = 0; i < 7; i++) { int taskId = i; pool.execute(() -> { try { Thread.sleep(2000); // 模拟长耗时任务 } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); } pool.shutdown(); }}
示例3:扩展钩子方法(记录任务耗时)
import java.util.concurrent.*;public class HookMethodDemo extends ThreadPoolExecutor { public HookMethodDemo(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } @Override protected void beforeExecute(Thread t, Runnable r) { System.out.println(\"Task \" + r + \" starts at \" + System.currentTimeMillis()); } @Override protected void afterExecute(Runnable r, Throwable t) { System.out.println(\"Task \" + r + \" ends at \" + System.currentTimeMillis()); } public static void main(String[] args) { HookMethodDemo pool = new HookMethodDemo( 2, 3, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>() ); pool.execute(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); pool.shutdown(); }}
七、总结
ThreadPoolExecutor
通过精细的状态管理(ctl
变量)、任务队列协调(workQueue
)和线程生命周期控制(Worker
类),实现了高效的任务并发执行。其设计中融合了工厂模式、策略模式、模板方法模式等经典设计模式,既保证了核心逻辑的稳定性,又通过扩展点(钩子方法、自定义工厂/策略)提供了强大的灵活性。实际开发中,需根据场景合理配置线程池参数(如核心线程数、队列类型),并结合拒绝策略和钩子方法优化任务执行效率。