Java队列、线程池及ThreadPoolExecutor自定义线程池实现
目录
- 1、阻塞队列
- 2、队列分类
- 3、API使用
- 4、线程池
-
- 4.1、线程池参数
- 4.2、线程池实现
- 4.3、任务执行流程
- 4.4、拒绝策略
- 4.5、参数合理值设置
- 5、自定义线程池流程
- 6、自定义线程池实现
-
- 6.1、AsyncFactory及AsyncBean实现
- 6.2、ThreadManager
- 6.3、ThreadPoolExecutor配置
- 6.4、Controller
- 6.5、源码地址:
1、阻塞队列
使用阻塞队列开发时不需要关心什么时候阻塞线程,什么时候唤醒线程,一切由阻塞队列管理。
2、队列分类
阻塞队列有7大分类,其中3个最常用:
ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列,大小默认为Integer的MAX。
SynchronousQueue:一个不存储元素的阻塞队列,即单个元素的队列。
PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
DelayQueue:一个使用优先级队列实现的无界阻塞队列。
LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
3、API使用
注意:element、peek是指检查队列第一个元素是是什么
抛出异常:
是指当阻塞队列满时候,再往队列里插入元素,会抛出 IllegalStateException(“Queue full”) 异常。当队列为空时,从队列里获取元素时会抛出 NoSuchElementException 异常 。
返回特殊值:
插入方法会返回是否成功,成功则返回 true。移除方法,则是从队列里拿出当前移除的元素对象,如果没有则返回 null。
一直阻塞:
当阻塞队列满时,往队列里 put 元素,队列会一直阻塞线程或者响应中断,直到队列有线程被取出。当队列空时,从队列里 take 元素时,队列会一直阻塞线程,直到队列有新的线程被插入。
超时退出:
当阻塞队列满时,队列会阻塞线程一段时间,如果超过一定的时间,线程就会退出。
4、线程池
4.1、线程池参数
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(…params) 七大参数:
- corePoolSize 核心线程数,指保留的线程池大小。
- maximumPoolSize 指的是线程池的最大大小。
- keepAliveTime 指的是多余空闲线程(空闲数大于核心数)时,超过keepAliveTime 时间后将停止,直到只剩下corePoolSize个线程。
- unit 是一个枚举,表示 keepAliveTime 的单位。
- workQueue 表示存放任务的队列(存放需要被线程池执行的线程队列)。
- threadFactory 线程工厂,一般默认即可。
- handler 拒绝策略(线程池数量已满时任务会添加失败,添加任务失败后如何处理该任务)。
4.2、线程池实现
线程池通过Executor框架实现,使用了Executor、Executors、ExecutorService、ThreadPoolExecutor。
线程池有四种实现方式:
Executors.newSingleThreadExecutor(); //创建使用单个线程的线程池Executors.newFixedThreadPool(n); // 创建使用固定线程数的线程池Executors.newCachedThreadPool(); // 可扩容线程数的线程池Executors.newScheduledThreadPool() // 创建固定线程数的线程池,支持定时及周期性任务执行。
这些线程池的创建,底层通过ThreadPoolExecutor实现。
1、SingleThreadExecutor:
单线程线程池,如果当前线程出现异常会开启一个新的线程来执行,保证任务的执行顺序和提交顺序一致,初始化参数如下:
• corePoolSize:1 • maximumPoolSize: 1• keepAliveTime: 0L• workQueue:new LinkedBlockingQueue<Runnable>()
2、FixedThreadPool:
固定大小的线程池,最大线程数与核心线程数一致,每提交一个任务就创建一个线程,直到达到最大线程数,当某一个线程出现异常后会创建一个新的线程,适用于固定并发,初始化参数如下:
• corePoolSize: nThreads• maximumPoolSize: nThreads• keepAliveTime: 0L• workQueue:new LinkedBlockingQueue<Runnable>()
3、CachedThreadPool:
是一个无界的、可以扩容的线程池,线程数量的上限取决于服务器性能,每当有任务时立马线程来执行,当有空闲线程时,默认60秒才会停止,主要初始化参数如下:
• corePoolSize: 0• maximumPoolSize: Integer.MAX_VALUE• keepAliveTime: 60L // TimeUnit.SECONDS• workQueue:new SynchronousQueue<Runnable>()
4、ScheduledThreadPool:
是一个固定线程核心数,无界的线程池,支持定时执行任务,如果存在空闲线程,会被立刻停止,初始化参数如下:
• corePoolSize: corePoolSize• maximumPoolSize: Integer.MAX_VALUE• keepAliveTime: 0• workQueue:new DelayedWorkQueue()
4.3、任务执行流程
步骤说明:
1、当任务进入时,运行的线程数小于corePoolSize,立马创建线程执行任务。
2、线程数量大于或等于corePoolSize时任务加入到队列中。
3、当队列满了,但是线程数量小于maximumPoolSize,则创建非核心线程来执行任务。
4、当队列满了,并且线程数量大于或等于maximumPoolSize,则进行拒绝策略。
5、当空闲的线程数超过keepAliveTime后会被释放。
当一个线程完成任务后,会继续从队列中取出任务来执行,但是当没有任务可以执行时,会在超过keepAliveTime后并且运行中的线程数量大于corePoolSize则空闲的线程会停止。
4.4、拒绝策略
- AbortPolicy(默认):丢弃任务并抛出RejectedExecutionException异常。
- DiscardPolicy:也是丢弃任务,但是不抛出异常。
- DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)。
- CallerRunsPolicy:由调用线程处理该任务。
- 自定义拒绝策略
RejectedExecutionHandler handler = new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { // 自定义拒绝后的操作 } };
4.5、参数合理值设置
核心线程数:CPU密集型:CPU核心数+1,IO密集型:CPU核心数/(1-阻尼系数),阻尼系数:0.8~0.9
或者使用以下计算公式:
- tasktime:每个任务平均执行时间,假如0.1秒。
- tasks:系统每秒需要处理的任务个数,假如100~200个。
- responseTime:系统期望的最大响应时间,假如1秒。响应时间不能太大。
- CorePoolSize:tasks* tasktime,即0.1*(100~200) = 10~20个线程,根据8020原则,如果80%情况下任务小于160(200x80%),则CorePoolSize数为16(20x80%),反之则CorePoolSize为20,假如取16个。
workQueue:(corePoolSize/tasktime)responsetime,即(16/0.1)*1 =160个。 - maxPoolSize:每秒最多任务数为200,每秒通常160个任务时,需要16个线程,所以当持续达到最大任务数时,(最大任务数- workQueue)x( CorePoolSize /每秒通常任务数),即(200-160)x(16/160) = 30,如果最大任务数小于workQueue数,则maxPoolSize = CorePoolSize 。
5、自定义线程池流程
1、通过new ThreadPoolExecutor()方法创建线程池。
2、自定义一个拒绝策略以及LinkedBlockingQueue队列来实现缓存队列,当任务被拒绝后,通过自定义拒绝策略加入缓存队列中。
3、通过Executors.newScheduledThreadPool()创建一个定时任务,每隔一段时间判断当前线程池中已存在的队列数量是否小于设定的队列大小,如果是从缓存队列中获取线程然后加入到线程池中。
4、创建Runnable接口实现类,通过threadPool.execute(Runable runable) 创建任务,如果想要传递参数,可以在初始化Runnable接口实现类时,通过构造器传递参数。
6、自定义线程池实现
代码实现目录结构如下:
6.1、AsyncFactory及AsyncBean实现
AsyncBean:用于存储线程传递的参数的Bean对象。
AsyncFactory:异步工厂,用于执行execute
开启线程。
AsyncBean:
/** * 用于存储线程传递的参数的Bean对象 */public class AsyncBean { /** * 具体的业务执行类 */ private Class<?> threadManagerClass; /** * 线程id */ private String threadId; /** * 业务类[如何业务实体类有id那么尽量和threadId一致] */ private Object object; public AsyncBean(Class<?> threadManagerClass, String threadId, Object object) { this.threadManagerClass = threadManagerClass; this.threadId = threadId; this.object = object; } public String getThreadId() { return threadId; } public void setThreadId(String threadId) { this.threadId = threadId; } public Object getObject() { return object; } public void setObject(Object object) { this.object = object; } public Class<?> getThreadManagerClass() { return threadManagerClass; } public void setThreadManagerClass(Class<?> threadManagerClass) { this.threadManagerClass = threadManagerClass; }}
AsyncFactory:
public class AsyncFactory { private AsyncBean asyncBean; public AsyncFactory(AsyncBean asyncBean) { this.asyncBean = asyncBean; } public AsyncBean getThread() { return asyncBean; } public void setAsyncBean(AsyncBean asyncBean) { this.asyncBean = asyncBean; } /** * 执行线程代码 * * @param threadPool */ public void execute(ThreadPoolExecutor threadPool) { // 利用反射找到相应的线程实现类 try { Class<?> zClass = asyncBean.getThreadManagerClass(); BaseThreadManager runnable = (BaseThreadManager) zClass.getConstructor(AsyncBean.class).newInstance(asyncBean); threadPool.execute(runnable); } catch (Exception e) { e.printStackTrace(); } }}
6.2、ThreadManager
ThreadManager
分为BaseThreadManager
以及实现的子类,ThreadManager
的作用就是执行具体的任务
,比如:
BaseThreadManager:
BaseThreadManager
该类是固定写法,是所有具体执行任务类的父类。
public class BaseThreadManager implements Runnable { public AsyncBean asyncBean; public BaseThreadManager(AsyncBean asyncBean) { // 将当前线程对象加入到map中 ThreadPoolConfig.consoleThreadMap.put(asyncBean.getThreadId(), Thread.currentThread()); this.asyncBean = asyncBean; } public AsyncBean getAsyncBean() { return asyncBean; } @Override public void run() { }}
AThreadManager:
一个执行具体任务的类,为BaseThreadManager
的子类;
@Scope("prototype")//spring 多例public class AThreadManager extends BaseThreadManager { private final Logger logger = LoggerFactory.getLogger(AThreadManager.class); public AThreadManager(AsyncBean asyncBean) { super(asyncBean); } @Override public void run() { //执行A线程相关业务代码 try { String threadId = asyncBean.getThreadId(); logger.info("线程Id:" + threadId); // 具体执行的业务 logger.info("执行完成"); } catch (InterruptedException e) { logger.error("结束任务"); } }}
BThreadManager:
一个执行具体任务的类,为BaseThreadManager
的子类;
@Scope("prototype")//spring 多例public class BThreadManager extends BaseThreadManager { private final Logger logger = LoggerFactory.getLogger(AThreadManager.class); public AThreadManager(AsyncBean asyncBean) { super(asyncBean); } @Override public void run() { //执行B线程相关业务代码 try { String threadId = asyncBean.getThreadId(); logger.info("线程Id:" + threadId); // 具体执行的业务 logger.info("执行完成"); } catch (InterruptedException e) { logger.error("结束任务"); } }}
6.3、ThreadPoolExecutor配置
ThreadPoolConfig:
import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.context.annotation.Configuration;import java.util.Map;import java.util.Queue;import java.util.concurrent.*;@Configurationpublic class ThreadPoolConfig { private final Logger logger = LoggerFactory.getLogger(ThreadPoolConfig.class); /** * 线程池维护线程的最少数量,20 */ private final static int CORE_POOL_SIZE = 5; /** * 线程池维护线程的最大数量 */ private final static int MAX_POOL_SIZE = 20; /** * 线程池维护线程所允许的空闲时间 */ private final static int KEEP_ALIVE_TIME = 10; /** * 线程池所使用的缓冲队列大小,200 */ private final static int WORK_QUEUE_SIZE = 20; /** * 用于储存在队列中的订单,防止重复提交,在真实场景中,可用redis代替 验证重复 */ private final Map<String, Object> cacheMap = new ConcurrentHashMap<>(); /** * 订单的缓冲队列,当线程池满了,则将订单存入到此缓冲队列 */ private final Queue<Object> msgQueue = new LinkedBlockingQueue<>(); //创建hashmap,用于存储线程 public static ConcurrentHashMap<String, Thread> consoleThreadMap = new ConcurrentHashMap<>(); /** * 当线程池的容量满了执行拒绝策略 * ThreadPoolExecutor.AbortPolicy(); //默认 * ThreadPoolExecutor.CallerRunsPolicy(); * ThreadPoolExecutor.DiscardPolicy(); * ThreadPoolExecutor.DiscardOldestPolicy(); */ final RejectedExecutionHandler handler = new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { //加入到缓冲队列,方便调度池进行调度 AsyncBean asyncBean = ((BaseThreadManager) r).getAsyncBean(); msgQueue.offer(asyncBean); logger.info("加入调度线程池,线程:" + asyncBean.getThreadId()); } }; /** * 创建线程池 */ final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new LinkedBlockingQueue<>(WORK_QUEUE_SIZE), this.handler); /** * 线程池的定时任务----> 称为(调度线程池)。此线程池支持 定时以及周期性执行任务的需求。 */ final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(CORE_POOL_SIZE); /** * 检查(调度线程池),每.5秒执行一次,查看订单的缓冲队列是否有 订单记录,则重新加入到线程池 */ final ScheduledFuture scheduledFuture = scheduler.scheduleAtFixedRate(new Runnable() { @Override public void run() { //判断缓冲队列是否存在记录 if (!msgQueue.isEmpty()) { //当线程池的队列容量少于WORK_QUEUE_SIZE,则开始把缓冲队列的订单 加入到 线程池 if (threadPool.getQueue().size() < WORK_QUEUE_SIZE) { AsyncBean asyncBean = (AsyncBean) msgQueue.poll(); AsyncFactory asyncFactory = new AsyncFactory(asyncBean); logger.info("取缓存执行,线程:" + asyncBean.getThreadId()); asyncFactory.execute(threadPool); } else { // 如果要保证每任务都需要加入到线程池中,则不要下列代码 msgQueue.poll(); } } } }, 0, 50, TimeUnit.MILLISECONDS); /** * 获取消息缓冲队列 */ public Queue<Object> getMsgQueue() { return msgQueue; } /** * 将任务加入订单线程池 */ public void start(AsyncBean asyncBean) { //验证当前进入的订单是否已经存在 if (cacheMap.get(asyncBean.getThreadId()) == null) { cacheMap.put(asyncBean.getThreadId(), new Object()); AsyncFactory asyncFactory = new AsyncFactory(asyncBean); asyncFactory.execute(threadPool); } } /** * 关闭线程池 - 通常情况下不会进行关闭 */ public void shutdown() { //true表示如果定时任务在执行,立即中止,false则等待任务结束后再停止 System.out.println("终止调度线程池:" + scheduledFuture.cancel(false)); logger.info("关闭线程服务,调度线程池还有未处理的信息条数:" + msgQueue.size()); //清空缓存,清空后缓存中后的后续任务不会继续执行 msgQueue.clear(); // 停止已经在运行的线程 logger.info("关闭线程池,正在运行的线程数:" + threadPool.getActiveCount()); // 待任务执行完成后依次关闭 threadPool.shutdown(); // 强转关闭所有线程,可能会出现InterruptedException异常 // try { // threadPool.shutdownNow(); // } catch (Exception e) { // logger.error("关闭执行中的线程"); // } } /** * 关闭具体任务 - 通常情况下不会结束某个线程 * * @param threadId 具体的业务线程Id */ public void remove(String threadId) { Thread thread = consoleThreadMap.get(threadId); if (thread != null) { consoleThreadMap.remove(threadId); thread.interrupt(); } }}
6.4、Controller
@RestControllerpublic class PoolController { @Resource private ThreadPoolConfig threadPoolConfig; /** * 测试模拟请求 入口 * * @return */ @GetMapping("/start") public String start() { //模拟启动不同线程 long millis = (long) (Math.random() * 100); Class<?> zClass = millis % 2 == 0 ? AThreadManager.class : BThreadManager.class; AsyncBean asyncBean = new AsyncBean(zClass, UUID.randomUUID().toString(), null); threadPoolConfig.start(asyncBean); return "Test ThreadPoolExecutor start"; } /** * 停止服务 * * @return */ @GetMapping("/end") public String end() { threadPoolConfig.shutdown(); return "Test ThreadPoolExecutor start"; } /** * 关闭某个线程 * * @return */ @GetMapping("/remove/{id}") public String remove(@PathVariable String id) { threadPoolConfig.remove(id); return "Test ThreadPoolExecutor start"; }}
6.5、源码地址:
gitee-源码地址:https://gitee.com/lhzlx/thread-pool-demo.git