> 文档中心 > 【java之juc并发包系列教程】一文看懂ThreadPoolExecutor线程池提交线程执行流程,源码解析

【java之juc并发包系列教程】一文看懂ThreadPoolExecutor线程池提交线程执行流程,源码解析

文章目录

  • 前言
  • 一、ThreadPoolExecutor
  • 二、threadPoolExecutor.submit(Runnable task)

前言

一、ThreadPoolExecutor

创建线程池有几种方式,可以通过工厂类创建,也可以直接创建对象。本质都是创建出了ThreadPoolExecutor对象,
简单弄个例子看看创建和执行都做了什么
【java之juc并发包系列教程】一文看懂ThreadPoolExecutor线程池提交线程执行流程,源码解析
new ThreadPoolExecutor构造函数
【java之juc并发包系列教程】一文看懂ThreadPoolExecutor线程池提交线程执行流程,源码解析
可以看到有四个构造函数,具体参数意义这里不做解析,用过的都知道

【java之juc并发包系列教程】一文看懂ThreadPoolExecutor线程池提交线程执行流程,源码解析
发现调用了Executors.defaultThreadFactory()。
【java之juc并发包系列教程】一文看懂ThreadPoolExecutor线程池提交线程执行流程,源码解析
也就是创建了DefaultThreadFactory默认线程工厂
在这里插入图片描述
往下会发现构造函数最终都调用了这个构造方法

    public ThreadPoolExecutor(int corePoolSize,  int maximumPoolSize,  long keepAliveTime,  TimeUnit unit,  BlockingQueue<Runnable> workQueue,  ThreadFactory threadFactory,  RejectedExecutionHandler handler) { if (corePoolSize < 0 ||     maximumPoolSize <= 0 ||     maximumPoolSize < corePoolSize ||     keepAliveTime < 0)     throw new IllegalArgumentException();//参数合法校验 if (workQueue == null || threadFactory == null || handler == null)     throw new NullPointerException();//空指针校验 //下面都是属性赋值 this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler;    }

ThreadPoolExecutor创建就这么多
ThreadPoolExecutor的一些属性
在这里插入图片描述
ctl属性记录了有效线程数和线程池的状态,下面是官方解释
主池控制状态ctl是一个原子整数,包装了两个概念字段workerCount,表示有效线程数runState,表示是否正在运行,关闭等为了将它们打包成一个int,我们限制workerCount为(2^29 )-1(约 5 亿)个线程,而不是 (2^31)-1(20 亿)个其他可表示的线程。如果这在将来成为问题,可以将变量更改为 AtomicLong,并调整下面的 shiftmask 常量。但是在需要之前,这段代码使用 int 会更快更简单一些。 workerCount 是允许启动和不允许停止的工作程序的数量。该值可能会暂时不同于活动线程的实际数量,例如,当 ThreadFactory 在被询问时未能创建线程时,以及退出线程在终止前仍在执行簿记时。用户可见的池大小报告为工作集的当前大小。 runState 提供主要的生命周期控制,取值: RUNNING:接受新任务并处理排队任务 SHUTDOWN:不接受新任务,但处理排队任务 STOP:不接受新任务,不处理排队任务,并中断正在进行的任务 TIDYING:所有任务都已终止,workerCount 为零,转换到状态 TIDYING 的线程将运行 terminate() 钩子方法. runState 随着时间的推移单调增加,但不需要达到每个状态。转换是: RUNNING -> SHUTDOWN 调用 shutdown()(RUNNING 或 SHUTDOWN) -> STOP 调用 shutdownNow() SHUTDOWN -> TIDYING 当队列和池都为空时 STOP -> TIDYING 当池为空时 TIDYING -> TERMINATED 当 terminate() 钩子方法完成时,在 awaitTermination() 中等待的线程将在状态达到 TERMINATED 时返回。检测从 SHUTDOWN 到 TIDYING 的转换并不像您想要的那么简单,因为队列可能在非空后变为空,在 SHUTDOWN 状态下反之亦然,但我们只能在看到它为空之后终止,我们看到 workerCount为 0

【java之juc并发包系列教程】一文看懂ThreadPoolExecutor线程池提交线程执行流程,源码解析
属性还有一个可重入锁,官方解释
锁定访问工人集和相关簿记。虽然我们可以使用某种并发集合,但事实证明通常最好使用锁。其中一个原因是这个序列化了interruptIdleWorkers,这避免了不必要的中断风暴,尤其是在关机期间。否则退出线程将同时中断那些尚未中断的线程。它还简化了一些相关的最大池大小等统计簿记。我们还在shutdown和shutdownNow上持有mainLock,以确保worker集稳定,同时分别检查中断和实际中断的权限。
【java之juc并发包系列教程】一文看懂ThreadPoolExecutor线程池提交线程执行流程,源码解析
存放Worker的HashSet, 包含池中所有工作线程的集合。仅在持有 mainLock 时访问。
【java之juc并发包系列教程】一文看懂ThreadPoolExecutor线程池提交线程执行流程,源码解析
可重入锁的Condition对象, 等待条件以支持 awaitTermination

至此ThreadPoolExecutor线程池对象已经创建完毕

二、threadPoolExecutor.submit(Runnable task)

这个是提交线程到线程池的方法。我们来看看都做了啥
进入该方法
发现实际上是调用父类AbstractExecutorService的方法

public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); //判空 RunnableFuture<Void> ftask = newTaskFor(task, null);//封装成RunnableFuture对象 execute(ftask);//调用方法 return ftask;    }

再来看看execute(ftask)方法,其实调用的threadPoolExecutor.execute(ftask)方法,传入的RunnableFuture对象

/*在未来的某个时间执行给定的任务。任务      可以在新线程或现有池线程中执行。    如果任务无法提交执行,要么是因为这个      执行程序已关闭或因为已达到其容量,      该任务由当前的 {@link RejectedExecutionHandler} 处理。    @param 命令要执行的任务      @throws RejectedExecutionException 自行决定{@code RejectedExecutionHandler},如果任务不能接受执行      如果 {@code command} 为空,则 @throws NullPointerException*/public void execute(Runnable command) { if (command == null)     throw new NullPointerException(); /*  分 3 个步骤进行:   1.如果少于corePoolSize线程正在运行,尝试   以给定命令作为第一个启动一个新线程   任务。对 addWorker 的调用以原子方式检查 runState 和   workerCount,因此可以防止误报   在不应该的时候线程,通过返回 false。     2.如果一个任务可以成功入队,那么我们还需要   仔细检查我们是否应该添加一个线程   (因为自上次检查以来现有的已经死了)或者那个   进入此方法后,池已关闭。所以我们   重新检查状态,并在必要时回滚入队   停止,或者如果没有,则启动一个新线程。     3.如果我们不能排队任务,那么我们尝试添加一个新的   线。如果它失败了,我们知道我们已经关闭或饱和   所以拒绝任务。  */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) {//如果worker数量小于corePoolSize就创建新woker     if (addWorker(command, true))   return;     c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) {      int recheck = ctl.get();     if (! isRunning(recheck) && remove(command))  reject(command);     else if (workerCountOf(recheck) == 0)  addWorker(null, false); } else if (!addWorker(command, false))     reject(command);    }

然后看看addWorker(command, false)方法

/*检查是否可以相对于当前添加新工人      池状态和给定的界限(核心或最大值)。如果是这样的话,      工人人数会相应调整,如果可能的话,一个      新工人被创建并启动,运行 firstTask 作为它的      第一个任务。如果池已停止,则此方法返回 false 或      有资格关闭。如果线程也返回 false      当被询问时,工厂无法创建线程。如果线程      创建失败,或者是由于线程工厂返回      null,或由于异常(通常是 OutOfMemoryError 在      Thread.start()),我们干净利落地回滚。    @param firstTask 新线程应该首先运行的任务(或      如果没有,则为空)。工人是用最初的第一个任务创建的      (在方法execute()中)在较少时绕过排队      比 corePoolSize 线程(在这种情况下我们总是启动一个),      或者当队列已满时(在这种情况下我们必须绕过队列)。      最初空闲的线程通常是通过      prestartCoreThread 或替换其他垂死的工人。    @param core if true 使用 corePoolSize 作为绑定,否则      最大池大小。 (这里使用布尔指示符而不是      值以确保在检查其他池后读取新值      状态)。      @return 如果成功则返回 true*/private boolean addWorker(Runnable firstTask, boolean core) { retry: for (int c = ctl.get();;) {     // 检查线程池相关状态,不符合直接返回     if (runStateAtLeast(c, SHUTDOWN)  && (runStateAtLeast(c, STOP)      || firstTask != null      || workQueue.isEmpty()))  return false;     for (;;) {//循环  if (workerCountOf(c)      >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))      return false;  if (compareAndIncrementWorkerCount(c))//尝试对 ctl 的 workerCount 字段进行 CAS 递增。  //如果递增成功则跳出循环      break retry;  c = ctl.get();  // Re-read ctl  if (runStateAtLeast(c, SHUTDOWN))      continue retry;  //否则 CAS 由于 workerCount 变化而失败;重试内循环     } } //两个标志位 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try {     w = new Worker(firstTask);//创建Worker     final Thread t = w.thread;//获取Worker的线程对象     if (t != null) {  final ReentrantLock mainLock = this.mainLock;  mainLock.lock();//上锁  try {      // Recheck while holding lock.      // Back out on ThreadFactory failure or if      // shut down before lock acquired.      int c = ctl.get();      if (isRunning(c) ||   (runStateLessThan(c, STOP) && firstTask == null)) {   if (t.getState() != Thread.State.NEW)//检查状态throw new IllegalThreadStateException();   workers.add(w);//将Worker加入Set集合   workerAdded = true; //修改标志位为成功   int s = workers.size();   if (s > largestPoolSize) //修改获得的最大池大小largestPoolSize = s;      }  } finally {//解锁      mainLock.unlock();  }  if (workerAdded) {      t.start();//开启worker的线程      workerStarted = true;  }     } } finally {     if (! workerStarted)  addWorkerFailed(w);//添加worker失败 } return workerStarted; //返回标志位成功或失败    }

之歌方法执行完,Set集合添加了一个worker,并且worker的线程执行start方法也被调用
线面我们来看看Worker的源码,他也是ThreadPoolExecutor内部类,下面注释相关解释

/**Class Worker 主要维护线程运行任务的中断控制状态,以及其他次要的簿记。此类机会主义地扩展 AbstractQueuedSynchronizer 以简化获取和释放围绕每个任务执行的锁。这可以防止旨在唤醒等待任务的工作线程而不是中断正在运行的任务的中断。我们实现了一个简单的不可重入互斥锁而不是使用 ReentrantLock,因为我们不希望工作任务在调用 setCorePoolSize 等池控制方法时能够重新获取锁。此外,为了在线程真正开始运行任务之前抑制中断,我们将锁定状态初始化为负值,并在启动时将其清除(在 runWorker 中)*/private final class Worker extends AbstractQueuedSynchronizer implements Runnable    { /**  * This class will never be serialized, but we provide a  * serialVersionUID to suppress a javac warning.  */ private static final long serialVersionUID = 6138294804551838833L; @SuppressWarnings("serial") // Unlikely to be serializable final Thread thread;   //持有的线程对象 @SuppressWarnings("serial") // Not statically typed as Serializable Runnable firstTask; //传进来的线程Runnable对象 volatile long completedTasks;//每线程任务计数器 // TODO: switch to AbstractQueuedLongSynchronizer and move // completedTasks into the lock word. /**      使用 ThreadFactory 中给定的第一个任务和线程创建。      参数:firstTask – 第一个任务(如果没有,则为 null)  */ Worker(Runnable firstTask) {     setState(-1); // 禁止中断直到 runWorker执行修改其状态     this.firstTask = firstTask;     this.thread = getThreadFactory().newThread(this); //创建了线程对象并赋值给thread属性,下面具体说说 } /** 将主运行循环委托给外部 runWorker方法 */ public void run() {     runWorker(this); }/*锁定方法  值 0 表示解锁状态。  值 1 表示锁定状态。*/ protected boolean isHeldExclusively() {     return getState() != 0; } protected boolean tryAcquire(int unused) {     if (compareAndSetState(0, 1)) {  setExclusiveOwnerThread(Thread.currentThread());  return true;     }     return false; } protected boolean tryRelease(int unused) {     setExclusiveOwnerThread(null);     setState(0);     return true; } public void lock() { acquire(1); } public boolean tryLock()  { return tryAcquire(1); } public void unlock()      { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() {     Thread t;     if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {  try {      t.interrupt();  } catch (SecurityException ignore) {  }     } }    }

Worker也实现了Runnable接口,他肯定也有run方法,worker.start()创建了线程执行了run方法,
在这里插入图片描述
getThreadFactory()返回了默认的工厂对象,然后调用它的newThread(this);。将woker传过去

 public Thread newThread(Runnable r) {     Thread t = new Thread(group, r,      namePrefix + threadNumber.getAndIncrement(),      0);//创建线程     if (t.isDaemon())  t.setDaemon(false); //是否守护进程     if (t.getPriority() != Thread.NORM_PRIORITY)  t.setPriority(Thread.NORM_PRIORITY); //设置优先级     return t; //返回线程对象 }

到这里真正的线程创建完成,线面看看runWorker(this);方法怎么执行的

/*主要工作人员运行循环。反复从队列中获取任务并      执行它们,同时处理许多问题:    1. 我们可以从一个初始任务开始,在这种情况下,我们      不需要拿到第一个。否则,只要池是      运行,我们从 getTask 获取任务。如果它返回 null 然后      由于池状态或配置更改,工作人员退出      参数。其他退出是由异常引发引起的      外部代码,在这种情况下 completedAbruptly 成立,其中      通常会导致 processWorkerExit 替换这个线程。    2.在运行任何任务之前,获取锁以防止      任务执行时其他池中断,然后我们      确保除非池正在停止,否则该线程没有      它的中断集。    3. 每个任务运行之前都会调用 beforeExecute,它      可能会抛出异常,在这种情况下我们会导致线程死亡      (用completedAbruptly true 中断循环)没有处理      任务。    4.假设beforeExecute正常完成,我们运行任务,      收集其抛出的任何异常以发送到 afterExecute。      我们分别处理 RuntimeException、Error(这两个      规范保证我们捕获)和任意 Throwables。      因为我们不能在 Runnable.run 中重新抛出 Throwables,所以我们      在出路时将它们包装在错误中(到线程的      未捕获异常处理程序)。任何抛出的异常也      保守地导致线程死亡。    5、task.run完成后,我们调用afterExecute,这可能      也会抛出异常,这也会导致线程      死。根据 JLS Sec 14.20,这个例外是      即使 task.run 抛出,也会生效。    异常机制的最终效果是 afterExecute      和线程的 UncaughtExceptionHandler 一样准确      我们可以提供的关于遇到的任何问题的信息      用户代码。*/final void runWorker(Worker w) { Thread wt = Thread.currentThread();//获取当前线程 Runnable task = w.firstTask;//获取Runnable实现对象 w.firstTask = null; w.unlock(); // 允许中断 boolean completedAbruptly = true; try {     while (task != null || (task = getTask()) != null) {      //循环判断如果task 不为空或者从队列中获取到了task   w.lock();//上锁  /*  如果池正在停止,请确保线程被中断;   如果没有,请确保线程不被中断。这   需要在第二种情况下重新检查才能处理   shutdownNow 在清除中断时进行比赛  */  if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&      !wt.isInterrupted())      wt.interrupt();  try {      beforeExecute(wt, task);//执行前      try {   task.run();//真正的执行,执行实际的run方法   afterExecute(task, null);//执行后      } catch (Throwable ex) {//执行后   afterExecute(task, ex);   throw ex;      }  } finally {      task = null;      w.completedTasks++; //完成数量加一      w.unlock();//解锁  }     }     completedAbruptly = false; } finally {     processWorkerExit(w, completedAbruptly); //处理woker推出 }    }

整体流程走完,再来看看getTask()方法,也就是从队列中拉runable

/*执行阻塞或定时等待任务,具体取决于当前配置设置,或者如果此工作人员由于以下任何原因必须退出,则返回 null: 1. 工作人员超过 maximumPoolSize(由于调用 setMaximumPoolSize)。  2. 池停止。  3. 池关闭,队列为空。 4. 、这个worker超时等待一个任务,超时worker在定时等待前后都会被终止(即allowCoreThreadTimeOut || workerCount > corePoolSize), 如果队列非空,这个worker不是池中的最后一个线程。任务,如果工作人员必须退出,则为 null,在这种情况下,workerCount 递减*/private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) {//循环     int c = ctl.get();     // 仅在必要时检查队列是否为空。     if (runStateAtLeast(c, SHUTDOWN)  && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {  decrementWorkerCount();  return null;     }     int wc = workerCountOf(c);     // Are workers subject to culling?     boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;     if ((wc > maximumPoolSize || (timed && timedOut))  && (wc > 1 || workQueue.isEmpty())) {  if (compareAndDecrementWorkerCount(c))      return null;  continue;     }     try {     //拉取Runnable  Runnable r = timed ?      workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :      workQueue.take();  if (r != null)      return r;  timedOut = true;     } catch (InterruptedException retry) {  timedOut = false;     } }    }

最后再来看看processWorkerExit(w, completedAbruptly)方法,woker怎么退出结束的

/*为垂死的工人进行清理和记账。仅从工作线程调用。除非设置了 completedAbruptly,否则假定 workerCount 已经被调整以考虑退出。此方法从工作程序集中删除线程,并且如果由于用户任务异常而退出或如果少于 corePoolSize 工作程序正在运行或队列非空但没有工作程序,则可能终止池或替换工作程序。参数:w——工人completedAbruptly – 如果工作人员因用户异常而死亡*/private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) //如果突然,则没有调整 workerCount     decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); //上锁 try {     completedTaskCount += w.completedTasks;  //完成的任务计数加上woker完成的数量     workers.remove(w); //set集合移除worker } finally {//解锁     mainLock.unlock(); }/*如果(SHUTDOWN 并且池和队列为空)或(STOP 并且池为空),则转换到 TERMINATED 状态。如果有其他条件终止但 workerCount 非零,则中断空闲的工作程序以确保关闭信号传播。必须在任何可能导致终止的操作之后调用此方法——减少工作人员数量或在关闭期间从队列中删除任务。该方法是非私有的,允许从 ScheduledThreadPoolExecutor 访问。*/ tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) {     if (!completedAbruptly) {     //如果allowCoreThreadTimeOut 为 false(默认),核心线程即使在空闲时也保持活动状态,min为0。如果allowCoreThreadTimeOut为真,核心线程使用 keepAliveTime 超时等待工作,min=corePoolSize。  int min = allowCoreThreadTimeOut ? 0 : corePoolSize;  if (min == 0 && ! workQueue.isEmpty())  //如果(min == 0并且workQueue不为空,min变成1      min = 1;  if (workerCountOf(c) >= min)//如果workerCount大于等于llowCoreThread就返回      return; // replacement not needed     }     //如果workerCount小于llowCoreThread就添加新的worker     addWorker(null, false); }    }

到这里整体的提交线程执行流程就讲完了,后续再出其他文章简介线程池的其他方法