注册

为什么推荐用Redisson实现分布式锁,看完直呼好好好

开心一刻


一男人站在楼顶准备跳楼,楼下有个劝解员拿个喇叭准备劝解
劝解员:兄弟,别跳
跳楼人:我不想活了
劝解员:你想想你媳妇
跳楼人:媳妇跟人跑了
劝解员:你还有兄弟
跳楼人:就是跟我兄弟跑的
劝解员:你想想你家孩子
跳楼人:孩子是他俩的
劝解员:死吧,妈的,你活着也没啥意义了


开心一刻

写在前面


关于锁,相信大家都不陌生,一般我们用其在多线程环境中控制对共享资源的并发访问;单服务下,用 JDK 中的 synchronizedLock 的实现类可实现对共享资源的并发访问,分布式服务下,JDK 中的锁就显得力不从心了,分布式锁也就应运而生了;分布式锁的实现方式有很多,常见的有如下几种



  1. 基于 MySQL,利用行级悲观锁(select ... for update)
  2. 基于 Redis,利用其 (setnx + expire) 或 set
  3. 基于 Zookeeper,利用其临时目录和事件回调机制   

本文不讲这些,网上资料很多,感兴趣的小伙伴自行去查阅;本文的重点是基于 Redis 的 Redisson,从源码的角度来看看为什么推荐用 Redisson 来实现分布式锁;推荐大家先去看看



搞懂了 Redis 的订阅、发布与Lua,Redisson的加锁原理就好理解了



有助于理解后文


分布式锁特点


可以类比 JDK 中的锁



  1. 互斥

    不仅要保证同个服务中不同线程的互斥,还需要保证不同服务间、不同线程的互斥;如何处理互斥,是自旋、还是阻塞 ,还是其他 ?


  2. 超时

    锁超时设置,防止程序异常奔溃而导致锁一直存在,后续同把锁一直加不上


  3. 续期

    程序具体执行的时长无法确定,所以过期时间只能是个估值,那么就不能保证程序在过期时间内百分百能运行完,所以需要进行锁续期,保证业务是在加锁的情况下完成的


  4. 可重入

    可重入锁又名递归锁,是指同一个线程在外层方法已经获得锁,再进入该线程的中层或内层方法会自动获取锁;简单点来说,就是同个线程可以反复获取同一把锁


  5. 专一释放

    通俗点来讲:谁加的锁就只有它能释放这把锁;为什么会出现这种错乱释放的问题了,举个例子就理解了



    线程 T1 对资源 lock_zhangsan 加了锁,由于某些原因,业务还未执行完,锁已经过期自动释放了,此时线程 T2 对资源 lock_zhangsan 加锁成功,T2 还在执行业务的过程中,T1 业务执行完后释放资源 lock_zhangsan 的锁,结果把 T2 加的锁给释放了




  6. 公平与非公平

    公平锁:多个线程按照申请锁的顺序去获得锁,所有线程都在队列里排队,这样就保证了队列中的第一个先得到锁


    非公平锁:多个线程不按照申请锁的顺序去获得锁,而是同时直接去尝试获取锁


    JDK 中的 ReentrantLock 就有公平和非公平两种实现,有兴趣的可以去看看它的源码;多数情况下用的是非公平锁,但有些特殊情况下需要用公平锁



你们可能会有这样的疑问



引入一个简单的分布式锁而已,有必要考虑这么多吗?



虽然绝大部分情况下,我们的程序都是在跑正常流程,但不能保证异常情况 100% 跑不到,出于健壮性考虑,异常情况都需要考虑到;下面我们就来看看 Redisson 是如何实现这些特点的


Redisson实现分布式锁


关于 Redisson,更多详细信息可查看官方文档,它提供了非常丰富的功能,分布式锁 只是其中之一;我们基于 Redisson 3.13.6,来看看分布式锁的实现



  1. 先将 Redis 信息配置给 Redisson,创建出 RedissonClient 实例

    Redis 的部署方式不同,Redisson 配置模式也会不同,详细信息可查看:Configuration,我们就以最简单的 Single mode 来配置


    @Before
    public void before() {
    Config config = new Config();
    config.useSingleServer()
    .setAddress("redis://192.168.1.110:6379");
    redissonClient = Redisson.create(config);
    }


  2. 通过 RedissonClient 实例获取锁

    RedissonClient 实例创建出来后,就可以通过它来获取锁


    /**
    * 多线程
    * @throws Exception
    */

    @Test
    public void multiLock() throws Exception {

    RLock testLock = redissonClient.getLock("multi_lock");
    int count = 5;
    CountDownLatch latch = new CountDownLatch(count);

    for (int i=1; i<=count; i++) {
    new Thread(() -> {
    try {
    System.out.println("线程 " + Thread.currentThread().getName() + " 尝试获取锁");
    testLock.lock();
    System.out.println(String.format("线程 %s 获取到锁, 执行业务中...", Thread.currentThread().getName()));
    try {
    TimeUnit.SECONDS.sleep(5);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    System.out.println(String.format("线程 %s 业务执行完成", Thread.currentThread().getName()));
    latch.countDown();
    } finally {
    testLock.unlock();
    System.out.println(String.format("线程 %s 释放锁完成", Thread.currentThread().getName()));
    }
    }, "t" + i).start();
    }

    latch.await();
    System.out.println("结束");
    }

    完整示例代码:redisson-demo



用 Redisson 实现分布式锁就是这么简单,但光会使用肯定是不够的,我们还得知道其底层实现原理



知其然,并知其所以然!



那如何知道其原理呢?当然是看其源码实现


客户端创建


客服端的创建过程中,会生成一个 id 作为唯一标识,用以区分分布式下不同节点中的客户端


client

id 值就是一个 UUID,客户端启动时生成;至于这个 id 有什么用,大家暂且在脑中留下这个疑问,我们接着往下看


锁获取


我们从 lock 开始跟源码


lock

最终会来到有三个参数的 lock 方法


private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();

// 尝试获取锁;ttl为null表示锁获取成功; ttl不为null表示获取锁失败,其值为其他线程占用该锁的剩余时间
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}

// 锁被其他线程占用而获取失败,使用redis的发布订阅功能来等待锁的释放通知,而非自旋监测锁的释放
RFuture<RedissonLockEntry> future = subscribe(threadId);

// 当前线程会阻塞,直到锁被释放时当前线程被唤醒(有超时等待,默认 7.5s,而不会一直等待)
// 持有锁的线程释放锁之后,redis会发布消息,所有等待该锁的线程都会被唤醒,包括当前线程
if (interruptibly) {
commandExecutor.syncSubscriptionInterrupted(future);
} else {
commandExecutor.syncSubscription(future);
}

try {
while (true) {
// 尝试获取锁;ttl为null表示锁获取成功; ttl不为null表示获取锁失败,其值为其他线程占用该锁的剩余时间
ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}

// waiting for message
if (ttl >= 0) {
try {
// future.getNow().getLatch() 返回的是 Semaphore 对象,其初始许可证为 0,以此来控制线程获取锁的顺序
// 通过 Semaphore 控制当前服务节点竞争锁的线程数量
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
if (interruptibly) {
future.getNow().getLatch().acquire();
} else {
future.getNow().getLatch().acquireUninterruptibly();
}
}
}
} finally {
// 退出锁竞争(锁获取成功或者放弃获取锁),则取消锁的释放订阅
unsubscribe(future, threadId);
}
// get(lockAsync(leaseTime, unit));
}

主要三个点:尝试获取锁订阅取消订阅



  1. 尝试获取锁
    尝试获取锁

    尝试获取锁主要做了两件事:1、尝试获取锁,2、锁续期;尝试获取锁主要涉及到一段 Lua 代码


    尝试获取锁Lua脚本

    结合 搞懂了 Redis 的订阅、发布与Lua,Redisson的加锁原理就好理解了 来看这段 Lua 脚本,还是很好理解的




    1. 用 exists 判断 key 不存在,则用 hash 结构来存放锁,key = 资源名,field = uuid + : + threadId,value 自增 1;设置锁的过期时间(默认是 lockWatchdogTimeout = 30 * 1000 毫秒),并返回 nil
    2. 用 hexists 判断 field = uuid + : + threadId 存在,则该 field 的 value 自增 1,并重置过期时间,最后返回 nil

      这里相当于实现了锁的重入


    3. 上面两种情况都不满足,则说明锁被其他线程占用了,直接返回锁的过期时间


    给你们提个问题



    为什么 field = uuid + : + threadId,而不是 field = threadId


    友情提示:从多个服务(也就是多个 Redisson 客户端)来考虑


    这个问题想清楚了,那么前面提到的:在 Redisson 客户端创建的过程中生成的 id(一个随机的 uuid 值),它的作用也就清楚了



    尝试获取锁成功之后,会启动一个定时任务(即 WatchDog,亦称 看门狗)实现锁续期,也涉及到一段 Lua 脚本


    看门狗Lua

    这段脚本很简单,相信你们都能看懂



    默认情况下,锁的过期时间是 30s,锁获取成功之后每隔 10s 进行一次锁续期,重置过期时间成 30s


    若锁已经被释放了,则定时任务也会停止,不会再续期




  2. 订阅
    订阅

    获取锁的过程中,尝试获取锁失败(锁被其他线程锁占有),则会完成对该锁频道的订阅,订阅过程中线程会阻塞;持有锁的线程释放锁时会向锁频道发布消息,订阅了该锁频道的线程会被唤醒,继续去获取锁,


    给你们提个问题



    如果持有锁的线程意外停止了,未向锁频道发布消息,那订阅了锁频道的线程该如何唤醒



    Redisson 其实已经考虑到了,提供了超时机制来处理


    锁频道超时机制

    默认超时时长 = 3000 + 1500 * 3 = 7500 毫秒


    再给你们提个问题



    为什么要用 Redis 的发布订阅



    如果我们不用 Redis 的发布订阅,我们该如何实现,自旋?自旋有什么缺点? 自旋频率难以掌控,太高会增大 CPU 的负担,太低会不及时(锁都释放半天了才检测到);可以类比 生产者与消费者 来考虑这个问题


  3. 取消订阅

    有订阅,肯定就有取消订阅;当阻塞的线程被唤醒并获取到锁时需要取消对锁频道的订阅,当然,取消获取锁的线程也需要取消对锁频道的订阅


    取消订阅

    比较好理解,就是取消当前线程对锁频道的订阅



锁释放


我们从 unlock 开始


unlock

代码比较简单,我们继续往下跟


unlock_跟源码

主要有两点:释放锁取消续期定时任务



  1. 释放锁

    重点在于一个 Lua 脚本


    释放锁Lua脚本

    我们把参数具象化,脚本就好理解了



    KEYS[1] = 锁资源,KEYS[2] = 锁频道


    ARGV[1] = 锁频道消息类型,ARGV[2] = 过期时间,ARGV[3] = uuid + : + threadId



    1. 如果当前线程未持有锁,直接返回 nil
    2. hash 结构的 field 的 value 自减 1,counter = 自减后的 value 值

      如果 counter > 0,表示线程重入了,重置锁的过期时间,返回 0


      如果 counter <= 0,删除锁,并对锁频道发布锁释放消息(频道订阅者则可收到消息,然后唤醒线程去获取锁),返回 1


    3. 上面 1、2 都不满足,则直接返回 nil

    两个细节:1、重入锁的释放,2、锁彻底释放后的消息发布




  2. 取消续期定时任务
    取消续期定时任务

    比较简单,没什么好说的


    总结


    我们从分布式锁的特点出发,来总结下 Redisson 是如何实现这些特点的



    1. 互斥

      Redisson 采用 hash 结构来存锁资源,通过 Lua 脚本对锁资源进行操作,保证线程之间的互斥;互斥之后,未获取到锁的线程会订阅锁频道,然后进入一定时长的阻塞


    2. 超时

      有超时设置,给 hash 结构的 key 加上过期时间,默认是 30s


    3. 续期

      线程获取到锁之后会开启一个定时任务(watchdog 即 看门狗),每隔一定时间(默认 10s)重置 key 的过期时间


    4. 可重入

      通过 hash 结构解决,key 是锁资源,field(值:uuid + : + threadId) 是持有锁的线程,value 表示重入次数


    5. 专一释放

      通过 hash 结构解决,field 中存放了线程信息,释放的时候就能够知道是不是当前线程加上的锁,是才能够进行锁释放


    6. 公平与非公平

      由你们在评论区补充





作者:青石路
来源:juejin.cn/post/7425786548061683727

0 个评论

要回复文章请先登录注册