> 文档中心 > 【Java面试突击-7】Java并发编程(线程池)

【Java面试突击-7】Java并发编程(线程池)

文章目录

  • 线程
    • 使用线程池的好处
    • Executor 框架
    • Executor 框架结构(主要由三大部分组成)
      • 1,任务(Runnable /Callable)
      • 2,任务的执行(Executor)
      • 3,异步计算的结果(Future)
    • Executor 框架的使用
    • ThreadPoolExecutor 类简单介绍
      • ThreadPoolExecutor 类分析
      • ThreadPoolExecutor 7个 参数描述:
      • ThreadPoolExecutor 饱和策略定义:
      • Executors 创建线程池的弊端:
    • ThreadPoolExecutor 使用
      • Runnable+ThreadPoolExecutor 示例
      • Callable+ThreadPoolExecutor 示例
    • 线程池的状态
      • 线程池状态流转
    • 线程池流程分析
    • 线程池方法的对比
      • Runnable vs Callable
      • execute() vs submit()
      • shutdown() VS shutdownNow()
      • isTerminated() VS isShutdown()
    • 常见的线程池详解
      • FixedThreadPool
        • 介绍
        • 执行任务过程
        • 不推荐使用
      • SingleThreadExecutor
        • 介绍
        • 执行过程
        • 不推荐使用
      • CachedThreadPool
        • 介绍
        • 执行过程
        • 不推荐使用
      • ScheduledThreadPoolExecutor
        • 介绍
        • 执行过程
        • 执行周期任务的步骤

线程池

使用线程池的好处

线程池提供了一种限制和管理资源(包括执行一个任务)的方式。 每个线程池还维护一些基本统计信息,例如已完成任务的数量。

使用线程池的好处:

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

Executor 框架

Executor 框架是 Java5 之后引进的,在 Java 5 之后,通过 Executor 来启动线程比使用 Thread 的 start 方法更好,除了更易管理,效率更好(用线程池实现,节约开销。

Executor 框架不仅包括了线程池的管理,还提供了线程工厂、队列以及拒绝策略等,Executor 框架让并发编程变得更加简单。

Executor 框架结构(主要由三大部分组成)

1,任务(Runnable /Callable)

执行任务需要实现的 Runnable 接口 或 Callable接口。Runnable 接口或 Callable 接口 实现类都可以被 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 执行。

2,任务的执行(Executor)

如下图所示,包括任务执行机制的核心接口 Executor ,以及继承自 Executor 接口的 ExecutorService 接口。ThreadPoolExecutor 和 ScheduledThreadPoolExecutor 这两个关键类实现了 ExecutorService 接口。

ThreadPoolExecutor 类定义:

//AbstractExecutorService实现了ExecutorService接口public class ThreadPoolExecutor extends AbstractExecutorService

ScheduledThreadPoolExecutor 类定义:

//ScheduledExecutorService继承ExecutorService接口public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService

类关系图:
【Java面试突击-7】Java并发编程(线程池)

3,异步计算的结果(Future)

Future 接口以及 Future 接口的实现类 FutureTask 类都可以代表异步计算的结果。

当我们把 Runnable 接口 或 Callable 接口 的实现类提交给 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 执行。(调用 submit() 方法时会返回一个 FutureTask 对象)

Executor 框架的使用

Executor 框架的使用示意图:
【Java面试突击-7】Java并发编程(线程池)
流程:

1,主线程首先要创建实现 Runnable 或者 Callable 接口的任务对象。
2,把创建完成的实现 Runnable/Callable接口的 对象直接交给 ExecutorService 执行: ExecutorService.execute(Runnable command))或者也可以把 Runnable 对象或Callable 对象提交给 ExecutorService 执行(ExecutorService.submit(Runnable task)或 ExecutorService.submit(Callable task))。
3,如果执行 ExecutorService.submit(…),ExecutorService 将返回一个实现Future接口的对象(我们刚刚也提到过了执行 execute()方法和 submit()方法的区别,submit()会返回一个 FutureTask 对象)。由于 FutureTask 实现了 Runnable,我们也可以创建 FutureTask,然后直接交给 ExecutorService 执行。
4,最后,主线程可以执行 FutureTask.get()方法来等待任务执行完成。主线程也可以执行 FutureTask.cancel(boolean mayInterruptIfRunning)来取消此任务的执行。

ThreadPoolExecutor 类简单介绍

线程池实现类 ThreadPoolExecutor 是 Executor 框架最核心的类。

ThreadPoolExecutor 类分析

ThreadPoolExecutor 类中提供的四个构造方法。我们来看最长的那个,其余三个都是在这个构造方法的基础上产生。

 /**     * 用给定的初始参数创建一个新的ThreadPoolExecutor。     */    public ThreadPoolExecutor(int corePoolSize,//线程池的核心线程数量  int maximumPoolSize,//线程池的最大线程数  long keepAliveTime,//当线程数大于核心线程数时,多余的空闲线程存活的最长时间  TimeUnit unit,//时间单位  BlockingQueue<Runnable> workQueue,//任务队列,用来储存等待执行任务的队列  ThreadFactory threadFactory,//线程工厂,用来创建线程,一般默认即可  RejectedExecutionHandler handler//拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务   ) { if (corePoolSize < 0 ||     maximumPoolSize <= 0 ||     maximumPoolSize < corePoolSize ||     keepAliveTime < 0)     throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null)     throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler;    }

ThreadPoolExecutor 7个 参数描述:

  • corePoolSize : 核心线程数线程数定义了最小可以同时运行的线程数量。
  • maximumPoolSize : 当队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。
  • workQueue: 当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。
  • keepAliveTime:当线程池中的线程数量大于 corePoolSize 的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了 keepAliveTime才会被回收销毁;
  • unit : keepAliveTime 参数的时间单位。
  • threadFactory :executor 创建新线程的时候会用到。
  • handler :饱和策略。关于饱和策略下面单独介绍一下。

线程池中各个参数的相互关系图:
【Java面试突击-7】Java并发编程(线程池)

ThreadPoolExecutor 饱和策略定义:

如果当前同时运行的线程数量达到最大线程数量并且队列也已经被放满了任务时,ThreadPoolTaskExecutor 定义一些策略:

  • ThreadPoolExecutor.AbortPolicy :抛出 RejectedExecutionException来拒绝新任务的处理。
  • ThreadPoolExecutor.CallerRunsPolicy :调用执行自己的线程运行任务,也就是直接在调用execute方法的线程中运行(run)被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。因此这种策略会降低对于新任务提交速度,影响程序的整体性能。如果您的应用程序可以承受此延迟并且你要求任何一个任务请求都要被执行的话,你可以选择这个策略。
  • ThreadPoolExecutor.DiscardPolicy :不处理新任务,直接丢弃掉。
  • ThreadPoolExecutor.DiscardOldestPolicy : 此策略将丢弃最早的未处理的任务请求。

Executors 创建线程池的弊端:

《阿里巴巴 Java 开发手册》中强制线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 构造函数的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险;

使用 Executors 创建的线程池对象的弊端如下:

FixedThreadPool SingleThreadExecutor : 允许请求的队列长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM。

CachedThreadPoolScheduledThreadPool : 允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。

ThreadPoolExecutor 使用

Runnable+ThreadPoolExecutor 示例

首先创建一个 Runnable 接口的实现类。

import java.util.Date;/** * 这是一个简单的Runnable类,需要大约5秒钟来执行其任务。 * @author shuang.kou */public class MyRunnable implements Runnable {    private String command;    public MyRunnable(String s) { this.command = s;    }    @Override    public void run() { System.out.println(Thread.currentThread().getName() + " Start. Time = " + new Date()); processCommand(); System.out.println(Thread.currentThread().getName() + " End. Time = " + new Date());    }    private void processCommand() { try {     Thread.sleep(5000); } catch (InterruptedException e) {     e.printStackTrace(); }    }    @Override    public String toString() { return this.command;    }}

编写测试程序,我们这里以阿里巴巴推荐的使用 ThreadPoolExecutor 构造函数自定义参数的方式来创建线程池。

import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class ThreadPoolExecutorDemo {    private static final int CORE_POOL_SIZE = 5;    private static final int MAX_POOL_SIZE = 10;    private static final int QUEUE_CAPACITY = 100;    private static final Long KEEP_ALIVE_TIME = 1L;    public static void main(String[] args) { //使用阿里巴巴推荐的创建线程池的方式 //通过ThreadPoolExecutor构造函数自定义参数创建 ThreadPoolExecutor executor = new ThreadPoolExecutor(  CORE_POOL_SIZE,  MAX_POOL_SIZE,  KEEP_ALIVE_TIME,  TimeUnit.SECONDS,  new ArrayBlockingQueue<>(QUEUE_CAPACITY),  new ThreadPoolExecutor.CallerRunsPolicy()); for (int i = 0; i < 10; i++) {     //创建WorkerThread对象(WorkerThread类实现了Runnable 接口)     Runnable worker = new MyRunnable("" + i);     //执行Runnable     executor.execute(worker); } //终止线程池 executor.shutdown(); while (!executor.isTerminated()) { } System.out.println("Finished all threads");    }}

上面的代码创建了一个线程池:

1,corePoolSize: 核心线程数为 5。
2,maximumPoolSize :最大线程数 10
3,keepAliveTime : 等待时间为 1L。
4,unit: 等待时间的单位为 TimeUnit.SECONDS。
5,workQueue:任务队列为 ArrayBlockingQueue,并且容量为 100;
6,handler:饱和策略为 CallerRunsPolicy。

Callable+ThreadPoolExecutor 示例

import java.util.concurrent.Callable;public class MyCallable implements Callable<String> {    @Override    public String call() throws Exception { Thread.sleep(1000); //返回执行当前 Callable 的线程名字 return Thread.currentThread().getName();    }}
import java.util.ArrayList;import java.util.Date;import java.util.List;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.Callable;import java.util.concurrent.ExecutionException;import java.util.concurrent.Future;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class CallableDemo {    private static final int CORE_POOL_SIZE = 5;    private static final int MAX_POOL_SIZE = 10;    private static final int QUEUE_CAPACITY = 100;    private static final Long KEEP_ALIVE_TIME = 1L;    public static void main(String[] args) { //使用阿里巴巴推荐的创建线程池的方式 //通过ThreadPoolExecutor构造函数自定义参数创建 ThreadPoolExecutor executor = new ThreadPoolExecutor(  CORE_POOL_SIZE,  MAX_POOL_SIZE,  KEEP_ALIVE_TIME,  TimeUnit.SECONDS,  new ArrayBlockingQueue<>(QUEUE_CAPACITY),  new ThreadPoolExecutor.CallerRunsPolicy()); List<Future<String>> futureList = new ArrayList<>(); Callable<String> callable = new MyCallable(); for (int i = 0; i < 10; i++) {     //提交任务到线程池     Future<String> future = executor.submit(callable);     //将返回值 future 添加到 list,我们可以通过 future 获得 执行 Callable 得到的返回值     futureList.add(future); } for (Future<String> fut : futureList) {     try {  System.out.println(new Date() + "::" + fut.get());     } catch (InterruptedException | ExecutionException e) {  e.printStackTrace();     } } //关闭线程池 executor.shutdown();    }}

线程池的状态

  • RUNNING:接受新任务并处理排队的任务。
  • SHUTDOWN:不接受新任务,但处理排队的任务。
  • STOP:不接受新任务,不处理排队的任务,并中断正在进行的任务。
  • TIDYING:所有任务都已终止,workerCount 为零,线程转换到TIDYING 状态将运行 terminated() 钩子方法。
  • TERMINATED:terminated() 已完成。
    线程池状态流转

线程池状态流转

【Java面试突击-7】Java并发编程(线程池)

  • 当创建线程池后,初始时,线程池处于RUNNING状态。
  • 如果调用了shutdown()方法,则线程池处于SHUTDOWN状态,此时线程池不能够接受新的任务,它会等待所有任务执行完毕。
  • 如果调用了shutdownNow()方法,则线程池处于STOP状态,此时线程池不能接受新的任务,并且会去尝试终止正在执行的任务。

线程池流程分析

在上面的例子中我们使用 executor.execute(worker)来提交一个任务到线程池中去,这个方法非常重要,下面我们来看看它的源码:

 // 存放线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount)   private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));    private static int workerCountOf(int c) { return c & CAPACITY;    }    //任务队列    private final BlockingQueue<Runnable> workQueue;    public void execute(Runnable command) { // 如果任务为null,则抛出异常。 if (command == null)     throw new NullPointerException(); // ctl 中保存的线程池当前的一些状态信息 int c = ctl.get(); //  下面会涉及到 3 步 操作 // 1.首先判断当前线程池中之行的任务数量是否小于 corePoolSize // 如果小于的话,通过addWorker(command, true)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。 if (workerCountOf(c) < corePoolSize) {     if (addWorker(command, true))  return;     c = ctl.get(); } // 2.如果当前之行的任务数量大于等于 corePoolSize 的时候就会走到这里 // 通过 isRunning 方法判断线程池状态,线程池处于 RUNNING 状态并且队列可以加入任务,该任务才会被加入进去 if (isRunning(c) && workQueue.offer(command)) {     int recheck = ctl.get();     // 再次获取线程池状态,如果线程池状态不是 RUNNING 状态就需要从任务队列中移除任务,并尝试判断线程是否全部执行完毕。同时执行拒绝策略。     if (!isRunning(recheck) && remove(command))  reject(command);  // 如果当前线程池为空就新创建一个线程并执行。     else if (workerCountOf(recheck) == 0)  addWorker(null, false); } //3. 通过addWorker(command, false)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。 //如果addWorker(command, false)执行失败,则通过reject()执行相应的拒绝策略的内容。 else if (!addWorker(command, false))     reject(command);    }

通过下图可以更好的对上面这 3 步做一个展示。

【Java面试突击-7】Java并发编程(线程池)

addWorker 这个方法主要用来创建新的工作线程,如果返回 true 说明创建和启动工作线程成功,否则的话返回的就是 false。

 // 全局锁,并发操作必备    private final ReentrantLock mainLock = new ReentrantLock();    // 跟踪线程池的最大大小,只有在持有全局锁mainLock的前提下才能访问此集合    private int largestPoolSize;    // 工作线程集合,存放线程池中所有的(活跃的)工作线程,只有在持有全局锁mainLock的前提下才能访问此集合    private final HashSet<Worker> workers = new HashSet<>();    //获取线程池状态    private static int runStateOf(int c)     { return c & ~CAPACITY; }    //判断线程池的状态是否为 Running    private static boolean isRunning(int c) { return c < SHUTDOWN;    }    /**     * 添加新的工作线程到线程池     * @param firstTask 要执行     * @param core参数为true的话表示使用线程池的基本大小,为false使用线程池最大大小     * @return 添加成功就返回true否则返回false     */   private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) {     //这两句用来获取线程池的状态     int c = ctl.get();     int rs = runStateOf(c);     // Check if queue empty only if necessary.     if (rs >= SHUTDOWN &&  ! (rs == SHUTDOWN &&     firstTask == null &&     ! workQueue.isEmpty()))  return false;     for (;;) { //获取线程池中工作的线程的数量  int wc = workerCountOf(c);  // core参数为true的话表明队列也满了,线程池大小变为 maximumPoolSize  if (wc >= CAPACITY ||      wc >= (core ? corePoolSize : maximumPoolSize))      return false; //原子操作将workcount的数量加1  if (compareAndIncrementWorkerCount(c))      break retry;  // 如果线程的状态改变了就再次执行上述操作  c = ctl.get();  if (runStateOf(c) != rs)      continue retry;  // else CAS failed due to workerCount change; retry inner loop     } } // 标记工作线程是否启动成功 boolean workerStarted = false; // 标记工作线程是否创建成功 boolean workerAdded = false; Worker w = null; try {     w = new Worker(firstTask);     final Thread t = w.thread;     if (t != null) {// 加锁  final ReentrantLock mainLock = this.mainLock;  mainLock.lock();  try {     //获取线程池状态      int rs = runStateOf(ctl.get());     //rs < SHUTDOWN 如果线程池状态依然为RUNNING,并且线程的状态是存活的话,就会将工作线程添加到工作线程集合中    //(rs=SHUTDOWN && firstTask == null)如果线程池状态小于STOP,也就是RUNNING或者SHUTDOWN状态下,同时传入的任务实例firstTask为null,则需要添加到工作线程集合和启动新的Worker     // firstTask == null证明只新建线程而不执行任务      if (rs < SHUTDOWN ||   (rs == SHUTDOWN && firstTask == null)) {   if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();   workers.add(w);  //更新当前工作线程的最大容量   int s = workers.size();   if (s > largestPoolSize)largestPoolSize = s; // 工作线程是否启动成功   workerAdded = true;      }  } finally {      // 释放锁      mainLock.unlock();  }   如果成功添加工作线程,则调用Worker内部的线程实例t的Thread#start()方法启动真实的线程实例  if (workerAdded) {      t.start();    /// 标记线程启动成功      workerStarted = true;  }     } } finally {    // 线程启动失败,需要从工作线程中移除对应的Worker     if (! workerStarted)  addWorkerFailed(w); } return workerStarted;    }

线程池方法的对比

Runnable vs Callable

Runnable自 Java 1.0 以来一直存在,但Callable仅在 Java 1.5 中引入。Runnable 接口不会返回结果或抛出检查异常,但是 Callable 接口可以。所以,如果任务不需要返回结果或抛出异常推荐使用 Runnable 接口,这样代码看起来会更加简洁。

工具类 Executors 可以实现将 Runnable 对象转换成 Callable 对象。(Executors.callable(Runnable task) 或 Executors.callable(Runnable task, Object result))。

execute() vs submit()

execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功与否;

submit()方法用于提交需要返回值的任务。线程池会返回一个 Future 类型的对象,通过这个 Future 对象可以判断任务是否执行成功,并且可以通过 Future 的 get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用 get(long timeout,TimeUnit unit)方法的话,如果在 timeout 时间内任务还没有执行完,就会抛出 java.util.concurrent.TimeoutException。

使用 get(long timeout,TimeUnit unit) 方法获取返回值。

ExecutorService executorService = Executors.newFixedThreadPool(3);Future<String> submit = executorService.submit(() -> {    try { Thread.sleep(5000L);    } catch (InterruptedException e) { e.printStackTrace();    }    return "abc";});String s = submit.get(3, TimeUnit.SECONDS);System.out.println(s);executorService.shutdown();

输出:

Exception in thread "main" java.util.concurrent.TimeoutExceptionat java.util.concurrent.FutureTask.get(FutureTask.java:205)

shutdown() VS shutdownNow()

  • shutdown() :关闭线程池,线程池的状态变为 SHUTDOWN。线程池不再接受新任务了,但是队列里的任务得执行完毕。
  • shutdownNow() :关闭线程池,线程的状态变为 STOP。线程池会终止当前正在运行的任务,并停止处理排队的任务并返回正在等待执行的 List。

isTerminated() VS isShutdown()

  • isShutDown 当调用 shutdown() 方法后返回为 true。
  • isTerminated 当调用 shutdown() 方法后,并且所有提交的任务完成后返回为 true

常见的线程池详解

FixedThreadPool

介绍

/**     * 创建一个可重用固定数量线程的线程池     */    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads,   0L, TimeUnit.MILLISECONDS,   new LinkedBlockingQueue<Runnable>(),   threadFactory);    }

从上面源代码可以看出新创建的 FixedThreadPool 的 corePoolSize 和 maximumPoolSize 都被设置为 nThreads,这个 nThreads 参数是我们使用的时候自己传递的。

执行任务过程

FixedThreadPool 的 execute() 方法运行示意图:
【Java面试突击-7】Java并发编程(线程池)
说明:

1,如果当前运行的线程数小于 corePoolSize, 如果再来新任务的话,就创建新的线程来执行任务;
2,当前运行的线程数等于 corePoolSize 后, 如果再来新任务的话,会将任务加入 LinkedBlockingQueue;
3,线程池中的线程执行完 手头的任务后,会在循环中反复从 LinkedBlockingQueue 中获取任务来执行;

不推荐使用

1,FixedThreadPool 使用无界队列 LinkedBlockingQueue(队列的容量为 Integer.MAX_VALUE)作为线程池的工作队列。
2,所以新任务将在无界队列中等待,因此线程池中的线程数不会超过 corePoolSize
3,运行中的 FixedThreadPool 永远不会拒绝任务,在任务比较多的时候会导致 OOM(内存溢出)。

SingleThreadExecutor

介绍

SingleThreadExecutor 是只有一个线程的线程池。下面看看SingleThreadExecutor 的实现:

  /**     *返回只有一个线程的线程池     */    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService     (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory));    }

从上面源代码可以看出新创建的 SingleThreadExecutor 的 corePoolSize 和 maximumPoolSize 都被设置为 1.其他参数和 FixedThreadPool 相同。

执行过程

SingleThreadExecutor 的运行示意图:
【Java面试突击-7】Java并发编程(线程池)
说明:

1,如果当前运行的线程数少于 corePoolSize,则创建一个新的线程执行任务;
2,当前线程池中有一个运行的线程后,将任务加入 LinkedBlockingQueue
3,线程执行完当前的任务后,会在循环中反复从LinkedBlockingQueue 中获取任务来执行;

不推荐使用

SingleThreadExecutor 使用无界队列 LinkedBlockingQueue 作为线程池的工作队列(队列的容量为 Intger.MAX_VALUE)。SingleThreadExecutor 使用无界队列作为线程池的工作队列会对线程池带来的影响与 FixedThreadPool 相同。无限的队列也是会导致 OOM。

CachedThreadPool

介绍

CachedThreadPool 是一个会根据需要创建新线程的线程池。下面通过源码来看看 CachedThreadPool 的实现:

 /**     * 创建一个线程池,根据需要创建新线程,但会在先前构建的线程可用时重用它。     */    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE,   60L, TimeUnit.SECONDS,   new SynchronousQueue<Runnable>(),   threadFactory);    }

CachedThreadPool 的corePoolSize 被设置为空(0),maximumPoolSize被设置为 Integer.MAX.VALUE,即它是无界的,这也就意味着如果主线程提交任务的速度高于 maximumPool 中线程处理任务的速度时,CachedThreadPool 会不断创建新的线程。极端情况下,这样会导致耗尽 cpu 和内存资源。

执行过程

CachedThreadPool 的 execute() 方法的执行示意图:
【Java面试突击-7】Java并发编程(线程池)

说明:

1,首先执行 SynchronousQueue.offer(Runnable task) 提交任务到任务队列。如果当前 maximumPool 中有闲线程正在执行 SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),那么主线程执行 offer 操作与空闲线程执行的 poll 操作配对成功,主线程把任务交给空闲线程执行,execute()方法执行完成,否则执行下面的步骤 2;
2,当初始 maximumPool 为空,或者 maximumPool 中没有空闲线程时,将没有线程执行 SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。这种情况下,步骤 1 将失败,此时 CachedThreadPool 会创建新线程执行任务,execute 方法执行完成;

不推荐使用

CachedThreadPool允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。

ScheduledThreadPoolExecutor

介绍

ScheduledThreadPoolExecutor 使用的任务队列 DelayQueue 封装了一个 PriorityQueue,PriorityQueue 会对队列中的任务进行排序,执行所需时间短的放在前面先被执行(ScheduledFutureTask 的 time 变量小的先执行),如果执行所需时间相同则先提交的任务将被先执行(ScheduledFutureTask 的 squenceNumber 变量小的先执行)。

执行过程

【Java面试突击-7】Java并发编程(线程池)

ScheduledThreadPoolExecutor 的执行主要分为两大部分:

1,当调用 ScheduledThreadPoolExecutor 的 scheduleAtFixedRate() 方法或者 scheduleWithFixedDelay() 方法时,会向 ScheduledThreadPoolExecutor 的 DelayQueue 添加一个实现了 RunnableScheduledFuture 接口的 ScheduledFutureTask 。
2,线程池中的线程从 DelayQueue 中获取 ScheduledFutureTask,然后执行任务。

ScheduledThreadPoolExecutor 为了实现周期性的执行任务,对 ThreadPoolExecutor做了如下修改:

  • 使用 DelayQueue 作为任务队列;
  • 获取任务的方不同
  • 执行周期任务后,增加了额外的处理

执行周期任务的步骤

【Java面试突击-7】Java并发编程(线程池)

1, 线程 1 从 DelayQueue 中获取已到期的 ScheduledFutureTask(DelayQueue.take())。到期任务是指 ScheduledFutureTask的 time 大于等于当前系统的时间;
2, 线程 1 执行这个 ScheduledFutureTask;
3, 线程 1 修改 ScheduledFutureTask 的 time 变量为下次将要被执行的时间;
4, 线程 1 把这个修改 time 之后的 ScheduledFutureTask 放回 DelayQueue 中(DelayQueue.add())。