> 文档中心 > 必背的线程池知识你知道了吗?

必背的线程池知识你知道了吗?


线程池原理

1、为什么要使⽤线程池

使⽤线程池主要有以下三个原因:

  1. 创建/销毁线程需要消耗系统资源,线程池可以复⽤已创建的线程。
  2. 控制并发的数量。并发数量过多,可能会导致资源消耗过多,从⽽造成服务器 崩溃。(主要原因)
  3. 可以对线程做统⼀管理。

2 线程池的原理

Java中的线程池顶层接⼝是 Executor 接⼝, ThreadPoolExecutor 是这个接⼝的实 现类。

我们先看看 ThreadPoolExecutor 类。

2.1 ThreadPoolExecutor提供的构造⽅法

⼀共有四个构造⽅法:

// 五个参数的构造函数public ThreadPoolExecutor(int corePoolSize,     int maximumPoolSize,     long keepAliveTime,     TimeUnit unit,     BlockingQueue<Runnable> workQueue)    // 六个参数的构造函数-1public ThreadPoolExecutor(int corePoolSize,     int maximumPoolSize,     long keepAliveTime,     TimeUnit unit,     BlockingQueue<Runnable> workQueue,     ThreadFactory threadFactory)    // 六个参数的构造函数-2public ThreadPoolExecutor(int corePoolSize,     int maximumPoolSize,     long keepAliveTime,     TimeUnit unit,     BlockingQueue<Runnable> workQueue,     RejectedExecutionHandler handler)    // 七个参数的构造函数public ThreadPoolExecutor(int corePoolSize,     int maximumPoolSize,     long keepAliveTime,     TimeUnit unit,     BlockingQueue<Runnable> workQueue,     ThreadFactory threadFactory,     RejectedExecutionHandler handler)

涉及到5~7个参数,我们先看看必须的5个参数是什么意思:

  • int corePoolSize:该线程池中核⼼线程数最⼤值

核⼼线程:线程池中有两类线程,核⼼线程和⾮核⼼线程。核⼼线程默 认情况下会⼀直存在于线程池中,即使这个核⼼线程什么都不⼲(铁饭 碗),⽽⾮核⼼线程如果⻓时间的闲置,就会被销毁(临时⼯)。

  • int maximumPoolSize:该线程池中线程总数最⼤值 。

该值等于核⼼线程数量 + ⾮核⼼线程数量。

  • long keepAliveTime:⾮核⼼线程闲置超时时⻓。

⾮核⼼线程如果处于闲置状态超过该值,就会被销毁。如果设置 allowCoreThreadTimeOut(true),则会也作⽤于核⼼线程。

  • TimeUnit unit:keepAliveTime的单位。

TimeUnit是⼀个枚举类型 ,包括以下属性:

NANOSECONDS : 1微毫秒 = 1微秒 / 1000

MICROSECONDS : 1微秒 = 1毫秒 / 1000

MILLISECONDS : 1毫秒 = 1秒 /1000

SECONDS : 秒

MINUTES : 分

HOURS : ⼩时

DAYS : 天

  • BlockingQueue workQueue:阻塞队列,维护着等待执⾏的Runnable任务 对象。

常⽤的⼏个阻塞队列:

​ 1.LinkedBlockingQueue

链式阻塞队列,底层数据结构是链表,默认⼤⼩是 Integer.MAX_VALUE , 也可以指定⼤⼩。

​ 2. ArrayBlockingQueue

数组阻塞队列,底层数据结构是数组,需要指定队列的⼤⼩。

​ 3. SynchronousQueue

同步队列,内部容量为0,每个put操作必须等待⼀个take操作,反之亦 然。

  1. DelayQueue

延迟队列,该队列中的元素只有当其指定的延迟时间到了,才能够从队列 中获取到该元素

介绍完5个必须的参数之后,还有两个⾮必须的参数。

  • ThreadFactory threadFactory

创建线程的⼯⼚ ,⽤于批量创建线程,统⼀在创建线程时设置⼀些参数,如是 否守护线程、线程的优先级等。如果不指定,会新建⼀个默认的线程⼯⼚。

static class DefaultThreadFactory implements ThreadFactory {     // 省略属性     // 构造函数     DefaultThreadFactory() {  SecurityManager s = System.getSecurityManager();  group = (s != null) ? s.getThreadGroup() :  Thread.currentThread().getThreadGroup();  namePrefix = "pool-" +  poolNumber.getAndIncrement() +  "-thread-";     }     // 省略}
  • RejectedExecutionHandler handler

    拒绝处理策略,线程数量⼤于最⼤线程数就会采⽤拒绝处理策略,四种拒绝处 理的策略为 :

    1. ThreadPoolExecutor.AbortPolicy:默认拒绝处理策略,丢弃任务并抛 出RejectedExecutionException异常。
    2. ThreadPoolExecutor.DiscardPolicy:丢弃新来的任务,但是不抛出异 常。
    3. ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列头部(最旧的) 的任务,然后重新尝试执⾏程序(如果再次失败,重复此过程)。
    4. ThreadPoolExecutor.CallerRunsPolicy:由调⽤线程处理该任务。

2.2 ThreadPoolExecutor的策略

线程池本身有⼀个调度线程,这个线程就是⽤于管理整个线程池⾥的各种任务 和事务,例如创建线程、销毁线程、任务队列管理、线程队列管理等等。

故线程池也有⾃⼰的状态。 ThreadPoolExecutor 类中定义了⼀个 volatile int 变 量runState来表示线程池的状态 ,分别为RUNNING、SHURDOWN、STOP、 TIDYING 、TERMINATED。

  • 线程池创建后处于RUNNING状态。

  • 调⽤shutdown()⽅法后处于SHUTDOWN状态,线程池不能接受新的任务,清 除⼀些空闲worker,会等待阻塞队列的任务完成。

  • 调⽤shutdownNow()⽅法后处于STOP状态,**线程池不能接受新的任务,中断 所有线程,阻塞队列中没有被执⾏的任务全部丢弃。**此时,poolsize=0,阻塞队 列的size也为0。

  • 当所有的任务已终⽌,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。 接着会执⾏terminated()函数。

    ThreadPoolExecutor中有⼀个控制状态的属性叫ctl,它是⼀个 AtomicInteger类型的变量。

  • 线程池处在TIDYING状态时,执⾏完terminated()⽅法之后,就会由 TIDYING -> TERMINATED, 线程池被设置为TERMINATED状态。

必背的线程池知识你知道了吗?

2.3 线程池主要的任务处理流程

处理任务的核⼼⽅法是 execute ,我们看看 JDK 1.8 源码中 ThreadPoolExecutor 是 如何处理线程任务的:

// JDK 1.8public void execute(Runnable command) {     if (command == null) throw new NullPointerException();     int c = ctl.get();  // 1.当前线程数⼩于corePoolSize,则调⽤addWorker创建核⼼线程执⾏任务     if (workerCountOf(c) < corePoolSize) {     if (addWorker(command, true))  return;      c = ctl.get();     }  // 2.如果不⼩于corePoolSize,则将任务添加到workQueue队列。     if (isRunning(c) && workQueue.offer(command)) {     int recheck = ctl.get();     // 2.1 如果isRunning返回false(状态检查),则remove这个任务,然后执⾏拒绝策略。     if (! isRunning(recheck) && remove(command))     reject(command);     // 2.2 线程池处于running状态,但是没有线程,则创建线程     else if (workerCountOf(recheck) == 0)    addWorker(null, false);     }  // 3.如果放⼊workQueue失败,则创建⾮核⼼线程执⾏任务,     // 如果这时创建⾮核⼼线程失败(当前线程总数不⼩于maximumPoolSize时),就会执⾏拒绝策略。     else if (!addWorker(command, false))reject(command);}

ctl.get() 是获取线程池状态,⽤ int 类型表示。

第⼆步中,⼊队前进⾏了⼀ 次 isRunning 判断,⼊队之后,⼜进⾏了⼀次 isRunning 判断。 为什么要⼆次检查线程池的状态?

在多线程的环境下,线程池的状态是时刻发⽣变化的。很有可能刚获取线程池状态 后线程池状态就改变了。判断是否将 command 加⼊ workqueue 是线程池之前的状 态。倘若没有⼆次检查,万⼀线程池处于⾮RUNNING状态(在多线程环境下很有 可能发⽣),那么 command 永远不会执⾏。

总结⼀下处理流程

  1. 线程总数量 < corePoolSize,⽆论线程是否空闲,都会新建⼀个核⼼线程执⾏ 任务(让核⼼线程数量快速达到corePoolSize,在核⼼线程数量 < corePoolSize时)。注意,这⼀步需要获得全局锁。
  2. 线程总数量 >= corePoolSize时,新来的线程任务会进⼊任务队列中等待,然 后空闲的核⼼线程会依次去缓存队列中取任务来执⾏(体现了线程复⽤)。
  3. 当缓存队列满了,说明这个时候任务已经多到爆棚,需要⼀些“临时⼯”来执⾏ 这些任务了。于是会创建⾮核⼼线程去执⾏这个任务。注意,这⼀步需要获得 全局锁。
  4. 缓存队列满了, 且总线程数达到了maximumPoolSize,则会采取上⾯提到的 拒绝策略进⾏处理。

整个过程如图所示:

必背的线程池知识你知道了吗?

2.4 ThreadPoolExecutor如何做到线程复⽤的?

我们知道,⼀个线程在创建的时候会指定⼀个线程任务,当执⾏完这个线程任务之 后,线程⾃动销毁。但是线程池却可以复⽤线程,即⼀个线程执⾏完线程任务后不 销毁,继续执⾏另外的线程任务。那么,线程池如何做到线程复⽤呢?

原来,ThreadPoolExecutor在创建线程时,会将线程封装成⼯作线程worker,并放 ⼊⼯作线程组中,然后这个worker反复从阻塞队列中拿任务去执⾏

源码中ThreadPoolExecutor中有个内置对象Worker,每个worker都是一个线程,worker线程数量和参数有关,每个worker会while死循环从阻塞队列中取数据,通过置换worker中Runnable对象,运行其run方法起到线程置换的效果,这样做的好处是避免多线程频繁线程切换,提高程序运行性能。

3 四种常⻅的线程池

Executors 类中提供的⼏个静态⽅法来创建线程池。⼤家到了这⼀步,如果看懂了 前⾯讲的 ThreadPoolExecutor 构造⽅法中各种参数的意义,那么⼀看 到 Executors 类中提供的线程池的源码就应该知道这个线程池是⼲嘛的。

3.1 newCachedThreadPool

public static ExecutorService newCachedThreadPool() {     return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}/*按需要创建新线程的线程池。核心线程数为0,最大线程数为 Integer.MAX_VALUE,keepAliveTime为60秒,工作队列使用同步移交 SynchronousQueue。该线程池可以无限扩展,当需求增加时,可以添加新的线程,而当需求降低时会自动回收空闲线程。适用于执行很多的短期异步任务,或者是负载较轻的服务器。*/

CacheThreadPool 的运⾏流程如下:

  1. 提交任务进线程池。
  2. 因为corePoolSize为0的关系,不创建核⼼线程,线程池最⼤为 Integer.MAX_VALUE。
  3. 尝试将任务添加到SynchronousQueue队列。
  4. 如果SynchronousQueue⼊列成功,等待被当前运⾏的线程空闲后拉取执⾏。 如果当前没有空闲线程,那么就创建⼀个⾮核⼼线程,然后从 SynchronousQueue拉取任务并在当前线程执⾏。
  5. 如果SynchronousQueue已有任务在等待,⼊列操作将会阻塞。

当需要执⾏很多短时间的任务时,CacheThreadPool的线程复⽤率⽐较⾼, 会显 著的提⾼性能。⽽且线程60s后会回收,意味着即使没有任务进来, CacheThreadPool并不会占⽤很多资源。

3.2 newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {     return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,      new LinkedBlockingQueue<Runnable>());} //固定线程数的线程池。corePoolSize = maximumPoolSize, //keepAliveTime为0,工作队列使用无界的LinkedBlockingQueue。 //适用于为了满足资源管理的需求,而需要限制当前线程数量的场景,适用于负载比较重的服务器。

核⼼线程数量和总线程数量相等,都是传⼊的参数nThreads,所以只能创建核⼼线 程,不能创建⾮核⼼线程。因为LinkedBlockingQueue的默认⼤⼩是 Integer.MAX_VALUE,故如果核⼼线程空闲,则交给核⼼线程处理;如果核⼼线程 不空闲,则⼊列等待,直到核⼼线程空闲。

与CachedThreadPool的区别:

  • 因为 corePoolSize == maximumPoolSize ,所以FixedThreadPool只会创建核⼼线程。 ⽽CachedThreadPool因为corePoolSize=0,所以只会创建⾮核⼼线程。
  • 在 getTask() ⽅法,如果队列⾥没有任务可取,线程会⼀直阻塞在LinkedBlockingQueue.take() ,线程不会被回收。 CachedThreadPool会在60s后收回。
  • 由于线程不会被回收,会⼀直卡在阻塞,所以没有任务的情况下, FixedThreadPool占⽤资源更多。
  • 都⼏乎不会触发拒绝策略,但是原理不同。FixedThreadPool是因为阻塞队列 可以很⼤(最⼤为Integer最⼤值),故⼏乎不会触发拒绝策略; CachedThreadPool是因为线程池很⼤(最⼤为Integer最⼤值),⼏乎不会导 致线程数量⼤于最⼤线程数,故⼏乎不会触发拒绝策略。

3.3 newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService ( new ThreadPoolExecutor(1, 1,   0L, TimeUnit.MILLISECONDS,   new LinkedBlockingQueue<Runnable>()));}//只有一个线程的线程池。corePoolSize = maximumPoolSize = 1,//keepAliveTime为0, 工作队列使用无界的LinkedBlockingQueue。//适用于需要保证顺序的执行各个任务的场景。

有且仅有⼀个核⼼线程( corePoolSize == maximumPoolSize=1),使⽤了 LinkedBlockingQueue(容量很⼤),所以,不会创建⾮核⼼线程。所有任务按照 先来先执⾏的顺序执⾏。如果这个唯⼀的线程不空闲,那么新来的任务会存储在任 务队列⾥等待执⾏。

3.4 newScheduledThreadPool

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize return new ScheduledThreadPoolExecutor(corePoolSize);}//ScheduledThreadPoolExecutor():public ScheduledThreadPoolExecutor(int corePoolSize) {     super(corePoolSize, Integer.MAX_VALUE,      DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue());}//创建一个以延迟或定时的方式来执行任务的线程池,//工作队列为 DelayedWorkQueue。//适用于需要多个后台线程执行周期任务。

四种常⻅的线程池基本够我们使⽤了,但是《阿⾥把把开发⼿册》不建议我们直接 使⽤Executors类中的线程池,⽽是通过ThreadPoolExecutor的⽅式,这样的处理 ⽅式让写的同学需要更加明确线程池的运⾏规则,规避资源耗尽的⻛险。

但如果你及团队本身对线程池⾮常熟悉,⼜确定业务规模不会⼤到资源耗尽的程度 (⽐如线程数量或任务队列⻓度可能达到Integer.MAX_VALUE)时,其实是可以使 ⽤JDK提供的这⼏个接⼝的,它能让我们的代码具有更强的可读性。

4、线程池大小确定

要想合理的配置线程池大小,首先我们需要区分任务是CPU 密集型还是I/O密集型

对于CPU 密集型,设置 线程数 = CPU数 + 1,通常能实现最优的利用率。

对于I/O密集型,网上常见的说法是设置 线程数 = CPU数 * 2 ,这个做法是可以的,但不是最优的。

在我们日常的开发中,我们的任务几乎是离不开I/O的,常见的网络I/O(RPC调用)、磁盘I/O(数据库操作),并且I/O的等待时间通常会占整个任务处理时间的很大一部分,在这种情况下,开启更多的线程可以让 CPU 得到更充分的使用,一个较合理的计算公式如下:

线程数 = CPU数 * CPU利用率 * (任务等待时间 / 任务计算时间 + 1)

例如我们有个定时任务,部署在4核的服务器上,该任务有100ms在计算,900ms在I/O等待,

则线程数约为:4 * 1 * (1 + 900 / 100) = 40个。

当然,具体我们还要结合实际的使用场景来考虑。如果要求比较精确,可以通过压测来获取一个合理的值。

如何判断是 CPU 密集任务还是 IO 密集任务?

CPU 密集型简单理解就是利用 CPU 计算能力的任务比如你在内存中对大量数据进行排序。但凡涉及到网络读取,文件读取这类都是 IO 密集型,这类任务的特点是 CPU 计算耗费时间相比于等待 IO 操作完成的时间来说很少,大部分时间都花在了等待 IO 操作完成上。