> 技术文档 > 并发编程-ConcurrentHashMap线程安全的哈希表_并发安全的hash表

并发编程-ConcurrentHashMap线程安全的哈希表_并发安全的hash表

  • ConcurrentHashMap是JUC中的的一种线程安全的哈希表实现
  • HashMap在多线程环境下扩容会出现CPU占用接近100%的情况,因为HashMap并不是线程安全的,我们可以通过Collections.synchronizedMap(Map m)将HashMap包装成一个线程安全的map
public V put(K key, V value) { synchronized (mutex) {return m.put(key, value);}}
  • 不过,这种方式并不是最优雅的。相对于HashMap,ConcurrentHashMap就是线程安全的map,其中利用了锁分段的思想大大提高了并发的效率
  • ConcurrentHashMap在1.8版本舍弃了segment,并且使用了synchronized+CAS以保证线程安全性
  • 为什么不用ReentrantLock而是sync呢?
    • 实际上,sync做了很多的优化,包括偏向锁、轻量级锁、重量级锁,可以一次向上升级锁状态,因此,sync相较于ReentrantLock的性能其实差不多,甚至在某些情况下更优

ConcurrentHashMap的变化

JDK1.7

  • ConcurrentHashMap在JDK1.7中,提供了一种更细的加锁机制,这种机制叫分段锁。
  • 整个哈希表被分为多段。每个段都独立锁定。读取操作不需要加锁,写入操作仅锁定相关的段。这减小了锁冲突的几率,从而提高并发性能
  • 这种机制的优点:在并发环境下将实现更高的吞吐量,而在单线程环境下只损失非常小的性能
  • 有些方法需要跨段,比如size(),isEmpty(),containsValue(),它们可能需要锁定整个表而不仅仅是某个段,这些需要按顺序锁定所有短,操作完后,需要按顺序释放所有段的锁

  • ConcurrentHashMap是由Segment数据结构和HashEntry数组构成。Segment是一种可重入锁ReentrantLcok,HashEntry则用于存储键值对数据
  • ConcurrentHashMap读写过程如下:
    • get方法:
      • 为输入的key做Hash运算,得到hash值
      • 再次通过hash值,定位到Segment当中数据的具体位置
      • 通过hash的值,定位到对应的Segment对象
    • put方法:
      • 为输入的key做Hash运算,得到hash值
      • 释放锁
      • 插入或覆盖HashEntry对象
      • 再次通过hash值,定位到Segment当中数据的具体位置
      • 获取重入锁
      • 通过hash值,定位到对应的Segment对象

JDK1.8

  • 在JDK1.8中,ConcurrentHashMap主要做了两个优化:
    • 同HashMap一样,链表也会在长度达到8的时候转换为红黑树,这样可以提升大量冲突时候的查询效率
    • 以某个位置的头节点(链表的头结点或红黑树的root节点)为锁,配合自旋+CAS避免不必要的锁开销,进一步提升并发性能

  • 相比于JDK1.7中的ConcurrentHashMap,JDK1.8中的取消了Segment分段锁,采用CAS+sync来保证并发安全
  • JDK1.8 中的 ConcurrentHashMap 对节点 Node 类中的共享变量,和 JDK1.7 一样,使用 volatile 关键字,保证多线程操作时,变量的可见性
static class Node implements Map.Entry { final int hash; final K key; volatile V val; volatile Node next; Node(int hash, K key, V val, Node next) { this.hash = hash; this.key = key; this.val = val; this.next = next; }......}

ConcurrentHashMap的字段

  • transient volatile Node[] table;

装载Node的数组,作为ConcurrentHashMap的底层容器。采用懒加载方式。直到第一次插入数据的时候才会进行初始化操作,数组的大小总是为2的幂次方

  • private transient volatile Node[] nextTable;

扩容时使用,平时为nul,只有在扩容的时候才为非null

  • private transient volatile int sizeCtl;

该属性用来控制table数据的大小,根据是否初始化和是否正在扩容有几种情况:

当值为负数时:如果为-1表示正在初始化,如果为-N则表示当前正有N-1个线程进行扩容操作

当值为正数时:如果当前数组为null的话表示table在初始化过程中,sizeCtl表示为需要新建数组的长度;若已经初始化了,表示当前数据容器(table数组)可用容量,也可以理解成临界值(插入节点数超过了该临界值就需要扩容),具体指为数组的长度n乘以加载因此loadFactor;

当值为0时:即数组长度为默认初始值

  • private static final Unsafe U = Unsafe.getUnsafe();

在ConcurrentHashMap的实现中,可以看到用了大量的U.compareAndSwapXXX方法去修改ConcurrentHashMap的一些属性

ConcurrentHashMap的内部类

static class Node implements Map.Entry { final int hash; final K key; volatile V val; volatile Node next; 。。。。。。。}
  • Node类实现了Map.Entry接口,主要存放key-value对,并且具有next域
  • 另外volatile保证内存可见性
static final class TreeNode extends Node { TreeNode parent; // red-black tree links TreeNode left; TreeNode right; TreeNode prev; // needed to unlink next upon deletion boolean red; 。。。。。。。}
  • 树节点,继承于承载数据的Node类。红黑树的操作是针对TreeBin类的,从该类的注释,也可以看出,TreeBin是对TreeNode的再一次封装
static final class TreeBin extends Node { TreeNode root; volatile TreeNode first; volatile Thread waiter; volatile int lockState; // values for lockState static final int WRITER = 1; // set while holding write lock static final int WAITER = 2; // set when waiting for write lock static final int READER = 4; // increment value for setting read lock 。。。。。。。。}
  • 这个类并不负责用户的key、value信息,而是封装了很多TreeNode节点。实际的ConcurrentHashMap数组中,存放的都是TreeBin对象,而不是TreeNode对象
static final class ForwardingNode extends Node { final Node[] nextTable; ForwardingNode(Node[] tab) { super(MOVED, null, null); this.nextTable = tab; } 。。。。。。。}
  • 在扩容会出现的特殊节点,其key、value、hash全部为null。并拥有nextTable引用的新table数组

ConcurrentHashMap的CAS

  • 下面是几个常用的利用CAS算法来保证线程安全的操作
static final  Node tabAt(Node[] tab, int i) { return (Node)U.getReferenceAcquire(tab, ((long)i << ASHIFT) + ABASE);}
  • 该方法用来获取table数组中索引为i的Node元素
static final  boolean casTabAt(Node[] tab, int i,  Node c, Node v) { return U.compareAndSetReference(tab, ((long)i << ASHIFT) + ABASE, c, v);}
  • 利用CAS操作设置table数据索引为i的元素
static final  void setTabAt(Node[] tab, int i, Node v) { U.putReferenceRelease(tab, ((long)i << ASHIFT) + ABASE, v);}
  • 该方法用来设置table数组中索引为i的元素

ConcurrentHashMap的方法

构造方法

//构造一个空的map,即table数组还未初始化,初始化放在第一次插入数据时,默认大小为16public ConcurrentHashMap() {}//给定map的大小public ConcurrentHashMap(int initialCapacity) { this(initialCapacity, LOAD_FACTOR, 1);}//给定一个Mappublic ConcurrentHashMap(Map m) { this.sizeCtl = DEFAULT_CAPACITY; putAll(m);}//给定map的大小以及加载因子public ConcurrentHashMap(int initialCapacity, float loadFactor) { this(initialCapacity, loadFactor, 1);}//给定map大小、加载因子、并发度(预计同时操作数据的线程)public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { //校验入参是否合法 if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); //设置最小容量:确保初始化容量不小于并发级别 if (initialCapacity = (long)MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int)size);//tableSizeFor将计算结果转化为2的幂次方 this.sizeCtl = cap;}
  • 调用构造方法之后,sizeCtl的大小就代表ConcurrentHashMap的大小,即table数组的长度
  • 调用构造方法并计算出table数组的长度,此时还没有完成初始化,当第一项ConcurrentHashMap插入数据才真正完成初始化,并创建tabl数组

initTable方法

private final Node[] initTable() { Node[] tab; int sc; //循环检查:确保只有一条线程能初始化哈希表 while ((tab = table) == null || tab.length == 0) { if ((sc = sizeCtl)  0) ? sc : DEFAULT_CAPACITY;  @SuppressWarnings(\"unchecked\")  Node[] nt = (Node[])new Node[n];  table = tab = nt;  //设置扩容阈值:下次扩的的阈值  sc = n - (n >>> 2); } } finally { sizeCtl = sc; } break; } } return tab;}

put方法

final V putVal(K key, V value, boolean onlyIfAbsent) { //参数校验 if (key == null || value == null) throw new NullPointerException(); //计算hash值 int hash = spread(key.hashCode()); int binCount = 0; for (Node[] tab = table;;) { Node f; int n, i, fh; K fk; V fv; //如果表未被初始化,则通过initTable初始化 if (tab == null || (n = tab.length) == 0) tab = initTable(); //定位桶位:通过(n - 1) & hash)定位到对应的桶 //若桶为空,使用CAS插入新节点 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node(hash, key, value))) break;  // no lock when adding to empty bin } //若桶头节点为MOVED,说明当前正在扩容,协助迁移数据 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); //检查是否已存在键,已存在则直接返回旧值 else if (onlyIfAbsent // check first node without acquiring lock  && fh == hash  && ((fk = f.key) == key || (fk != null && key.equals(fk)))  && (fv = f.val) != null) return fv; //加锁插入节点 else { V oldVal = null; synchronized (f) { //若为链表节点,加锁后插入或更新 if (tabAt(tab, i) == f) {  if (fh >= 0) { binCount = 1; for (Node e = f;; ++binCount) { K ek; //找到hash值相同的key,覆盖旧值 if (e.hash == hash && ((ek = e.key) == key ||  (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent)  e.val = value; break; } Node pred = e; if ((e = e.next) == null) { //如果链表末尾仍未找到,则直接将新值插入到链表尾部 pred.next = new Node(hash, key, value); break; } }  }  //若为红黑树节点,调用putTreeVal插入或更新  else if (f instanceof TreeBin) { Node p; binCount = 2; if ((p = ((TreeBin)f).putTreeVal(hash, key,  value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; }  }  //若为ReservationNode,抛出异常,防止递归更新  else if (f instanceof ReservationNode) throw new IllegalStateException(\"Recursive update\"); } } //链表转红黑树 if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD)  treeifyBin(tab, i); if (oldVal != null)  return oldVal; break; } } } //更新计数并可能触发扩容 addCount(1L, binCount); return null;}

put方法总结:

  • 对每一个放入的值,先用spread方法对key进行hash运算,用来确定这个值在table中的位置
  • 如果当前table数据还未初始化,进行初始化操作
  • 如果这个位置是null,那么使用CAS操作直接放入
  • 如果这个位置存在节点,说明发生了hash冲突,先判断这个节点类型,如果节点是MOVED的话,说明正在进行扩容
  • 如果是链表节点(fh>0),先获取头节点,在依次向后变量确定这个新加入的节点的位置。如果遇到key相同的节点,直接覆盖。否则在链表尾插入
  • 如果这个节点的类型是TreeBin,直接调用红黑树的插入方法插入新的节点
  • 插入完节点之后,再次检查链表的长度,如果长度大于8,就把这个链表转换为红黑树
  • 对当前容量大小进行检查,如果超过临界值,就需要扩容

get方法

public V get(Object key) { Node[] tab; Node e, p; int n, eh; K ek; //hash运算,定位位置 int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { //如果桶中的第一个节点匹配键,则直接返回对应的值 if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } //如果桶中的结点是树节点,则调用find方法在树中查找 else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; //否则遍历链表查找匹配的键并返回值 while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null;}
  • 哈希:对传入的键的哈希值进行散列,这有助于减少哈希冲突的可能性。使用spread方法可以保证不同的键更均匀的分布在桶数组中
  • 直接查找:查找的第一步是检查键的哈希值是否位于表的正确位置。如果在该桶的第一个元素中找到了键,则直接返回该元素的值。这里使用==操作符和equals方法来比较键,有助于处理可能的null值和确保正确的相等性
  • 红黑树查找:如果第一个节点的hash小于0,那么这个桶的数据结构是红黑树。这种情况下,使用find方法在红黑树中查找键
  • 链表查找:如果前面两个条件都不满足,那么代码将遍历该桶中的链表。如果在来链表中找到了具有相同hash和键的元素,则返回其值,如果变量完整个链表都未找到,则返回null

transfer方法

private final void transfer(Node[] tab, Node[] nextTab) { int n = tab.length, stride; //计算每个线程迁移任务的步长stride if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range //初始化新表,大小为原来的两倍 if (nextTab == null) { // initiating try { @SuppressWarnings(\"unchecked\") Node[] nt = (Node[])new Node[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; transferIndex = n; } int nextn = nextTab.length; ForwardingNode fwd = new ForwardingNode(nextTab); boolean advance = true; boolean finishing = false; // to ensure sweep before committing nextTab for (int i = 0, bound = 0;;) { Node f; int fh; //确定迁移任务的范围 //使用transferIndex控制迁移任务的划分与分配,确保多个线程能安全的分工迁移桶 while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) advance = false; else if ((nextIndex = transferIndex)  stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } if (i = n || i + n >= nextn) { int sc; //迁移完成处理 if (finishing) { nextTable = null; table = nextTab; sizeCtl = (n <>> 1);//更新sizeCtl为新的扩容阈值 return; } if (U.compareAndSetInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)  return; finishing = advance = true; i = n; // recheck before commit } } //空桶处理 else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); //已迁移桶处理 else if ((fh = f.hash) == MOVED) advance = true; // already processed else { synchronized (f) { if (tabAt(tab, i) == f) {  //分为高位链表和低位链表  Node ln, hn;  //链表迁移  if (fh >= 0) { int runBit = fh & n; Node lastRun = f; for (Node p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for (Node p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node(ph, pk, pv, ln); else hn = new Node(ph, pk, pv, hn); } //设置低位链表到旧索引位置 setTabAt(nextTab, i, ln); //设置高位链表到i+n位置 setTabAt(nextTab, i + n, hn); //设置转发节点 setTabAt(tab, i, fwd); advance = true;  }  //红黑树迁移  else if (f instanceof TreeBin) { TreeBin t = (TreeBin)f; TreeNode lo = null, loTail = null; TreeNode hi = null, hiTail = null; int lc = 0, hc = 0; for (Node e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode p = new TreeNode (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null)  lo = p; else  loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null)  hi = p; else  hiTail.next = p; hiTail = p; ++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true;  }  else if (f instanceof ReservationNode) throw new IllegalStateException(\"Recursive update\"); } } } }}

整个扩容操作分为两个部分:

  • 第一部分:是构建一个nextTable,它的容量是原来的两倍,这个操作是单线程完成的
  • 第二部分:是将原来table中的元素复制到nextTable中,主要是遍历复制的过程。得到当前遍历的数组位置i,然后利用tabAt方法获取i位置的元素:
    • 如果这个位置为空,就在原table中的i放入forwardNode节点,这个是触发并发扩容的关键
    • 如果这个位置是Node节点(fh>=0),并且是链表的头结点,就把这个链表分裂成两个链表,把他们分别放在nextTable的i和i+n的位置
    • 如果这个位置是TreeBin,也做一个反序处理,并且判断是否需要untreeify,把处理结果分别放在nexttable的i和i+n的位置上
    • 遍历所有的结点,就完成复制工作,这时让nextTable作为新的table,并且更新sizeCtl为新容量的0.75倍,完成扩容

size相关方法

  • size方法返回Map中的元素数量,但是结果被限制在 Integer.MAX_VALUE 内。如果超过,返回 Integer.MAX_VALUE 内。如果小于0,则返回0
public int size() { long n = sumCount(); return ((n  (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n);}
  • mappingCount 方法也返回 Map 中的元素数量,但允许返回一个 long 值,因此可以表示大于 Integer.MAX_VALUE 的数量。与 size() 方法类似,该方法也会忽略负值,返回 0。
public long mappingCount() { long n = sumCount(); return (n < 0L) ? 0L : n; // ignore transient negative values}
  • sumCount 方法计算 Map 的实际大小。ConcurrentHashMap 使用一个基础计数 baseCount 和一个 CounterCell 数组 counterCells 来跟踪大小。这种结构有助于减少多线程环境中的争用,因为不同的线程可能会更新不同的 CounterCell。
 final long sumCount() { CounterCell[] cs = counterCells; long sum = baseCount; if (cs != null) { for (CounterCell c : cs) if (c != null) sum += c.value; } return sum;}

ConcurrentHashMap示例

package org.example.code_case.Thread;import java.util.concurrent.ConcurrentHashMap;/** * @Author: what * @Date: 2025/7/22 * @Description: TODO **/public class ConcurrentHashMap示例 { private final ConcurrentHashMap visitCountMap; public ConcurrentHashMap示例() { this.visitCountMap = new ConcurrentHashMap(); } // 用户访问时调用的方法 public void userVisited(String userId) { visitCountMap.compute(userId, (key, value) -> value == null ? 1 : value + 1); } // 获取用户的访问次数 public int getVisitCount(String userId) { return visitCountMap.getOrDefault(userId, 0); } public static void main(String[] args) { ConcurrentHashMap示例 counter = new ConcurrentHashMap示例(); // 模拟用户访问 counter.userVisited(\"user1\"); counter.userVisited(\"user1\"); counter.userVisited(\"user2\"); System.out.println(\"User1 visit count: \" + counter.getVisitCount(\"user1\")); // 输出: User1 visit count: 2 System.out.println(\"User2 visit count: \" + counter.getVisitCount(\"user2\")); // 输出: User2 visit count: 1 }}

小结

ConcurrentHashMap 是线程安全的,支持完全并发的读取,并且有很多线程可以同时执行写入。在早期版本(例如 JDK 1.7)中,ConcurrentHashMap 使用分段锁技术。整个哈希表被分成一些段(Segment),每个段独立加锁。这样,在不同段上的操作可以并发进行。从 JDK 1.8 开始,ConcurrentHashMap 的内部实现有了很大的变化。它放弃了分段锁技术,转而采用了更先进的并发控制策略,如 CAS 操作和红黑树等,进一步提高了并发性能。