线程池
线程池的简介
线程池可以看成是线程的集合,之所以推荐使用线程池是因为当并发的线程数量很多的时候,没个线程执行一段时间之后,任务就结束了,这样频繁的创建线程会大大的降低系统的效率,因为频繁的创建线程和销毁销毁线程需要时间。
线程池的API
jdk提供Excutor框架在使用线程池
- 提供任务提交额任务执行分离开来的机制(解耦)
线程池的总体API架构
Executor接口
ExecutorService接口
AbstractExecutorService类
ScheduledExecutorService接口:
ForkJoinPool线程池
在jdk1.7中新增的一个线程池,与ThreadPoolExecutor一样,同样继承了AbstractExecutorService。ForkJoinPool是Fork/join框架的两大核心类之一。与其他类型的ExecutorService相比,其主要的不同在于工作窃取算法(work-stealing):使用池中的线程会尝试找到并执行已被提交到池中的或由其他线程创建的任务。这样很少有线程会处于空闲状态,非常高效,这使得能够有效处理以下场景大多数由任务产生大量子任务的情况;从外部客户端大量提交小任务到池中情况。
Callable和Future
学到线程池,我们会发现很多API都有Callable和Future这两个东西
Future submit(Runnable task) Future submit(Callable task)
我们可以简单的认为:Callable就是Runnable的扩展
Runnable没有返回值,不能抛出受检查的异常,而Callable可以,也就是说当我们的任务需要返回值的时候,我们可以使用Callable
Future一般就是我们认为的Callable的返回值,但他其实代表的是任务的任命周期(当然,他是能够获取得到Callable的返回值)
简单的用法
import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;public class Demo { public static void main(String[] args) throws InterruptedException, ExecutionException { // 创建线程池对象 ExecutorService pool = Executors.newFixedThreadPool(2); // 可以执行Runnable对象或者Callable对象代表的线程 Future<Integer> f1 = pool.submit(new MyCallable(100)); Future<Integer> f2 = pool.submit(new MyCallable(200)); // V get() Integer i1 = f1.get(); Integer i2 = f2.get(); System.out.println(i1); System.out.println(i2); // 结束 pool.shutdown(); }}
import java.util.concurrent.Callable;public class MyCallable implements Callable<Integer> { private int number; public MyCallable(int number) { this.number = number; } @Override public Integer call() throws Exception { int sum = 0; for (int x = 1; x <= number; x++) { sum += x; } return sum; }}
结果
ThreadPoolExecutor
- 使用线程池的线程来完成任务,一般使用Executor工厂来配置
- 线程池提供了两个好处
- 线程重复使用,减少创建线程个数,提供性能
- 提供限定和管理的手段
- 该类提供很好的拓展性,但一般我们使用Executor工厂方法就可以创建三种非常好用的线程池
- 核心线程数量corePoolSize和最大线程数量maximumPoolSize
- 如果运行的线程少于corePoolSize,则创建新的线程来处理请求,即使其他的辅助线程是空闲的
- 如果运行的线程大于corePoolSize,小于maxmumPoolSize,则仅当队列满时创建新线程
- 如果设置的corePoolSize和maxmumPoolSize相同,则创建固定大小的线程池
- 如果设置了maxmumPoolSize最大值,那么允许池适应任意数量的并发任务
这里举一个例子:假如有一个工厂,工厂里面有10个工人,每个工人同时只能做一件任务。
因此只要当10个工人中有工人是空闲的,来了任务就分配给空闲的工人做;
当10个工人都有任务在做时,如果还来了任务,就把任务进行排队等待;
如果说新任务数目增长的速度远远大于工人做任务的速度,那么此时工厂主管可能会想补救措施,比如重新招4个临时工人进来;
然后就将任务也分配给这4个临时工人做;
如果说着14个工人做任务的速度还是不够,此时工厂主管可能就要考虑不再接收新的任务或者抛弃前面的一些任务了。
当这14个工人当中有人空闲时,而新任务增长的速度又比较缓慢,工厂主管可能就考虑辞掉4个临时工了,只保持原来的10个工人,毕竟请额外的工人是要花钱的。
这个例子中的corePoolSize就是10,而maximumPoolSize就是14(10+4)。
也就是说corePoolSize就是线程池大小,maximumPoolSize在我看来是线程池的一种补救措施,即任务量突然过大时的一种补救措施。
不过为了方便理解,在本文后面还是将corePoolSize翻译成核心池大小。
largestPoolSize只是一个用来起记录作用的变量,用来记录线程池中曾经有过的最大线程数目,跟线程池的容量没有任何关系。
-
默认只有当任务请求过来,才会初始化线程,当然可以使用方法重写
-
新创建的线程默认由线程工厂创建的,都是属于同一个线程组,相同优先级,没有守护线程
-
如果线程创建失败(return null),执行器会继续执行,但是不会执行任务
-
如果线程数大量核心线程,如果空闲线程时间>keepAliveTime,那么会销毁
-
如果少于corePoolSize线程正在运行,Executor会添加一个新的线程而不是放入队列
-
如果队列满了(不能入列),会先开一个线程来处理,除非超过最大线程数,这样的话就会拒绝这个请求
-
三种排队策略
- 同步移交策略:
- 该策略不会将任务放到队列中,而是直接移交给执行它的线程
- 如果当前没有线程执行它,很可能会创建一个线程
- 一般用于线程池是无界限的情况
- 无界限策略
- 如果所有的核心线程都在工作,那么新的线程会子啊队列中等待
- 因此,线程的创建不会多于核心线程的数量(其他的都在队列中等待)
- 有界性策略
- 避免资源耗尽的情况
- 如果线程池较小,而队列比较大,一定程度上减少内存的使用量,但代价是限制吞吐量
- 同步移交策略:
-
拒绝任务
- 线程池关闭
- 线程池数量满了,队列满了
- 以上两种情况都会发生
-
拒绝策略
- 直接抛出异常,默认策略
- 用调用者所在的线程来执行任务
- 直接丢掉这个任务
- 丢弃最旧的一条任务
-
线程池状态:
- RUNNING:线程池能够接受新的任务,以及对新添加的任务进行处理
- SHUTDOWN:线程池不可以接受新的任务,但是可以对已添加的任务进行处理
- STOP:线程池不接收新的任务,不处理已添加的任务,并且会中断正在处理的任务
- TIDYING:当所有的任务已终止,ctl记录的任务数量为0,线程池会变为TIDYING状态,当线程池变为TIDYING状态时,会执行钩子函数terminated(),terminated在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;可以通过重载terminated()函数来实现
- TERMINATED:线程池彻底终止的状态
状态 | 高三位 | 工作队列workers中的任务 | 阻塞队列workQueue中的任务 | 是否添加任务 |
---|---|---|---|---|
RUNNING | 111 | 继续处理 | 继续处理 | 添加 |
SHUTDOWN | 000 | 继续处理 | 继续处理 | 不添加 |
STOP | 001 | 尝试中断 | 不处理 | 不添加 |
TIDYING | 010 | 处理完了 | 如果由SHUTDOWN-TIDYING,那就是处理完了,如果由STOP-TIDYING,那就是不处理 | 不添加 |
TERMINATED | 011 | 同TIDYING | 同TIDYING | 不添加 |
常见的池
- newFixedThreadPool
- newCachedThreadPool
- SingleThreadExecutor
newFixedThreadPool
一个固定线程数的线程池,返回一个corePoolSize和maxmumPool相等的线程池
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); }
newCacheThreadPool
非常有弹性的线程池,对于新的任务,如果此时线程池里没有空闲的线程,线程池会毫不犹豫的创建一个新的线程去处理这个任务
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue()); }
SingleThreadExecutor
使用单个worker线程的Executor
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())); }
构造方法
- 构造方法可以让我自定义线程池
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
- 指定线程池数量
- 指定最大线程数量
- 允许线程空闲时间
- 时间对象
- 阻塞队列
- 线程工厂
- 任务拒接策略
execute的执行方法
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to 如果只有少于核心线程数的线程在允许 * start a new thread with the given command as its first 尝试启动一个新的线程随着这个被给予的命令作为他的任务 * task. The call to addWorker atomically checks runState and 这这个要求addWorker检查运行状态和工人数量 * workerCount, and so prevents false alarms that would add 以防止误报警将会增加 * threads when it shouldn't, by returning false. 线程,当他实际上不需要,return false * * 2. If a task can be successfully queued, then we still need 如果一个任务可以成功被排队缓存,然而我们任然需要 * to double-check whether we should have added a thread 二级检查当我们需要添加一个线程的时候 * (because existing ones died since last checking) or that 因为存在现有的线程死亡从上一次检查开始 * the pool shut down since entry into this method. So we 或者这个线程池被关闭当进入这个方法的时候 * recheck state and if necessary roll back the enqueuing if 所以我们需要重新检查状态而且有必要时回滚入队当线程池停止的时候, * stopped, or start a new thread if there are none.或者重新启动一个新的线程当没有线程的时候 * * 3. If we cannot queue task, then we try to add a new 如果我们不能靠队列缓存任务的时候,那么我们需要增加一个新的线程 * thread. If it fails, we know we are shut down or saturated 如果失败,就以为线程池已经关闭或者饱和 * and so reject the task. 所以需要拒绝任务 */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
线程池的关闭
shutdown():
不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务
shutdownNow():
立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务
如何合理的配置
-
CPU密集型
尽量使用较小的线程池,一般为CPU核心数+1
因为CPU密集型任务CPU的使用率很高,若开过多的线程,只能增加线程上下文的切换次数,带来额外的开销 -
I/O密集型
- 使用较大的线程池,一般为CPU核心数*2
IO密集型CPU使用率不高,可以让CPU等待IO的时候处理别的任务,充分利用CPU时间 - 线程等待时间所占比例较高,需要越多的线程,线程CPU时间所占比例越高,需要越少线程
比如平均每个线程CPU运行时间为0.5s,而线程等待时间(非CPU运行时间,比如IO)为1.5s,CPU核心数为8,那么根据上面这个公式估算得到:((0.5+1.5)/0.5)*8=32。这个公式进一步转化为:
最佳线程数目 = (线程等待时间与线程CPU时间之比 + 1)* CPU数目
- 使用较大的线程池,一般为CPU核心数*2
参考
https://www.cnblogs.com/dolphin0520/p/3932921.html
https://mp.weixin.qq.com/s?__biz=MzI4Njg5MDA5NA==&mid=2247484214&idx=1&sn=9b5c977e0f8329b2bf4c29d230c678fb&chksm=ebd74237dca0cb212f4505935f9905858b9166beddd4603c3d3b5386b5dd8cf240c460a8e7c4&scene=21wechat_redirect
https://www.cnblogs.com/cherish010/p/8334952.html