【并发编程五:Java中的线程池(2)-线程池的实现原理】
衔接上一章 【编发编程四:Java中的线程池(1)】
学习路线
-
- 1.6一些开源项目实现的拒绝策略
- 1.7线程池的实现原理
- 1.8线程池的底层源码实现
-
- 1.8.1十进制、二进制、八进制、十六进制
- 1.8.2线程池源码-构造方法
- 1.8.4线程池源码-线程池状态值
-
- (1) RUNNING = -1 << 29
- (2) SHUTDOWN = 0 << 29
- (3) STOP = 1 << 29
- (4) TIDYING = 2 << 29
- (5) TERMINATED = 3 << 29
- 1.8.5线程池源码-拆解线程状态和线程池工作线程数
- 1.8.6线程池源码-拆解线程状态和线程池工作线程数-ctlOf
- 1.8.7线程池源码-拆解线程状态和线程池工作线程数-runStateOf
- 1.8.8线程池源码-拆解线程状态和线程池工作线程数-workerCountOf
- 1.8.9线程池源码-线程池状态和工作线程数为什么是一个变量?
-
- ctl 是线程池中的非常重要的成员变量,它代表两个含义;
- 它是如何解决共享变量线程安全问题的?
- 1.8.10线程池源码-核心源码解读-execute方法
- 1.8.111.8.11线程池源码-核心源码解读-addWorker方法
- 1.8.12线程池源码-核心源码解读-runWorker方法
- 1.8.13线程池源码-核心源码解读-getTask方法
- 1.8.14线程池源码-核心源码解读-线程池复用
- 1.8.15线程池源码-核心源码解读-线程池大小变化
- 1.8.16线程池源码-核心源码解读-画图总结
1.6一些开源项目实现的拒绝策略
org.apache.dubbo.common.threadpool.support.AbortPolicyWithReport
当dubbo的工作线程触发了线程拒绝策略后,为了能尽量让使用者清楚触发线程拒绝策略的真实原因,在拒绝策略中它做了4件事情:
- 1、输出了一条警告级别的日志,日志内容为线程池的详细设置参数,以及线程池当前的状态,还有当前拒绝任务的一些详细信息,日志打印非常详细,让开发和运维更容易容易定位到问题所在;
- 2、输出当前线程堆栈详情,将发生拒绝策略时的现场情况dump线程上下文信息到一个文件中,助你发现问题所在;
- 3、发送事件;
- 4、抛出拒绝执行异常,使本次任务失败,这个使用的是JDK默认拒绝策略的异常;
org.elasticsearch.common.util.concurrent.EsAbortPolicy
org.elasticsearch.common.util.concurrent.EsExecutors.ForceQueuePolicy
io.netty.util.concurrent.RejectedExecutionHandlers
稍微总结下:
AbortPolicy异常中止策略:异常中止,无特殊场景;
DiscardPolicy丢弃策略:无关紧要的任务(文章点击量、商品浏览量等);
DiscardOldestPolicy弃老策略:允许丢掉老数据的场景;
CallerRunsPolicy调用者运行策略:不允许失败场景(对性能要求不高、并发量较小);
1.7线程池的实现原理
1.8线程池的底层源码实现
1.8.1十进制、二进制、八进制、十六进制
进制是为了计数的需要而产生的;10 11
日常中我们用的最多的是十进制(0 - 9),人有10个指头;(系绳法)
计算机使用的是二进制(0,1),计算机只认识0和1,其它的都不认识,不管是十进制、八进制、还是十六进制,计算机在处理的时候都会转化为二进制,不仅如此,即使是色彩斑斓的图片、视频、声音等,想让计算机识别,都需要转化为二进制,所以在计算机中,只有0和1;
虽然计算机只认识0和1,但是在编写程序的时候如果使用二进制来表示数字书写就太长了,比如十进制数字100,用二进制表示为:01100100?,这样不利于程序员阅读,所以又延伸出了八进制和十六进制,比如十进制数值100,用八进制表示为0144,用十六进制表示为0x64;
八进制和十六进制缩短了二进制数字的长度,又容易识别,也容易在各进制之间相互转化,所以八进制、十六进制也会在计算机中采用;
十六进制以0x或0X开头
十进制直接写数字
八进制以0(零)开头
二进制以0b或0B开头
1.8.2线程池源码-构造方法
1.8.3线程池源码-控制变量
private static final int COUNT_BITS = Integer.SIZE - 3;private static final int CAPACITY = (1 << COUNT_BITS) - 1;COUNT_BITS = 29CAPACITY = (1 << COUNT_BITS) - 1int 类型的数是占用4字节,32位,所以前面填了一堆0;原码:00000000 00000000 00000000 00000001左移:00100000 00000000 00000000 00000000减一:00011111 11111111 11111111 11111111 (536870911 = 5亿多)
1.8.4线程池源码-线程池状态值
private static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP= 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;RUNNING = -1 << COUNT_BITS;
(1) RUNNING = -1 << 29
在计算机中,负数以其正数的补码形式表示;原码:一个整数,按照绝对值大小转换成的二进制数,称为原码;1的原码:00000000 00000000 00000000 00000001;反码:将二进制数按位取反,所得的新二进制数称为原二进制数的反码;1的反码:11111111 11111111 11111111 11111110称:11111111 11111111 11111111 11111110是00000000 00000000 00000000 00000001的反码,反码是相互的,所以也可称这两个数互为反码;补码:反码加1称为补码;1的补码:11111111 11111111 11111111 11111110 + 1 = 11111111 11111111 11111111 11111111所以计算 RUNNING = -1 << 29原码:00000000 00000000 00000000 00000001反码:11111111 11111111 11111111 11111110(按位取反)补码:11111111 11111111 11111111 11111111(反码+1)左移:11100000 00000000 00000000 00000000
SHUTDOWN = 0 << COUNT_BITS;
(2) SHUTDOWN = 0 << 29
初始值:00000000 00000000 00000000 00000000左移后:00000000 00000000 00000000 00000000
STOP = 1 << COUNT_BITS;
(3) STOP = 1 << 29
初始值:00000000 00000000 00000000 00000001左移后:00100000 00000000 00000000 00000000
TIDYING = 2 << COUNT_BITS;
(4) TIDYING = 2 << 29
初始值:00000000 00000000 00000000 00000010左移后:01000000 00000000 00000000 00000000
TERMINATED = 3 << COUNT_BITS;
(5) TERMINATED = 3 << 29
初始值:00000000 00000000 00000000 00000011左移后:01100000 00000000 00000000 00000000
1.8.5线程池源码-拆解线程状态和线程池工作线程数
// Packing and unpacking ctlprivate static int runStateOf(int c) { return c & ~CAPACITY; }private static int workerCountOf(int c) { return c & CAPACITY; }private static int ctlOf(int rs, int wc) { return rs | wc; }
1.8.6线程池源码-拆解线程状态和线程池工作线程数-ctlOf
先看private static int ctlOf(int rs, int wc) { return rs | wc; }比如 ctlOf(RUNNING, 0)RUNNING | 011100000 00000000 00000000 00000000 |00000000 00000000 00000000 00000000=11100000 00000000 00000000 00000000注:&同为1时为1,否则为0;|只要一个为1就为1,否则为0------------------------------------------------------------------------比如 ctlOf(RUNNING, 1)RUNNING | 111100000 00000000 00000000 00000000 |00000000 00000000 00000000 00000001=11100000 00000000 00000000 00000001
1.8.7线程池源码-拆解线程状态和线程池工作线程数-runStateOf
计算线程状态的方法
计算线程状态的方法private static int runStateOf(int c) { return c & ~CAPACITY; }runStateOf(ctl.get())ctl.get() & ~CAPACITYctlOf(RUNNING, 0) & ~CAPACITY11100000 00000000 00000000 00000000&(原始00011111 11111111 11111111 11111111取反11100000 00000000 00000000 00000000)=11100000 00000000 00000000 00000000 (RUNNING)------------------------------------------------------------------------------如果是1:runStateOf(ctl.get())ctl.get() & ~CAPACITYctlOf(TIDYING, 1) & ~CAPACITYctlOf(TIDYING, 1)TIDYING | 101000000 00000000 00000000 00000000 |00000000 00000000 00000000 00000001=01000000 00000000 00000000 00000001&11100000 00000000 00000000 00000000=01000000 00000000 00000000 00000000 (TIDYING)
1.8.8线程池源码-拆解线程状态和线程池工作线程数-workerCountOf
计算线程工作线程数
private static int workerCountOf(int c) { return c & CAPACITY; }workerCountOf(ctl.get())ctl.get() & CAPACITYctlOf(RUNNING, 0) & CAPACITY11100000 00000000 00000000 00000000&00011111 11111111 11111111 11111111=00000000 00000000 00000000 00000000-------------------------------------------------------------------------------如果是1:ctlOf(RUNNING, 1) & CAPACITY11100000 00000000 00000000 00000001&00011111 11111111 11111111 11111111=00000000 00000000 00000000 00000001整个推算过程如上所分析(如果没有看太懂,可以多看几遍)
1.8.9线程池源码-线程池状态和工作线程数为什么是一个变量?
ctl 是线程池中的非常重要的成员变量,它代表两个含义;
1、表示线程池的运行状态;
2、表示线程池的工作线程数;
为什么线程池要用一个ctl变量代表两个含义呢?
考虑共享变量,在多线程条件下存在线程安全问题;
一个变量代表两个含义就能解决线程安全问题吗?ctl变量是一个成员变量;
它是如何解决共享变量线程安全问题的?
(1)synchronize 写多读少
(2)cas+volatile 写少读多(cas只能保证一个变量的原子性);
在多线程的环境下,运行状态和有效线程数量往往需要保证统一,不能出现一个修改而另一个没有修改的情况,如果将他们放在同一个AtomicInteger中,利用AtomicInteger的原子操作,就可以保证这两个值始终是统一的;
1.8.10线程池源码-核心源码解读-execute方法
1.//Worker类 2.private?final?class?Worker 3. extends?AbstractQueuedSynchronizer 4. implements?Runnable?{ 5. 6. /?Thread?this?worker?is?running?in. Null?if?factory?fails.?*/ 7. final?Thread?thread; 8. 9. /?Initial?task?to?run. Possibly?null.?*/ 10. Runnable?firstTask; 11. 12. Worker(Runnable?firstTask)?{ 13. setState(-1);?//?inhibit?interrupts?until?runWorker 14. this.firstTask?=?firstTask; 15. this.thread?=?getThreadFactory().newThread(this); 16. 17. //Thread?t?=?new?Thread(worker); 18. //t.start();?==>?worker.run() 19. } 20. 21. @Override 22. public?void?run()?{ 23. runWorker(this); 24. } 25. 26. //.........................?省略 27.}
1.8.111.8.11线程池源码-核心源码解读-addWorker方法
1.private?boolean?addWorker(Runnable?firstTask,?boolean?core)?{ 2. retry: 3. //?外层for循环 4. for?(;;)?{ 5. //?获取线程池控制变量的值 6. int?c?=?ctl.get(); 7. //?获取线程池运行状态 8. int?rs?=?runStateOf(c); 9. 10. //?if判断,如果rs?>=?SHUTDOWN?并且?(判断3个条件,只要有1个不满足),返回false; 11. //?Check?if?queue?empty?only?if?necessary. 12. if?(rs?>=?SHUTDOWN?&& 13. !?(rs?==?SHUTDOWN?&& 14.?firstTask?==?null?&& 15.?!?workQueue.isEmpty())) 16. return?false; 17.18. //?内存for循环 19. for?(;;)?{ 20. //?获取线程池线程数 21. int?wc?=?workerCountOf(c); 22. //?如果wc超过CAPACITY,也就是ctl的低29位的最大值(二进制是29个1),返回false; 23. //?core为addWorker方法的第二个参数,若为true根据corePoolSize比较,若为false根据maximumPoolSize比较; 24. if?(wc?>=?CAPACITY?|| 25. wc?>=?(core ?corePoolSize?:?maximumPoolSize)) 26. return?false; 27. //?尝试增加workerCount,如果成功,则跳出外层for循环 28. if?(compareAndIncrementWorkerCount(c)) 29. break?retry; 30. //?如果增加workerCount失败,则重新获取控制变量ctl的值 31. c?=?ctl.get(); //?Re-read?ctl 32. //?如果当前线程池的运行状态不等于rs,说明线程池运行状态已被改变,返回外层for循环继续执行 33. if?(runStateOf(c)?!=?rs) 34. continue?retry; 35. //?else?CAS?failed?due?to?workerCount?change;?retry?inner?loop 36. } 37. } 38. 39. //?Worker线程是否启动 40. boolean?workerStarted?=?false; 41. //?Worker线程是否添加 42. boolean?workerAdded?=?false; 43. Worker?w?=?null; 44. try?{ 45. //?根据firstTask来创建Worker对象 46. w?=?new?Worker(firstTask); 47. //?每一个Worker对象都会创建一个线程 48. final?Thread?t?=?w.thread; 49. if?(t?!=?null)?{ 50. final?ReentrantLock?mainLock?=?this.mainLock; 51. mainLock.lock(); 52. try?{ 53. //?检查线程池运行状态 54. //?Recheck?while?holding?lock. 55. //?Back?out?on?ThreadFactory?failure?or?if 56. //?shut?down?before?lock?acquired. 57. int?rs?=?runStateOf(ctl.get()); 58. //?rs?<?SHUTDOWN表示是RUNNING状态; 59. //?如果rs是RUNNING状态或者rs是SHUTDOWN状态并且firstTask为null,向线程池中添加线程。 60. //?因为在SHUTDOWN时不会在添加新的任务,但还是会执行workQueue中的任务 61. if?(rs?<?SHUTDOWN?|| 62. (rs?==?SHUTDOWN?&&?firstTask?==?null))?{ 63. //?检查线程已经是运行状态,抛出非法线程状态异常 64. if?(t.isAlive())?//?precheck?that?t?is?startable 65. throw?new?IllegalThreadStateException(); 66. //?workers是一个HashSet 67. workers.add(w); 68. int?s?=?workers.size(); 69. //?largestPoolSize记录着线程池中出现过的最大线程数量 70. if?(s?>?largestPoolSize) 71. //?把历史上出现过的最大线程数的值更新一下 72. largestPoolSize?=?s; 73. //?Worker线程添加成功 74. workerAdded?=?true; 75. } 76. }?finally?{ 77. //?释放ReentrantLock锁 78. mainLock.unlock(); 79. } 80. if?(workerAdded)?{ 81. //?启动线程 82. t.start(); 83. //?Worker线程已经启动 84. workerStarted?=?true; 85. } 86. } 87. }?finally?{ 88. //?Worker线程没有启动成功 89. if?(!?workerStarted) 90. addWorkerFailed(w); 91. } 92. //?返回Worker线程是否启动成功 93. return?workerStarted; 94.} 1.//Worker类 2.private?final?class?Worker 3. extends?AbstractQueuedSynchronizer 4. implements?Runnable?{ 5. 6. /?Thread?this?worker?is?running?in. Null?if?factory?fails.?*/ 7. final?Thread?thread; 8. 9. /?Initial?task?to?run. Possibly?null.?*/ 10. Runnable?firstTask; 11. 12. Worker(Runnable?firstTask)?{ 13. setState(-1);?//?inhibit?interrupts?until?runWorker 14. this.firstTask?=?firstTask; 15. this.thread?=?getThreadFactory().newThread(this); 16. 17. //Thread?t?=?new?Thread(worker); 18. //t.start();?==>?worker.run() 19. } 20. 21. @Override 22. public?void?run()?{ 23. runWorker(this); 24. } 25. 26. //.........................?省略 27.}
1.8.12线程池源码-核心源码解读-runWorker方法
1.final?void?runWorker(Worker?w){ 2. 3. Thread?wt?=?Thread.currentThread(); 4. Runnable?task?=?w.firstTask; 5. w.firstTask?=?null; 6. 7. //允许响应中断 8. w.unlock(); 9. //?线程退出的原因,true是任务导致,false是线程正常退出 10. boolean?completedAbruptly?=?true; 11. 12. try{ 13. //?当前任务为空,且当前任务队列为空,停止循环 14. while?(task?!=?null?||?(task?=?getTask())?!=?null)?{ 15.16. //?上锁处理并发问题,防止在shutdown()时终止正在运行的worker 17. w.lock(); 18.19. //?如果线程池是stop状态,并且线程没有被中断,就要确保线程被中断,如果线程池不是,确保线程池没有被中断; 20. //?清除当前线程的中断标志,做一个recheck来应对shutdownNow方法; 21. if?((runStateAtLeast(ctl.get(),STOP)?||?(Thread.interrupted()?&&?runStateAtLeast(ctl.get(),STOP)))?&&?!wt.isInterrupted()) 22. wt.interrupt(); 23. try?{ 24. //?执行前(空方法,由子类重写实现) 25. beforeExecute(wt,?task); 26. Throwable?thrown?=?null; 27. try?{ 28. //?执行Runnable类的run()方法 29. task.run(); 30. }?catch?(RuntimeException?x)?{ 31. thrown?=?x;?throw?x; 32. }?catch?(Error?x)?{ 33. thrown?=?x;?throw?x; 34. }?catch?(Throwable?x)?{ 35. thrown?=?x;?throw?new?Error(x); 36. }?finally?{ 37. //?执行后(空方法,由子类重写实现) 38. afterExecute(task,?thrown); 39. } 40. }?finally?{ 41. task?=?null; 42. //?完成的任务数+1 43. w.completedTasks++; 44. //?释放锁 45. w.unlock(); 46. } 47. } 48. //?到此,线程是正常退出 49. completedAbruptly?=?false; 50. }?finally?{ 51. //?处理worker的退出 52. processWorkerExit(w,completedAbruptly); 53. } 54.}
1.8.13线程池源码-核心源码解读-getTask方法
1.private?Runnable?getTask()?{ 2. 3. //?表示上一次从任务队列中取任务时是否超时 4. boolean?timedOut?=?false;?//?Did?the?last?poll()?time?out ?5. for?(;;)?{ 6. int?c?=?ctl.get(); 7. int?rs?=?runStateOf(c); 8. 9. //?Check?if?queue?empty?only?if?necessary. 10. /*?11. ? 如果线程池为`SHUTDOWN`状态且任务队列为空(线程池shutdown状态可以处理任务队列中的任务,不再接受新任务)?12. ? 或者?13. ? 线程池状态>=STOP,则意味着线程池不必再获取任务了,?14. ? 将当前工作线程数量-1并返回null;?15. */ 16. if?(rs?>=?SHUTDOWN?&&?(rs?>=?STOP?||?workQueue.isEmpty()))?{ 17. decrementWorkerCount(); 18. return?null; 19. } 20. 21. //线程数 22. int?wc?=?workerCountOf(c); 23. //?Are?workers?subject?to?culling ?24. /*?25. ?timed变量用于判断是否需要进行超时控制;?26. ?allowCoreThreadTimeOut默认是false,也就是核心线程不允许进行超时;?27. ?wc?>?corePoolSize,表示当前线程池中的线程数量大于核心线程数量;?28. ?表示对于超过核心线程数量的这些线程,需要进行超时控制(默认情况)?29. */ 30. boolean?timed?=?allowCoreThreadTimeOut?||?wc?>?corePoolSize; 31. 32. /*?33. ?*?两个条件全部为true,则通过CAS使工作线程数-1,即去除工作线程:?34. ?*?条件1:工作线程数大于maximumPoolSize,或(工作线程需要超时控制且上次在任务队列拉取任务超时)?35. ?*?条件2:wc?>?1或任务队列为空?36. ?*?如果减1失败,则返回重试;?37. ?*/ 38. if?((wc?>?maximumPoolSize?||?(timed?&&?timedOut))?&&?(wc?>?1?||?workQueue.isEmpty()))?{ 39. if?(compareAndDecrementWorkerCount(c)) 40. return?null; 41. continue; 42. } 43. try?{ 44. /*?45. ?*?执行到这里,说明已经经过前面的校验,开始真正获取task;?46. ?*?根据timed来判断,如果工作线程有超时时间,则通过任务队列的poll方法进行超时等待方式获取任务,?47. ?*?如果在keepAliveTime时间内没有获取到任务,则返回null,否则通过take方法;?48. ?*?take方法表示如果这时任务队列为空,则会阻塞直到任务队列不为空;?49. ?*?一般poll()用于普通线程、take()用于核心线程?50. ?*/ 51. Runnable?r?=?timed ?workQueue.poll(keepAliveTime,?TimeUnit.NANOSECONDS)?:?workQueue.take(); 52. //?r不为空,则返回该Runnable 53. if?(r?!=?null) ?54. return?r; 55. //?如果?r?==?null,说明已经超时得不到任务,timedOut设置为true 56. timedOut?=?true; 57. }?catch?(InterruptedException?retry)?{ 58. //?如果获取任务时当前线程发生了中断,则设置timedOut为false并返回循环重试 59. timedOut?=?false; 60. } 61. } }?
1.8.14线程池源码-核心源码解读-线程池复用
1、threadPoolExecutor.execute(runnable)2、addWorker(command, boolean)3、Worker w = new Worker(firstTask); //已经创建了Thread4、HashSet workers.add(w);5、t.start(); //w.thread.start();6、worker.run();7、runWorker(this)8、task = w.firstTask 或者 task = getTask()9、task.run();
线程的复用是通过while循环实现的,worker会首先获取当前的firstTask进行run,然后不停地循环从等待队列中获取新的任务task,如果有新任务则直接调用task的run方法,不会再去新建一个线程,从而实现线程复用;
1.8.15线程池源码-核心源码解读-线程池大小变化
1.private?void?processWorkerExit(Worker?w,?boolean?completedAbruptly)?{ 2. 3. //completedAbruptly为true表示线程异常执行结束 4. //completedAbruptly为false表示线程正常执行结束 5. if?(completedAbruptly)?//?If?abrupt,?then?workerCount?wasn't?adjusted 6. decrementWorkerCount(); 7. 8. //从线程set集合中移除工作线程,该过程需要加锁,因为HashSet是线程不安全的集合 9. final?ReentrantLock?mainLock?=?this.mainLock; 10. mainLock.lock(); 11. try?{ 12. //统计完成的任务数:将该worker已完成的任务数追加到线程池已完成的任务数 13. completedTaskCount?+=?w.completedTasks; 14. //从HashSet中移除该worker 15. workers.remove(w); 16. }?finally?{ 17. //释放锁 18. mainLock.unlock(); 19. } 20. //根据线程池状态进行判断是否结束线程池 21. tryTerminate(); 22. 23. int?c?=?ctl.get(); 24. //当线程池是RUNNING或SHUTDOWN状态时 25. if?(runStateLessThan(c,?STOP))?{ 26. //如果worker不是异常结束: 27. if?(!completedAbruptly)?{ 28. //如果allowCoreThreadTimeOut=true,最小线程个数就可以变为0; 29. int?min?=?allowCoreThreadTimeOut ?0?:?corePoolSize; 30. //但是,如果等待队列有任务,至少保留一个worker来处理任务; 31. if?(min?==?0?&&?!?workQueue.isEmpty()) 32. min?=?1; 33. //如果工作线程大于等于核心线程,直接return就行了,否则就需要添加一个线程; 34. if?(workerCountOf(c)?>=?min) 35. return;?//?replacement?not?needed 36. } 37. //是异常执行结束的,添加一个线程去执行任务 38. addWorker(null,?false); 39. } 40.}
1.8.16线程池源码-核心源码解读-画图总结
【衔接下一章【并发编程六:Java中的线程池(3)-线程池的应用】】