> 文档中心 > Java线程池实现原理

Java线程池实现原理

Java线程池实现原理

  • 线程池是什么
    • 池化思想
    • 线程池解决了什么
  • 线程池核心设计与实现
    • 总体设计
    • 生命周期管理
    • 任务执行机制
      • 任务调度
      • 任务缓冲
      • 任务申请
      • 任务拒绝
    • Worker线程管理
      • Worker线程
      • Worker线程增加
      • Worker线程回收
      • Worker线程执行任务

线程池是什么

线程池(Thread Pool)是一种基于池化思想管理线程的工具。

池化思想

池化,是为了最大化收益并最小化风险,而将资源统一在一起进行管理的一种思想。

Pooling is the grouping together of resources (assets, equipment, personnel, effort, etc.) for the purposes if maximizing advantage or minimizing risk to the users. The term is used in finance, computing and equipment management. --wikipedia

“池化”思想不仅仅能应用在计算机领域,在金融、设备、人员管理、工作管理等地方也有相关的应用。

在计算机领域中的表现为:统一管理IT资源,包括服务器、存储和网络资源等。通过共享资源,使用户在低投入中获益。出去线程池,还有其他几种比较典型的使用策略:

  1. 内存池(Memory Pooling):预先申请内存,提升申请内存的速度,减少内存碎片。
  2. 连接池(Connection Pooling):预先申请数据库连接,提升申请连接的速度,降低系统的开销。
  3. 实例池(Object Pooling):循环使用对象,减少资源在初始化和释放时的昂贵损耗。

线程池解决了什么

线程过多会带来额外的开销,其中包括创建、销毁线程的开销、调度线程的开销等等,同时也降低了计算机的整体性能。

线程池维护多个线程,等待监督、管理者分配可并发执行的任务。这种做法,一方面避免了处理任务时创建、销毁线程开销的代价,另一方面避免了线程数量膨胀导致的过分调度问题,保障了对内核的充分利用。

线程池解决的核心问题就是资源管理问题。在并发环境下,系统不能确定在任意时刻中,有多少任务需要执行、有多少资源需要投入。这种不确定性将带来以下问题:

  1. 频繁申请 / 销毁资源和调度资源,将带来额外的性能损耗,可能会非常巨大。
  2. 对资源无线申请缺少抑制手段,易引发系统资源耗尽的风险。
  3. 系统无法合理管理内部的资源分布,会降低系统稳定性。

使用线程池还能带来以下好处:

  1. 降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
  2. 提高响应速度:任务到达时,无需等待线程创建即可立即执行。
  3. 提高线程的课管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
  4. 提供更强大的功能:线程池具备可扩展性,允许开发人员向其中增加更多的功能。比如延时、定时线程池ScheduledThreadPoolExecutor,就允许延期执行或定期执行。

线程池核心设计与实现

总体设计

Java中的线程池核心实现类是ThreadPoolExecutor,下图为ThreadPoolExecutor的UML类图。
Java线程池实现原理
ThreadPoolExecutor实现的顶层接口是Executor,顶层接口Executor提供了一种思想:将任务提交和任务执行进行解耦。用户无需关心如何创建线程如何调度线程来执行任务,用户只需提供Runnable对象,将任务的运行逻辑提交到执行器(Executor)中,由Executo框架完成线程的调配和任务的执行。

ExecutorService接口增加了一些能力:

  1. 扩充执行任务的能力,补充可以为一个或一批异步任务生成Future的方法;
  2. 提供了监控线程池的方法,例如停止线程池的运行。

AbstractExecutorService则是上层的抽象类,将执行任务的流程串联了起来,保证下层的实现只需关注一个执行任务的方法即可。

最下层的实现类ThreadPoolExecutor实现最为复杂的运行部分,ThreadPoolExecutor将会一方面维护自身的生命周期,另一方面同事管理线程和任务,使两者良好的结合,从而执行秉性任务。

ThreadPoolExecutor同时维护线程和执行任务的运行机制如下图所示:
Java线程池实现原理
线程池在内部实际上构建了一个生产者消费者模型,将线程和任务两者解耦,并不直接关联,从而良好地缓冲任务,复用线程。线程池的运行主要分为两个部分:任务管理、线程管理。

任务管理管理部分充当生产者的角色,提交任务后,线程池会判断该任务后续的流转:(1)直接申请线程执行该任务;(2)缓冲到队列中等待线程执行;(3)拒绝该任务。

线程管理部分是消费者,他们被维护在线程池内,根据任务请求进行线程分配,当线程执行完任务后,则会陆续获取新的任务去执行,最终当线程获取不到任务的时候,线程就会被回收。

接下来,按照下述三个部分去详细描述线程池的运行机制:

  1. 线程池如何维护自身状态;
  2. 线程池如何管理任务;
  3. 线程池如何管理线程。

生命周期管理

线程池运行的状态,并不是用户显示设置的,而是伴随着线程池的运行,由内部来维护。线程池内部使用一个变量维护两个值:运行状态(runState)和线程数量(workerCount)。在具体实现中,线程池将运行状态(runState)、线程数量(workerCount)两个关键参数维护放到了一起,如下代码所示:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

ctl这个AtomicInteger类型,是对线程池的运行状态和线程池中有效线程数量进行控制的一个手段,他同时包含两部分的信息:线程池的运行状态(runState)和线程池内有效线程的数量(workerCount),高3位保存runState,低29位保存workerCount,两个变量之间互不干扰。用一个变量去存储两个值,课避免在做相关决策时,出现不一致的情况,不必为了维护两者的一致,而占用锁资源。通过阅读线程池源码可以发现,经常出现需要同事判断线程池运行状态和线程数量的情况。线程池也提供了若干方法去供用户获得当前线程池的运行状态和线程个数。这里都使用的是位运算的方式,相比于基本运算,速度也会快很多。

关于内部封装的获取生命周期状态、获取线程池线程数量的计算方法如下:

// Packing and unpacking ctlprivate static int runStateOf(int c)     { return c & ~CAPACITY; }// 计算当前运行状态private static int workerCountOf(int c)  { return c & CAPACITY; }// 计算当前线程数量private static int ctlOf(int rs, int wc) { return rs | wc; }// 通过状态和线程数生成ctl

ThreadPoolExecutor的运行状态由5种,分别为:

运行状态 状态描述
RUNNING 能接受新提交的任务,并且也能处理阻塞队列中的任务。
SHUTDOWN 关闭状态,不能接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务。
STOP 不能接受新任务,也不能处理队列中的任务,会中断正在处理任务的线程。
TIDYING 所有的任务都已终止了,workerCount(有效线程数)为0。
TERMINATED 在terminated( )方法执行完毕后进入该状态。

其生命周期转换如下图所示:

Java线程池实现原理

任务执行机制

任务调度

任务调度是线程池的主要入口,当用户提交了一个任务,接下来这个任务将如何执行都是由这个阶段决定的。了解这部分就相当于了解了线程池的核心运行机制。

首先,所有任务的调度都是由execute( )这个方法完成的,这部分完成的工作是:检查当前线程池的运行状态、运行线程数、运行策略,决定接下来的执行流程,是直接申请线程执行、或是缓冲到队列中执行,亦或是直接拒绝该任务。其执行过程如下:

  1. 首先检查线程池的运行状态,如果不是RUNNING,则直接拒绝,线程池要确保在RUNNING状态写执行任务。
  2. 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务。
  3. 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。
  4. 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务。
  5. 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满,则根据拒绝策略来处理该任务,默认的处理方式是直接抛弃。

其执行流程如下图所示:
Java线程池实现原理

任务缓冲

任务缓冲模块是线程池能够管理任务的核心部分。线程池的本质是对任务和线程的管理,而做到这一点最关键的思想就是将任务和线程两者解耦,不让两者关联,才可以做后续的分配工作。线程池中是以生产者消费者模式,通过一个阻塞队列来实现的。阻塞队列缓冲任务,工作线程从阻塞队列中获取任务。

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加操作是:

  1. 在队列为空时,获取元素的线程会等待队列变为非空;
  2. 在队列为满时,存储元素的线程会等待队列可用。

阻塞队列常用于生产者和消费者的场景。生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

下图中展示了线程1往阻塞队列中添加元素,而线程2从阻塞队列中移除元素:

Java线程池实现原理

使用不同的队列可以实现不同的任务存取策略。在这里,我们介绍以下存储队列的成员:

名称 描述
ArrayBlockingQueue 一个用数组实现的有界阻塞队列,此队列按照先进先出(FIFO)的原则对元素进行排序。支持公平锁和非公平锁。
LinkedBlockingQueue 一个由链表结构组成的有界队列,此队列按照先进先出(FIFO)的原则对元素进行排序。此队列的默认长度为Integer.MAX_VALUE,所以默认创建该队列有容量危险。
PriorityBlockingQueue 一个支持线程优先级排序的无界队列,默认自然序进行排列,也可以自定义实现compareTo( )方法来指定元素排列规则,不能保证同优先级元素的顺序。
DelayQueue 一个实现PriorityBlockingQueue实现延迟获取的无界队列,在创建元素时,可以指定多久才能从队列中获取当前元素。是有延时期满后才能从队列中获取元素。
SynchronousQueue 一个不存储元素的阻塞队列,每一个put操作必须等待take操作,否则不能添加元素。支持公平锁和非公平锁。SynchronousQueue的一个使用场景是在线程池里,Executor.newCachedThreadPool( )就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲60秒后会被回收。
LinkedTransferQueue 一个由链表结构组成的无界队列,相当于其他队列,LinkedTransferQueue多了transfer和tryTransfer方法。
LinkedBlockingDeque 一个由链表结构组成的双向阻塞队列。队列头部和尾部都可以添加和移除元素,多线程并发时,可以将锁的竞争最多减少到一半。

任务申请

由上文的任务分配可知,任务的执行有两种情况:一种是任务直接由新创建的线程执行;另一种是线程从任务队列中获取任务然后执行,执行完任务的线程会再次从队列中申请任务再去执行。第一种情况仅出现在线程初次创建的时候,第二种情况是线程获取任务的绝大多数情况。

线程需从任务缓存模块中不断地取任务,帮助线程从阻塞队列中获取任务,实现线程管理模块和任务管理模块之间的通信。这部分策略由getTask方法实现,其执行流程如下图所示:
Java线程池实现原理

getTask这部分进行了多次判断,为的是控制线程数量,使其符合线程池的状态。如果线程池现在不应该持有那么多线程,则会返回null。工作线程Worker会不断接收新任务去执行,二挡工作线程Worker接收不到任务的时候,就会开始被回收。

任务拒绝

任务拒绝模块是线程池的保护部分,线程池有一个最大的容量,当线程的任务缓存队列已满,并且线程池中的线程数目达到maximumPoolISize时,就需要拒绝该任务,采用任务拒绝策略,保护线程池。

拒绝策略是一个接口,其设计如下:

public interface RejectedExecutionHandler {    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);}

用户可以通过实现这个接口去定制拒绝策略,也可以通过JDK提供的4种已有拒绝策略,其特点如下:

名称 描述
ThreadPoolExecutor.AbortPolicy 丢弃任务并抛出RejectedExecutionException异常。这是线程池默认的拒绝策略,在任务不能再提交的时候,抛出异常,及时反馈程序运行状态。如果是比较关键的业务,推荐使用此拒绝策略,这样子在系统不能承受更大的并发量时,能够及时的通过异常发现。
ThreadPoolExecutor.DiscardPolicy 丢弃任务,但是不抛出异常。使用此策略,可能会导致我们无法发现系统的异常状态。建议是一些无关紧要的业务采取此策略。
ThreadPoolExecutor.DiscardOldestPolicy 丢弃队列最前面的任务,然后重新提交被拒绝的任务。是否采用此拒绝策略,还得根据实际业务是否允许丢弃老任务来认真衡量。
ThreadPoolExecutor.CallerRunsPolicy 由调用线程(提交任务的线程)处理该任务。这种情况是需要让所有任务都执行完毕,那么就适合大量计算的任务类型去执行,多线程仅仅是增大吞吐量的手段,最终必须要让每个任务都执行完毕。

Worker线程管理

Worker线程

线程池为了掌握线程的状态并维护线程的生命周期,设计了线程池内部的工作线程Worker。

private final class Worker extends AbstractQueuedSynchronizer implements Runnable{  final Thread thread;// Worker持有的线程  Runnable firstTask;// 初始化的任务,可以为null}

Worker这个工作线程,实现了Runnable接口,并且有一个线程thread,一个初始化的任务firstTask。thread是在调用构造方法时通过ThreadFactory来创建的线程,可以用来执行任务;firstTask用它来保存传入的第一个任务,这个任务可以为null。如果这个线程是非空的,那么线程就会在启动时立即去执行这个任务,也就对应核心线程创建时的情况;如果这个值是null,那么就是需要创建一个线程去执行任务李彪(workQueue)中的任务,也就是非核心线程的创建。

Worker执行任务的模型如下图所示:

Java线程池实现原理

线程池需要管理线程的生命周期,需要在线程长时间不运行的情况下进行回收。线程池有一张Hash表去持有线程的引用,这样可以通过添加引用、移除引用这样的操作来控制吸纳城池的生命周期。这个时候最重要的就是如何判断线程是否在运行。

Worker是用过集成AQS,使用AQS来实现独占锁这个功能。没有使用可重入锁ReentrantLock,而是使用AQS,为的就是实现不可重入的特性去反映线程现在的执行状态。

  1. lock方法一旦获取了独占锁,表示当前线程正在执行任务中。
  2. 如果正在执行任务,则不应该中断线程。
  3. 如果该线程现在不是独占锁的状态,也就是空闲状态,说明他没有在处理任务,这个时候可以对该线程进行中断。
  4. 线程池在执行shutdown方法或者tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;如果线程是空闲状态则可以安全回收。

在线程池回收过程中就使用了这种特性,回收过程如下图所示:

Java线程池实现原理

Worker线程增加

增加线程是通过线程池中的addWorker方法,该方法的功能就是增加一个线程,该方法不考虑线程池是在哪个阶段增加的该线程,这个分配线程的策略是在上个步骤完成的,该步骤仅仅增加线程,并使它运行,最后返回是否成功这个结果。addWorker方法有两个参数:firstTask、core。firstTask参数用于指定新增的线程执行的第一个任务,该参数可以为空;core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,false表示新增线程时会判断当前活动线程数是否少于maximumPoolSize,其执行流程如下图所示:

Java线程池实现原理

Worker线程回收

线程池中线程的销毁以来JVM自动的回收,线程池做的工作是根据当前线程池的状态维护一定数量的线程引用,防止这部分线程被JVM回收,当线程池决定哪些线程需要会收时,只需要将其引用消除即可。Worker被创建出来后,就会不断地进行轮询,然后获取任务去执行,核心线程可以无线等待获取任务,非核心线程要限时获取任务。当Worker无法获取到任务,也就是获取的任务为空时,循环就会结束,Worker主动消除自身在线程池内的引用。

while (task != null || (task = getTask()) != null) {  // 执行任务} finally {  processWorkerExit(w, completedAbruptly);// 获取不到任务时,主动回收自己}

线程回收的工作是在processWorkerExit方法中完成的。
Java线程池实现原理

事实上,在这个方法中,将线程引用移除线程池就已经结束了线程销毁的部分。但由于引起线程销毁的可能性有很多,线程池还有要判断是什么引发了这次销毁,是否要改变线程池的现阶段状态,是否要根据新状态,重新分配线程。

Worker线程执行任务

在Worker类中的run方法调用了runWorker方法来执行任务,runWorker方法的执行过程如下:

  1. while循环不断地通过getTask方法获取任务。
  2. getTask方法从阻塞队列中获取任务。
  3. 如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态。
  4. 执行任务。
  5. 如果getTask结果为null则跳出循环,执行processWorkerExit方法,销毁线程。

执行流程如下图所示:
Java线程池实现原理