> 文档中心 > Redis 分布式锁

Redis 分布式锁


Redis 分布式锁

目录

  • Redis 分布式锁
    • 一. 问题场景
    • 二. 基本用法
    • 三. 解决超时问题
    • 四. Redisson 实现分布式锁

一. 问题场景

  1. 在用户操作中,一个线程去修改用户的状态,首先从数据库中读出当前用户状态,然后在内存中进行修改,修改完成之后再存回数据库中。在单线程中,这个操作是没有问题的。
  2. 但在多线程中,由于读取、修改、存储 这是三个操作,不是原子操作,所以在多线程中,这样是会出问题的。
  3. 对于这样的问题,我们可以使用分布式锁来限制程序的并发执行。

二. 基本用法

  1. 分布式锁实现的思路很简单,就是进来的线程先占位,当别的线程进来操作时,发现已经有人占位了,就会放弃或者稍后再试。

  2. 在 Redis 中,占位一般使用 setnx 指令,先进来的线程占位,线程执行完成后,再调用 del 指令释放位子。

        public static void main(String[] args) { new Redis().execute(jedis -> {     Long setnx = jedis.setnx("k1", "v1");     if (setnx == 1) {  //说明没有人占位,可以进行操作  jedis.set("name", "peng");  String name = jedis.get("name");  System.out.println("name = " + name);  //操作完之后,释放 k1  jedis.del("k1");     } else {  //来到这里说明有人在操作了     } });    }

    (1) 这里的 Redis 是封装了连接池的,代码如下

    public class Redis {    private JedisPool jedisPool;    public Redis() { //这里等同于创建了一个连接池,在这里进行配置 GenericObjectPoolConfig<Jedis> poolConfig = new GenericObjectPoolConfig<>(); //最大闲置数 poolConfig.setMaxTotal(10); //最大连接数 poolConfig.setMaxTotal(20); //最小闲置数 poolConfig.setMinIdle(5); jedisPool = new JedisPool(poolConfig,"192.168.73.128", 6379, 5000, "123");    }    public void execute(IJedisExec jedisExec) { //从连接池中获取一个 jedis 对象 Jedis jedis = jedisPool.getResource(); //把这个对象传到接口中 jedisExec.call(jedis); //回收资源 jedisPool.returnResource(jedis);    }}

    (2) 其中的 IJedisExec 是一个接口,代码如下

    public interface IJedisExec {    public void call(Jedis jedis);}
  3. 当然我们关注的重点还是在第一段代码。在第一段代码中,如果在代码的运行的过程中跑出来了异常亦或是挂掉了,这样会导致一个结果,那就是 del 指令没有被调用,从而导致 k1 没有被释放,后面的请求都会被阻塞在这里,造成了 死锁

  4. 那么如何解决这个问题呢?我们可以给锁添加一个过期时间,确保锁在一定的时间之后,能够得到释放。

        public static void main(String[] args) { new Redis().execute(jedis -> {     Long setnx = jedis.setnx("k1", "v1");     if (setnx == 1) {  //给 k1 设置一个过期时间,防止死锁  jedis.expire("k1", 5L);  //说明没有人占位,可以进行操作  jedis.set("name", "peng");  String name = jedis.get("name");  System.out.println("name = " + name);  //操作完之后,释放 k1  jedis.del("k1");     } else {  //来到这里说明有人在操作了     } });    }
  5. 改造之后,还有一个问题,那就是在获取锁和设置过期时间之间如果服务器突然挂掉了,这个时候锁被占用,无法及时得到释放,也会造成死锁。因为获取锁和设置过期时间是两个操作,不具备原子性

  6. 为了解决这个问题,从 Redis2.8 开始,setnx 和 expire 可以通过一个命令一起执行。

        public static void main(String[] args) { new Redis().execute(jedis -> {     SetParams setParams = new SetParams()      //相当于执行了 setnx      .nx()      //相当于执行了 expire      .ex(5L);     String setnx = jedis.set("k1", "v1", setParams);     if ("OK".equals(setnx)) {  //说明没有人占位,可以进行操作  jedis.set("name", "peng");  String name = jedis.get("name");  System.out.println("name = " + name);  //操作完之后,释放 k1  jedis.del("k1");     } else {  //来到这里说明有人在操作了     } });    }

三. 解决超时问题

  1. 为了防止业务代码执行的过程中抛出异常或挂掉,我们给锁添加了一个超时时间(上面设置的是 5 秒),超时之后,锁会被自动释放。但这样也带来了一个新的问题:如果执行的业务非常耗时,就有可能出现混乱。

  2. 假设一个线程获取到锁,开始执行业务代码,但这个业务代码比较耗时,执行了 7 秒(或是更久),但我设置的超时时间是 5 秒,这样会在第一个线程还没有执行完成便将锁释放了;此时第二个线程获取到锁开始执行,在第二个线程执行到 2 秒时,第一个线程也执行完成了,此时第一个线程会释放锁(执行到了 del 那里了)。但是,第一个线程此时释放的是第二个线程的锁,释放之后,第三个线程进来…

  3. 对于这个问题,我们可以从两个角度入手

    • 尽量避免在获取锁之后,执行耗时操作
    • 可以在锁上面做文章,将锁的 value 设置为一个随机字符串,每次释放锁的时候,都去比较随机字符串是否一致,如果一致,再去释放,否则不释放
  4. 对于第二种方案,由于释放锁的时候,要去查看锁的 value,接着比较 value 之间是否一致,然后再释放锁。这三个步骤很明显不具备原子性,所以这里需要引入 Lua 脚本

    Lua 脚本的优势:

    • 使用方便,Redis 中内置了对 Lua 脚本的支持
    • Lua 脚本可以在 Redis 服务端原子的执行多个 Redis 命令
    • 由于网络在很大程度上会影响到 Redis 的性能,而使用 Lua 脚本可以让多个命令一起执行,可以有效的解决网络给 Redis 带来的性能问题
  5. 提前在 Redis 服务端写好 Lua 脚本,然后在 Java 客户端去调用脚本。在 Redis 安装目录创建一个 lua 文件夹,在里面新建一个后缀是 .lua 的文件,并添加上下面的脚本

    if redis.call("get",KEYS[1])==ARGV[1] then    return redis.call("del",KEYS[1])else    return 0 end
  6. 接下来,在 Redis 中给 Lua 脚本求一个 SHA1 和。script load 这个命令会在 Redis 服务器中缓存 Lua 脚本,并返回脚本内容的 SHA1 校验和,然后在 Java 端调用时,传入 SHA1 校验和作为参数,这样 Redis 服务端就知道执行哪个脚本了。
    通过Lua脚本生成的字符串

  7. 然后在 Java 端再次对代码进行修改,如下

        public static void main(String[] args) { new Redis().execute(jedis -> {     SetParams setParams = new SetParams()      //相当于执行了 setnx      .nx()      //相当于执行了 expire      .ex(5L);     String value = UUID.randomUUID().toString();     String setnx = jedis.set("k1", value, setParams);     if ("OK".equals(setnx)) {  //说明没有人占位,可以进行操作  jedis.set("name", "peng");  String name = jedis.get("name");  System.out.println("name = " + name);  //释放锁  jedis.evalsha("b8059ba43af6ffe8bed3db65bac35d452f8115d8"   , Arrays.asList("k1"), Arrays.asList(value));     } else {  //来到这里说明有人在操作了     } });    }

四. Redisson 实现分布式锁

  1. 相对于 Jedis 这种原生态应用,Redisson 对 Redis 请求做了较多封装,对于锁,也提供了对应的方法可以直接使用

        public static void main(String[] args) { Config config = new Config(); //配置 Redis 的基本信息 config.useSingleServer().setAddress("redis://192.168.73.128:6379").setPassword("123"); //获取一个 RedissonClient 对象 RedissonClient redissonClient = Redisson.create(config); //获取一个锁对象实例 RLock mylock = redissonClient.getLock("mylock"); try {     //第一个参数是获取锁的等待时间     //第二个参数是锁的超时时间     boolean b = mylock.tryLock(5L, 10L, TimeUnit.SECONDS);     if (b) {  //获取到锁了  RBucket<Object> rBucket = redissonClient.getBucket("k1");  rBucket.set("peng");  Object o = rBucket.get();  System.out.println("o = " + o);     } else {  //没有获取到锁  System.out.println("没有获取到锁");     } } catch (InterruptedException e) {     e.printStackTrace(); }finally {     //释放锁     mylock.unlock(); }    }
  2. 在 Redisson 中,核心的就是 mylock.tryLock(5L, 10L, TimeUnit.SECONDS) ,第一个参数表示尝试获取锁等待时间 5 秒;第二个参数锁的超时时间为 10 秒,即这个锁在 10 秒后会自动失效;第三个参数是参数的单位为 秒。

  3. 这里面的配置与上面的思路是一样的,Redisson 只不过是将锁相关的方法封装起来了而已。