> 文档中心 > Java并发编程(多线程) -- 第四部分(JUC - 1)

Java并发编程(多线程) -- 第四部分(JUC - 1)


十六、JUC下常用类(包含源码) - 第一部分

1. AQS原理

在这里插入图片描述

基本思想:
1.获取锁的逻辑:

while(state 状态不允许获取) {if(队列中还没有此线程) {入队并阻塞}}当前线程出队

2.释放锁的逻辑:

if(state 状态允许了) {恢复阻塞的线程(s)}

AQS 要实现的功能目标
阻塞版本获取锁 acquire 和非阻塞的版本尝试获取锁 tryAcquire
获取锁超时机制
通过打断取消机制
独占机制及共享机制
条件不满足时的等待机制

1. 概述

全称是 AbstractQueuedSynchronizer,是阻塞式锁和相关的同步器工具的框架

特点:
用 state 属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取锁和释放锁
getState - 获取 state 状态
setState - 设置 state 状态
compareAndSetState - cas 机制设置 state 状态

独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源
提供了基于 FIFO 的等待队列,类似于 Monitor 的 EntryList
条件变量来实现等待、唤醒机制,支持多个条件变量,类似于 Monitor 的 WaitSet

子类主要实现这样一些方法(默认抛出 UnsupportedOperationException)
tryAcquire
tryRelease
tryAcquireShared
tryReleaseShared
isHeldExclusively

2. AQS实现不可重入锁

public class MyLock implements Lock {    class MySync extends AbstractQueuedSynchronizer { @Override // 加锁 protected boolean tryAcquire(int arg) {     if (compareAndSetState(0, 1)) {  setExclusiveOwnerThread(Thread.currentThread());  return true;     }     return false; } @Override // 解锁 protected boolean tryRelease(int arg) {     setExclusiveOwnerThread(null);     setState(0);     return true; } @Override // 判断是否加锁 protected boolean isHeldExclusively() {     return getState() == 1; }  public Condition newCondition() {     return new ConditionObject(); }    } MySync sync = new MySync(); @Override    // 加锁,不成功进入队列    public void lock() { sync.acquire(1);    }    @Override    // 加锁,不成功进入队列,可打断    public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1);    }    @Override    // 尝试一次,不成功返回,不进入队列    public boolean tryLock() { return sync.tryAcquire(1);    }    @Override    // 带超时的尝试加锁,不成功进入队列    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(time));    }    @Override    // 释放锁    public void unlock() { sync.release(1);    }    @Override    // 生成条件变量    public Condition newCondition() { return sync.newCondition();    }}

2. ReentrantLock原理

在这里插入图片描述

1. 非公平锁实现原理

如果调用ReentrantLock(),构造方法默认使用的就是非公平锁,NonfairSync 继承自 AQS

public ReentrantLock() { sync = new NonfairSync();    }

加锁流程

没有竞争时,Thread - 0 拥有锁
static final class NonfairSync extends Sync {    private static final long serialVersionUID = 7316153563782823691L; final void lock() {  // 首先用 cas 尝试(仅尝试一次)将 state 从 0 改为 1, 如果成功表示获得了独占锁 if (compareAndSetState(0, 1))  // Thread - 0 获取CAS成功,直接设置持有对象     setExclusiveOwnerThread(Thread.currentThread()); else     acquire(1);     }......}

在这里插入图片描述

第一个竞争线程 Thread - 1 出现

在这里插入图片描述

Thread-1 执行步骤
1. CAS 尝试将 state 由 0 改为 1,结果失败

 final void lock() {  if (compareAndSetState(0, 1))      setExclusiveOwnerThread(Thread.currentThread());  else// Thread - 1 CAS竞争失败,进入acquire逻辑      acquire(1);     }

进入acquire()逻辑

 public final void acquire(int arg) { if (!tryAcquire(arg) &&     acquireQueued(addWaiter(Node.EXCLUSIVE), arg))     selfInterrupt();    }

2. 先进入 tryAcquire 逻辑,这时 state 已经是1,当前线程不是锁持有线程,结果仍然失败,返回false,取反为真,进入addWaiter()逻辑

protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);    }
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); // 如果还没有获得锁 if (c == 0) { // 没有线程占有锁,直接尝试用 cas 获得, 这里体现了非公平性: 不去检查 AQS 队列     if (compareAndSetState(0, acquires)) {  setExclusiveOwnerThread(current);  return true;     } } // 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入 else if (current == getExclusiveOwnerThread()) {     int nextc = c + acquires;     if (nextc < 0) // overflow  throw new Error("Maximum lock count exceeded");     setState(nextc);     return true; } // 结果仍然失败,返回false return false;    }

3. 接下来先进入 addWaiter 逻辑,构造 Node 队列

  • 图中黄色三角表示该 Node 的 waitStatus 状态,其中 0 为默认正常状态
  • Node 的创建是懒惰的
  • 其中第一个 Node 称为 Dummy(哑元)或哨兵,用来占位,并不关联线程
    在这里插入图片描述
/*1. 将新节点的前驱节点,指向之前的尾节点2. 将新节点设置为tail尾节点3. 最后将之前尾节点的后继节点指向新节点*/private Node addWaiter(Node mode) {// 将当前线程关联到一个 Node 对象上, 模式为独占模式 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail;  // 保存之前的尾节点 // 如果 tail 不为 null, cas 尝试将 Node 对象加入 AQS 队列尾部 if (pred != null) {     node.prev = pred;     if (compareAndSetTail(pred, node)) {  pred.next = node;  return node;     } } enq(node);  // 这一步,用作哑元节点的初始化,并添加新节点到尾部 return node;    }
/* 同上,只不过是多了初始化哑元节点1. 将新节点的前驱节点,指向之前的尾节点2. 将新节点设置为tail尾节点3. 最后将之前尾节点的后继节点指向新节点*/private Node enq(final Node node) { for (;;) {     Node t = tail; // 保存之前的尾节点     // 如果队列为空,设置head为哑元节点,然后再循环添加     if (t == null) { // Must initialize   if (compareAndSetHead(new Node()))      tail = head;     } else {  // 初始化完毕,进入else     // 将新节点的前驱节点,指向尾节点(哑元节点,因为头尾一致)  node.prev = t;  if (compareAndSetTail(t, node)) { // CAS将尾节点替换为新节点      t.next = node;  // 尾节点的      return t;  }     } }    }

4. 当前线程进入 acquireQueued 逻辑

1.acquireQueued 会在一个死循环中不断尝试获得锁,失败后进入 park 阻塞
2.如果自己是紧邻着 head(排第二位),那么再次 tryAcquire 尝试获取锁,当然这时 state 仍为 1,失败
3.进入 shouldParkAfterFailedAcquire 逻辑,将前驱 node,即 head 的 waitStatus 改为 -1,(注意:waitStatus就是图中节点上的菱形中的数值,0无意义,-1时则表示需要park该节点的下一个节点),这次返回 false
在这里插入图片描述
4.shouldParkAfterFailedAcquire 执行完毕回到 acquireQueued ,再次 tryAcquire 尝试获取锁,当然这时state 仍为 1,失败
5.当再次进入 shouldParkAfterFailedAcquire 时,这时因为其前驱 node 的 waitStatus 已经是 -1,这次返回true
6.进入 parkAndCheckInterrupt, 在等待队列中阻塞等待,Thread-1 park(灰色表示)
在这里插入图片描述

final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try {     boolean interrupted = false;     // acquireQueued 会在一个死循环中不断尝试获得锁,失败后进入 park 阻塞     for (;;) {final Node p = node.predecessor(); // 得到前驱节点  // 如果自己是紧邻着 head(排第二位),那么再次 tryAcquire 尝试获取锁  if (p == head && tryAcquire(arg)) {  // 获取成功, 设置自己(当前线程对应的 node)为 head      setHead(node);      p.next = null; // help GC      failed = false;      // 没有被中断,返回中断标记 false      return interrupted;  }    if (shouldParkAfterFailedAcquire(p, node) &&  // 判断是否应当 park      parkAndCheckInterrupt())  // park 等待, 此时 Node 的状态被置为 Node.SIGNAL      interrupted = true;     } } finally {     if (failed)  cancelAcquire(node); }    }
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; // 查看其前驱节点的waitStatus // Node.SIGNAL = -1, 为真表明要park该node节点(上一个节点都阻塞了,自己也阻塞) if (ws == Node.SIGNAL)     return true; // > 0 表示取消状态,线程不要了 if (ws > 0) {// 上一个节点取消, 那么重构删除前面所有取消的节点, 返回到外层循环重试     do {  node.prev = pred = pred.prev;     } while (pred.waitStatus > 0);     pred.next = node; } else {     // 这次还没有阻塞// 但下次如果重试不成功即再次tryAcquire()失败, 则需要阻塞// 这时需要设置上一个节点状态为 Node.SIGNAL     compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // 设置前驱结点的waitStatus为-1 } return false;    }
private final boolean parkAndCheckInterrupt() { LockSupport.park(this);  // park当前线程 return Thread.interrupted();    }

再次有多个线程经历上述过程竞争失败,变成这个样子
在这里插入图片描述

解锁流程

Thread - 0 释放锁

1. 进入 release() 方法,走tryRelease流程

// 解锁实现public void unlock() {sync.release(1);}

如果成功
设置 exclusiveOwnerThread 为 null;state = 0
当前队列不为 null,并且 head 的 waitStatus = -1,进入 unparkSuccessor 流程
找到队列中离 head 最近的一个 Node(没取消的),unpark 恢复其运行,将其从等待队列中退出,本例中即为 Thread-1

public final boolean release(int arg) {// 尝试释放锁 if (tryRelease(arg)) { // 成功则进行unpark操作     Node h = head;   // 当前队列不为 null,并且 head 的 waitStatus = -1     if (h != null && h.waitStatus != 0)  unparkSuccessor(h); // unpark AQS 中等待的线程     return true; } return false;    }
protected final boolean tryRelease(int releases) {int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread())     throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) {  // 支持锁重入, 只有 state 减为 0, 才释放成功     free = true;     setExclusiveOwnerThread(null);  // 设置持有线程为null } setState(c);  // 设置状态为0 return free;      }
private void unparkSuccessor(Node node) { // 如果状态为 Node.SIGNAL 尝试重置状态为 0// 不成功也可以 int ws = node.waitStatus; if (ws < 0)     compareAndSetWaitStatus(node, ws, 0);// 找到需要 unpark 的节点, 但本节点从 AQS 队列中脱离, 是由唤醒节点完成的 Node s = node.next;  // 不考虑已取消的节点, 从 AQS 队列从后至前找到队列最前面需要 unpark 的节点 if (s == null || s.waitStatus > 0) {     s = null;     for (Node t = tail; t != null && t != node; t = t.prev)  if (t.waitStatus <= 0)      s = t; } if (s != null)     LockSupport.unpark(s.thread);    }

2. 回到 Thread-1 的 acquireQueued 流程
如果加锁成功(没有竞争),在tryAcquire()中会设置,exclusiveOwnerThread 为 Thread-1,state = 1
然后 head 指向刚刚 Thread-1 所在的 Node,该 Node 清空 Thread
原本的 head 因为从链表断开,而可被垃圾回收

final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try {     boolean interrupted = false;     for (;;) {  final Node p = node.predecessor(); // 得到前驱节点  // 如果自己是紧邻着 head(排第二位),那么再次 tryAcquire 尝试获取锁  if (p == head && tryAcquire(arg)) {  // 获取成功, 设置自己(当前线程对应的 node)为 head      setHead(node);      p.next = null; // help GC      failed = false;      // 没有被中断,返回中断标记 false      return interrupted;  }    if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())      interrupted = true;     } } finally {     if (failed)  cancelAcquire(node); }    }

在这里插入图片描述

如果这时候有其它线程来竞争(非公平的体现),例如这时有 Thread-4 来了,如果不巧又被 Thread-4 占了先
Thread-4 被设置为 exclusiveOwnerThread,state = 1
Thread-1 再次进入 acquireQueued 流程,获取锁失败,重新进入 park 阻塞
在这里插入图片描述

2. 可重入原理

加锁时流程

final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) {     if (compareAndSetState(0, acquires)) {  setExclusiveOwnerThread(current);  return true;     } } // 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入 else if (current == getExclusiveOwnerThread()) {     int nextc = c + acquires;  // 锁状态增加     if (nextc < 0) // overflow  throw new Error("Maximum lock count exceeded");     setState(nextc);     return true; } return false;    }

解锁时流程

protected final boolean tryRelease(int releases) { int c = getState() - releases;  // 锁状态减少 if (Thread.currentThread() != getExclusiveOwnerThread())     throw new IllegalMonitorStateException(); boolean free = false; // 支持锁重入, 只有 state 减为 0, 才释放成功 if (c == 0) {     free = true;     setExclusiveOwnerThread(null); } setState(c); return free;    }

3. 不可打断 \ 可打断原理

不可打断模式
private final boolean parkAndCheckInterrupt() {// 如果被打断,打断标记为true,则park会失效 LockSupport.park(this); // 返回Thread.interrupted(),interrupted()会清除打断标记,以便可以再次park return Thread.interrupted(); // 打断后范围true    }
 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try {     boolean interrupted = false;     for (;;) {  final Node p = node.predecessor();  if (p == head && tryAcquire(arg)) {      setHead(node);      p.next = null; // help GC      failed = false;      // interrupted 标记在这里会被返回      return interrupted;  }  if (shouldParkAfterFailedAcquire(p, node) &&      parkAndCheckInterrupt())  // 这里返回true,进入if块      interrupted = true; // 只是标记打断状态为true,再次进入循环,依旧会被park     } } finally {     if (failed)  cancelAcquire(node); }    }
public final void acquire(int arg) { if (!tryAcquire(arg) &&     acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 这里打断标记返回true,进入if块     selfInterrupt();  // 这里会进行一次重新的打断    }
static void selfInterrupt() { Thread.currentThread().interrupt();    }

在此模式下,即使它被打断,仍会驻留在 AQS 队列中,一直要等到获得锁后方能得知自己被打断了;整个不可打断逻辑,保证的就是线程在没有获取锁的情况下不能被打断,只有获取到锁并且曾经被打断过,才会重新进行一次打断操作

可打断模式
public final void acquireInterruptibly(int arg)     throws InterruptedException { if (Thread.interrupted())     throw new InterruptedException(); // 如果没有获得到锁,进入doAcquireInterruptibly() if (!tryAcquire(arg))     doAcquireInterruptibly(arg);    }
private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try {     for (;;) {  final Node p = node.predecessor();  if (p == head && tryAcquire(arg)) {      setHead(node);      p.next = null; // help GC      failed = false;      return;  }  if (shouldParkAfterFailedAcquire(p, node) &&      parkAndCheckInterrupt())      // 只有这里的逻辑不一样, park 过程中如果被打断了,返回true进入此if块      // 直接抛出异常,表示被打断,不会再进入for(;;)尝试获取锁      throw new InterruptedException();     } } finally {     if (failed)  cancelAcquire(node); }    }

4. 公平锁实现原理

static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; final void lock() {     acquire(1); }......}
public final void acquire(int arg) { if (!tryAcquire(arg) &&     acquireQueued(addWaiter(Node.EXCLUSIVE), arg))     selfInterrupt();    }
protected final boolean tryAcquire(int acquires) {     final Thread current = Thread.currentThread();     int c = getState();     if (c == 0) {     // 先检查 AQS 队列中是否有前驱节点     // 没有前驱节点 或者 当前线程等于前驱节点中的线程才去竞争  if (!hasQueuedPredecessors() &&      compareAndSetState(0, acquires)) {      setExclusiveOwnerThread(current);      return true;  }     }     else if (current == getExclusiveOwnerThread()) {  int nextc = c + acquires;  if (nextc < 0)      throw new Error("Maximum lock count exceeded");  setState(nextc);  return true;     }     return false; }    }
public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; // // h != t 时(头尾不相等)表示队列中有 Node return h != t && // (s = h.next) == null 表示队列中还有没有老二,如果有返回true,没有返回false // 或者队列中老二线程不是此线程,如果不是返回true     ((s = h.next) == null || s.thread != Thread.currentThread());    }

5. 条件变量实现原理

await()流程

开始 Thread-0 持有锁,调用 await,进入 ConditionObject 的 addConditionWaiter 流程
创建新的 Node 状态为 -2(Node.CONDITION),关联 Thread-0,加入等待队列尾部
在这里插入图片描述
接下来进入 AQS 的 fullyRelease 流程,释放同步器上的锁
在这里插入图片描述
unpark AQS 队列中的下一个节点,竞争锁,假设没有其他竞争线程,那么 Thread-1 竞争成功
在这里插入图片描述
park 阻塞 Thread-0
在这里插入图片描述

public final void await() throws InterruptedException { if (Thread.interrupted())     throw new InterruptedException(); // 添加一个 Node 至队列 Node node = addConditionWaiter();// 释放节点持有的所有的锁 int savedState = fullyRelease(node); int interruptMode = 0; // 如果该节点还没有转移至 AQS 队列, 阻塞 while (!isOnSyncQueue(node)) {     // 在队列中,park 阻塞     LockSupport.park(this);     if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)  break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE)     interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled     unlinkCancelledWaiters(); if (interruptMode != 0)     reportInterruptAfterWait(interruptMode);    }
private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) {     unlinkCancelledWaiters();     t = lastWaiter; }// 设置新节点,设置节点状态为-2,表示被await,并添加到队列尾部 Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null)     firstWaiter = node; else     t.nextWaiter = node; lastWaiter = node; return node;    }
// 因为某线程可能重入,需要将 state 全部释放final int fullyRelease(Node node) { boolean failed = true; try {     int savedState = getState();     if (release(savedState)) {  failed = false;  return savedState;     } else {  throw new IllegalMonitorStateException();     } } finally {     if (failed)  node.waitStatus = Node.CANCELLED; }    }
// 释放锁的同时,唤醒下一个节点的线程public final boolean release(int arg) { if (tryRelease(arg)) {     Node h = head;     if (h != null && h.waitStatus != 0)  unparkSuccessor(h);     return true; } return false;    }
signal()流程

假设 Thread-1 要来唤醒 Thread-0
在这里插入图片描述
进入 ConditionObject 的 doSignal 流程,取得等待队列中第一个 Node,即 Thread-0 所在 Node
在这里插入图片描述
执行 transferForSignal 流程,将该 Node 加入 AQS 队列尾部,将 Thread-0 的 waitStatus 改为 0,Thread-3 的 waitStatus 改为 -1
在这里插入图片描述

public final void signal() {if (!isHeldExclusively())     throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null)     doSignal(first);    }
private void doSignal(Node first) { do { // 已经是尾节点了,即最后一个节点,将其置空     if ( (firstWaiter = first.nextWaiter) == null)  lastWaiter = null;     first.nextWaiter = null; // 将等待队列中的 Node 转移至 AQS 队列, 不成功(线程可能被取消)且队列还有节点则继续循环取下一个被await的Node } while (!transferForSignal(first) &&   (first = firstWaiter) != null);    }
// 该方法用于将唤醒的线程Node,放入等待队列尾部// 如果节点状态是取消, 返回 false 表示转移失败, 否则转移成功final boolean transferForSignal(Node node) { // 如果状态已经不是 Node.CONDITION, 说明被取消了 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))     return false;  Node p = enq(node);  // 将被唤醒的节点添加到等待队列尾部,返回其上一个节点 int ws = p.waitStatus; // ws > 0 表示上一个节点被取消 // !compareAndSetWaitStatus(p, ws, Node.SIGNAL),表示上一个节点状态设置为-1失败 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))     LockSupport.unpark(node.thread); return true;    }

3. 读写锁使用及原理

1. ReentrantReadWriteLock 基本使用

public class MyReadWriteLock {    public static void main(String[] args) { // 读读不阻塞 /*new Thread(()->{     read(); }, "t1").start(); new Thread(()->{     read(); }, "t2").start(); 获取读锁... 获取读锁... 读取数据data 读取数据data 释放读锁... 释放读锁...*/ // 读写阻塞 /*new Thread(()->{     read(); }, "t1").start(); new Thread(()->{     write(); }, "t2").start(); 获取读锁... 读取数据data 释放读锁... 获取写锁... 修改数据data 释放写锁...*/ // 写写阻塞 /*new Thread(()->{     write(); }, "t1").start(); new Thread(()->{     write(); }, "t2").start(); 获取写锁... 修改数据data 释放写锁... 获取写锁... 修改数据data 释放写锁...*/    } private static String data;    private static ReentrantReadWriteLock rw = new ReentrantReadWriteLock();    private final static ReentrantReadWriteLock.ReadLock readLock = rw.readLock();    private final static ReentrantReadWriteLock.WriteLock writeLock = rw.writeLock();    public static void read() { readLock.lock(); System.out.println("获取读锁..."); try {     try {  Thread.sleep(1000);  System.out.println("读取数据data");     } catch (InterruptedException e) {  e.printStackTrace();     } } finally {     System.out.println("释放读锁...");     readLock.unlock(); }    }    public static void write() { writeLock.lock(); System.out.println("获取写锁..."); try {     System.out.println("修改数据data"); } finally {     System.out.println("释放写锁...");     writeLock.unlock(); }    }}

2. 读写锁实现一致性缓存

在没有加锁的情况下,这里要注意先更新数据库,还是先清除缓存
如果先清除缓存,在还没来得及将新值写入数据库的时候;另外的线程先来做了查询操作,这时查询到的是旧值;然后才将新值更新到了数据库中。导致查询到的是旧值。
在这里插入图片描述
所以应该先更新数据库,防止来查询的线程查询到旧值
在这里插入图片描述

加锁实现一致性缓存

class GenericCachedDao<T> {    // HashMap 作为缓存,非线程安全, 需要保护    HashMap<SqlPair, T> map = new HashMap<>();    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();    GenericDao genericDao = new GenericDao();    public int update(String sql, Object... params) { SqlPair key = new SqlPair(sql, params); // 加写锁, 防止其它线程同时对缓存读取和更改 lock.writeLock().lock(); try { // 先更新数据库,再清除缓存     int rows = genericDao.update(sql, params);     map.clear();     return rows; } finally {     lock.writeLock().unlock(); }    }    public T queryOne(Class<T> beanClass, String sql, Object... params) { SqlPair key = new SqlPair(sql, params); // 加读锁, 防止其它线程对缓存更改 lock.readLock().lock(); try {     T value = map.get(key);     if (value != null) {  return value;     } } finally {     lock.readLock().unlock(); } // 加写锁, 防止其它线程对缓存读取和更改 lock.writeLock().lock(); try {     // get 方法上面部分是可能多个线程进来的, 会有多线程读取到空值,进入下面的逻辑// 但同时也可能有线程已经向缓存填充了数据     // 这时为了防止重复查询数据库, 再次验证     T value = map.get(key);     if (value == null) {  // 如果没有, 查询数据库  value = genericDao.queryOne(beanClass, sql, params);  map.put(key, value);     }     return value; } finally {     lock.writeLock().unlock(); }    } // 作为 key 保证其是不可变的    class SqlPair { private String sql; private Object[] params; public SqlPair(String sql, Object[] params) {     this.sql = sql;     this.params = params; } @Override public boolean equals(Object o) {     if (this == o) {  return true;     }     if (o == null || getClass() != o.getClass()) {  return false;     }     SqlPair sqlPair = (SqlPair) o;     return sql.equals(sqlPair.sql) &&      Arrays.equals(params, sqlPair.params); } @Override public int hashCode() {     int result = Objects.hash(sql);     result = 31 * result + Arrays.hashCode(params);     return result; }    }}

3. ReentrantReadWriteLock 原理

读写锁用的是同一个 Sycn 同步器,因此等待队列、state 等也是同一个
整体流程与RentrantLock基本一致,只是写锁状态占了 state 的低 16 位,而读锁使用的是 state 的高 16 位

t1 w.lock,t2 r.lock

1) t1 成功上锁,流程与 ReentrantLock 加锁相比没有特殊之处,不同是写锁状态占了 state 的低 16 位,而读锁使用的是 state 的高 16 位
在这里插入图片描述
2)t2 执行 r.lock,这时进入读锁的 sync.acquireShared(1) 流程,首先会进入 tryAcquireShared 流程。如果有写锁占据,那么 tryAcquireShared 返回 -1 表示失败
在这里插入图片描述
3)这时会进入 sync.doAcquireShared(1) 流程,首先也是调用 addWaiter 添加节点,不同之处在于节点被设置为 (读锁)Node.SHARED 模式而非 (写锁)Node.EXCLUSIVE 模式,注意此时 t2 仍处于活跃状态
在这里插入图片描述
4)t2 会看看自己的节点是不是老二,如果是,还会再次调用 tryAcquireShared(1) 来尝试获取锁
5)如果没有成功,在 doAcquireShared 内 for (;😉 循环一次,把前驱节点的 waitStatus 改为 -1,再 for (;😉 循环一次尝试 tryAcquireShared(1) 如果还不成功,那么在 parkAndCheckInterrupt() 处 park
在这里插入图片描述

t3 r.lock,t4 w.lock

这种状态下,假设又有 t3 加读锁和 t4 加写锁,这期间 t1 仍然持有锁,就变成了下面的样子
在这里插入图片描述

t1 w.unlock

接下来执行唤醒流程 sync.unparkSuccessor,即让老二恢复运行,这时 t2(读锁) 在 doAcquireShared 内
parkAndCheckInterrupt() 处恢复运行
这回再来一次 for ( ;; ) 执行 tryAcquireShared 成功则让读锁计数加一
在这里插入图片描述
这时 t2 已经恢复运行,接下来 t2 调用 setHeadAndPropagate(node, 1),它原本所在节点被置为头节点
在这里插入图片描述
事情还没完,在 setHeadAndPropagate 方法内还会检查下一个节点是否是 shared,如果是则调用
doReleaseShared() 将 head 的状态从 -1 改为 0 并唤醒老二,这时 t3 在 doAcquireShared 内parkAndCheckInterrupt() 处恢复运行

在这里插入图片描述
这回再来一次 for ( ;; ) 执行 tryAcquireShared 成功则让读锁计数加一
在这里插入图片描述
这时 t3 已经恢复运行,接下来 t3 调用 setHeadAndPropagate(node, 1),它原本所在节点被置为头节点
在这里插入图片描述
下一个节点不是 shared 了,因此不会继续唤醒 t4 所在节点
t2和t3争抢锁…

t2 r.unlock,t3 r.unlock

在这里插入图片描述
t2 进入 sync.releaseShared(1) 中,调用 tryReleaseShared(1) 让计数减一,但由于计数还不为零
t3 进入 sync.releaseShared(1) 中,调用 tryReleaseShared(1) 让计数减一,这回计数为零了,进入
doReleaseShared() 将头节点从 -1 改为 0 并唤醒老二,即
在这里插入图片描述
之后 t4 在 acquireQueued 中 parkAndCheckInterrupt 处恢复运行,再次 for (;😉 这次自己是老二,并且没有其他
竞争,tryAcquire(1) 成功,修改头结点,流程结束
在这里插入图片描述

1. 写锁

加锁
// 写锁部分public static class WriteLock implements Lock, java.io.Serializable {private static final long serialVersionUID = -4992448646407690164L;private final Sync sync;protected WriteLock(ReentrantReadWriteLock lock) {    sync = lock.sync;}public void lock() {  // 加写锁    sync.acquire(1);}......}
public final void acquire(int arg) { if (!tryAcquire(arg) && // 尝试获得写锁 // 失败,将当前线程关联到一个 Node 对象上, 模式为独占模式     // 进入 AQS 队列阻塞     acquireQueued(addWaiter(Node.EXCLUSIVE), arg))      selfInterrupt();    }

进入ReentrantReadWriteLock实现的tryAcquire

protected final boolean tryAcquire(int acquires) {/*  * Walkthrough:  * 1. If read count nonzero or write count nonzero  *    and owner is a different thread, fail.  * 如果读数非零或写入计数非零和所有者是一个不同的线程,失败  *   * 2. If count would saturate, fail. (This can only  *    happen if count is already nonzero.)  * 如果计数会饱和,失败。 (这只能发生在计数已经是非零的情况下)  *   * 3. Otherwise, this thread is eligible for lock if  *    it is either a reentrant acquire or  *    queue policy allows it. If so, update state  *    and set owner.  * 否则,如果它是refeRant获取或队列策略允许,此线程有资格锁定。 如果是这样,更新状态和设置所有者  */ Thread current = Thread.currentThread(); int c = getState();  // 获取state值 int w = exclusiveCount(c);  // 获取写部分计数 if (c != 0) {  // 如果state不为0,说明有锁     // (Note: if c != 0 and w == 0 then shared count != 0)     // 如果此时w == 0, 证明没有加写锁但加了读锁,读锁中不能有写锁,返回false     // 或者在重入的情况下,发现不是同一个线程重入,返回false     if (w == 0 || current != getExclusiveOwnerThread())  return false;     // 重入计数大于16为最大值,超过次数,抛出异常     if (w + exclusiveCount(acquires) > MAX_COUNT)  throw new Error("Maximum lock count exceeded");     // Reentrant acquire     // 如果没有上述情况,说明重入成功,state + 1,返回true     setState(c + acquires);     return true; } // 如果 state 为0,证明没有加过锁 // 此时由于是非公平锁,writerShouldBlock()总是返回false,直接CAS尝试修改state // 如果修改成功,加锁成功,修改线程拥有者,返回true // 修改失败,则返回false if (writerShouldBlock() ||     !compareAndSetState(c, c + acquires))     return false; setExclusiveOwnerThread(current); return true;    }

加锁失败,就进入AQS队列park

final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try {     boolean interrupted = false;     for (;;) {  final Node p = node.predecessor();  if (p == head && tryAcquire(arg)) {  // 如果获取了写锁锁,就和重入锁流程一致了  // 然后 head 指向刚刚 Thread-1 所在的 Node,该 Node 清空 Thread// 原本的 head 因为从链表断开,而可被垃圾回收      setHead(node);      p.next = null; // help GC      failed = false;      return interrupted;  }  if (shouldParkAfterFailedAcquire(p, node) &&      parkAndCheckInterrupt())  // 加锁失败,这个地方会park      interrupted = true;     } } finally {     if (failed)  cancelAcquire(node); }    }
解锁

特别注意:unpark()了写锁,就去找写锁的逻辑,unpark()了读锁,就去找读锁的逻辑,他们都阻塞在同一个AQS队列中

static final class NonfairSync extends Sync {......public void unlock() {sync.releaseShared(1);}......}
public final boolean release(int arg) { if (tryRelease(arg)) { // unpark AQS 中等待的线程     Node h = head;     if (h != null && h.waitStatus != 0)  // 头结点不为空,且它可以唤醒下一个节点  unparkSuccessor(h);     return true; } return false;    }
protected final boolean tryRelease(int releases) { if (!isHeldExclusively())     throw new IllegalMonitorStateException(); // 因为可重入的原因, 写锁计数为 0, 才算释放成功 int nextc = getState() - releases;   boolean free = exclusiveCount(nextc) == 0; if (free)     setExclusiveOwnerThread(null); setState(nextc); return free;    }
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0)     compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) {     s = null;     for (Node t = tail; t != null && t != node; t = t.prev)  if (t.waitStatus <= 0)      s = t; } if (s != null)     LockSupport.unpark(s.thread);    }

2. 读锁

加锁
public static class ReadLock implements Lock, java.io.Serializable { private static final long serialVersionUID = -5992448646407690164L; private final Sync sync; protected ReadLock(ReentrantReadWriteLock lock) {     sync = lock.sync; } public void lock() {     sync.acquireShared(1); }......}
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0)  // tryAcquireShared 返回负数, 表示获取读锁失败     doAcquireShared(arg);    }

tryAcquireShared 返回值表示

  • -1表示失败
  • 0 表示成功,但后继节点不会继续唤醒
  • 正数表示成功,而且数值是还有几个后继节点需要唤醒,读写锁返回 1
 protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread();  // 获取当前线程 int c = getState(); // 获取锁状态 // 写锁状态不为0 并且 当前线程与锁拥有线程不一致,返回-1 失败 // 这里如果写锁状态不为0 但是 线程是同一个,因为可以锁降级,所以还可以向下尝试加读锁 if (exclusiveCount(c) != 0 &&     getExclusiveOwnerThread() != current)     return -1; int r = sharedCount(c);  // 获取读锁状态 // readerShouldBlock() 非公平锁则返回false // r < MAX_COUNT 查看读锁计数是否合法 // compareAndSetState(c, c + SHARED_UNIT) 尝试增加读锁计数 if (!readerShouldBlock() &&     r < MAX_COUNT &&     compareAndSetState(c, c + SHARED_UNIT)) {     if (r == 0) {  firstReader = current;  firstReaderHoldCount = 1;     } else if (firstReader == current) {  firstReaderHoldCount++;     } else {  HoldCounter rh = cachedHoldCounter;  if (rh == null || rh.tid != getThreadId(current))      cachedHoldCounter = rh = readHolds.get();  else if (rh.count == 0)      readHolds.set(rh);  rh.count++;     }     return 1;  // 加锁成功返回 1 } return fullTryAcquireShared(current);    }
 private void doAcquireShared(int arg) { // 将当前线程关联到一个 Node 对象上, 模式为共享模式 final Node node = addWaiter(Node.SHARED); boolean failed = true; try {     boolean interrupted = false;     for (;;) {  final Node p = node.predecessor();  // 查看是否是老二节点  if (p == head) {    // 如果是老二节点,再一次尝试获取读锁      int r = tryAcquireShared(arg);      // 如果获取成功了      if (r >= 0) {      // r 表示可用资源数, 在这里总是 1 允许传播//(唤醒 AQS 中下一个 Share 节点)   setHeadAndPropagate(node, r);   p.next = null; // help GC   if (interrupted)selfInterrupt();   failed = false;   return;      }  }  // 是否在获取读锁失败时阻塞(前一个阶段 waitStatus == Node.SIGNAL)  // shouldParkAfterFailedAcquire(p, node) 将上一个节点标记设置为Node.SIGNAL  // parkAndCheckInterrupt() park当前线程  if (shouldParkAfterFailedAcquire(p, node) &&      parkAndCheckInterrupt())      interrupted = true;     } } finally {     if (failed)  cancelAcquire(node); }    }
重点,读读可以并发的原因
 private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); /*  * Try to signal next queued node if:  *   Propagation was indicated by caller,  *     or was recorded (as h.waitStatus either before  *     or after setHead) by a previous operation  *     (note: this uses sign-check of waitStatus because  *      PROPAGATE status may transition to SIGNAL.)  * and  *   The next node is waiting in shared mode,  *     or we don't know, because it appears null  *  * The conservatism in both of these checks may cause  * unnecessary wake-ups, but only when there are multiple  * racing acquires/releases, so most need signals now or soon  * anyway.  */ if (propagate > 0 || h == null || h.waitStatus < 0 ||     (h = head) == null || h.waitStatus < 0) {     Node s = node.next;     if (s == null || s.isShared())  // 如果下一个节点是共享状态  doReleaseShared(); }    }

这里就是递归的,将所有共享的读锁全部unpark唤醒;或者将一个写锁唤醒

private void doReleaseShared() { /*  * Ensure that a release propagates, even if there are other  * in-progress acquires/releases.  This proceeds in the usual  * way of trying to unparkSuccessor of head if it needs  * signal. But if it does not, status is set to PROPAGATE to  * ensure that upon release, propagation continues.  * Additionally, we must loop in case a new node is added  * while we are doing this. Also, unlike other uses of  * unparkSuccessor, we need to know if CAS to reset status  * fails, if so rechecking.  */ for (;;) {     Node h = head;     if (h != null && h != tail) {  int ws = h.waitStatus;  if (ws == Node.SIGNAL) {  // 将头结点的状态改为 -1, 防止其他线程重复唤醒,产生干扰      if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))   continue;     // loop to recheck cases      unparkSuccessor(h);  // 将头结点的后继节点unpark唤醒  }  else if (ws == 0 &&    !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))      continue;  // loop on failed CAS     }     if (h == head)     // loop if head changed  break; }    }
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0)     compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) {     s = null;     for (Node t = tail; t != null && t != node; t = t.prev)  if (t.waitStatus <= 0)      s = t; } if (s != null)     LockSupport.unpark(s.thread);    }
解锁

特别注意:unpark()了写锁,就去找写锁的逻辑,unpark()了读锁,就去找读锁的逻辑,他们都阻塞在同一个AQS队列中

public void unlock() {sync.releaseShared(1);    }
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) {     doReleaseShared();     return true; } return false;    }
protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); if (firstReader == current) {     // assert firstReaderHoldCount > 0;     if (firstReaderHoldCount == 1)  firstReader = null;     else  firstReaderHoldCount--; } else {     HoldCounter rh = cachedHoldCounter;     if (rh == null || rh.tid != getThreadId(current))  rh = readHolds.get();     int count = rh.count;     if (count <= 1) {  readHolds.remove();  if (count <= 0)      throw unmatchedUnlockException();     }     --rh.count; } // 主要的流程,状态减1 for (;;) {     int c = getState();     int nextc = c - SHARED_UNIT;     if (compareAndSetState(c, nextc))  // Releasing the read lock has no effect on readers,  // but it may allow waiting writers to proceed if  // both read and write locks are now free.  return nextc == 0; }    }
private void doReleaseShared() { /*  * Ensure that a release propagates, even if there are other  * in-progress acquires/releases.  This proceeds in the usual  * way of trying to unparkSuccessor of head if it needs  * signal. But if it does not, status is set to PROPAGATE to  * ensure that upon release, propagation continues.  * Additionally, we must loop in case a new node is added  * while we are doing this. Also, unlike other uses of  * unparkSuccessor, we need to know if CAS to reset status  * fails, if so rechecking.  */ for (;;) {     Node h = head;     if (h != null && h != tail) {  int ws = h.waitStatus;  if (ws == Node.SIGNAL) {      if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))   continue;     // loop to recheck cases      unparkSuccessor(h);  }  else if (ws == 0 &&    !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))      continue;  // loop on failed CAS     }     if (h == head)     // loop if head changed  break; }    }
// 从后向前查找可唤醒的线程,并唤醒private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0)     compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) {     s = null;     for (Node t = tail; t != null && t != node; t = t.prev)  if (t.waitStatus <= 0)      s = t; } if (s != null)     LockSupport.unpark(s.thread);    }

3. 总结

  1. 读锁和写锁的加锁过程,会将没有成功获取到锁的线程封装到Node中,加入同一个AQS队列中park
  2. 解锁过程中:如果是写锁调用unlock(),就是正常的ReentrantLock过程,将老二节点unpark唤醒,尝试获取锁;如果是读锁调用了unlock(),就会直接尝试递归的unpark所有头结点后相连接的(读锁)共享的线程。
  3. 如果被unpark的是写锁,那么还是正常的ReentrantLock过程,尝试争抢锁,成功后取出;如果unpark的是读锁,首先尝试争抢锁,在成功获取到锁之后,就会尝试递归的unpark所有头结点后相连接的(读锁)共享的线程
  4. 写锁删除AQS队列中获取到锁的节点是直接调用setHead()方法完成的;读锁删除AQS队列中获取到锁的节点是通过调用setHeadAndPropagate()方法,内部简介调用了setHead()方法完成的。
  5. 删除节点的流程和ReentrantLock是一致的,首先保存原head引用给变量p,然后将head指向获取到锁的Node,将该Node的Thread和前驱节点置为空,然后将原head的next置为空,将他移出队列,被GC回收。