> 文档中心 > 线程池原理

线程池原理


线程池原理

线程池的实现原理

线程池的实现原理其实可以看成一个生产者消费者模型。

在这里插入图片描述
我们执行线程往线程池里提交任务,线程池里的线程去消费任务,其他的一些策略去保证线程池和系统安全,如拒绝策略,线程大小的控制,阻塞队列的大小。

JDK中线程继承图如下
在这里插入图片描述

可以看比较详细的具体的继承图
在这里插入图片描述

顶级接口
只有一个执行任务的方法

public interface Executor {    void execute(Runnable command);}

ExecutorService 接口
额外添加了一些 关闭方法 和 其他一些提交任务,包括单独提交 有返回值的、无返回值的,批量提交任务的方法等。

public interface ExecutorService extends Executor {    //关闭    void shutdown();  //关闭并且返回 未执行的任务    List<Runnable> shutdownNow();      //线程池是否被关闭    boolean isShutdown();    //线程池是否死亡    boolean isTerminated();    //等待多长时间 销毁线程池    boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;//提交有有返回值的任务    <T> Future<T> submit(Callable<T> task); //提交有有返回值的任务 指定结果 可以适配    <T> Future<T> submit(Runnable task, T result);  //提交无返回值的,无返回值可以阻塞获取 也就是等待执行完成 返回值只不过是void    Future<?> submit(Runnable task);    //批量提交    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;    //批量提交 最多等待时间    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,      long timeout, TimeUnit unit) throws InterruptedException;    <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;     <T> T invokeAny(Collection<? extends Callable<T>> tasks,      long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;}

ExecutorService 的抽象类

public abstract class AbstractExecutorService implements ExecutorService {protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value);    }protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {    //这里异步执行任务 FutureTask  需要和  Future 一起看。 return new FutureTask<T>(callable);    }    //返回值方法提交     public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask;    }      public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask;    }      public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask;    }    //其他省略....   }

可以看到 execute(task); 是顶级接口中需要提交任务方法,这也明白了 为什么顶级接口只有这一个方法

返回指的可以在此方法执行完后,在里面设置返回结果。

抽象累的具体实现,也就是线程池的构造方法

package java.util.concurrent;import java.security.AccessControlContext;import java.security.AccessController;import java.security.PrivilegedAction;import java.util.concurrent.locks.AbstractQueuedSynchronizer;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.ReentrantLock;import java.util.concurrent.atomic.AtomicInteger;import java.util.*;public class ThreadPoolExecutor extends AbstractExecutorService {private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));    private static final int COUNT_BITS = Integer.SIZE - 3;    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;    // runState is stored in the high-order bits    private static final int RUNNING    = -1 << COUNT_BITS;    private static final int SHUTDOWN   =  0 << COUNT_BITS;    private static final int STOP=  1 << COUNT_BITS;    private static final int TIDYING    =  2 << COUNT_BITS;    private static final int TERMINATED =  3 << COUNT_BITS;    // Packing and unpacking ctl    private static int runStateOf(int c)     { return c & ~CAPACITY; }    private static int workerCountOf(int c)  { return c & CAPACITY; }    private static int ctlOf(int rs, int wc) { return rs | wc; }// 存放任务的阻塞队列    private final BlockingQueue<Runnable> workQueue;    // 对线程池内部各种变量进行互斥访问控制    private final ReentrantLock mainLock = new ReentrantLock();    // 线程集合    private final HashSet<Worker> workers = new HashSet<Worker>();    /**     * Wait condition to support awaitTermination     */    private final Condition termination = mainLock.newCondition();    /**     * Tracks largest attained pool size. Accessed only under     * mainLock.     */    private int largestPoolSize;    /**     * Counter for completed tasks. Updated only on termination of     * worker threads. Accessed only under mainLock.     */    private long completedTaskCount;    private volatile ThreadFactory threadFactory;    /**     * 拒绝策略      */    private volatile RejectedExecutionHandler handler;    // 活跃时间    private volatile long keepAliveTime;    /// 是否允许核心线程超时    private volatile boolean allowCoreThreadTimeOut;    ///核心线程数    private volatile int corePoolSize;    //最大线程数    private volatile int maximumPoolSize;    //默认拒绝策略    private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();      // 每一个线程是一个Worker对象。Worker是ThreadPoolExector的内部类    private final class Worker extends AbstractQueuedSynchronizer implements Runnable    { /**  * This class will never be serialized, but we provide a  * serialVersionUID to suppress a javac warning.  */ private static final long serialVersionUID = 6138294804551838833L; //worker 线程 final Thread thread; //worker 接收到的第一个任务 Runnable firstTask; //执行完毕的任务个数 volatile long completedTasks; /**  * Creates with given first task and thread from ThreadFactory.  * @param firstTask the first task (null if none)  */ Worker(Runnable firstTask) {     setState(-1); // inhibit interrupts until runWorker     this.firstTask = firstTask;     this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker  */ public void run() {     runWorker(this); } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. protected boolean isHeldExclusively() {     return getState() != 0; } protected boolean tryAcquire(int unused) {     if (compareAndSetState(0, 1)) {  setExclusiveOwnerThread(Thread.currentThread());  return true;     }     return false; } protected boolean tryRelease(int unused) {     setExclusiveOwnerThread(null);     setState(0);     return true; } public void lock() { acquire(1); } public boolean tryLock()  { return tryAcquire(1); } public void unlock()      { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() {     Thread t;     if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {  try {      t.interrupt();  } catch (SecurityException ignore) {  }     } }    }    /*     * Methods for setting control state     */    /**     * Transitions runState to given target, or leaves it alone if     * already at least the given target.     *     * @param targetState the desired state, either SHUTDOWN or STOP     * (but not TIDYING or TERMINATED -- use tryTerminate for that)     */    private void advanceRunState(int targetState) { for (;;) {     int c = ctl.get();     if (runStateAtLeast(c, targetState) ||  ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))  break; }    }    /**     * Transitions to TERMINATED state if either (SHUTDOWN and pool     * and queue empty) or (STOP and pool empty).  If otherwise     * eligible to terminate but workerCount is nonzero, interrupts an     * idle worker to ensure that shutdown signals propagate. This     * method must be called following any action that might make     * termination possible -- reducing worker count or removing tasks     * from the queue during shutdown. The method is non-private to     * allow access from ScheduledThreadPoolExecutor.     */    final void tryTerminate() { for (;;) {     int c = ctl.get();     if (isRunning(c) ||  runStateAtLeast(c, TIDYING) ||  (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))  return;     if (workerCountOf(c) != 0) { // Eligible to terminate  interruptIdleWorkers(ONLY_ONE);  return;     }     final ReentrantLock mainLock = this.mainLock;     mainLock.lock();     try {  if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {      try {   terminated();      } finally {   ctl.set(ctlOf(TERMINATED, 0));   termination.signalAll();      }      return;  }     } finally {  mainLock.unlock();     }     // else retry on failed CAS }    }    /*     * Methods for controlling interrupts to worker threads.     */    /**     * If there is a security manager, makes sure caller has     * permission to shut down threads in general (see shutdownPerm).     * If this passes, additionally makes sure the caller is allowed     * to interrupt each worker thread. This might not be true even if     * first check passed, if the SecurityManager treats some threads     * specially.     */    private void checkShutdownAccess() { SecurityManager security = System.getSecurityManager(); if (security != null) {     security.checkPermission(shutdownPerm);     final ReentrantLock mainLock = this.mainLock;     mainLock.lock();     try {  for (Worker w : workers)      security.checkAccess(w.thread);     } finally {  mainLock.unlock();     } }    }    /**     * Interrupts all threads, even if active. Ignores SecurityExceptions     * (in which case some threads may remain uninterrupted).     */    private void interruptWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try {     for (Worker w : workers)  w.interruptIfStarted(); } finally {     mainLock.unlock(); }    } private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try {     for (Worker w : workers) {  Thread t = w.thread;  if (!t.isInterrupted() && w.tryLock()) {      try {   t.interrupt();      } catch (SecurityException ignore) {      } finally {   w.unlock();      }  }  if (onlyOne)      break;     } } finally {     mainLock.unlock(); }    }    /**     * Common form of interruptIdleWorkers, to avoid having to     * remember what the boolean argument means.     */    private void interruptIdleWorkers() { interruptIdleWorkers(false);    }    private static final boolean ONLY_ONE = true;final void reject(Runnable command) { handler.rejectedExecution(command, this);    }    /**     * Performs any further cleanup following run state transition on     * invocation of shutdown.  A no-op here, but used by     * ScheduledThreadPoolExecutor to cancel delayed tasks.     */    void onShutdown() {    }    /**     * State check needed by ScheduledThreadPoolExecutor to     * enable running tasks during shutdown.     *     * @param shutdownOK true if should return true if SHUTDOWN     */    final boolean isRunningOrShutdown(boolean shutdownOK) { int rs = runStateOf(ctl.get()); return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);    }    /**     * Drains the task queue into a new list, normally using     * drainTo. But if the queue is a DelayQueue or any other kind of     * queue for which poll or drainTo may fail to remove some     * elements, it deletes them one by one.     */    private List<Runnable> drainQueue() { BlockingQueue<Runnable> q = workQueue; ArrayList<Runnable> taskList = new ArrayList<Runnable>(); q.drainTo(taskList); if (!q.isEmpty()) {     for (Runnable r : q.toArray(new Runnable[0])) {  if (q.remove(r))      taskList.add(r);     } } return taskList;    } private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) {     int c = ctl.get();     int rs = runStateOf(c);     // Check if queue empty only if necessary.     if (rs >= SHUTDOWN &&  ! (rs == SHUTDOWN &&     firstTask == null &&     ! workQueue.isEmpty()))  return false;     for (;;) {  int wc = workerCountOf(c);  if (wc >= CAPACITY ||      wc >= (core ? corePoolSize : maximumPoolSize))      return false;  if (compareAndIncrementWorkerCount(c))      break retry;  c = ctl.get();  // Re-read ctl  if (runStateOf(c) != rs)      continue retry;  // else CAS failed due to workerCount change; retry inner loop     } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try {     w = new Worker(firstTask);     final Thread t = w.thread;     if (t != null) {  final ReentrantLock mainLock = this.mainLock;  mainLock.lock();  try {      // Recheck while holding lock.      // Back out on ThreadFactory failure or if      // shut down before lock acquired.      int rs = runStateOf(ctl.get());      if (rs < SHUTDOWN ||   (rs == SHUTDOWN && firstTask == null)) {   if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();   workers.add(w);   int s = workers.size();   if (s > largestPoolSize)largestPoolSize = s;   workerAdded = true;      }  } finally {      mainLock.unlock();  }  if (workerAdded) {      t.start();      workerStarted = true;  }     } } finally {     if (! workerStarted)  addWorkerFailed(w); } return workerStarted;    }    /**     * Rolls back the worker thread creation.     * - removes worker from workers, if present     * - decrements worker count     * - rechecks for termination, in case the existence of this     *   worker was holding up termination     */    private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try {     if (w != null)  workers.remove(w);     decrementWorkerCount();     tryTerminate(); } finally {     mainLock.unlock(); }    } private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted     decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try {     completedTaskCount += w.completedTasks;     workers.remove(w); } finally {     mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) {     if (!completedAbruptly) {  int min = allowCoreThreadTimeOut ? 0 : corePoolSize;  if (min == 0 && ! workQueue.isEmpty())      min = 1;  if (workerCountOf(c) >= min)      return; // replacement not needed     }     addWorker(null, false); }    }private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) {     int c = ctl.get();     int rs = runStateOf(c);     // Check if queue empty only if necessary.     if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {  decrementWorkerCount();  return null;     }     int wc = workerCountOf(c);     // Are workers subject to culling?     boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;     if ((wc > maximumPoolSize || (timed && timedOut))  && (wc > 1 || workQueue.isEmpty())) {  if (compareAndDecrementWorkerCount(c))      return null;  continue;     }     try {  Runnable r = timed ?      workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :      workQueue.take();  if (r != null)      return r;  timedOut = true;     } catch (InterruptedException retry) {  timedOut = false;     } }    } final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try {     while (task != null || (task = getTask()) != null) {  w.lock();  // If pool is stopping, ensure thread is interrupted;  // if not, ensure thread is not interrupted.  This  // requires a recheck in second case to deal with  // shutdownNow race while clearing interrupt  if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&      !wt.isInterrupted())      wt.interrupt();  try {      beforeExecute(wt, task);      Throwable thrown = null;      try {   task.run();      } catch (RuntimeException x) {   thrown = x; throw x;      } catch (Error x) {   thrown = x; throw x;      } catch (Throwable x) {   thrown = x; throw new Error(x);      } finally {   afterExecute(task, thrown);      }  } finally {      task = null;      w.completedTasks++;      w.unlock();  }     }     completedAbruptly = false; } finally {     processWorkerExit(w, completedAbruptly); }    }    public ThreadPoolExecutor(int corePoolSize,  int maximumPoolSize,  long keepAliveTime,  TimeUnit unit,  BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,      Executors.defaultThreadFactory(), defaultHandler);    }   //默认 线程工厂  默认 拒绝策略    public ThreadPoolExecutor(int corePoolSize,  int maximumPoolSize,  long keepAliveTime,  TimeUnit unit,  BlockingQueue<Runnable> workQueue,  ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,      threadFactory, defaultHandler);    }   //默认 线程工厂  可以指定拒绝策略    public ThreadPoolExecutor(int corePoolSize,  int maximumPoolSize,  long keepAliveTime,  TimeUnit unit,  BlockingQueue<Runnable> workQueue,  RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,      Executors.defaultThreadFactory(), handler);    }      //这个最全构造    线程工厂  可以指定拒绝策略      // corePoolSize:在线程池中始终维护的线程个数。// 在corePooSize 已满、队列也满的情况下,扩充线程至此值。// keepAliveTime/TimeUnit:maxPoolSize 中的空闲线程,销毁所需要的时间,总线程数收缩回corePoolSize。// workQueue  :线程池所用的队列类型。// threadFactory  :线程创建工厂,可以自定义,有默认值 如上面//  handler : corePoolSize已满,队列已满,maxPoolSize 已满,最后的拒绝策略。////    public ThreadPoolExecutor(int corePoolSize,  int maximumPoolSize,  long keepAliveTime,  TimeUnit unit,  BlockingQueue<Runnable> workQueue,  ThreadFactory threadFactory,  RejectedExecutionHandler handler) { if (corePoolSize < 0 ||     maximumPoolSize <= 0 ||     maximumPoolSize < corePoolSize ||     keepAliveTime < 0)     throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null)     throw new NullPointerException(); this.acc = System.getSecurityManager() == null ?  null :  AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler;    } public void execute(Runnable command) { if (command == null)     throw new NullPointerException();int c = ctl.get(); if (workerCountOf(c) < corePoolSize) {     if (addWorker(command, true))  return;     c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) {     int recheck = ctl.get();     if (! isRunning(recheck) && remove(command))  reject(command);     else if (workerCountOf(recheck) == 0)  addWorker(null, false); } else if (!addWorker(command, false))     reject(command);    }public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try {     checkShutdownAccess();     advanceRunState(SHUTDOWN);     interruptIdleWorkers();     onShutdown(); // hook for ScheduledThreadPoolExecutor } finally {     mainLock.unlock(); } tryTerminate();    }public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try {     checkShutdownAccess();     advanceRunState(STOP);     interruptWorkers();     tasks = drainQueue(); } finally {     mainLock.unlock(); } tryTerminate(); return tasks;    }    public boolean isShutdown() { return ! isRunning(ctl.get());    }public boolean isTerminating() { int c = ctl.get(); return ! isRunning(c) && runStateLessThan(c, TERMINATED);    }    public boolean isTerminated() { return runStateAtLeast(ctl.get(), TERMINATED);    }    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try {     for (;;) {  if (runStateAtLeast(ctl.get(), TERMINATED))      return true;  if (nanos <= 0)      return false;  nanos = termination.awaitNanos(nanos);     } } finally {     mainLock.unlock(); }    }protected void finalize() { SecurityManager sm = System.getSecurityManager(); if (sm == null || acc == null) {     shutdown(); } else {     PrivilegedAction<Void> pa = () -> { shutdown(); return null; };     AccessController.doPrivileged(pa, acc); }    }    /**     * Sets the thread factory used to create new threads.     *     * @param threadFactory the new thread factory     * @throws NullPointerException if threadFactory is null     * @see #getThreadFactory     */    public void setThreadFactory(ThreadFactory threadFactory) { if (threadFactory == null)     throw new NullPointerException(); this.threadFactory = threadFactory;    }    //获取默认线程池工厂    public ThreadFactory getThreadFactory() { return threadFactory;    }    ///设置拒绝策略    public void setRejectedExecutionHandler(RejectedExecutionHandler handler) { if (handler == null)     throw new NullPointerException(); this.handler = handler;    }   //获取拒绝策略    public RejectedExecutionHandler getRejectedExecutionHandler() { return handler;    }      public void setCorePoolSize(int corePoolSize) { if (corePoolSize < 0)     throw new IllegalArgumentException(); int delta = corePoolSize - this.corePoolSize; this.corePoolSize = corePoolSize; if (workerCountOf(ctl.get()) > corePoolSize)     interruptIdleWorkers(); else if (delta > 0) {     // We don't really know how many new threads are "needed".     // As a heuristic, prestart enough new workers (up to new     // core size) to handle the current number of tasks in     // queue, but stop if queue becomes empty while doing so.     int k = Math.min(delta, workQueue.size());     while (k-- > 0 && addWorker(null, true)) {  if (workQueue.isEmpty())      break;     } }    }    /**     * Returns the core number of threads.     *     * @return the core number of threads     * @see #setCorePoolSize     */    public int getCorePoolSize() { return corePoolSize;    }     public boolean prestartCoreThread() { return workerCountOf(ctl.get()) < corePoolSize &&     addWorker(null, true);    }    void ensurePrestart() { int wc = workerCountOf(ctl.get()); if (wc < corePoolSize)     addWorker(null, true); else if (wc == 0)     addWorker(null, false);    }    public int prestartAllCoreThreads() { int n = 0; while (addWorker(null, true))     ++n; return n;    } public boolean allowsCoreThreadTimeOut() { return allowCoreThreadTimeOut;    }public void allowCoreThreadTimeOut(boolean value) { if (value && keepAliveTime <= 0)     throw new IllegalArgumentException("Core threads must have nonzero keep alive times"); if (value != allowCoreThreadTimeOut) {     allowCoreThreadTimeOut = value;     if (value)  interruptIdleWorkers(); }    }      public void setMaximumPoolSize(int maximumPoolSize) { if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)     throw new IllegalArgumentException(); this.maximumPoolSize = maximumPoolSize; if (workerCountOf(ctl.get()) > maximumPoolSize)     interruptIdleWorkers();    }      public int getMaximumPoolSize() { return maximumPoolSize;    }    //设置活跃时间    public void setKeepAliveTime(long time, TimeUnit unit) { if (time < 0)     throw new IllegalArgumentException(); if (time == 0 && allowsCoreThreadTimeOut())     throw new IllegalArgumentException("Core threads must have nonzero keep alive times"); long keepAliveTime = unit.toNanos(time); long delta = keepAliveTime - this.keepAliveTime; this.keepAliveTime = keepAliveTime; if (delta < 0)     interruptIdleWorkers();    }    /**     * Returns the thread keep-alive time, which is the amount of time     * that threads in excess of the core pool size may remain     * idle before being terminated.     *     * @param unit the desired time unit of the result     * @return the time limit     * @see #setKeepAliveTime(long, TimeUnit)     */    public long getKeepAliveTime(TimeUnit unit) { return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);    }    /* User-level queue utilities */   //获取阻塞队列  便于外面监控队列大小    public BlockingQueue<Runnable> getQueue() { return workQueue;    }     public boolean remove(Runnable task) { boolean removed = workQueue.remove(task); tryTerminate(); // In case SHUTDOWN and now empty return removed;    } public void purge() { final BlockingQueue<Runnable> q = workQueue; try {     Iterator<Runnable> it = q.iterator();     while (it.hasNext()) {  Runnable r = it.next();  if (r instanceof Future<?> && ((Future<?>)r).isCancelled())      it.remove();     } } catch (ConcurrentModificationException fallThrough) {     // Take slow path if we encounter interference during traversal.     // Make copy for traversal and call remove for cancelled entries.     // The slow path is more likely to be O(N*N).     for (Object r : q.toArray())  if (r instanceof Future<?> && ((Future<?>)r).isCancelled())      q.remove(r); } tryTerminate(); // In case SHUTDOWN and now empty    }    public int getPoolSize() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try {     // Remove rare and surprising possibility of     // isTerminated() && getPoolSize() > 0     return runStateAtLeast(ctl.get(), TIDYING) ? 0  : workers.size(); } finally {     mainLock.unlock(); }    }  //获取执行线程数量    public int getActiveCount() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try {     int n = 0;     for (Worker w : workers)  if (w.isLocked())      ++n;     return n; } finally {     mainLock.unlock(); }    }    //最大线程数    public int getLargestPoolSize() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try {     return largestPoolSize; } finally {     mainLock.unlock(); }    }    //获取任务数量  这个值是个动态值  可能不准    public long getTaskCount() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try {     long n = completedTaskCount;     for (Worker w : workers) {  n += w.completedTasks;  if (w.isLocked())      ++n;     }     return n + workQueue.size(); } finally {     mainLock.unlock(); }    }    // 获取执行结束的任务数量    public long getCompletedTaskCount() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try {     long n = completedTaskCount;     for (Worker w : workers)  n += w.completedTasks;     return n; } finally {     mainLock.unlock(); }    }public String toString() { long ncompleted; int nworkers, nactive; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try {     ncompleted = completedTaskCount;     nactive = 0;     nworkers = workers.size();     for (Worker w : workers) {  ncompleted += w.completedTasks;  if (w.isLocked())      ++nactive;     } } finally {     mainLock.unlock(); } int c = ctl.get(); String rs = (runStateLessThan(c, SHUTDOWN) ? "Running" :(runStateAtLeast(c, TERMINATED) ? "Terminated" : "Shutting down")); return super.toString() +     "[" + rs +     ", pool size = " + nworkers +     ", active threads = " + nactive +     ", queued tasks = " + workQueue.size() +     ", completed tasks = " + ncompleted +     "]";    }    /* Extension hooks */    //狗子函数  没实现 用于线程执行之前 做监控    protected void beforeExecute(Thread t, Runnable r) { }//     //狗子函数  没实现 用于线程执行之后 做监控    protected void afterExecute(Runnable r, Throwable t) { }     //狗子函数  没实现 用于线程池销毁之后  做监控    protected void terminated() { }    /* Predefined RejectedExecutionHandlers */// 阻塞队列和最大线程数都满了    //拒绝策略之一  如果被丢弃的线程任务未关闭,则执行线程池的线程执行该线程任务    public static class CallerRunsPolicy implements RejectedExecutionHandler { /**  * Creates a {@code CallerRunsPolicy}.  */ public CallerRunsPolicy() { } /**  * Executes task r in the caller's thread, unless the executor  * has been shut down, in which case the task is discarded.  *  * @param r the runnable task requested to be executed  * @param e the executor attempting to execute this task  */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {     if (!e.isShutdown()) {  r.run();     } }    }      //拒绝策略之一  拒绝策略  阻塞队列和最大线程数都满了 之后 直接抛异常    public static class AbortPolicy implements RejectedExecutionHandler { /**  * Creates an {@code AbortPolicy}.  */ public AbortPolicy() { } /**  * Always throws RejectedExecutionException.  *  * @param r the runnable task requested to be executed  * @param e the executor attempting to execute this task  * @throws RejectedExecutionException always  */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {     throw new RejectedExecutionException("Task " + r.toString() +" rejected from " +e.toString()); }    }      //拒绝策略之一  丢弃当前的线程任务而不做任何处理  直接丢掉  不作处理    public static class DiscardPolicy implements RejectedExecutionHandler { /**  * Creates a {@code DiscardPolicy}.  */ public DiscardPolicy() { } /**  * Does nothing, which has the effect of discarding task r.  *  * @param r the runnable task requested to be executed  * @param e the executor attempting to execute this task  */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { }    }      //拒绝策略之一      移除当前最早提交的任务    public static class DiscardOldestPolicy implements RejectedExecutionHandler { /**  * Creates a {@code DiscardOldestPolicy} for the given executor.  */ public DiscardOldestPolicy() { }      public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {     if (!e.isShutdown()) {  e.getQueue().poll();  e.execute(r);     } }    }}

处理流程

在每次往线程池中提交任务的时候,处理流程如下:

在这里插入图片描述

  • 1、判断当前线程数是否大于或等于corePoolSize。如果小于,则新建线程执行;如果大于,则进入第二步。

  • 2、判断队列是否已满。如未满,则放入;如已满,则进入第三步。

  • 3、判断当前线程数是否大于或等于maxPoolSize。如果小于,则新建线程执行;如果大于,则进入第四步。

  • 4、根据拒绝策略,拒绝任务。

  • 5、拒绝策略 : 首先判断corePoolSize,其次判断blockingQueue是否已满,接着判断maxPoolSize,最后使用拒绝策略。

如果队列是无界的(如ArrayBlockingQueueLinkedBlockingQueue 没指定大小,默认int最大值),永远没有机会走到第三步,maxPoolSize没有
使用,也一定不会走到第四步。 因为队列没有满,max线程不会启动。拒绝策略也不回执行了。

线程池关闭

当关闭一个线程池的时候,有的线程还正在执行某个任务,有的调用者正在向线程池提交任务,并且队列中可能还有未执行的任务。
一般不会强制去关闭线程,等任务执行完成在关闭。

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));    //这里 线程池状态被用Int  高三位表示 其他29位用来存线程个数    private static final int COUNT_BITS = Integer.SIZE - 3;    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;    // runState is stored in the high-order bits    //线程池5种状态    private static final int RUNNING    = -1 << COUNT_BITS;    private static final int SHUTDOWN   =  0 << COUNT_BITS;    private static final int STOP=  1 << COUNT_BITS;    private static final int TIDYING    =  2 << COUNT_BITS;    private static final int TERMINATED =  3 << COUNT_BITS;    // Packing and unpacking ctl    //运行状态    private static int runStateOf(int c)     { return c & ~CAPACITY; }    //线程数量    private static int workerCountOf(int c)  { return c & CAPACITY; }    //合并    private static int ctlOf(int rs, int wc) { return rs | wc; }

ctl变量被拆成两半,最高的3位用来表示线程池的状态,低的29位表示线程的个数。
线程池的状态有五种,分别是RUNNING、SHUTDOWN、STOP、TIDYING和TERMINATED。
在这里插入图片描述
从源码里看线程池关闭有2个方法 shutdown()和shutdownNow()

线程池还提供了其他几个钩子方法,这些方法的实现都是空的。上面源码中注释也写了。

关闭线程池

    public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); //关闭加锁 try { //检查是否被关闭权限     checkShutdownAccess();     //设置线程池状态     advanceRunState(SHUTDOWN);     //中断线程     interruptIdleWorkers();     //空狗子方法     onShutdown(); // hook for ScheduledThreadPoolExecutor } finally {     mainLock.unlock(); } //销毁 tryTerminate();    }

最终的打断线程代码

    private void advanceRunState(int targetState) { for (;;) {     int c = ctl.get();     if (runStateAtLeast(c, targetState) ||  ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))  break; }    }
    private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try {     for (Worker w : workers) {  Thread t = w.thread;  //如果没有被打断,并且尝试还能获取到锁,打断  if (!t.isInterrupted() && w.tryLock()) {      try {   t.interrupt();      } catch (SecurityException ignore) {      } finally {   w.unlock();      }  }  if (onlyOne)      break;     } } finally {     mainLock.unlock(); }    }

aqs里的代码

这里tryLock():一个线程在执行一个任务之前,会先加锁,这意味着通过是否持有锁,可以判断出线程是否处于空闲状态。tryLock()如果调用成功,说明线程处于空闲状态,向其发送中断信号;否则不发送。

shutdownNow

 public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try {     checkShutdownAccess();     advanceRunState(STOP);     interruptWorkers();     //这里唯一不一样的是 返回队列里未执行的task     tasks = drainQueue(); } finally {     mainLock.unlock(); } tryTerminate(); return tasks;    }

提交任务分析

public void execute(Runnable command) { if (command == null)     throw new NullPointerException();     //获取线程池状态 int c = ctl.get(); // 如果当前线程数小于corePoolSize,则启动新线程 if (workerCountOf(c) < corePoolSize) { //添加Worker,并将command设置为Worker线程的第一个任务开始执行。     if (addWorker(command, true))  return;     c = ctl.get(); } // 如果当前的线程数大于或等于corePoolSize,则调用workQueue.offer放入队列 if (isRunning(c) && workQueue.offer(command)) {     int recheck = ctl.get();     // 如果线程池正在停止,则将command任务从队列移除,并拒绝command任务请求。     if (! isRunning(recheck) && remove(command))  reject(command);  // 放入队列中后发现没有线程执行任务,开启新线程     else if (workerCountOf(recheck) == 0)  addWorker(null, false); } // 线程数大于maxPoolSize,并且队列已满,调用拒绝策略 else if (!addWorker(command, false))     reject(command);    }
// 该方法用于启动新线程。如果第二个参数为true,则使用corePoolSize作为上限,否则使用 maxPoolSize作为上限。 private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) {      int c = ctl.get();     int rs = runStateOf(c);     // Check if queue empty only if necessary.     // 如果线程池状态值起码是SHUTDOWN和STOP,或则第一个任务不是null,或者工作队列 为空      // 则添加worker失败,返回false     if (rs >= SHUTDOWN &&  ! (rs == SHUTDOWN &&     firstTask == null &&     ! workQueue.isEmpty()))  return false;     for (;;) {  int wc = workerCountOf(c);  // 工作线程数达到最大,要么是corePoolSize要么是maximumPoolSize,启动 线程失败  if (wc >= CAPACITY ||      wc >= (core ? corePoolSize : maximumPoolSize))      return false;      // 增加worker数量成功,返回到retry语句  if (compareAndIncrementWorkerCount(c))      break retry;  c = ctl.get();  // Re-read ctl  // 如果线程池运行状态不是初始值,则重试retry标签语句,CAS  if (runStateOf(c) != rs)      continue retry;  // else CAS failed due to workerCount change; retry inner loop     } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //新建worker对象     w = new Worker(firstTask);     //获取线程     final Thread t = w.thread;     if (t != null) {  final ReentrantLock mainLock = this.mainLock;  //加锁  mainLock.lock();  try {      // Recheck while holding lock.      // Back out on ThreadFactory failure or if      // shut down before lock acquired.      //获取线程池状态      int rs = runStateOf(ctl.get());      if (rs < SHUTDOWN ||   (rs == SHUTDOWN && firstTask == null)) {   //线程在运行状态    if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();//添加线程对应worker到worker集合   workers.add(w);   int s = workers.size();   if (s > largestPoolSize)largestPoolSize = s;   workerAdded = true;      }  } finally {  //释放锁      mainLock.unlock();  }  //启动worker线程  if (workerAdded) {      t.start();      //工作      workerStarted = true;  }     } } finally {     if (! workerStarted)     //如果失败 工作线程-1  addWorkerFailed(w); } return workerStarted;    }

addWorkerFailed

    private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try {     if (w != null)  workers.remove(w);     decrementWorkerCount();     tryTerminate(); } finally {     mainLock.unlock(); }    }

4种拒绝策略

4种拒绝策略 举例demo、可以运行一下体验一下,就不多说了。

package thread.juc.threadpool;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;/** * 线程池 拒绝策略 详解 */public class RejectedExecutionHandlerTest1 {    public static void main(String[] args) throws InterruptedException { //拒绝策略 //AbortPolicy(); // 如果被丢弃的线程任务未关闭,则执行线程池的线程执行该线程任务 //CallerRunsPolicy(); // 丢弃当前的线程任务而不做任何处理  直接丢掉  不作处理 //DiscardPolicy(); // 移除当前最早提交的任务 DiscardOldestPolicy();    }    private static void DiscardOldestPolicy() throws InterruptedException { ThreadPoolExecutor t = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,  new ArrayBlockingQueue<>(3), new ThreadPoolExecutor.DiscardOldestPolicy());  // //移除当前最早提交的任务 for (int i = 0; i < 10; i++) {     int finalI = i;     t.submit(() -> {  try {      Thread.sleep(5_000);  } catch (InterruptedException e) {      e.printStackTrace();  }  System.out.println(finalI + " 执行完成 " + Thread.currentThread().getName());     });     System.out.println(" e -- > " + i); } TimeUnit.SECONDS.sleep(1);    }    private static void DiscardPolicy() throws InterruptedException { ThreadPoolExecutor t = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,  new ArrayBlockingQueue<>(3), new ThreadPoolExecutor.DiscardPolicy());  // 丢弃当前的线程任务而不做任何处理 for (int i = 0; i < 10; i++) {     int finalI = i;     t.submit(() -> {  try {      Thread.sleep(5_000);  } catch (InterruptedException e) {      e.printStackTrace();  }  System.out.println(finalI + " 执行完成 " + Thread.currentThread().getName());     });     System.out.println(" e -- > " + i); } TimeUnit.SECONDS.sleep(1);    }    private static void CallerRunsPolicy() throws InterruptedException { ThreadPoolExecutor t = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,  new ArrayBlockingQueue<>(3), new ThreadPoolExecutor.CallerRunsPolicy());  //  如果被丢弃的线程任务未关闭,则执行线程池的线程执行该线程任务 for (int i = 0; i < 10; i++) {     int finalI = i;     t.submit(() -> {  try {      Thread.sleep(5_000);  } catch (InterruptedException e) {      e.printStackTrace();  }  System.out.println(finalI + " 执行完成 " + Thread.currentThread().getName());     });     System.out.println(" e -- > " + i); } TimeUnit.SECONDS.sleep(1);    }    static void AbortPolicy() throws InterruptedException { ThreadPoolExecutor t = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,  new ArrayBlockingQueue<>(3), new ThreadPoolExecutor.AbortPolicy());  //  默认情况下也是 拒绝 抛异常 for (int i = 0; i < 10; i++) {     int finalI = i;     t.submit(() -> {  try {      Thread.sleep(5_000);  } catch (InterruptedException e) {      e.printStackTrace();  }  System.out.println(finalI + " 执行完成 " + Thread.currentThread().getName());     });     System.out.println(" e -- > " + i); } TimeUnit.SECONDS.sleep(1);    }}