JAVA-线程池_java固定线程池
1. 固定大小线程池(FixedThreadPool)
基本概念
固定大小线程池是一种线程数量固定的线程池,初始化时指定线程数量,这些线程会一直存活,即使处于空闲状态。
创建方式
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);
底层实现
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());}
核心参数解析
-
corePoolSize:等于传入的nThreads,表示核心线程数
-
maximumPoolSize:等于传入的nThreads,表示最大线程数
-
keepAliveTime:0毫秒,表示空闲线程立即终止(但实际不会,因为corePoolSize=maximumPoolSize)
-
workQueue:使用无界的LinkedBlockingQueue(默认容量为Integer.MAX_VALUE)
工作特点
-
线程数量固定,创建后不会改变
-
当所有线程都处于活动状态时,新任务会被放入无界队列等待
-
线程会一直存活,除非线程池被关闭
-
使用无界队列,任务永远不会被拒绝(除非内存耗尽)‘
使用场景
-
负载较重、任务量比较稳定的系统
-
后端服务处理HTTP请求
-
需要长期运行的任务调度器
-
批处理作业处理
潜在问题
由于使用无界队列,当任务提交速度持续高于处理速度时,会导致任务不断堆积在队列中,最终可能导致内存溢出(OOM)。
示例代码
// 创建固定大小为5的线程池ExecutorService executor = Executors.newFixedThreadPool(5);// 提交10个任务for (int i = 0; i { System.out.println(\"执行任务 \" + taskId + \" 线程: \" + Thread.currentThread().getName()); try { Thread.sleep(1000); // 模拟任务执行 } catch (InterruptedException e) { e.printStackTrace(); } });}// 关闭线程池executor.shutdown();
2.可缓存线程池(CachedThreadPool)
基本概念
CachedThreadPool是一种可根据需要动态创建线程的线程池,当线程空闲超过一定时间后会被回收。
创建方式
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
底层实现
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue());}
核心参数
-
corePoolSize: 0 (没有核心线程)
-
maximumPoolSize: Integer.MAX_VALUE (理论上无限)
-
keepAliveTime: 60秒 (空闲线程存活时间)
-
workQueue: SynchronousQueue (不存储元素的阻塞队列)
工作特点
-
线程数量不固定,根据需要动态创建
-
空闲线程60秒后会被回收
-
新任务到来时,如果有空闲线程则复用,否则创建新线程
-
使用SynchronousQueue,每个插入操作必须等待另一个线程的移除操作
使用场景
-
大量短生命周期的异步任务
-
请求突发性强、间隔长的服务
-
对响应速度要求较高的小型任务处理
-
测试环境或原型开发(不推荐生产环境使用)
潜在问题
-
可能创建大量线程导致系统资源耗尽
-
不适合执行长时间运行的任务
-
生产环境慎用,容易导致系统不稳定
示例代码
ExecutorService executor = Executors.newCachedThreadPool();for (int i = 0; i { System.out.println(\"执行任务 \" + taskId + \" 线程: \" + Thread.currentThread().getName()); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } });}// 监控线程数量Thread.sleep(2000);System.out.println(\"当前线程数: \" + ((ThreadPoolExecutor)executor).getPoolSize());executor.shutdown();
3.单线程线程池(SingleThreadExecutor)
基本概念
单线程线程池是固定大小线程池的特例,内部只有一个工作线程,所有任务会按顺序执行。
创建方式
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
底层实现
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()));}
核心参数解析
-
corePoolSize:1,表示只有一个核心线程
-
maximumPoolSize:1,表示最多只有一个线程
-
keepAliveTime:0毫秒(无意义,因为只有一个线程)
-
workQueue:使用无界的LinkedBlockingQueue
工作特点
-
只有一个工作线程执行所有任务
-
任务按提交顺序串行执行
-
如果线程异常终止,会创建一个新的线程替代
-
使用无界队列,任务永远不会被拒绝
使用场景
-
需要保证任务顺序执行的场景
-
日志写入(避免多线程并发写日志导致混乱)
-
文件读写操作(避免并发访问文件)
-
事件监听与处理队列
-
简单的后台任务处理
潜在问题
-
无界队列可能导致内存溢出
-
单个线程处理能力有限,不适合高负载场景
-
任务执行失败会影响后续任务
示例代码
ExecutorService executor = Executors.newSingleThreadExecutor();// 提交多个任务for (int i = 0; i { System.out.println(\"任务 \" + taskId + \" 开始执行\"); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(\"任务 \" + taskId + \" 执行完成\"); });}// 关闭线程池executor.shutdown();
4.自定义线程池(ThreadPoolExecutor)
基本概念
自定义线程池是通过直接实例化ThreadPoolExecutor
类来创建的线程池,可以精确控制所有参数。
创建方式
ThreadPoolExecutor customExecutor = new ThreadPoolExecutor( corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
核心参数详解
1. corePoolSize(核心线程数)
-
定义:线程池中保持活动状态的最小线程数,即使它们处于空闲状态
-
设置建议:
-
CPU密集型任务:CPU核心数N或N+1
-
IO密集型任务:2*CPU核心数或根据公式计算
-
公式:
最佳线程数 = N(CPU核心数) * (1 + WT(线程等待时间)/ST(线程计算时间))
-
2. maximumPoolSize(最大线程数)
-
定义:线程池中允许存在的最大线程数
-
设置建议:
-
一般设置为核心线程数的2-4倍
-
需要根据系统资源和任务特性调整
-
注意:当使用无界队列时,此参数无效
-
. keepAliveTime(线程空闲时间)
-
定义:当线程数超过核心线程数时,多余的空闲线程在等待新任务时的最长存活时间
-
单位:与unit参数配合使用
-
设置建议:
-
通常设置为60秒(与网络请求超时时间一致)
-
对于定时任务可以设置更长
-
4. unit(时间单位)
-
定义:keepAliveTime的时间单位
-
常用值:TimeUnit.SECONDS、TimeUnit.MILLISECONDS等
5. workQueue(任务队列)
-
定义:用于存放等待执行的任务的阻塞队列
-
常见实现类:
-
ArrayBlockingQueue:基于数组的有界队列
-
LinkedBlockingQueue:基于链表的队列,默认无界
-
SynchronousQueue:不存储元素的队列,每个插入操作必须等待另一个线程的移除操作
-
PriorityBlockingQueue:具有优先级的无界队列
-
DelayQueue:支持延时获取元素的无界队列
-
6. threadFactory(线程工厂)
-
定义:用于创建新线程的工厂
-
作用:
-
设置线程名称(便于问题排查)
-
设置线程优先级
-
设置线程是否为守护线程
-
设置UncaughtExceptionHandler
-
-
推荐使用Guava的ThreadFactoryBuilder:
ThreadFactory threadFactory = new ThreadFactoryBuilder() .setNameFormat(\"my-thread-%d\") .setDaemon(false) .build();
7. handler(拒绝策略)
-
定义:当线程池和队列都已满时,对新提交任务的处理策略
-
内置策略:
-
AbortPolicy(默认):抛出RejectedExecutionException
-
CallerRunsPolicy:由调用线程执行该任务
-
DiscardPolicy:直接丢弃任务
-
DiscardOldestPolicy:丢弃队列中最老的任务,然后尝试重新提交当前任务
-
-
自定义策略:实现RejectedExecutionHandler接口
工作流程
-
提交任务时,如果当前线程数 < corePoolSize,创建新线程执行任务
-
如果当前线程数 >= corePoolSize,将任务放入workQueue
-
如果workQueue已满且当前线程数 < maximumPoolSize,创建新线程执行任务
-
如果workQueue已满且当前线程数 >= maximumPoolSize,执行拒绝策略
使用场景
-
需要精确控制线程池行为的场景
-
高并发、高性能要求的系统
-
需要特殊队列或拒绝策略的业务
-
需要监控线程池状态的系统
示例代码
// 自定义线程工厂ThreadFactory threadFactory = new ThreadFactoryBuilder() .setNameFormat(\"custom-pool-%d\") .setUncaughtExceptionHandler((thread, throwable) -> { System.err.println(\"线程\" + thread.getName() + \"发生异常: \" + throwable.getMessage()); }) .build();// 自定义拒绝策略RejectedExecutionHandler rejectionHandler = (runnable, executor) -> { System.err.println(\"任务被拒绝,正在记录日志...\"); // 可以将任务信息持久化,稍后重试};// 创建自定义线程池ThreadPoolExecutor executor = new ThreadPoolExecutor( 5, // 核心线程数 10, // 最大线程数 60, // 空闲线程存活时间 TimeUnit.SECONDS, // 时间单位 new ArrayBlockingQueue(100), // 有界队列(容量100) threadFactory, // 线程工厂 rejectionHandler // 拒绝策略);// 监控线程ScheduledExecutorService monitor = Executors.newScheduledThreadPool(1);monitor.scheduleAtFixedRate(() -> { System.out.println(\"=== 线程池状态 ===\"); System.out.println(\"活跃线程数: \" + executor.getActiveCount()); System.out.println(\"核心线程数: \" + executor.getCorePoolSize()); System.out.println(\"最大线程数: \" + executor.getMaximumPoolSize()); System.out.println(\"已完成任务数: \" + executor.getCompletedTaskCount()); System.out.println(\"队列中的任务数: \" + executor.getQueue().size());}, 0, 5, TimeUnit.SECONDS);// 提交任务for (int i = 0; i { System.out.println(Thread.currentThread().getName() + \" 执行任务 \" + taskId); try { Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); } catch (RejectedExecutionException e) { System.err.println(\"任务 \" + taskId + \" 被拒绝\"); }}// 优雅关闭executor.shutdown();monitor.shutdown();
5.定时任务线程池(ScheduledThreadPool)
基本概念
ScheduledThreadPool可以用来执行延迟任务或定期执行任务,类似于定时器的功能。
创建方式
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);
底层实现
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize);}// ScheduledThreadPoolExecutor继承自ThreadPoolExecutorpublic ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue());}
核心参数
-
corePoolSize: 指定的核心线程数
-
maximumPoolSize: Integer.MAX_VALUE
-
keepAliveTime: 10毫秒 (DEFAULT_KEEPALIVE_MILLIS)
-
workQueue: DelayedWorkQueue (专门用于定时任务的优先级队列)
工作特点
-
支持延迟执行和周期执行任务
-
可以设置多个线程来处理定时任务
-
使用DelayedWorkQueue,任务按执行时间排序
-
即使maximumPoolSize设置为Integer.MAX_VALUE,实际线程数不会无限增长
主要方法
-
schedule(Runnable command, long delay, TimeUnit unit)
: 延迟执行 -
scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
: 固定频率执行 -
scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
: 固定延迟执行
使用场景
-
定时清理缓存或日志
-
心跳检测机制
-
定期数据同步任务
-
延迟任务执行
-
轮询检查系统状态
示例代码
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);// 延迟执行executor.schedule(() -> { System.out.println(\"延迟5秒执行: \" + new Date());}, 5, TimeUnit.SECONDS);// 固定频率执行(每3秒执行一次,不考虑任务执行时间)executor.scheduleAtFixedRate(() -> { System.out.println(\"固定频率执行: \" + new Date()); try { Thread.sleep(1000); // 模拟任务执行时间 } catch (InterruptedException e) { e.printStackTrace(); }}, 0, 3, TimeUnit.SECONDS);// 固定延迟执行(任务完成后延迟2秒再执行下一次)executor.scheduleWithFixedDelay(() -> { System.out.println(\"固定延迟执行: \" + new Date()); try { Thread.sleep(1000); // 模拟任务执行时间 } catch (InterruptedException e) { e.printStackTrace(); }}, 0, 2, TimeUnit.SECONDS);// 10秒后关闭executor.schedule(() -> executor.shutdown(), 10, TimeUnit.SECONDS);
6.工作窃取线程池(WorkStealingPool) - Java 8+
基本概念
WorkStealingPool是Java 8引入的线程池,基于ForkJoinPool实现,使用工作窃取(work-stealing)算法提高并行处理能力。
创建方式
ExecutorService workStealingPool = Executors.newWorkStealingPool();// 指定并行级别ExecutorService workStealingPool = Executors.newWorkStealingPool(4);
底层实现
public static ExecutorService newWorkStealingPool() { return new ForkJoinPool (Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true);}
核心特点
-
基于ForkJoinPool实现
-
使用工作窃取算法:空闲线程可以从其他线程的任务队列尾部\"窃取\"任务执行
-
默认并行级别等于CPU核心数
-
适合处理大量相互独立的小任务
-
任务可以生成子任务(ForkJoinTask)
工作窃取算法优势
-
减少线程竞争
-
提高CPU利用率
-
自动负载均衡
使用场景
-
递归任务处理
-
分治算法实现
-
大量小任务的并行处理
-
计算密集型任务
示例代码
ExecutorService executor = Executors.newWorkStealingPool(4);List<Callable> tasks = Arrays.asList( () -> \"Task1\", () -> \"Task2\", () -> \"Task3\", () -> \"Task4\");try { // 提交所有任务并获取Future列表 List<Future> futures = executor.invokeAll(tasks); // 获取结果 for (Future future : futures) { System.out.println(future.get()); }} catch (InterruptedException | ExecutionException e) { e.printStackTrace();} finally { executor.shutdown();}
7.ForkJoinPool (分治线程池)
基本概念
ForkJoinPool是专门为分治算法设计的线程池,Java 7引入,是WorkStealingPool的底层实现。
创建方式
ForkJoinPool forkJoinPool = new ForkJoinPool(4); // 指定并行级别
核心特点
-
使用工作窃取算法
-
适合处理可以递归分解的任务
-
任务可以fork(分解)和join(合并)
-
每个工作线程维护自己的任务队列
-
默认并行级别等于CPU核心数
与WorkStealingPool关系
-
WorkStealingPool是ForkJoinPool的简单封装
-
两者都使用工作窃取算法
-
ForkJoinPool提供更多控制和配置选项
使用场景
-
大规模并行计算
-
递归任务处理(如快速排序、归并排序)
-
大数据处理
-
复杂算法实现
示例代码
class FibonacciTask extends RecursiveTask { private final long n; FibonacciTask(long n) { this.n = n; } @Override protected Long compute() { if (n <= 1) { return n; } FibonacciTask f1 = new FibonacciTask(n - 1); f1.fork(); // 异步执行 FibonacciTask f2 = new FibonacciTask(n - 2); return f2.compute() + f1.join(); // 等待结果 }}public class Main { public static void main(String[] args) { ForkJoinPool pool = new ForkJoinPool(4); FibonacciTask task = new FibonacciTask(10); long result = pool.invoke(task); System.out.println(\"Fibonacci(10) = \" + result); pool.shutdown(); }}
8.SingleThreadScheduledExecutor (单线程定时任务线程池)
基本概念
SingleThreadScheduledExecutor是只有一个线程的定时任务线程池,保证任务按顺序执行。
创建方式
ScheduledExecutorService singleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
底层实现
public static ScheduledExecutorService newSingleThreadScheduledExecutor() { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1));}
核心特点
-
只有一个工作线程执行所有任务
-
任务按提交顺序执行
-
如果线程异常终止,会创建新线程替代
-
适合需要严格顺序执行的定时任务
使用场景
-
需要顺序执行的定时任务
-
简单的后台定时任务
-
单线程的事件调度器
示例代码
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();// 延迟执行executor.schedule(() -> System.out.println(\"延迟执行\"), 2, TimeUnit.SECONDS);// 固定频率执行executor.scheduleAtFixedRate(() -> { System.out.println(\"固定频率执行: \" + new Date());}, 0, 1, TimeUnit.SECONDS);// 10秒后关闭executor.schedule(() -> executor.shutdown(), 10, TimeUnit.SECONDS);
线程池类型对比总结
选择线程池的建议
-
CPU密集型任务:选择FixedThreadPool或自定义ThreadPoolExecutor,线程数设置为CPU核心数+1
-
IO密集型任务:选择自定义ThreadPoolExecutor,适当增加线程数,使用有界队列
-
定时/周期性任务:选择ScheduledThreadPool或SingleThreadScheduledExecutor
-
大量小任务并行处理:Java 8+环境下优先考虑WorkStealingPool
-
递归/分治算法:使用ForkJoinPool
-
生产环境:建议使用自定义ThreadPoolExecutor,避免使用无界队列
-
简单任务:如果只是偶尔执行简单任务,可以直接new Thread()
最佳实践总结
-
理解任务特性:根据任务类型(CPU/IO密集型)选择合适的线程池
-
避免无界队列:生产环境推荐使用有界队列并设置合理的拒绝策略
-
合理设置线程数:使用公式计算最佳线程数(CPU核心数 * (1 + 等待时间/计算时间))
-
监控线程池:定期收集线程池运行指标,如活跃线程数、队列大小等
-
资源隔离:不同业务使用不同线程池,避免相互影响
-
优雅关闭:使用shutdown()和awaitTermination()确保任务完成
-
异常处理:为线程设置UncaughtExceptionHandler,避免静默失败
-
线程命名:通过自定义线程工厂为线程命名,便于问题排查
通过合理选择和使用线程池,可以显著提高Java应用程序的性能和稳定性。在实际开发中,应根据具体业务场景和性能需求,选择最适合的线程池类型和配置参数。