> 技术文档 > SpringBoot 线程池 配置使用详解_线程池配置

SpringBoot 线程池 配置使用详解_线程池配置


一、核心特性

Springboot 集成
支持 @Async 注解,简化异步方法调用。
参数可配置化

核心线程数、最大线程数、队列容量、拒绝策略等均可通过配置调整。
生命周期管理

实现 Lifecycle 接口,支持线程池的启动和关闭(如应用关闭时优雅终止任务)。
任务装饰器

支持通过 TaskDecorator 对任务进行装饰(如传递上下文信息)

二、添加依赖

pom.xml 文件中添加 Spring Boot Starter AOP 依赖

 org.springframework.boot spring-boot-starter-aop org.springframework.boot spring-boot-starter-web

三、参数详解

通过 Spring 配置文件或 @Bean 定义线程池时,需设置以下关键参数:

参数名称 说明 默认值 corePoolSize 核心线程数,即使空闲也不会被回收 1 maxPoolSize 最大线程数,当队列满时创建新线程直到达到此值 Integer.MAX_VALUE queueCapacity 任务队列容量(使用 LinkedBlockingQueue 或 ArrayBlockingQueue) Integer.MAX_VALUE keepAliveSeconds 非核心线程的空闲存活时间(秒) 60 threadNamePrefix 线程名前缀,便于日志追踪 \"task-executor-\" allowCoreThreadTimeOut 是否允许核心线程超时回收 false rejectedExecutionHandler 拒绝策略(如 AbortPolicy、CallerRunsPolicy) AbortPolicy(直接抛出异常)

四、配置线程池

@Configuration@EnableAsyncpublic class ExecutorConfig { private static final Logger logger = LoggerFactory.getLogger(ExecutorConfig.class); @Value(\"${async.executor.thread.core_pool_size}\") private int corePoolSize; @Value(\"${async.executor.thread.max_pool_size}\") private int maxPoolSize; @Value(\"${async.executor.thread.queue_capacity}\") private int queueCapacity; @Value(\"${async.executor.thread.name.prefix}\") private String namePrefix; @Bean(name = \"asyncServiceExecutor\") public Executor asyncServiceExecutor() { logger.info(\"start asyncServiceExecutor\"); ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //配置核心线程数 executor.setCorePoolSize(corePoolSize); //配置最大线程数 executor.setMaxPoolSize(maxPoolSize); //配置队列大小 executor.setQueueCapacity(queueCapacity); //配置线程池中的线程的名称前缀 executor.setThreadNamePrefix(namePrefix); // rejection-policy:当pool已经达到max size的时候,如何处理新任务 // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //执行初始化 executor.initialize(); return executor; }}

@Value是我配置在 application.yml,可以参考配置,自由定义 

# 异步线程配置# 配置核心线程数async.executor.thread.core_pool_size = 5# 配置最大线程数async.executor.thread.max_pool_size = 5# 配置队列大小async.executor.thread.queue_capacity = 99999# 配置线程池中的线程的名称前缀async.executor.thread.name.prefix = async-service-

五、应用实践

1、异步任务处理

创建一个服务类 AsyncService,并在其方法上使用 @Async 注解来定义异步任务:

import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.scheduling.annotation.Async;import org.springframework.stereotype.Service;@Servicepublic class AsyncService { private static final Logger logger = LoggerFactory.getLogger(AsyncService.class); @Async(\"taskExecutor\") public void asyncTask(String taskName) { logger.info(Thread.currentThread().getName() + \" 开始执行任务: \" + taskName); try { Thread.sleep(2000); // 模拟耗时操作 } catch (InterruptedException e) { Thread.currentThread().interrupt(); logger.error(\"任务执行被中断\", e); } finally { logger.info(Thread.currentThread().getName() + \" 任务执行完成: \" + taskName); } }}

创建一个控制器类 AsyncController,用于触发异步任务(线程安全的)

import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.scheduling.annotation.AsyncResult;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;import java.util.concurrent.Future;@RestControllerpublic class AsyncController { private static final Logger logger = LoggerFactory.getLogger(AsyncController.class); @Autowired private AsyncService asyncService; @GetMapping(\"/trigger\") public String triggerAsyncTasks() { logger.info(\"开始触发异步任务\"); for (int i = 0; i < 10; i++) { asyncService.asyncTask(\"任务 \" + i); } return \"异步任务已触发\"; }}

创建一个监控组件 ThreadPoolMonitor,用于定期监控线程池的状态

import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.scheduling.annotation.Scheduled;import org.springframework.stereotype.Component;@Componentpublic class ThreadPoolMonitor { private static final Logger logger = LoggerFactory.getLogger(ThreadPoolMonitor.class); @Autowired private ThreadPoolTaskExecutor taskExecutor; @Scheduled(fixedRate = 60000) // 每分钟执行一次 public void monitorThreadPool() { int activeCount = taskExecutor.getActiveCount(); int poolSize = taskExecutor.getPoolSize(); int corePoolSize = taskExecutor.getCorePoolSize(); int maxPoolSize = taskExecutor.getMaxPoolSize(); int queueSize = taskExecutor.getThreadPoolExecutor().getQueue().size(); int completedTaskCount = taskExecutor.getThreadPoolExecutor().getCompletedTaskCount(); logger.info(\"线程池状态 - 活动线程数: {}, 当前线程数: {}, 核心线程数: {}, 最大线程数: {}, 队列大小: {}, 已完成任务数: {}\", activeCount, poolSize, corePoolSize, maxPoolSize, queueSize, completedTaskCount); // 检查线程池是否接近饱和 if (activeCount >= maxPoolSize * 0.8 || queueSize >= taskExecutor.getQueueCapacity() * 0.8) { logger.warn(\"线程池负载过高!请考虑优化配置或检查任务执行情况\"); } }}

 确保在启动类上添加 @EnableAsync 注解,以启用异步任务支持

import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.scheduling.annotation.EnableAsync;@SpringBootApplication@EnableAsyncpublic class AsyncDemoApplication { public static void main(String[] args) { SpringApplication.run(AsyncDemoApplication.class, args); }}

测试:

启动 Spring Boot 应用后,访问 http://localhost:8080/trigger,即可看到异步任务在线程池中执行的情况,同时线程池的状态也会定期输出到日志中。

代码说明

  • @EnableAsync 注解 :用于启用 Spring 的异步方法执行支持,确保 Spring 容器能够识别和处理带有 @Async 注解的方法。

  • @Async 注解 :用于标注希望异步执行的方法,需指定所使用的线程池 Bean 的名称,在本例中为 “taskExecutor”。当该方法被调用时,Spring 会将其提交到指定的线程池中执行。

  • ThreadPoolTaskExecutor :是 Spring 提供的一个线程池任务执行器,通过设置核心线程数、最大线程数、队列容量等参数,可以根据应用的需求灵活地配置线程池。

  • 异步任务失败处理 :通过自定义的拒绝策略,在线程池满时记录详细信息并抛出异常,以便及时发现任务执行失败的情况。

  • 线程池监控 :使用 @Scheduled 注解定期监控线程池的状态,包括活动线程数、当前线程数、核心线程数、最大线程数、队列大小和已完成任务数等,帮助开发者了解线程池的运行情况,以便及时进行优化和调整

2、高并发请求处理

在 Web 应用中处理大量并发请求,避免阻塞主线程

@RestControllerpublic class MyController { @Autowired private ThreadPoolTaskExecutor taskExecutor; @GetMapping(\"/process\") public CompletableFuture handleRequest() { return CompletableFuture.supplyAsync(() -> { // 耗时操作 return \"Result\"; }, taskExecutor); } }

3、定时任务调度

@EnableScheduling@Configurationpublic class SchedulerConfig { @Bean public ThreadPoolTaskScheduler taskScheduler() { ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); scheduler.setPoolSize(5); scheduler.setThreadNamePrefix(\"Scheduler-\"); return scheduler; } } @Servicepublic class ScheduledService { @Scheduled(fixedRate = 5000) public void scheduledTask() { // 定时任务逻辑 } }

 

拒绝策略(Rejected Policies

当线程池和队列均满时,处理新任务的策略:

策略类 行为描述 AbortPolicy 直接抛出 RejectedExecutionException(默认) CallerRunsPolicy 由提交任务的线程直接执行任务(同步阻塞提交者) DiscardPolicy 静默丢弃新任务,不抛异常 DiscardOldestPolicy 丢弃队列中最旧的任务,然后重试提交新任务

如下给出不同拒绝策略的配置类,请结合上面的配置类整合使用

import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.concurrent.BlockingQueue;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;@Configurationpublic class ThreadPoolConfig { @Bean(name = \"abortPolicyExecutor\") public ThreadPoolTaskExecutor abortPolicyExecutor() { return createExecutor(new ThreadPoolExecutor.AbortPolicy()); } @Bean(name = \"callerRunsPolicyExecutor\") public ThreadPoolTaskExecutor callerRunsPolicyExecutor() { return createExecutor(new ThreadPoolExecutor.CallerRunsPolicy()); } @Bean(name = \"discardPolicyExecutor\") public ThreadPoolTaskExecutor discardPolicyExecutor() { return createExecutor(new ThreadPoolExecutor.DiscardPolicy()); } @Bean(name = \"discardOldestPolicyExecutor\") public ThreadPoolTaskExecutor discardOldestPolicyExecutor() { return createExecutor(new ThreadPoolExecutor.DiscardOldestPolicy()); } private ThreadPoolTaskExecutor createExecutor(ThreadPoolExecutor.RejectedExecutionHandler rejectedExecutionHandler) { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(5); // 核心线程数 executor.setMaxPoolSize(10); // 最大线程数 executor.setQueueCapacity(100); // 队列容量 executor.setThreadNamePrefix(\"Task-Executor-\"); // 线程名前缀 executor.setRejectedExecutionHandler(rejectedExecutionHandler); executor.initialize(); return executor; }}

创建一个服务类 TaskService,用于执行任务

import org.springframework.scheduling.annotation.Async;import org.springframework.stereotype.Service;@Servicepublic class TaskService { @Async(\"abortPolicyExecutor\") public void executeWithAbortPolicy(String taskName) { executeTask(taskName); } @Async(\"callerRunsPolicyExecutor\") public void executeWithCallerRunsPolicy(String taskName) { executeTask(taskName); } @Async(\"discardPolicyExecutor\") public void executeWithDiscardPolicy(String taskName) { executeTask(taskName); } @Async(\"discardOldestPolicyExecutor\") public void executeWithDiscardOldestPolicy(String taskName) { executeTask(taskName); } private void executeTask(String taskName) { try { System.out.println(Thread.currentThread().getName() + \" 开始执行任务: \" + taskName); Thread.sleep(2000); // 模拟任务执行时间 System.out.println(Thread.currentThread().getName() + \" 任务执行完成: \" + taskName); } catch (InterruptedException e) { Thread.currentThread().interrupt(); System.err.println(\"任务执行被中断: \" + taskName); } }}

创建一个控制器类 TaskController,用于触发任务执行

import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RequestParam;import org.springframework.web.bind.annotation.RestController;@RestControllerpublic class TaskController { @Autowired private TaskService taskService; @GetMapping(\"/trigger/abort\") public String triggerAbortPolicy(@RequestParam String taskName) { taskService.executeWithAbortPolicy(taskName); return \"任务已提交到使用 AbortPolicy 的线程池\"; } @GetMapping(\"/trigger/caller\") public String triggerCallerRunsPolicy(@RequestParam String taskName) { taskService.executeWithCallerRunsPolicy(taskName); return \"任务已提交到使用 CallerRunsPolicy 的线程池\"; } @GetMapping(\"/trigger/discard\") public String triggerDiscardPolicy(@RequestParam String taskName) { taskService.executeWithDiscardPolicy(taskName); return \"任务已提交到使用 DiscardPolicy 的线程池\"; } @GetMapping(\"/trigger/discardoldest\") public String triggerDiscardOldestPolicy(@RequestParam String taskName) { taskService.executeWithDiscardOldestPolicy(taskName); return \"任务已提交到使用 DiscardOldestPolicy 的线程池\"; }}

启动 Spring Boot 应用后,分别访问以下 URL 来测试不同拒绝策略的行为:

  • http://localhost:8080/trigger/abort?taskName=任务1

  • http://localhost:8080/trigger/caller?taskName=任务2

  • http://localhost:8080/trigger/discard?taskName=任务3

  • http://localhost:8080/trigger/discardoldest?taskName=任务4

  • 代码说明

  • 线程池配置

    • 使用 ThreadPoolTaskExecutor 创建线程池。

    • 配置了 4 个不同的线程池,每个线程池使用不同的拒绝策略。

    • 每个线程池的核心线程数为 5,最大线程数为 10,队列容量为 100。

  • 拒绝策略

    • AbortPolicy:直接抛出 RejectedExecutionException

    • CallerRunsPolicy:由提交任务的线程直接执行任务。

    • DiscardPolicy:静默丢弃新任务,不抛异常。

    • DiscardOldestPolicy:丢弃队列中最旧的任务,然后重试提交新任务。

    • 任务执行

      • TaskService 类中的每个方法都使用 @Async 注解,并指定使用的线程池。

      • executeTask 方法模拟任务执行,包含一个 2 秒的睡眠时间。

      • 通过这个示例,你可以观察不同拒绝策略在任务被拒绝时的行为。例如,当线程池满时,AbortPolicy 会抛出异常,CallerRunsPolicy 会让提交任务的线程执行任务,DiscardPolicy 会静默丢弃任务,而 DiscardOldestPolicy 会丢弃最旧的任务并尝试提交新任务

    • 6、最佳配置

    • · 合理设置线程池参数
      CPU 密集型任务:核心线程数 ≈ CPU 核心数
      I/O 密集型任务:核心线程数 ≈ CPU 核心数 * 2,并增大队列容量。
      · 避免队列无限堆积
      设置合理的 queueCapacity,防止内存溢出(OOM)。
      · 统一异常处理
      通过 AsyncUncaughtExceptionHandler 捕获异步任务中的异常:

    • @Configuration public class AsyncConfig implements AsyncConfigurer { @Override public Executor getAsyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // ... 配置参数 return executor; } @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return (ex, method, params) -> { // 处理异常 }; } }

      应用退出时,调用 shutdown() 并等待剩余任务执行完毕

    • executor.shutdown(); try { if (!executor.awaitTermination(60, TimeUnit.SECONDS)) { executor.shutdownNow(); } } catch (InterruptedException e) { executor.shutdownNow(); }

      总结:

    • ThreadPoolTaskExecutor 是 Spring 生态中管理线程任务的利器,通过灵活的配置和与 Spring 的无缝集成,能够高效处理异步任务、高并发请求和定时调度。合理设置参数、选择拒绝策略,并结合监控手段,可显著提升系统性能和稳定性。