并发编程-ConcurrentHashMap线程安全的哈希表_并发安全的hash表
- ConcurrentHashMap是JUC中的的一种线程安全的哈希表实现
- HashMap在多线程环境下扩容会出现CPU占用接近100%的情况,因为HashMap并不是线程安全的,我们可以通过
Collections.synchronizedMap(Map m)
将HashMap包装成一个线程安全的map
public V put(K key, V value) { synchronized (mutex) {return m.put(key, value);}}
- 不过,这种方式并不是最优雅的。相对于HashMap,ConcurrentHashMap就是线程安全的map,其中利用了锁分段的思想大大提高了并发的效率
- ConcurrentHashMap在1.8版本舍弃了segment,并且使用了synchronized+CAS以保证线程安全性
- 为什么不用ReentrantLock而是sync呢?
- 实际上,sync做了很多的优化,包括偏向锁、轻量级锁、重量级锁,可以一次向上升级锁状态,因此,sync相较于ReentrantLock的性能其实差不多,甚至在某些情况下更优
ConcurrentHashMap的变化
JDK1.7
- ConcurrentHashMap在JDK1.7中,提供了一种更细的加锁机制,这种机制叫分段锁。
- 整个哈希表被分为多段。每个段都独立锁定。读取操作不需要加锁,写入操作仅锁定相关的段。这减小了锁冲突的几率,从而提高并发性能
- 这种机制的优点:在并发环境下将实现更高的吞吐量,而在单线程环境下只损失非常小的性能
- 有些方法需要跨段,比如size(),isEmpty(),containsValue(),它们可能需要锁定整个表而不仅仅是某个段,这些需要按顺序锁定所有短,操作完后,需要按顺序释放所有段的锁
- ConcurrentHashMap是由Segment数据结构和HashEntry数组构成。Segment是一种可重入锁ReentrantLcok,HashEntry则用于存储键值对数据
- ConcurrentHashMap读写过程如下:
- get方法:
- 为输入的key做Hash运算,得到hash值
- 再次通过hash值,定位到Segment当中数据的具体位置
- 通过hash的值,定位到对应的Segment对象
- put方法:
- 为输入的key做Hash运算,得到hash值
- 释放锁
- 插入或覆盖HashEntry对象
- 再次通过hash值,定位到Segment当中数据的具体位置
- 获取重入锁
- 通过hash值,定位到对应的Segment对象
- get方法:
JDK1.8
- 在JDK1.8中,ConcurrentHashMap主要做了两个优化:
- 同HashMap一样,链表也会在长度达到8的时候转换为红黑树,这样可以提升大量冲突时候的查询效率
- 以某个位置的头节点(链表的头结点或红黑树的root节点)为锁,配合自旋+CAS避免不必要的锁开销,进一步提升并发性能
- 相比于JDK1.7中的ConcurrentHashMap,JDK1.8中的取消了Segment分段锁,采用CAS+sync来保证并发安全
- JDK1.8 中的 ConcurrentHashMap 对节点 Node 类中的共享变量,和 JDK1.7 一样,使用 volatile 关键字,保证多线程操作时,变量的可见性
static class Node implements Map.Entry { final int hash; final K key; volatile V val; volatile Node next; Node(int hash, K key, V val, Node next) { this.hash = hash; this.key = key; this.val = val; this.next = next; }......}
ConcurrentHashMap的字段
transient volatile Node[] table;
装载Node的数组,作为ConcurrentHashMap的底层容器。采用懒加载方式。直到第一次插入数据的时候才会进行初始化操作,数组的大小总是为2的幂次方
private transient volatile Node[] nextTable;
扩容时使用,平时为nul,只有在扩容的时候才为非null
private transient volatile int sizeCtl;
该属性用来控制table数据的大小,根据是否初始化和是否正在扩容有几种情况:
当值为负数时:如果为-1表示正在初始化,如果为-N则表示当前正有N-1个线程进行扩容操作
当值为正数时:如果当前数组为null的话表示table在初始化过程中,sizeCtl表示为需要新建数组的长度;若已经初始化了,表示当前数据容器(table数组)可用容量,也可以理解成临界值(插入节点数超过了该临界值就需要扩容),具体指为数组的长度n乘以加载因此loadFactor;
当值为0时:即数组长度为默认初始值
private static final Unsafe U = Unsafe.getUnsafe();
在ConcurrentHashMap的实现中,可以看到用了大量的U.compareAndSwapXXX
方法去修改ConcurrentHashMap的一些属性
ConcurrentHashMap的内部类
static class Node implements Map.Entry { final int hash; final K key; volatile V val; volatile Node next; 。。。。。。。}
- Node类实现了Map.Entry接口,主要存放key-value对,并且具有next域
- 另外volatile保证内存可见性
static final class TreeNode extends Node { TreeNode parent; // red-black tree links TreeNode left; TreeNode right; TreeNode prev; // needed to unlink next upon deletion boolean red; 。。。。。。。}
- 树节点,继承于承载数据的Node类。红黑树的操作是针对TreeBin类的,从该类的注释,也可以看出,TreeBin是对TreeNode的再一次封装
static final class TreeBin extends Node { TreeNode root; volatile TreeNode first; volatile Thread waiter; volatile int lockState; // values for lockState static final int WRITER = 1; // set while holding write lock static final int WAITER = 2; // set when waiting for write lock static final int READER = 4; // increment value for setting read lock 。。。。。。。。}
- 这个类并不负责用户的key、value信息,而是封装了很多TreeNode节点。实际的ConcurrentHashMap数组中,存放的都是TreeBin对象,而不是TreeNode对象
static final class ForwardingNode extends Node { final Node[] nextTable; ForwardingNode(Node[] tab) { super(MOVED, null, null); this.nextTable = tab; } 。。。。。。。}
- 在扩容会出现的特殊节点,其key、value、hash全部为null。并拥有nextTable引用的新table数组
ConcurrentHashMap的CAS
- 下面是几个常用的利用CAS算法来保证线程安全的操作
static final Node tabAt(Node[] tab, int i) { return (Node)U.getReferenceAcquire(tab, ((long)i << ASHIFT) + ABASE);}
- 该方法用来获取table数组中索引为i的Node元素
static final boolean casTabAt(Node[] tab, int i, Node c, Node v) { return U.compareAndSetReference(tab, ((long)i << ASHIFT) + ABASE, c, v);}
- 利用CAS操作设置table数据索引为i的元素
static final void setTabAt(Node[] tab, int i, Node v) { U.putReferenceRelease(tab, ((long)i << ASHIFT) + ABASE, v);}
- 该方法用来设置table数组中索引为i的元素
ConcurrentHashMap的方法
构造方法
//构造一个空的map,即table数组还未初始化,初始化放在第一次插入数据时,默认大小为16public ConcurrentHashMap() {}//给定map的大小public ConcurrentHashMap(int initialCapacity) { this(initialCapacity, LOAD_FACTOR, 1);}//给定一个Mappublic ConcurrentHashMap(Map m) { this.sizeCtl = DEFAULT_CAPACITY; putAll(m);}//给定map的大小以及加载因子public ConcurrentHashMap(int initialCapacity, float loadFactor) { this(initialCapacity, loadFactor, 1);}//给定map大小、加载因子、并发度(预计同时操作数据的线程)public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { //校验入参是否合法 if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); //设置最小容量:确保初始化容量不小于并发级别 if (initialCapacity = (long)MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int)size);//tableSizeFor将计算结果转化为2的幂次方 this.sizeCtl = cap;}
- 调用构造方法之后,sizeCtl的大小就代表ConcurrentHashMap的大小,即table数组的长度
- 调用构造方法并计算出table数组的长度,此时还没有完成初始化,当第一项ConcurrentHashMap插入数据才真正完成初始化,并创建tabl数组
initTable方法
private final Node[] initTable() { Node[] tab; int sc; //循环检查:确保只有一条线程能初始化哈希表 while ((tab = table) == null || tab.length == 0) { if ((sc = sizeCtl) 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings(\"unchecked\") Node[] nt = (Node[])new Node[n]; table = tab = nt; //设置扩容阈值:下次扩的的阈值 sc = n - (n >>> 2); } } finally { sizeCtl = sc; } break; } } return tab;}
put方法
final V putVal(K key, V value, boolean onlyIfAbsent) { //参数校验 if (key == null || value == null) throw new NullPointerException(); //计算hash值 int hash = spread(key.hashCode()); int binCount = 0; for (Node[] tab = table;;) { Node f; int n, i, fh; K fk; V fv; //如果表未被初始化,则通过initTable初始化 if (tab == null || (n = tab.length) == 0) tab = initTable(); //定位桶位:通过(n - 1) & hash)定位到对应的桶 //若桶为空,使用CAS插入新节点 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node(hash, key, value))) break; // no lock when adding to empty bin } //若桶头节点为MOVED,说明当前正在扩容,协助迁移数据 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); //检查是否已存在键,已存在则直接返回旧值 else if (onlyIfAbsent // check first node without acquiring lock && fh == hash && ((fk = f.key) == key || (fk != null && key.equals(fk))) && (fv = f.val) != null) return fv; //加锁插入节点 else { V oldVal = null; synchronized (f) { //若为链表节点,加锁后插入或更新 if (tabAt(tab, i) == f) { if (fh >= 0) { binCount = 1; for (Node e = f;; ++binCount) { K ek; //找到hash值相同的key,覆盖旧值 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node pred = e; if ((e = e.next) == null) { //如果链表末尾仍未找到,则直接将新值插入到链表尾部 pred.next = new Node(hash, key, value); break; } } } //若为红黑树节点,调用putTreeVal插入或更新 else if (f instanceof TreeBin) { Node p; binCount = 2; if ((p = ((TreeBin)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } //若为ReservationNode,抛出异常,防止递归更新 else if (f instanceof ReservationNode) throw new IllegalStateException(\"Recursive update\"); } } //链表转红黑树 if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } //更新计数并可能触发扩容 addCount(1L, binCount); return null;}
put方法总结:
- 对每一个放入的值,先用spread方法对key进行hash运算,用来确定这个值在table中的位置
- 如果当前table数据还未初始化,进行初始化操作
- 如果这个位置是null,那么使用CAS操作直接放入
- 如果这个位置存在节点,说明发生了hash冲突,先判断这个节点类型,如果节点是MOVED的话,说明正在进行扩容
- 如果是链表节点(fh>0),先获取头节点,在依次向后变量确定这个新加入的节点的位置。如果遇到key相同的节点,直接覆盖。否则在链表尾插入
- 如果这个节点的类型是TreeBin,直接调用红黑树的插入方法插入新的节点
- 插入完节点之后,再次检查链表的长度,如果长度大于8,就把这个链表转换为红黑树
- 对当前容量大小进行检查,如果超过临界值,就需要扩容
get方法
public V get(Object key) { Node[] tab; Node e, p; int n, eh; K ek; //hash运算,定位位置 int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { //如果桶中的第一个节点匹配键,则直接返回对应的值 if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } //如果桶中的结点是树节点,则调用find方法在树中查找 else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; //否则遍历链表查找匹配的键并返回值 while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null;}
- 哈希:对传入的键的哈希值进行散列,这有助于减少哈希冲突的可能性。使用spread方法可以保证不同的键更均匀的分布在桶数组中
- 直接查找:查找的第一步是检查键的哈希值是否位于表的正确位置。如果在该桶的第一个元素中找到了键,则直接返回该元素的值。这里使用==操作符和equals方法来比较键,有助于处理可能的null值和确保正确的相等性
- 红黑树查找:如果第一个节点的hash小于0,那么这个桶的数据结构是红黑树。这种情况下,使用find方法在红黑树中查找键
- 链表查找:如果前面两个条件都不满足,那么代码将遍历该桶中的链表。如果在来链表中找到了具有相同hash和键的元素,则返回其值,如果变量完整个链表都未找到,则返回null
transfer方法
private final void transfer(Node[] tab, Node[] nextTab) { int n = tab.length, stride; //计算每个线程迁移任务的步长stride if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range //初始化新表,大小为原来的两倍 if (nextTab == null) { // initiating try { @SuppressWarnings(\"unchecked\") Node[] nt = (Node[])new Node[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; transferIndex = n; } int nextn = nextTab.length; ForwardingNode fwd = new ForwardingNode(nextTab); boolean advance = true; boolean finishing = false; // to ensure sweep before committing nextTab for (int i = 0, bound = 0;;) { Node f; int fh; //确定迁移任务的范围 //使用transferIndex控制迁移任务的划分与分配,确保多个线程能安全的分工迁移桶 while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) advance = false; else if ((nextIndex = transferIndex) stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } if (i = n || i + n >= nextn) { int sc; //迁移完成处理 if (finishing) { nextTable = null; table = nextTab; sizeCtl = (n <>> 1);//更新sizeCtl为新的扩容阈值 return; } if (U.compareAndSetInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; i = n; // recheck before commit } } //空桶处理 else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); //已迁移桶处理 else if ((fh = f.hash) == MOVED) advance = true; // already processed else { synchronized (f) { if (tabAt(tab, i) == f) { //分为高位链表和低位链表 Node ln, hn; //链表迁移 if (fh >= 0) { int runBit = fh & n; Node lastRun = f; for (Node p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for (Node p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node(ph, pk, pv, ln); else hn = new Node(ph, pk, pv, hn); } //设置低位链表到旧索引位置 setTabAt(nextTab, i, ln); //设置高位链表到i+n位置 setTabAt(nextTab, i + n, hn); //设置转发节点 setTabAt(tab, i, fwd); advance = true; } //红黑树迁移 else if (f instanceof TreeBin) { TreeBin t = (TreeBin)f; TreeNode lo = null, loTail = null; TreeNode hi = null, hiTail = null; int lc = 0, hc = 0; for (Node e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode p = new TreeNode (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } else if (f instanceof ReservationNode) throw new IllegalStateException(\"Recursive update\"); } } } }}
整个扩容操作分为两个部分:
- 第一部分:是构建一个nextTable,它的容量是原来的两倍,这个操作是单线程完成的
- 第二部分:是将原来table中的元素复制到nextTable中,主要是遍历复制的过程。得到当前遍历的数组位置i,然后利用tabAt方法获取i位置的元素:
- 如果这个位置为空,就在原table中的i放入forwardNode节点,这个是触发并发扩容的关键
- 如果这个位置是Node节点(fh>=0),并且是链表的头结点,就把这个链表分裂成两个链表,把他们分别放在nextTable的i和i+n的位置
- 如果这个位置是TreeBin,也做一个反序处理,并且判断是否需要untreeify,把处理结果分别放在nexttable的i和i+n的位置上
- 遍历所有的结点,就完成复制工作,这时让nextTable作为新的table,并且更新sizeCtl为新容量的0.75倍,完成扩容
size相关方法
- size方法返回Map中的元素数量,但是结果被限制在 Integer.MAX_VALUE 内。如果超过,返回 Integer.MAX_VALUE 内。如果小于0,则返回0
public int size() { long n = sumCount(); return ((n (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n);}
- mappingCount 方法也返回 Map 中的元素数量,但允许返回一个 long 值,因此可以表示大于 Integer.MAX_VALUE 的数量。与
size()
方法类似,该方法也会忽略负值,返回 0。
public long mappingCount() { long n = sumCount(); return (n < 0L) ? 0L : n; // ignore transient negative values}
- sumCount 方法计算 Map 的实际大小。ConcurrentHashMap 使用一个基础计数 baseCount 和一个 CounterCell 数组 counterCells 来跟踪大小。这种结构有助于减少多线程环境中的争用,因为不同的线程可能会更新不同的 CounterCell。
final long sumCount() { CounterCell[] cs = counterCells; long sum = baseCount; if (cs != null) { for (CounterCell c : cs) if (c != null) sum += c.value; } return sum;}
ConcurrentHashMap示例
package org.example.code_case.Thread;import java.util.concurrent.ConcurrentHashMap;/** * @Author: what * @Date: 2025/7/22 * @Description: TODO **/public class ConcurrentHashMap示例 { private final ConcurrentHashMap visitCountMap; public ConcurrentHashMap示例() { this.visitCountMap = new ConcurrentHashMap(); } // 用户访问时调用的方法 public void userVisited(String userId) { visitCountMap.compute(userId, (key, value) -> value == null ? 1 : value + 1); } // 获取用户的访问次数 public int getVisitCount(String userId) { return visitCountMap.getOrDefault(userId, 0); } public static void main(String[] args) { ConcurrentHashMap示例 counter = new ConcurrentHashMap示例(); // 模拟用户访问 counter.userVisited(\"user1\"); counter.userVisited(\"user1\"); counter.userVisited(\"user2\"); System.out.println(\"User1 visit count: \" + counter.getVisitCount(\"user1\")); // 输出: User1 visit count: 2 System.out.println(\"User2 visit count: \" + counter.getVisitCount(\"user2\")); // 输出: User2 visit count: 1 }}
小结
ConcurrentHashMap 是线程安全的,支持完全并发的读取,并且有很多线程可以同时执行写入。在早期版本(例如 JDK 1.7)中,ConcurrentHashMap 使用分段锁技术。整个哈希表被分成一些段(Segment),每个段独立加锁。这样,在不同段上的操作可以并发进行。从 JDK 1.8 开始,ConcurrentHashMap 的内部实现有了很大的变化。它放弃了分段锁技术,转而采用了更先进的并发控制策略,如 CAS 操作和红黑树等,进一步提高了并发性能。