> 文档中心 > 关于线程池需要注意的几点

关于线程池需要注意的几点


关于线程池需要注意的几点

指定线程名,异常处理

自定义ThreadFactory指定
public static final ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), (r) -> {    Thread thread = new Thread(r);    thread.setName("xxxx-pool-" + atomicInteger.getAndIncrement());    thread.setUncaughtExceptionHandler((t, e) -> { loggerHelper.errorLog(t.getName() + " 线程处理异常 : " + e.getMessage()); e.printStackTrace(); System.out.println(t.getName() + " 线程处理异常 : " + e.getMessage());    });    return thread;}, new ThreadPoolExecutor.AbortPolicy());
ThreadPoolTaskExecutor 前缀指定方式
 AtomicInteger atomicInteger = new AtomicInteger(); ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(corePoolSize); executor.setMaxPoolSize(maxPoolSize); executor.setQueueCapacity(queueCapacity);// 前缀指定方式 //executor.setThreadNamePrefix("xxxx"); // rejection-policy:当pool已经达到max size的时候,如何处理新任务 // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); executor.setThreadFactory((runnable)->{     Thread t = new Thread(runnable);     t.setName(threadNamePrefix + atomicInteger.getAndIncrement());     t.setUncaughtExceptionHandler((th, e) -> {  loggerHelper.errorLog(th.getName() + " xxxxxx 线程处理异常 : " + e.getMessage());  e.printStackTrace();     });     return t; }); return executor;
线程池线程默认构造
    static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() {     SecurityManager s = System.getSecurityManager();     group = (s != null) ? s.getThreadGroup() :      Thread.currentThread().getThreadGroup();     namePrefix = "pool-" +     poolNumber.getAndIncrement() +    "-thread-"; } public Thread newThread(Runnable r) {     Thread t = new Thread(group, r,      namePrefix + threadNumber.getAndIncrement(),      0);     if (t.isDaemon())  t.setDaemon(false);     if (t.getPriority() != Thread.NORM_PRIORITY)  t.setPriority(Thread.NORM_PRIORITY);     return t; }    }

线程池队列类型待探讨指定

SynchronousQueue队列 适用场景:
执行时间很短的任务

特点: 执行时间很短的任务,无容量队列,提交一个任务创建一个线程。

使用 SynchronousQueue队列,特点是没有容量,没有线程是放不进去的。

对于执行时间较长任务,建议使用有界阻塞队列ArrayBlockingQueue,LinkedBlockingQueue建议指定容量

线程池线程数,以及针对不同业务线程池

core核心数和最大核心数根据实际业务来设置,具体需要压测测试

线程池关闭问题(局部变量,切记关闭线程池)

否则请求一次会线程数增长,core不会被回收

测试示例

    static void test(){ //方法内 定义线程池 ExecutorService pool = new org.apache.tomcat.util.threads.ThreadPoolExecutor(1, 2, 10, TimeUnit.SECONDS,  new SynchronousQueue<Runnable>(),  Executors.defaultThreadFactory(),  new org.apache.tomcat.util.threads.ThreadPoolExecutor.AbortPolicy()); pool.execute(()->{     System.out.println(System.currentTimeMillis()); });      // 打开和关闭测试// pool.shutdown();    }    public static void main(String[] args) throws InterruptedException { while (true){     TimeUnit.SECONDS.sleep(2);     test(); }    }

使用visualVm可以观察进程线程数是否处于稳定

关闭线程

pool.shutdown(); //设置中断标志,等待执行任务线程执行结束后被打断,立即打断其他空闲线程
pool.shutdownNow(); //打断所有线程,任务队列里任务会被返回

根据jvm规范关闭线程,关闭守护线程,守护线程中非守护线程也会被关闭

实例如下

public static final ThreadPoolExecutor cardPool = new ThreadPoolExecutor(1, 4, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(MAX_TASK_SIZE),     (r) -> {  Thread t = new Thread(r);  t.setDaemon(true);  t.setName("xxxx" + ai.getAndIncrement());  t.setUncaughtExceptionHandler((th,ex)->{      //System.out.println(th.getName() + " " + ex.getMessage());      loggerHelper.errorLog(th.getName() + " " + ex.getMessage());      ex.printStackTrace();  });  return t;     });    public static void main(String[] args) throws InterruptedException { cardPool.execute(()->{     try {  TimeUnit.SECONDS.sleep(6);     } catch (InterruptedException e) {  e.printStackTrace();     }     System.out.println(11111); }); TimeUnit.SECONDS.sleep(2); System.out.println("main end");    }

提交任务

提交任务如果不是自己自定义异常,需要捕获异常,进行异常处理

     try { pool.execute(() -> {  //任务。。。});     }catch (RejectedExecutionException handler){//异常处理 根据业务 处理loggerHelper.infoLog(" xxxxxx :  " + object + " 任务被拒绝...");     }

关于CompletableFuture.runAsync异步处理任务

CompletableFuture.runAsync 默认是使用 ForkJoinPool.commonPool()线程池, 默认是cpu核心数 -1 线程池线程大小(1个cpu是1个cpu)

   if (parallelism < 0 && // default 1 less than #cores     (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)     parallelism = 1; if (parallelism > MAX_CAP)     parallelism = MAX_CAP;

默认线程池(大于1核心,最大核心数 - 1) 并且线程异常无法捕获

CompletableFuture.runAsync 异常处理

 CompletableFuture.runAsync(() -> {    System.out.println(" ------- executor --- ");    System.out.println(" ---------- " + 1 / 0);}, ExecutorUtil.executor).whenComplete((re,t)->{     System.out.println("结果: " + re);     System.out.println(t.getMessage());     t.printStackTrace(); });

也可以在任务里面 try catch 捕获处理

多事通