> 技术文档 > 万字解析Redission ---深入理解Redission上锁过程

万字解析Redission ---深入理解Redission上锁过程


Redisson获取锁过程 

RLock lock = redissonClient.getLock(\"lock:order\" + userId);boolean isLock = lock.tryLock(1L, TimeUnit.SECONDS);

调用tyrLock其实就是下面的方法,如果说没有指定锁的过期时间,可以看到这边设置为了-1

@Override public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException { return tryLock(waitTime, -1, unit); }

再往下追,,只需要先看tryAcquire就行,这是获取锁的核心,tryLock后面还有一堆东西现在先不用管

这里将等待时间转化为毫秒,获取了当前线程id,当前时间

@Override public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);

再往下追,可以看到会根据有没有设置锁的超时时间,调用不同的方法,没有设置的话调用的话会进入下面的代码设置看门狗时间getLockWatchdogTimeout,默认是30秒这里也是30*1000化为了毫秒

这里传入的参数分别是 获取锁的等待时间,锁的过期时间,时间单位,线程id

调用 tryLockInnerAsync 传入的参数是 获取锁的等待时间,锁的过期时间,时间单位,线程id, Redis 命令(如 EVAL),用于执行 Lua 脚本

private RFuture tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { if (leaseTime != -1) { return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN); }RFuture ttlRemainingFuture = tryLockInnerAsync(waitTime,commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN); ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e != null) { return; } // lock acquired if (ttlRemaining) { scheduleExpirationRenewal(threadId); } });

核心步骤 

  • getName(): 获取锁在 Redis 中的键名(KEYS[1])。
  • internalLockLeaseTime: 将传入的锁租约时间 leaseTime 转换为毫秒。
  • getLockName(threadId): 生成一个唯一标识当前线程(或客户端)的字符串(ARGV[2])。
  • command: 通常是一个 Redis 命令(如 EVAL),用于执行 Lua 脚本。
 RFuture tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) { internalLockLeaseTime = unit.toMillis(leaseTime); return evalWriteAsync(getName(), LongCodec.INSTANCE, command, \"if (redis.call(\'exists\', KEYS[1]) == 0) then \" + \"redis.call(\'hincrby\', KEYS[1], ARGV[2], 1); \" + \"redis.call(\'pexpire\', KEYS[1], ARGV[1]); \" + \"return nil; \" + \"end; \" + \"if (redis.call(\'hexists\', KEYS[1], ARGV[2]) == 1) then \" + \"redis.call(\'hincrby\', KEYS[1], ARGV[2], 1); \" + \"redis.call(\'pexpire\', KEYS[1], ARGV[1]); \" + \"return nil; \" + \"end; \" + \"return redis.call(\'pttl\', KEYS[1]);\", Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }

  1. 执行 Lua 脚本: 脚本逻辑是原子性的,确保并发安全。它包含三个主要分支:

    • 分支 1:锁不存在(首次获取)

      if (redis.call(\'exists\', KEYS[1](@ref) == 0) then redis.call(\'hincrby\', KEYS[1], ARGV[2], 1); -- 创建Hash,字段ARGV[2]的值设为1(计数) redis.call(\'pexpire\', KEYS[1], ARGV[1](@ref); -- 设置整个锁Key的过期时间 return nil;  -- 返回nil表示获取成功end;
      • 检查锁 Key (KEYS[1]) 是否存在。
      • 如果不存在(exists == 0):
        • 使用 HINCRBY 命令创建一个 Hash 结构,Key 是 KEYS[1],字段(field)是当前线程标识 ARGV[2],值初始化为 1(表示锁计数)。
        • 使用 PEXPIRE 命令为整个锁 Key (KEYS[1]) 设置过期时间(毫秒),值为 ARGV[1](即 internalLockLeaseTime)。
        • 返回 nil,表示获取锁成功。
    • 分支 2:锁已存在且当前线程持有(锁重入)

      if (redis.call(\'hexists\', KEYS[1], ARGV[2](@ref) == 1) then redis.call(\'hincrby\', KEYS[1], ARGV[2], 1); -- 字段ARGV[2]的值加1(增加重入计数) redis.call(\'pexpire\', KEYS[1], ARGV[1](@ref); -- 刷新整个锁Key的过期时间 return nil;  -- 返回nil表示获取成功(重入)end;
      • 检查锁 Key (KEYS[1]) 对应的 Hash 中,是否存在字段 ARGV[2](即当前线程标识)。
      • 如果存在(hexists == 1):
        • 使用 HINCRBY 命令将字段 ARGV[2] 的值加 1(实现可重入锁,计数增加)。
        • 使用 PEXPIRE 命令刷新整个锁 Key (KEYS[1]) 的过期时间(续租)。
        • 返回 nil,表示获取锁成功(重入成功)。
    • 分支 3:锁已存在但被其他线程持有(获取失败)

      return redis.call(\'pttl\', KEYS[1](@ref); -- 返回锁Key的剩余生存时间(毫秒)
      • 如果前两个分支都不满足(锁存在但不是当前线程持有):
        • 使用 PTTL 命令获取锁 Key (KEYS[1]) 的剩余生存时间(毫秒)。
        • 将这个剩余时间返回给调用者。

尝试获取锁之后的逻辑

@Override public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);//这里获取到的依旧是以毫秒为单位,如果获取到锁返回null,没有获取到锁返回该锁的剩余时间 // lock acquired if (ttl == null) { return true;//获取成功直接返回true } time -= System.currentTimeMillis() - current; //time-上面代码所消耗的时间 if (time <= 0) { time是等待锁的时间,判断如果上面代码消耗的时间过长,其实就是获取锁的时间太长大于了锁的等待时间返回false acquireFailed(waitTime, unit, threadId); return false; } //如果还能等锁释放,继续执行下面的代码 current = System.currentTimeMillis(); //这里无需立即重新去获取锁了,因为你知道获取锁的那个人还在执行自己的业务 //这里订阅别人释放锁的信息, Redisson释放锁的时候会发布一条通知,这个后面会说 RFuture subscribeFuture = subscribe(threadId); // 阻塞当前线程,等待 subscribeFuture 代表的异步订阅操作完成(成功、失败或取消),但最多只等待指定的 time 毫秒,超时返回false if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) { if (!subscribeFuture.cancel(false)) { subscribeFuture.onComplete((res, e) -> {  if (e == null) { unsubscribe(subscribeFuture, threadId);//超时直接取消订阅  } }); } acquireFailed(waitTime, unit, threadId); return false; } try { //再次获取剩余时间 time -= System.currentTimeMillis() - current; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; }  while (true) { //依旧和上面的逻辑一样,先尝试获取锁,为空代表成功,返回true //判断剩余时间,不够返回false long currentTime = System.currentTimeMillis(); ttl = tryAcquire(waitTime, leaseTime, unit, threadId); // lock acquired if (ttl == null) {  return true; } time -= System.currentTimeMillis() - currentTime; if (time = 0 && ttl < time) {  //ttl就是锁的过期时间,而time则是我们可以等待的时间  //哪个小等待哪个时间就行,因为一个到时了另一个也没用了  subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else {  subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } //判断是否超时 time -= System.currentTimeMillis() - currentTime; if (time <= 0) {  acquireFailed(waitTime, unit, threadId);  return false; } //没有超时继续循环重新尝试获取锁 } } finally { unsubscribe(subscribeFuture, threadId); }// return get(tryLockAsync(waitTime, leaseTime, unit)); }
  1. 初始化时间与线程ID:
    • long time = unit.toMillis(waitTime);: 将用户指定的最大等待时间 waitTime 转换为毫秒 time
    • long current = System.currentTimeMillis();: 记录当前时间戳 current
    • long threadId = Thread.currentThread().getId();: 获取当前线程的唯一ID threadId
  2. 首次尝试获取锁:
    • Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);: 调用核心方法尝试获取锁。
      • 成功 (ttl == null): 直接返回 true
      • 失败 (ttl >= 0)ttl 表示锁当前的剩余生存时间(毫秒)。
  3. 扣除首次尝试耗时 & 检查剩余等待时间:
    • time -= System.currentTimeMillis() - current;: 计算首次尝试获取锁花费的时间,并从总等待时间 time 中扣除。
    • if (time <= 0) { ... return false; }: 如果扣除后剩余等待时间 time <= 0,说明等待时间已耗尽,调用 acquireFailed(记录失败指标)并返回 false
  4. 订阅锁释放通知频道:
    • RFuture subscribeFuture = subscribe(threadId);: 异步发起订阅操作,订阅与当前锁关联的频道,用于接收锁释放通知。返回 RFuture 对象 subscribeFuture
    • if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) { ... }: 阻塞等待订阅操作完成,最多等待剩余的 time 毫秒。
      • 订阅超时 (!await(...)):
        • if (!subscribeFuture.cancel(false)) { ... }: 尝试取消订阅操作。如果取消失败(通常意味着订阅在取消瞬间完成了),则注册一个 onComplete 回调。这个回调的作用是:如果订阅最终成功完成 (e == null),则立即执行 unsubscribe 清理资源。
        • acquireFailed(...); return false;: 标记获取失败并返回 false
      • 订阅成功 (await(...) 返回 true): 继续执行后续流程。
  5. 扣除订阅耗时 & 再次检查剩余等待时间:
    • time -= System.currentTimeMillis() - current;: 计算订阅操作花费的时间,并从剩余等待时间 time 中扣除。
    • if (time <= 0) { ... return false; }: 如果剩余时间耗尽,标记失败并返回 false
  6. 循环尝试获取锁(核心重试逻辑):
    • while (true) { ... }: 进入一个无限循环,直到成功获取锁、等待超时或发生异常。
    • 记录循环开始时间: long currentTime = System.currentTimeMillis();
    • 再次尝试获取锁: ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
      • 成功 (ttl == null): 直接返回 true
      • 失败 (ttl >= 0): 继续后续步骤。
    • 扣除本次尝试耗时 & 检查剩余时间:
      • time -= System.currentTimeMillis() - currentTime;
      • if (time <= 0) { ... return false; }: 时间耗尽则失败返回。
    • 基于 TTL 的智能等待(关键优化):
      • currentTime = System.currentTimeMillis();: 记录等待开始时间。
      • if (ttl >= 0 && ttl < time) { ... } else { ... }
        • ttl < time (锁快过期): 调用 semaphore.tryAcquire(ttl, MILLISECONDS)只等待锁剩余生存时间 ttl 期望锁因过期自动释放或收到通知。
        • ttl >= time (锁活很久或无效): 调用 semaphore.tryAcquire(time, MILLISECONDS)只等待剩余的 time 期望在耐心耗尽前收到锁释放通知。
      • tryAcquire 行为:
        • 如果收到锁释放通知 (semaphore.release() 被调用),tryAcquire 会立刻返回 true (获取到\"许可\"),然后循环会再次尝试 tryAcquire 获取锁。
        • 如果超时 (未收到通知),tryAcquire 返回 false
    • 扣除等待耗时 & 最终检查剩余时间:
      • time -= System.currentTimeMillis() - currentTime;: 计算等待花费的时间。
      • if (time <= 0) { ... return false; }: 时间耗尽则失败返回。
      • 如果时间还有剩余,且 tryAcquire 超时返回 false (未收到通知),循环会再次执行,重新尝试获取锁 (ttl = tryAcquire(...))。这提供了主动重试的机会,即使没收到通知(比如通知丢失或锁自动过期但通知未触发)。
  7. finally 块 - 资源清理 (至关重要):
    • unsubscribe(subscribeFuture, threadId);: 无论 tryLock 方法最终是成功返回 (true)、失败返回 (false) 还是抛出异常 (InterruptedException 等),这段代码都会被执行。
    • 目的: 释放步骤 4 中建立的订阅关系。
    • 为什么必须放在 finally 里?
      • 防止资源泄漏: 如果不取消订阅,Redis 服务器会持续维护这个订阅连接和频道监听,消耗服务器资源(内存、连接数)。
      • 避免无效通知: 锁释放时,消息会发送到这个频道,但客户端线程已经不再关心(它要么获得了锁,要么放弃了),造成不必要的网络流量和处理。
      • 保证健壮性: 即使循环内部出现异常(虽然代码中未显式抛出,但理论上可能),也能确保订阅被清理。
      • 客户端资源管理: Redisson 客户端也需要管理其内部的订阅状态,及时清理不再需要的订阅。

看门狗”(Watchdog)机制核心作用

到这里其实还是有点问题,考虑一个问题

  1. 线程一 (Thread1):

    • 成功调用 tryLock 获取锁。
    • 开始执行临界区业务代码
    • 业务代码执行时间过长,超过了锁的租约时间 leaseTime
    • 锁在 Redis 中因 TTL 到期而被自动删除(超时释放)。
  2. 线程二 (Thread2):

    • 在 Thread1 持有锁期间尝试获取锁。
    • 首次 tryAcquire 失败,返回 ttl(锁的剩余时间)。
    • 成功订阅锁释放频道。
    • 在信号量上调用 tryAcquire(ttl, ...) 进行等待。
    • 当 Thread1 的锁因超时被 Redis 自动删除后:
      • 可能情况一: Redis 的 expire 机制删除锁时,不会主动发布锁释放消息。(这是关键!Redis 的 Key 过期是惰性删除+定期删除,删除事件不一定触发发布订阅通知)。
      • 可能情况二: 即使 Redis 有 __keyevent@__:expired 这样的 Keyspace 通知,Redisson 默认的锁释放监听是基于特定频道的普通发布订阅,通常不会监听 Key 过期事件
    • Thread2 的 tryAcquire(ttl, ...) 超时返回 false(因为它没收到锁释放的通知)。
    • Thread2 跳出等待,再次调用 tryAcquire
    • 此时锁已被 Redis 删除(超时释放),Thread2 成功获取锁
    • Thread2 进入临界区执行业务。
  3. 问题发生:

    • Thread1 仍在执行它的业务代码! 它以为自己还持有锁(因为它没有主动释放,也不知道锁被 Redis 强制移除了)。
    • Thread2 也开始执行相同的业务代码
    • 结果:两个线程同时进入了临界区,破坏了锁的互斥性,导致线程安全问题(如数据不一致)。

为什么单看上面的 tryLock 代码有安全隐患

  1. 锁的持有时间 (leaseTime) 是固定的: 在 tryLock 方法中,leaseTime 是由调用者指定的。一旦设置,锁在 Redis 中的 TTL 就是固定的。
  2. 业务执行时间不可控: 业务代码的执行时间可能因为各种原因(GC、网络延迟、复杂计算、死循环等)超出预期的 leaseTime
  3. 锁超时释放是静默的: Redis 在 Key 过期被删除时,默认不会向 Redisson 订阅的锁释放频道发送消息。等待锁的线程(Thread2)感知不到锁是因为超时而被删除的。它只能通过:
    • 被动等待通知: 这通常只在锁被主动释放(调用 unlock)时才会触发。
    • 主动重试: 在信号量等待超时后,Thread2 会再次尝试 tryAcquire。此时它才发现锁已经被删除了(超时释放),从而成功获取。但这发生在 Thread1 的业务还在执行期间。
  4. 线程无法感知锁丢失: Thread1 在执行超长的业务时,完全不知道 Redis 上的锁已经因为 TTL 到期而被删除了。它仍然认为自己持有锁,并继续执行对共享资源的操作。

 我们再来看看下面的这段代码

private  RFuture tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { if (leaseTime != -1) { return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } RFuture ttlRemainingFuture = tryLockInnerAsync(waitTime, commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e != null) { return;//发生异常,直接返回 } // lock acquired if (ttlRemaining == null) {//获取锁成功 scheduleExpirationRenewal(threadId);//启动锁的自动续期任务 } }); return ttlRemainingFuture; }
  • 这里调用 tryLockInnerAsync 尝试获取锁,但传入的租约时间不是 -1,而是配置的 lockWatchdogTimeout(默认 30,000 毫秒)
  • 返回一个 RFuture 对象 ttlRemainingFuture,代表这个异步获取锁操作的结果
  1. 注册回调函数

    ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e != null) { return; } if (ttlRemaining == null) { scheduleExpirationRenewal(threadId); }});
    • 在 ttlRemainingFuture 上注册一个完成时触发的回调函数
    • 这个回调函数会在 tryLockInnerAsync 操作完成时(无论成功或失败)被调用

最后看看锁的自动续期相关代码 

getEntryName返回的就是线程的id和锁名称拼接起来的字符串,这里的EXPIRATION_RENEWAL_MAP是个静态MAP ,一个锁对应一个entry对象

private void scheduleExpirationRenewal(long threadId) { // 1. 创建新的续期记录 ExpirationEntry entry = new ExpirationEntry(); // 2. 尝试将续期记录放入全局管理Map ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry); // 3. 处理续期记录 if (oldEntry != null) { // 3a. 如果已有记录存在:添加当前线程ID oldEntry.addThreadId(threadId); } else { // 3b. 如果是新记录:添加线程ID并启动续期任务 entry.addThreadId(threadId); renewExpiration(); // 启动看门狗定时任务 }}
  1. 创建续期记录

    • 创建一个新的 ExpirationEntry 实例
    • 这个对象将用于跟踪当前锁的续期状态
  2. 管理全局续期状态

    • 使用 putIfAbsent(getEntryName(), entry) 尝试将新记录放入全局映射
    • 这个方法原子性地执行:
      • 如果映射中不存在指定键的条目,则添加新条目并返回 null
      • 如果已存在,则返回现有条目
  3. 处理续期记录

    • 情况A:已有记录存在 (oldEntry != null)

      • 表示这个锁已经启动了续期任务
      • 只需将当前线程ID添加到现有记录:oldEntry.addThreadId(threadId)
      • 这支持锁的可重入性(同一线程多次获取同一锁)
    • 情况B:新记录 (oldEntry == null)

      • 表示这是第一次为此锁启动续期任务
      • 将当前线程ID添加到新记录:entry.addThreadId(threadId)
      • 启动续期任务:renewExpiration()

加油就剩最后一点了,我们可以看到看门狗的核心机制

private void renewExpiration() { //获取当前锁的续期记录 ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ee == null) { return; // 如果记录不存在(锁已被释放),直接返回 } //创建定时任务,再delay时间到期以后才会执行,这个delay也是作为newTimeout的第二个参数 Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ent == null) {  return; } //从entry中取出线程 Long threadId = ent.getFirstThreadId(); if (threadId == null) {  return; } //调用函数刷新有效期 RFuture future = renewExpirationAsync(threadId); future.onComplete((res, e) -> {  if (e != null) { log.error(\"Can\'t update lock \" + getName() + \" expiration\", e); return;  }  if (res) { // reschedule itself //续期成功递归调用 renewExpiration();  } }); } }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);//这里就是之前的看门狗时间 ee.setTimeout(task); }

也就是这个定时任务十秒之后才会执行

protected RFuture renewExpirationAsync(long threadId) { return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, \"if (redis.call(\'hexists\', KEYS[1], ARGV[2]) == 1) then \" + \"redis.call(\'pexpire\', KEYS[1], ARGV[1]); \" + \"return 1; \" + \"end; \" + \"return 0;\", Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }

可以看到这里的刷新有效期还是通过lua脚本来实现的,作用就是重置锁的有效期