线程池原理
线程池原理
线程池的实现原理
线程池的实现原理其实可以看成一个生产者消费者模型。
我们执行线程往线程池里提交任务,线程池里的线程去消费任务,其他的一些策略去保证线程池和系统安全,如拒绝策略,线程大小的控制,阻塞队列的大小。
JDK中线程继承图如下
可以看比较详细的具体的继承图
顶级接口
只有一个执行任务的方法
public interface Executor { void execute(Runnable command);}
ExecutorService 接口
额外添加了一些 关闭方法 和 其他一些提交任务,包括单独提交 有返回值的、无返回值的,批量提交任务的方法等。
public interface ExecutorService extends Executor { //关闭 void shutdown(); //关闭并且返回 未执行的任务 List<Runnable> shutdownNow(); //线程池是否被关闭 boolean isShutdown(); //线程池是否死亡 boolean isTerminated(); //等待多长时间 销毁线程池 boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;//提交有有返回值的任务 <T> Future<T> submit(Callable<T> task); //提交有有返回值的任务 指定结果 可以适配 <T> Future<T> submit(Runnable task, T result); //提交无返回值的,无返回值可以阻塞获取 也就是等待执行完成 返回值只不过是void Future<?> submit(Runnable task); //批量提交 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; //批量提交 最多等待时间 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;}
ExecutorService 的抽象类
public abstract class AbstractExecutorService implements ExecutorService {protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); }protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { //这里异步执行任务 FutureTask 需要和 Future 一起看。 return new FutureTask<T>(callable); } //返回值方法提交 public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; } //其他省略.... }
可以看到 execute(task); 是顶级接口中需要提交任务方法,这也明白了 为什么顶级接口只有这一个方法
返回指的可以在此方法执行完后,在里面设置返回结果。
抽象累的具体实现,也就是线程池的构造方法
package java.util.concurrent;import java.security.AccessControlContext;import java.security.AccessController;import java.security.PrivilegedAction;import java.util.concurrent.locks.AbstractQueuedSynchronizer;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.ReentrantLock;import java.util.concurrent.atomic.AtomicInteger;import java.util.*;public class ThreadPoolExecutor extends AbstractExecutorService {private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP= 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // Packing and unpacking ctl private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; }// 存放任务的阻塞队列 private final BlockingQueue<Runnable> workQueue; // 对线程池内部各种变量进行互斥访问控制 private final ReentrantLock mainLock = new ReentrantLock(); // 线程集合 private final HashSet<Worker> workers = new HashSet<Worker>(); /** * Wait condition to support awaitTermination */ private final Condition termination = mainLock.newCondition(); /** * Tracks largest attained pool size. Accessed only under * mainLock. */ private int largestPoolSize; /** * Counter for completed tasks. Updated only on termination of * worker threads. Accessed only under mainLock. */ private long completedTaskCount; private volatile ThreadFactory threadFactory; /** * 拒绝策略 */ private volatile RejectedExecutionHandler handler; // 活跃时间 private volatile long keepAliveTime; /// 是否允许核心线程超时 private volatile boolean allowCoreThreadTimeOut; ///核心线程数 private volatile int corePoolSize; //最大线程数 private volatile int maximumPoolSize; //默认拒绝策略 private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); // 每一个线程是一个Worker对象。Worker是ThreadPoolExector的内部类 private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; //worker 线程 final Thread thread; //worker 接收到的第一个任务 Runnable firstTask; //执行完毕的任务个数 volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } } /* * Methods for setting control state */ /** * Transitions runState to given target, or leaves it alone if * already at least the given target. * * @param targetState the desired state, either SHUTDOWN or STOP * (but not TIDYING or TERMINATED -- use tryTerminate for that) */ private void advanceRunState(int targetState) { for (;;) { int c = ctl.get(); if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) break; } } /** * Transitions to TERMINATED state if either (SHUTDOWN and pool * and queue empty) or (STOP and pool empty). If otherwise * eligible to terminate but workerCount is nonzero, interrupts an * idle worker to ensure that shutdown signals propagate. This * method must be called following any action that might make * termination possible -- reducing worker count or removing tasks * from the queue during shutdown. The method is non-private to * allow access from ScheduledThreadPoolExecutor. */ final void tryTerminate() { for (;;) { int c = ctl.get(); if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } } /* * Methods for controlling interrupts to worker threads. */ /** * If there is a security manager, makes sure caller has * permission to shut down threads in general (see shutdownPerm). * If this passes, additionally makes sure the caller is allowed * to interrupt each worker thread. This might not be true even if * first check passed, if the SecurityManager treats some threads * specially. */ private void checkShutdownAccess() { SecurityManager security = System.getSecurityManager(); if (security != null) { security.checkPermission(shutdownPerm); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) security.checkAccess(w.thread); } finally { mainLock.unlock(); } } } /** * Interrupts all threads, even if active. Ignores SecurityExceptions * (in which case some threads may remain uninterrupted). */ private void interruptWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) w.interruptIfStarted(); } finally { mainLock.unlock(); } } private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } } /** * Common form of interruptIdleWorkers, to avoid having to * remember what the boolean argument means. */ private void interruptIdleWorkers() { interruptIdleWorkers(false); } private static final boolean ONLY_ONE = true;final void reject(Runnable command) { handler.rejectedExecution(command, this); } /** * Performs any further cleanup following run state transition on * invocation of shutdown. A no-op here, but used by * ScheduledThreadPoolExecutor to cancel delayed tasks. */ void onShutdown() { } /** * State check needed by ScheduledThreadPoolExecutor to * enable running tasks during shutdown. * * @param shutdownOK true if should return true if SHUTDOWN */ final boolean isRunningOrShutdown(boolean shutdownOK) { int rs = runStateOf(ctl.get()); return rs == RUNNING || (rs == SHUTDOWN && shutdownOK); } /** * Drains the task queue into a new list, normally using * drainTo. But if the queue is a DelayQueue or any other kind of * queue for which poll or drainTo may fail to remove some * elements, it deletes them one by one. */ private List<Runnable> drainQueue() { BlockingQueue<Runnable> q = workQueue; ArrayList<Runnable> taskList = new ArrayList<Runnable>(); q.drainTo(taskList); if (!q.isEmpty()) { for (Runnable r : q.toArray(new Runnable[0])) { if (q.remove(r)) taskList.add(r); } } return taskList; } private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize)largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; } /** * Rolls back the worker thread creation. * - removes worker from workers, if present * - decrements worker count * - rechecks for termination, in case the existence of this * worker was holding up termination */ private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (w != null) workers.remove(w); decrementWorkerCount(); tryTerminate(); } finally { mainLock.unlock(); } } private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } }private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } } final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } //默认 线程工厂 默认 拒绝策略 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); } //默认 线程工厂 可以指定拒绝策略 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); } //这个最全构造 线程工厂 可以指定拒绝策略 // corePoolSize:在线程池中始终维护的线程个数。// 在corePooSize 已满、队列也满的情况下,扩充线程至此值。// keepAliveTime/TimeUnit:maxPoolSize 中的空闲线程,销毁所需要的时间,总线程数收缩回corePoolSize。// workQueue :线程池所用的队列类型。// threadFactory :线程创建工厂,可以自定义,有默认值 如上面// handler : corePoolSize已满,队列已满,maxPoolSize 已满,最后的拒绝策略。//// public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; } public void execute(Runnable command) { if (command == null) throw new NullPointerException();int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); }public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; } public boolean isShutdown() { return ! isRunning(ctl.get()); }public boolean isTerminating() { int c = ctl.get(); return ! isRunning(c) && runStateLessThan(c, TERMINATED); } public boolean isTerminated() { return runStateAtLeast(ctl.get(), TERMINATED); } public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (;;) { if (runStateAtLeast(ctl.get(), TERMINATED)) return true; if (nanos <= 0) return false; nanos = termination.awaitNanos(nanos); } } finally { mainLock.unlock(); } }protected void finalize() { SecurityManager sm = System.getSecurityManager(); if (sm == null || acc == null) { shutdown(); } else { PrivilegedAction<Void> pa = () -> { shutdown(); return null; }; AccessController.doPrivileged(pa, acc); } } /** * Sets the thread factory used to create new threads. * * @param threadFactory the new thread factory * @throws NullPointerException if threadFactory is null * @see #getThreadFactory */ public void setThreadFactory(ThreadFactory threadFactory) { if (threadFactory == null) throw new NullPointerException(); this.threadFactory = threadFactory; } //获取默认线程池工厂 public ThreadFactory getThreadFactory() { return threadFactory; } ///设置拒绝策略 public void setRejectedExecutionHandler(RejectedExecutionHandler handler) { if (handler == null) throw new NullPointerException(); this.handler = handler; } //获取拒绝策略 public RejectedExecutionHandler getRejectedExecutionHandler() { return handler; } public void setCorePoolSize(int corePoolSize) { if (corePoolSize < 0) throw new IllegalArgumentException(); int delta = corePoolSize - this.corePoolSize; this.corePoolSize = corePoolSize; if (workerCountOf(ctl.get()) > corePoolSize) interruptIdleWorkers(); else if (delta > 0) { // We don't really know how many new threads are "needed". // As a heuristic, prestart enough new workers (up to new // core size) to handle the current number of tasks in // queue, but stop if queue becomes empty while doing so. int k = Math.min(delta, workQueue.size()); while (k-- > 0 && addWorker(null, true)) { if (workQueue.isEmpty()) break; } } } /** * Returns the core number of threads. * * @return the core number of threads * @see #setCorePoolSize */ public int getCorePoolSize() { return corePoolSize; } public boolean prestartCoreThread() { return workerCountOf(ctl.get()) < corePoolSize && addWorker(null, true); } void ensurePrestart() { int wc = workerCountOf(ctl.get()); if (wc < corePoolSize) addWorker(null, true); else if (wc == 0) addWorker(null, false); } public int prestartAllCoreThreads() { int n = 0; while (addWorker(null, true)) ++n; return n; } public boolean allowsCoreThreadTimeOut() { return allowCoreThreadTimeOut; }public void allowCoreThreadTimeOut(boolean value) { if (value && keepAliveTime <= 0) throw new IllegalArgumentException("Core threads must have nonzero keep alive times"); if (value != allowCoreThreadTimeOut) { allowCoreThreadTimeOut = value; if (value) interruptIdleWorkers(); } } public void setMaximumPoolSize(int maximumPoolSize) { if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) throw new IllegalArgumentException(); this.maximumPoolSize = maximumPoolSize; if (workerCountOf(ctl.get()) > maximumPoolSize) interruptIdleWorkers(); } public int getMaximumPoolSize() { return maximumPoolSize; } //设置活跃时间 public void setKeepAliveTime(long time, TimeUnit unit) { if (time < 0) throw new IllegalArgumentException(); if (time == 0 && allowsCoreThreadTimeOut()) throw new IllegalArgumentException("Core threads must have nonzero keep alive times"); long keepAliveTime = unit.toNanos(time); long delta = keepAliveTime - this.keepAliveTime; this.keepAliveTime = keepAliveTime; if (delta < 0) interruptIdleWorkers(); } /** * Returns the thread keep-alive time, which is the amount of time * that threads in excess of the core pool size may remain * idle before being terminated. * * @param unit the desired time unit of the result * @return the time limit * @see #setKeepAliveTime(long, TimeUnit) */ public long getKeepAliveTime(TimeUnit unit) { return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS); } /* User-level queue utilities */ //获取阻塞队列 便于外面监控队列大小 public BlockingQueue<Runnable> getQueue() { return workQueue; } public boolean remove(Runnable task) { boolean removed = workQueue.remove(task); tryTerminate(); // In case SHUTDOWN and now empty return removed; } public void purge() { final BlockingQueue<Runnable> q = workQueue; try { Iterator<Runnable> it = q.iterator(); while (it.hasNext()) { Runnable r = it.next(); if (r instanceof Future<?> && ((Future<?>)r).isCancelled()) it.remove(); } } catch (ConcurrentModificationException fallThrough) { // Take slow path if we encounter interference during traversal. // Make copy for traversal and call remove for cancelled entries. // The slow path is more likely to be O(N*N). for (Object r : q.toArray()) if (r instanceof Future<?> && ((Future<?>)r).isCancelled()) q.remove(r); } tryTerminate(); // In case SHUTDOWN and now empty } public int getPoolSize() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Remove rare and surprising possibility of // isTerminated() && getPoolSize() > 0 return runStateAtLeast(ctl.get(), TIDYING) ? 0 : workers.size(); } finally { mainLock.unlock(); } } //获取执行线程数量 public int getActiveCount() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int n = 0; for (Worker w : workers) if (w.isLocked()) ++n; return n; } finally { mainLock.unlock(); } } //最大线程数 public int getLargestPoolSize() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { return largestPoolSize; } finally { mainLock.unlock(); } } //获取任务数量 这个值是个动态值 可能不准 public long getTaskCount() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { long n = completedTaskCount; for (Worker w : workers) { n += w.completedTasks; if (w.isLocked()) ++n; } return n + workQueue.size(); } finally { mainLock.unlock(); } } // 获取执行结束的任务数量 public long getCompletedTaskCount() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { long n = completedTaskCount; for (Worker w : workers) n += w.completedTasks; return n; } finally { mainLock.unlock(); } }public String toString() { long ncompleted; int nworkers, nactive; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { ncompleted = completedTaskCount; nactive = 0; nworkers = workers.size(); for (Worker w : workers) { ncompleted += w.completedTasks; if (w.isLocked()) ++nactive; } } finally { mainLock.unlock(); } int c = ctl.get(); String rs = (runStateLessThan(c, SHUTDOWN) ? "Running" :(runStateAtLeast(c, TERMINATED) ? "Terminated" : "Shutting down")); return super.toString() + "[" + rs + ", pool size = " + nworkers + ", active threads = " + nactive + ", queued tasks = " + workQueue.size() + ", completed tasks = " + ncompleted + "]"; } /* Extension hooks */ //狗子函数 没实现 用于线程执行之前 做监控 protected void beforeExecute(Thread t, Runnable r) { }// //狗子函数 没实现 用于线程执行之后 做监控 protected void afterExecute(Runnable r, Throwable t) { } //狗子函数 没实现 用于线程池销毁之后 做监控 protected void terminated() { } /* Predefined RejectedExecutionHandlers */// 阻塞队列和最大线程数都满了 //拒绝策略之一 如果被丢弃的线程任务未关闭,则执行线程池的线程执行该线程任务 public static class CallerRunsPolicy implements RejectedExecutionHandler { /** * Creates a {@code CallerRunsPolicy}. */ public CallerRunsPolicy() { } /** * Executes task r in the caller's thread, unless the executor * has been shut down, in which case the task is discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } } //拒绝策略之一 拒绝策略 阻塞队列和最大线程数都满了 之后 直接抛异常 public static class AbortPolicy implements RejectedExecutionHandler { /** * Creates an {@code AbortPolicy}. */ public AbortPolicy() { } /** * Always throws RejectedExecutionException. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task * @throws RejectedExecutionException always */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() +" rejected from " +e.toString()); } } //拒绝策略之一 丢弃当前的线程任务而不做任何处理 直接丢掉 不作处理 public static class DiscardPolicy implements RejectedExecutionHandler { /** * Creates a {@code DiscardPolicy}. */ public DiscardPolicy() { } /** * Does nothing, which has the effect of discarding task r. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } } //拒绝策略之一 移除当前最早提交的任务 public static class DiscardOldestPolicy implements RejectedExecutionHandler { /** * Creates a {@code DiscardOldestPolicy} for the given executor. */ public DiscardOldestPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }}
处理流程
在每次往线程池中提交任务的时候,处理流程如下:
-
1、判断当前线程数是否大于或等于corePoolSize。如果小于,则新建线程执行;如果大于,则进入第二步。
-
2、判断队列是否已满。如未满,则放入;如已满,则进入第三步。
-
3、判断当前线程数是否大于或等于maxPoolSize。如果小于,则新建线程执行;如果大于,则进入第四步。
-
4、根据拒绝策略,拒绝任务。
-
5、拒绝策略 : 首先判断corePoolSize,其次判断blockingQueue是否已满,接着判断maxPoolSize,最后使用拒绝策略。
如果队列是无界的(如ArrayBlockingQueue
、LinkedBlockingQueue
没指定大小,默认int最大值),永远没有机会走到第三步,maxPoolSize没有
使用,也一定不会走到第四步。 因为队列没有满,max线程不会启动。拒绝策略也不回执行了。
线程池关闭
当关闭一个线程池的时候,有的线程还正在执行某个任务,有的调用者正在向线程池提交任务,并且队列中可能还有未执行的任务。
一般不会强制去关闭线程,等任务执行完成在关闭。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //这里 线程池状态被用Int 高三位表示 其他29位用来存线程个数 private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits //线程池5种状态 private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP= 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // Packing and unpacking ctl //运行状态 private static int runStateOf(int c) { return c & ~CAPACITY; } //线程数量 private static int workerCountOf(int c) { return c & CAPACITY; } //合并 private static int ctlOf(int rs, int wc) { return rs | wc; }
ctl变量被拆成两半,最高的3位用来表示线程池的状态,低的29位表示线程的个数。
线程池的状态有五种,分别是RUNNING、SHUTDOWN、STOP、TIDYING和TERMINATED。
从源码里看线程池关闭有2个方法 shutdown()和shutdownNow()
线程池还提供了其他几个钩子方法,这些方法的实现都是空的。上面源码中注释也写了。
关闭线程池
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); //关闭加锁 try { //检查是否被关闭权限 checkShutdownAccess(); //设置线程池状态 advanceRunState(SHUTDOWN); //中断线程 interruptIdleWorkers(); //空狗子方法 onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } //销毁 tryTerminate(); }
最终的打断线程代码
private void advanceRunState(int targetState) { for (;;) { int c = ctl.get(); if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) break; } }
private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; //如果没有被打断,并且尝试还能获取到锁,打断 if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
aqs里的代码
这里tryLock():一个线程在执行一个任务之前,会先加锁,这意味着通过是否持有锁,可以判断出线程是否处于空闲状态。tryLock()如果调用成功,说明线程处于空闲状态,向其发送中断信号;否则不发送。
shutdownNow
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); //这里唯一不一样的是 返回队列里未执行的task tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
提交任务分析
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); //获取线程池状态 int c = ctl.get(); // 如果当前线程数小于corePoolSize,则启动新线程 if (workerCountOf(c) < corePoolSize) { //添加Worker,并将command设置为Worker线程的第一个任务开始执行。 if (addWorker(command, true)) return; c = ctl.get(); } // 如果当前的线程数大于或等于corePoolSize,则调用workQueue.offer放入队列 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 如果线程池正在停止,则将command任务从队列移除,并拒绝command任务请求。 if (! isRunning(recheck) && remove(command)) reject(command); // 放入队列中后发现没有线程执行任务,开启新线程 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 线程数大于maxPoolSize,并且队列已满,调用拒绝策略 else if (!addWorker(command, false)) reject(command); }
// 该方法用于启动新线程。如果第二个参数为true,则使用corePoolSize作为上限,否则使用 maxPoolSize作为上限。 private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. // 如果线程池状态值起码是SHUTDOWN和STOP,或则第一个任务不是null,或者工作队列 为空 // 则添加worker失败,返回false if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); // 工作线程数达到最大,要么是corePoolSize要么是maximumPoolSize,启动 线程失败 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 增加worker数量成功,返回到retry语句 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl // 如果线程池运行状态不是初始值,则重试retry标签语句,CAS if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //新建worker对象 w = new Worker(firstTask); //获取线程 final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; //加锁 mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. //获取线程池状态 int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { //线程在运行状态 if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();//添加线程对应worker到worker集合 workers.add(w); int s = workers.size(); if (s > largestPoolSize)largestPoolSize = s; workerAdded = true; } } finally { //释放锁 mainLock.unlock(); } //启动worker线程 if (workerAdded) { t.start(); //工作 workerStarted = true; } } } finally { if (! workerStarted) //如果失败 工作线程-1 addWorkerFailed(w); } return workerStarted; }
addWorkerFailed
private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (w != null) workers.remove(w); decrementWorkerCount(); tryTerminate(); } finally { mainLock.unlock(); } }
4种拒绝策略
4种拒绝策略 举例demo、可以运行一下体验一下,就不多说了。
package thread.juc.threadpool;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;/** * 线程池 拒绝策略 详解 */public class RejectedExecutionHandlerTest1 { public static void main(String[] args) throws InterruptedException { //拒绝策略 //AbortPolicy(); // 如果被丢弃的线程任务未关闭,则执行线程池的线程执行该线程任务 //CallerRunsPolicy(); // 丢弃当前的线程任务而不做任何处理 直接丢掉 不作处理 //DiscardPolicy(); // 移除当前最早提交的任务 DiscardOldestPolicy(); } private static void DiscardOldestPolicy() throws InterruptedException { ThreadPoolExecutor t = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(3), new ThreadPoolExecutor.DiscardOldestPolicy()); // //移除当前最早提交的任务 for (int i = 0; i < 10; i++) { int finalI = i; t.submit(() -> { try { Thread.sleep(5_000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(finalI + " 执行完成 " + Thread.currentThread().getName()); }); System.out.println(" e -- > " + i); } TimeUnit.SECONDS.sleep(1); } private static void DiscardPolicy() throws InterruptedException { ThreadPoolExecutor t = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(3), new ThreadPoolExecutor.DiscardPolicy()); // 丢弃当前的线程任务而不做任何处理 for (int i = 0; i < 10; i++) { int finalI = i; t.submit(() -> { try { Thread.sleep(5_000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(finalI + " 执行完成 " + Thread.currentThread().getName()); }); System.out.println(" e -- > " + i); } TimeUnit.SECONDS.sleep(1); } private static void CallerRunsPolicy() throws InterruptedException { ThreadPoolExecutor t = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(3), new ThreadPoolExecutor.CallerRunsPolicy()); // 如果被丢弃的线程任务未关闭,则执行线程池的线程执行该线程任务 for (int i = 0; i < 10; i++) { int finalI = i; t.submit(() -> { try { Thread.sleep(5_000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(finalI + " 执行完成 " + Thread.currentThread().getName()); }); System.out.println(" e -- > " + i); } TimeUnit.SECONDS.sleep(1); } static void AbortPolicy() throws InterruptedException { ThreadPoolExecutor t = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(3), new ThreadPoolExecutor.AbortPolicy()); // 默认情况下也是 拒绝 抛异常 for (int i = 0; i < 10; i++) { int finalI = i; t.submit(() -> { try { Thread.sleep(5_000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(finalI + " 执行完成 " + Thread.currentThread().getName()); }); System.out.println(" e -- > " + i); } TimeUnit.SECONDS.sleep(1); }}