ThreadLocal源码分析
文章目录
- (一)ThreadLocal介绍
- (二)ThreadLocal思想
- (三)源码分析
-
- ThreadLocal核心点和api
-
- (1)核心成员变量和辅助方法
- (2)setInitialValue方法
- (3)get方法(api)
- (4)set方法(api)
- (5)remove方法(api)
- ThreadLocalMap核心源码:
-
- (1)核心成员变量和辅助方法
- (2)构造函数(核心)
- (3)set方法(核心)
- (4)getEntry方法(核心)
- (5)rehash扩容(核心)
- (6)remove方法(核心)
- (四)ThreadLocal与内存泄漏
(一)ThreadLocal介绍
ThreadLocal工具类是一种线程本地变量,程序中定义了一个ThreadLocal共享变量,每个线程往这个ThreadLocal中读写是线程隔离,互相之间不会影响的。它提供了一种将可变数据通过每个线程有自己的独立副本从而实现线程封闭的机制。
(二)ThreadLocal思想
(1)Thread线程类有一个成员变量为ThreadLocal.ThreadLocalMap的实例变量threadLocals,也就是说每个线程有一个属于自身的ThreadLocalMap。
(2)ThreadLocalMap是ThreadLocal的内部类,有自身的实现,并不依赖于Map,ThreadLocalMap内部是使用Entry类型的数组table来存储,Entry中的key为ThreadLocal,value为代码中放入的值(实际上key并不是ThreadLocal本身,而是它的一个弱引用)。
(3)每个线程在往某个ThreadLocal里塞值的时候,都会往自己的ThreadLocalMap里存,读也是以某个ThreadLocal作为引用,在自己的map里找对应的key,从而实现了线程隔离。
(三)源码分析
ThreadLocal核心点和api
(1)核心成员变量和辅助方法
/** * 作为ThreadLocal对象的hashCode,主要作用于计算存储table中的位置 */ private final int threadLocalHashCode = nextHashCode(); /** * 服务于原子递增某个值进行得到hashCode */ private static AtomicInteger nextHashCode = new AtomicInteger(); /** * 一个固定值,与斐波那契散列有关,可以比较均匀的是其分布在2ⁿ数组中 */ private static final int HASH_INCREMENT = 0x61c88647; /** * 服务于threadLocalHashCode调用,为构造函数的threadLocalHashCode作为数据来源 */ private static int nextHashCode() {return nextHashCode.getAndAdd(HASH_INCREMENT); }
(2)setInitialValue方法
/*** 当ThreadLocal未被设置初始化值时会在get中获取值会空时被调用* initialValue方法可用于子类重写,可以自定义扩展初始值*/private T setInitialValue() { T value = initialValue();//子类如果重写的话,可自定义设置初始值,变相改变初始值 Thread t = Thread.currentThread();//当前线程 ThreadLocalMap map = getMap(t);//从t线程获取本地变量值threadLocals if (map != null) map.set(this, value);//更新值 else //createMap是重点核心,会在ThreadLocalMap中重点分析 createMap(t, value);//基于t封装指定的Entry对象,其Entry中的key是ThreadLocal的弱引用 return value;//返回初始值}
(3)get方法(api)
/** * 在线程中,通过ThreadLocal对象调用获取当前线程的本地变量值过程如下: * (1)先根据自身线程去获取本地threadLocals即map,map为空,则调用setInitialValue * (2)如果map不为空,则在threadLocals中获取Entry,如果Entry为空也调用setInitialValue, * 否则返回Entry中的value */public T get() { Thread t = Thread.currentThread();//获取当前线程 ThreadLocalMap map = getMap(t);//获取自身线程本地变量map if (map != null) {//判断本地map是否有被设置 //从自身线程获取的threadLocals中获取的map中获取线程封装的Entry ThreadLocalMap.Entry e = map.getEntry(this);//获取Entry if (e != null) { @SuppressWarnings("unchecked") T result = (T)e.value;//得到线程本地值 return result;//返回值 } } return setInitialValue();//本地map为空,调用自身初始化设置值方法}
(4)set方法(api)
/** * 代码逻辑: * (1)先获取当前线程在其自身获取ThreadLocalMap * (2)判断ThreadLocalMap是否为空,如果存在则调用其自身的set,否则则创建新的ThreadLocalMap */ public void set(T value) { Thread t = Thread.currentThread();//获取当前线程 ThreadLocalMap map = getMap(t);//根据指定线程获取自身的ThreadLocal.ThreadLocalMap if (map != null)//代表是否被设置 map.set(this, value);/更新值 else //createMap是重点核心,会在ThreadLocalMap中重点分析 createMap(t, value); }
(5)remove方法(api)
//remove基于当前线程获取自身线程本地变量threadLocals public void remove() { ThreadLocalMap m = getMap(Thread.currentThread()); if (m != null) m.remove(this);//基于弱引用从map中删除 }
ThreadLocalMap核心源码:
(1)核心成员变量和辅助方法
/** * 默认ThreadLocalMap中的table长度为16 */private static final int INITIAL_CAPACITY = 16;/** * 用于存储键值对的数组 */private Entry[] table;/** * 在table中实际存储的Entry数量 */private int size = 0;/** * 下一次扩容时的阈值,默认为零 */private int threshold; // Default to 0/** * 计算扩容时的阈值的方法,计算公式:threshold = 容量*2/3 */private void setThreshold(int len) { threshold = len * 2 / 3;}/** * 线性探测法 * 主要是为了解决hash冲突时向后进行逻辑上的环形模式设置 */private static int nextIndex(int i, int len) { return ((i + 1 < len) ? i + 1 : 0);}/** * 线性探测法 * 主要是为了解决hash冲突时向前进行逻辑上的环形模式设置 */private static int prevIndex(int i, int len) { return ((i - 1 >= 0) ? i - 1 : len - 1);}
(2)构造函数(核心)
/** * 代码逻辑: * (1)先初始化table:利用ThreadLocalMap中的默认容量 * (2)确定存储索引:使用ThreadLocal内部成员变量ThreadLocalHashCode与容量-1进行位运算 * (3)创建Entry:利用ThreadLocal在Entry构造方法中进行弱引用和创建Entry对象 * (4)设置初始size:直接在代码中写死为1 * (5)设置扩容阈值:使用ThreadLocalMap中的默认容量*2/3 */ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) { table = new Entry[INITIAL_CAPACITY];//初始化ThreadLocalMap中的table数组 int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);//计算索引 table[i] = new Entry(firstKey, firstValue);//基于threadLocal进行弱引用创建Entry size = 1;//初始值 setThreshold(INITIAL_CAPACITY);//默认阈值}/** * InheritableThreadLocal 父子线程中传递数据,但仅限于初始化线程时有效 * Thread线程初始化时会判断是否存在parent.inheritableThreadLocals是否存在 * 以父线程的inheritableThreadLocalMap为数据源,过滤出有效的entry, * 初始化到自己的inheritableThreadLocalMap中,其中childValue可以被重写。 */private ThreadLocalMap(ThreadLocalMap parentMap) { Entry[] parentTable = parentMap.table;//父类table int len = parentTable.length;//父类table长度 setThreshold(len);//设置阈值 table = new Entry[len];//创建Entry初始化table for (int j = 0; j < len; j++) {//迁移父类有效Entry Entry e = parentTable[j]; if (e != null) { @SuppressWarnings("unchecked") ThreadLocal<Object> key = (ThreadLocal<Object>) e.get(); if (key != null) { //这里的childValue方法在InheritableThreadLocal中默认实现为返回本身值,可以被重写 Object value = key.childValue(e.value); Entry c = new Entry(key, value);//创建新的Entry int h = key.threadLocalHashCode & (len - 1);//计算索引 while (table[h] != null)//是否存在hash冲突 h = nextIndex(h, len); table[h] = c;//设置值 size++;//有效个数增加 } } }}
(3)set方法(核心)
/** * 代码逻辑: * (1)找到Entry且有效,直接进行替换更新值 * (2)探测过程中在索引计算下发现是hash冲突,发现无效Entry则调用replaceStaleEntry进行连续段清理和启发式清理 * 1)在连续段清理中发现key则将其提到索引后面的连续点且更新值 * 2)如果在连续段清理中未发现key,则在索引或索引后添加Entry * (3)在最后会判定是否达到阈值,其内部有优化,使用rehash函数会调用一次全量清理slot方法也即expungeStaleEntries, * 如果清理完后table大小超过了threshold - threshold/ 4,则进行扩容2倍 */private void set(ThreadLocal<?> key, Object value) { Entry[] tab = table;//获取table int len = tab.length;//计算长度 int i = key.threadLocalHashCode & (len-1);//确定索引 //利用线性探测法向后进行环形查找 for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) { ThreadLocal<?> k = e.get();//根据e(Entry的key为ThreadLocal弱引用) if (k == key) {//是否为同一个ThreadLocal e.value = value;//更新值 return; } //此处代码代表当前索引不是同一个key if (k == null) {//如果为空,代表该处索引的key是空(value有值)或这个位置的Entry本身是null //该处可能会进行的操作:hash冲突后的Entry处理办法、回收key为null且value存在的Entry replaceStaleEntry(key, value, i); return; } } //代码执行到这里代表被回收后,需要对该索引位置进行创建Entry tab[i] = new Entry(key, value); int sz = ++size;//size大小自增 if (!cleanSomeSlots(i, sz) && sz >= threshold)//是否需要扩容 rehash();//扩容 } //replaceStaleEntry 清理无效的Entry包含连续段和启发式清理 private void replaceStaleEntry(ThreadLocal<?> key, Object value, int staleSlot) { Entry[] tab = table;//table数组 int len = tab.length;//table数组长度 Entry e; int slotToExpunge = staleSlot;//将索引位置赋值给slotToExpunge //向前进行线性探测找到key为空,Entry的value不为空的索引,主要是为清理工作做铺垫 for (int i=prevIndex(staleSlot,len);(e=tab[i])!=null;i=prevIndex(i,len)) if (e.get() == null)//key为空则进行更新slotToExpunge值 slotToExpunge = i;//更新slotToExpunge值,主要存储key不为空的下一个失效Entry的索引 //向后进行线性探测 for (int i=nextIndex(staleSlot,len);(e=tab[i])!=null;i=nextIndex(i, len)) { ThreadLocal<?> k = e.get();//获取弱引用 if (k == key) {//判定是否为同一个ThreadLocal e.value = value;//更新值 //staleSlot是一个半残Entry tab[i] = tab[staleSlot];//在找到弱引用的索引将其与计算的索引进行对换 //计算出来的索引与半残Entry对换即将找到的弱引用放到计算处可能是hash冲突后的延伸,好处是该地方的前一个是有效的Entry tab[staleSlot] = e; //向前线性探测找到最前失效Entry的索引与传参进来的staleSlot对比 if (slotToExpunge == staleSlot)//是否找到失效Entry slotToExpunge = i;//未找到失效Entry则以向后探测的更新请理值 cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);//内部连续段清理+启发式清理 return; } if (k == null && slotToExpunge == staleSlot)//向前没有无效Entry且当前slot是失效 slotToExpunge = i;则从i开始进行清理 } tab[staleSlot].value = null;//help gc tab[staleSlot] = new Entry(key, value);//重新创建一个有效的Entry if (slotToExpunge != staleSlot) cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);//内部连续段清理+启发式清理 } /** * expungeStaleEntry连续段清理 清理核心方法 * 清理无效Entry和确保清理后的hash冲突Entry具有连续性 * 主要是根据指定staleSlot开始向后清理无效的Entry * 并对hash冲突的Entry进行移动确保冲突的Entry连续性(伴随着会进行rehash) */ private int expungeStaleEntry(int staleSlot) { Entry[] tab = table;//获取table int len = tab.length;//获取table长度 //开始准备消除无效Entry tab[staleSlot].value = null;//help GC tab[staleSlot] = null;//help GC size--;//Entry无效清除一个 // Rehash until we encounter null Entry e; int i; //线性探测向后清理无效Entry for (i = nextIndex(staleSlot,len);(e=tab[i])!=null;i=nextIndex(i,len)) { ThreadLocal<?> k = e.get();//获取软引用 if (k == null) {//无效Entry判定条件 e.value = null;//help GC tab[i] = null;//help GC size--;//数量减一 } else { int h = k.threadLocalHashCode & (len - 1); //对有效Entry的进行rehash确保hash冲突进行线性探测连续性 if (h != i) { tab[i] = null;//将当前向后线性探测的索引位置置空 while (tab[h] != null)//循环向后寻找rehash后的索引后面连接着的索引h = nextIndex(h, len); tab[h] = e;//将本次需要调整的索引Entry放到h找到的一个为null的位置,确保连续 } } } return i; } /** * 启发式清理和连续段清理,在插入时调用和replaceStaleEntry中清理无效Entry调用 * i代表有效Entry或Entry为null,不是失效Entry * n是用于控制扫描次数,正常情况下log n次扫描结束没发现无效Entry会结束cleanSomeSlots */ private boolean cleanSomeSlots(int i, int n) { boolean removed = false;//是否有清理 Entry[] tab = table; int len = tab.length; do { i = nextIndex(i, len);//线性探测向后 Entry e = tab[i]; if (e != null && e.get() == null) {//找到无效Entry n = len;//扩大扫描次数,使用容量来赋值 removed = true;//代表会被清理 i = expungeStaleEntry(i);//调用expungeStaleEntry进行连续段清理 } } while ( (n >>>= 1) != 0);//利用位运算仅有无符号右移并赋值给n,等于0是即退出循环 return removed; }
(4)getEntry方法(核心)
/** * getEntry 主要是在get方法中调用,从ThreadLocalMap中得到Entry * 根据索引和key找寻ThreadLocal,会伴随着线性探测法寻找 */private Entry getEntry(ThreadLocal<?> key) { int i = key.threadLocalHashCode & (table.length - 1);//计算索引 Entry e = table[i]; if (e != null && e.get() == key)//判断当前索引是否为寻找的ThreadLocal return e;//如果是,则直接返回Entry else return getEntryAfterMiss(key, i, e);//线性探测法寻找即寻找的key发生了hash冲突}/** * 在get时发现该key发生了hash冲突,利用线性探测法进行寻找 */private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) { Entry[] tab = table;//存储table int len = tab.length;//table长度 //循环遍历整个table,直到找到或遍历完(不一定所有位置遍历完,hash冲突完时)结束 while (e != null) { ThreadLocal<?> k = e.get(); if (k == key) return e; if (k == null) expungeStaleEntry(i);//连续段清理 else i = nextIndex(i, len);//线性探测 e = tab[i];//判断条件 } return null;}
(5)rehash扩容(核心)
private void rehash() { expungeStaleEntries();//调用一次全量清理 /*** 考虑到调用了全量清理,size可能会变小,所以进行优化扩容条件* 优化后的值 = 当前阈值-当前阈值/4,如size为11,当前阈值为10,被全量清理后size为4,* 优化后的值为10-16/4 = 6,此时是4>=6不成立,则不进行扩容*/ if (size >= threshold - threshold / 4) resize(); } /** * 全量清理 * 对table进行非空的清理expungeStaleEntry连续段 */ private void expungeStaleEntries() { Entry[] tab = table; int len = tab.length; for (int j = 0; j < len; j++) { Entry e = tab[j]; if (e != null && e.get() == null)expungeStaleEntry(j); } } /** * 扩容方法 */ private void resize() { Entry[] oldTab = table;//获取当前table int oldLen = oldTab.length;//当前table的容量 int newLen = oldLen * 2;//扩容后的容量即翻倍 Entry[] newTab = new Entry[newLen];//创建新的扩容后的Entry int count = 0;//有效Entry个数 for (int j = 0; j < oldLen; ++j) {//遍历旧的table Entry e = oldTab[j];//临时变量用于转移 if (e != null) {//非空Entry转移ThreadLocal<?> k = e.get();//获取该slot的Entry中的弱引用即ThreadLocalif (k == null) {//代表无效Entry e.value = null; // Help the GC} else { int h = k.threadLocalHashCode & (newLen - 1);//计算新的索引 while (newTab[h] != null)//是否hash冲突 h = nextIndex(h, newLen);//利用线性探测法解决hash冲突 newTab[h] = e;//利用临时变量进行转移赋值 count++;//自增代表有效Entry个数} } } setThreshold(newLen);//设置新的阈值 size = count;//将有效Entry个数赋值给size table = newTab;//将扩容后的新数组赋值给原table }
(6)remove方法(核心)
private void remove(ThreadLocal<?> key) {Entry[] tab = table;//获取tableint len = tab.length;//获取table长度int i = key.threadLocalHashCode & (len-1);//计算索引for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) {//遍历table,可能会伴随线性探测法 if (e.get() == key) {//匹配到指定ThreadLocal对象 e.clear();//将Entry中的key置空 expungeStaleEntry(i);//调用清理无效Entry方法(连续段和启发式清理) return; }} }
(四)ThreadLocal与内存泄漏
ThreadLocal是否会引起内存泄漏也是一个比较有讨论性的问题,原因如下:
(1)如果一个ThreadLocal对象被回收了,里面放的value对于[前线程->当前线程的threadLocals
(ThreadLocal.ThreadLocalMap对象)->Entry数组->某个entry.value]是一条强引用链是可达的因此value不会被回收,导致其内Entry一直存在。
(2)ThreadLocal不会引起内存泄漏的是因为ThreadLocal.ThreadLocalMap源码实现中在get和set中都有一套自我清理的机制。
其实ThreadLocal导致内存泄漏如果是独立的线程使用,是不会发生内存泄漏的,线程用完就伴随着销毁,真正出现问题的是在线程池中,线程池最大的特点是线程复用,里面的线程寿命很长,大对象长期不回收会影响系统安全和效率。
避免内存泄漏的最好办法是:线程(任务)执行完毕时,可显示调佣remove方法,可帮助GC进行回收;