面试高频之线程池
在面试中关于线程池的问题出现率还是很高的,今天就做下总结。
为什么用线程池
线程池提供了一种限制和管理资源(包括执行一个任务)的方式。 每个线程池还维护一些基本统计信息,例如已完成任务的数量。
这里借用《Java 并发编程的艺术》提到的来说一下使用线程池的好处:
-
降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。 -
提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。 -
提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
线程池体系
-
Executor是整体线程池体系的顶层接口,只有一个execute方法,用于执行任务,对应的源码截图
-
ExecutorService 继承并拓展了 Executor,在 ExecutorService 内部提供了更全面的任务提交机制以及线程池关闭方法。 -
ThreadPoolExecutor 是 ExecutorService 的默认实现,所谓的线程池机制也大多封装在此类当中,其他常见的线程池都是通过此类的配置不同参数实现的。
public class ThreadPoolExecutor extends AbstractExecutorService
public abstract class AbstractExecutorService implements ExecutorService
-
ScheduledExecutorService 继承自 ExecutorService,增加了定时任务相关方法 -
ScheduledThreadPoolExecutor继承自ThreadPoolExecutor,并实现了 ScheduledExecutorService 接口。
//ScheduledExecutorService继承ExecutorService接口
public class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor
implements ScheduledExecutorService
ThreadPoolExecutor(重点)
先看一下构造函数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
就参数进行说明一下:
-
corePoolSize:核心线程数。除非设置了allowCoreThreadTimeOut,即使空闲时,也不会销毁。 -
maximumPoolSize:线程池中同时运行的最大线程数,必须大于或等于 1。如果和 corePoolSize 相等即是固定大小线程池。 -
workQueue:当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。 -
keepAliveTime:当线程池中的线程数量大于 corePoolSize 的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了 keepAliveTime才会被回收销毁 -
unit:线程空闲时的时间单位,枚举值有 MILLISECONDS、SECONDS、MINUTES、HOURS 等。 -
threadFactory:线程工厂,线程池中使用它来创建线程,如果传入的是 null,则使用默认工厂类 DefaultThreadFactory。 -
handler:任务饱和时,执行拒绝策略的对象。当 workQueue 满了之后并且活动线程数大于 maximumPoolSize 的时候,线程池通过该策略处理请求。有如下几种默认实现。
实际上拒绝策略都是实现自接口 RejectedExecutionException,开发者也可以通过实现此接口,定制自己的拒绝策略。
创建线程池
推荐使用 ThreadPoolExecutor 构造函数创建线程池
在《阿里巴巴 Java 开发手册》“并发处理”这一章节,明确指出线程资源必须通过线程池提供,不允许在应用中自行显式创建线程。
为什么呢?
使用线程池的好处是减少在创建和销毁线程上所消耗的时间以及系统资源开销,解决资源不足的问题。如果不使用线程池,有可能会造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题。
另外,《阿里巴巴 Java 开发手册》中强制线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 构造函数的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险
Executors 返回线程池对象的弊端如下:
FixedThreadPool 和 SingleThreadExecutor : 允许请求的队列长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM。 CachedThreadPool 和 ScheduledThreadPool : 允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。
我们逐个看一下这几个线程池的实现源码。
FixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue());
}
核心线程与最大线程数是通过调用时传入, 关键问题是采用无界队列 LinkedBlockingQueue(队列的容量为 Integer.MAX_VALUE)作为线程池的工作队列,任务堆积很多时,会造成OOM。
SingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue()));
}
线程池中只有一个线程处理任务,也是采用的无界的任务队列,如果任务堆积多时,会出现和FixedThreadPool一样的问题。
CachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue());
}
CachedThreadPool 的corePoolSize 被设置为空(0),maximumPoolSize被设置为 Integer.MAX.VALUE,即它是无界的,这也就意味着如果主线程提交任务的速度高于 maximumPool 中线程处理任务的速度时,CachedThreadPool 会不断创建新的线程。极端情况下,这样会导致耗尽 cpu 和内存资源。
ScheduledThreadPoolExecutor
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
ScheduledThreadPoolExecutor 使用的任务队列 DelayQueue 封装了一个 PriorityQueue,PriorityQueue 会对队列中的任务进行排序,执行所需时间短的放在前面先被执行(ScheduledFutureTask 的 time 变量小的先执行),如果执行所需时间相同则先提交的任务将被先执行(ScheduledFutureTask 的 squenceNumber 变量小的先执行)。
实际项目中基本上不会用到,只做了解即可。不推荐使用的原因也是因为极端情况下会创建大量的线程,导致OOM。
线程池的运行过程
1.当前线程池中运行的线程数量还没有达到 corePoolSize 大小时,线程池会创建一个新线程执行提交的任务,无论之前创建的线程是否处于空闲状态。
public class ThreadPoolExample {
public static void main(String[] args) throws InterruptedException{
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
for (int i = 0; i < 5; i++) {
final int taskId = i;
fixedThreadPool.execute(new Runnable() {
@Override
public void run() {
try {
System.out.println("线程:" + Thread.currentThread().getName()
+ " 正在执行task:" + taskId);
Thread.sleep(100);
}catch (InterruptedException e){
e.printStackTrace();
}
}
});
Thread.sleep(2000);
}
fixedThreadPool.shutdown();
}
}
运行结果
线程:pool-1-thread-1 正在执行task:0
线程:pool-1-thread-2 正在执行task:1
线程:pool-1-thread-3 正在执行task:2
线程:pool-1-thread-1 正在执行task:3
线程:pool-1-thread-2 正在执行task:4
进程已结束,退出代码0
2.当前线程池中运行的线程数量已经达到 corePoolSize 大小时,线程池会把任务加入到等待队列中,直到某一个线程空闲了,线程池会根据我们设置的等待队列规则,从队列中取出一个新的任务执行。
public class ThreadPoolExample {
public static void main(String[] args) throws InterruptedException{
ThreadPoolExecutor fixedThreadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(3);
for (int i = 0; i < 5; i++) {
final int taskId = i;
fixedThreadPool.execute(new Runnable() {
@Override
public void run() {
try {
System.out.println("线程:" + Thread.currentThread().getName()
+ " 正在执行task:" + taskId);
Thread.sleep(2000);
}catch (InterruptedException e){
e.printStackTrace();
}
}
});
System.out.println("此时等待队列中有 " + fixedThreadPool.getQueue().size() + "个元素");
Thread.sleep(500);
}
fixedThreadPool.shutdown();
}
}
运行结果
此时等待队列中有 0个元素
线程:pool-1-thread-1 正在执行task:0
此时等待队列中有 0个元素
线程:pool-1-thread-2 正在执行task:1
此时等待队列中有 0个元素
线程:pool-1-thread-3 正在执行task:2
此时等待队列中有 1个元素
线程:pool-1-thread-1 正在执行task:3
此时等待队列中有 1个元素
线程:pool-1-thread-2 正在执行task:4
进程已结束,退出代码0
3.如果线程数大于 corePoolSize 数量但是还没有达到最大线程数 maximumPoolSize,并且等待队列已满,则线程池会创建新的线程来执行任务。
public class ThreadPoolExample {
public static void main(String[] args) throws InterruptedException{
ThreadPoolExecutor fixedThreadPool = new ThreadPoolExecutor(2,10,
0l, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(2));
for (int i = 0; i <= 5; i++) {
final int taskId = i;
fixedThreadPool.execute(new Runnable() {
@Override
public void run() {
try {
System.out.println("线程:" + Thread.currentThread().getName()
+ " 正在执行task:" + taskId);
Thread.sleep(4000);
}catch (InterruptedException e){
e.printStackTrace();
}
}
});
System.out.println("此时等待队列中有 " + fixedThreadPool.getQueue().size() + "个元素" + " taskId: " + taskId);
}
fixedThreadPool.shutdown();
}
}
运行结果
此时等待队列中有 0个元素 taskId: 0
线程:pool-1-thread-1 正在执行task:0
此时等待队列中有 0个元素 taskId: 1
线程:pool-1-thread-2 正在执行task:1
此时等待队列中有 1个元素 taskId: 2
此时等待队列中有 2个元素 taskId: 3
此时等待队列中有 2个元素 taskId: 4
线程:pool-1-thread-3 正在执行task:4
此时等待队列中有 2个元素 taskId: 5
线程:pool-1-thread-4 正在执行task:5
线程:pool-1-thread-1 正在执行task:2
线程:pool-1-thread-2 正在执行task:3
4.最后如果提交的任务,无法被核心线程直接执行,又无法加入等待队列,又无法创建“非核心线程”直接执行,线程池将根据拒绝处理器定义的策略处理这个任务,如果没有设置策略,默认就是ThreadPoolExecutor.AbortPolicy,抛出RejectedExecutionException,所以在设置参数时一定要小心。
public class ThreadPoolExample {
public static void main(String[] args) throws InterruptedException{
ThreadPoolExecutor fixedThreadPool = new ThreadPoolExecutor(2,3,
0l, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(2));
for (int i = 0; i <= 5; i++) {
final int taskId = i;
fixedThreadPool.execute(new Runnable() {
@Override
public void run() {
try {
System.out.println("线程:" + Thread.currentThread().getName()
+ " 正在执行task:" + taskId);
Thread.sleep(4000);
}catch (InterruptedException e){
e.printStackTrace();
}
}
});
System.out.println("此时等待队列中有 " + fixedThreadPool.getQueue().size() + "个元素" + " taskId: " + taskId);
}
fixedThreadPool.shutdown();
}
}
运行结果
此时等待队列中有 0个元素 taskId: 0
线程:pool-1-thread-1 正在执行task:0
此时等待队列中有 0个元素 taskId: 1
此时等待队列中有 1个元素 taskId: 2
此时等待队列中有 2个元素 taskId: 3
线程:pool-1-thread-2 正在执行task:1
此时等待队列中有 2个元素 taskId: 4
线程:pool-1-thread-3 正在执行task:4
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task thread.ThreadPoolExample$1@60e53b93 rejected from java.util.concurrent.ThreadPoolExecutor@5e2de80c[Running, pool size = 3, active threads = 3, queued tasks = 2, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at thread.ThreadPoolExample.main(ThreadPoolExample.java:11)
线程:pool-1-thread-3 正在执行task:2
线程:pool-1-thread-2 正在执行task:3
针对上述4中情况的处理,基本上可以用下面这张图概况一下
其他问题
execute() 和 submit()
-
execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功与否; -
submit()方法用于提交需要返回值的任务。线程池会返回一个 Future 类型的对象,通过这个 Future 对象可以判断任务是否执行成功,并且可以通过 Future 的 get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用 get(long timeout,TimeUnit unit)方法的话,如果在 timeout 时间内任务还没有执行完,就会抛出 java.util.concurrent.TimeoutException。
shutdown() 和 shutdownNow()
-
shutdown():关闭线程池,线程池的状态变为 SHUTDOWN。线程池不再接受新任务了,但是队列里的任务得执行完毕。
/**
* Initiates an orderly shutdown in which previously submitted
* tasks are executed, but no new tasks will be accepted.
* Invocation has no additional effect if already shut down.
*
* This method does not wait for previously submitted tasks to
* complete execution. Use {@link #awaitTermination awaitTermination}
* to do that.
*
* @throws SecurityException {@inheritDoc}
*/
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
-
shutdownNow():关闭线程池,线程的状态变为 STOP。线程池会终止当前正在运行的任务,并停止处理排队的任务并返回正在等待执行的 List。
/**
* Attempts to stop all actively executing tasks, halts the
* processing of waiting tasks, and returns a list of the tasks
* that were awaiting execution. These tasks are drained (removed)
* from the task queue upon return from this method.
*
* This method does not wait for actively executing tasks to
* terminate. Use {@link #awaitTermination awaitTermination} to
* do that.
*
*
There are no guarantees beyond best-effort attempts to stop
* processing actively executing tasks. This implementation
* cancels tasks via {@link Thread#interrupt}, so any task that
* fails to respond to interrupts may never terminate.
*
* @throws SecurityException {@inheritDoc}
*/
public List shutdownNow() {
List tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
isTerminated() 和 isShutdown()
-
isShutDown: 当调用 shutdown() 方法后返回为 true。 -
isTerminated: 当调用 shutdown() 方法后,并且所有提交的任务完成后返回为 true
今天整理的线程池相关问题,希望能在大家平时工作中以及面试中有所帮助。