为什么推荐用Redisson实现分布式锁,看完直呼好好好
开心一刻
一男人站在楼顶准备跳楼,楼下有个劝解员拿个喇叭准备劝解
劝解员:兄弟,别跳
跳楼人:我不想活了
劝解员:你想想你媳妇
跳楼人:媳妇跟人跑了
劝解员:你还有兄弟
跳楼人:就是跟我兄弟跑的
劝解员:你想想你家孩子
跳楼人:孩子是他俩的
劝解员:死吧,妈的,你活着也没啥意义了
写在前面
关于锁,相信大家都不陌生,一般我们用其在多线程环境中控制对共享资源的并发访问;单服务下,用 JDK 中的 synchronized
或 Lock
的实现类可实现对共享资源的并发访问,分布式服务下,JDK 中的锁就显得力不从心了,分布式锁也就应运而生了;分布式锁的实现方式有很多,常见的有如下几种
- 基于 MySQL,利用行级悲观锁(select ... for update)
- 基于 Redis,利用其 (setnx + expire) 或 set
- 基于 Zookeeper,利用其临时目录和事件回调机制
本文不讲这些,网上资料很多,感兴趣的小伙伴自行去查阅;本文的重点是基于 Redis 的 Redisson,从源码的角度来看看为什么推荐用 Redisson 来实现分布式锁;推荐大家先去看看
搞懂了 Redis 的订阅、发布与Lua,Redisson的加锁原理就好理解了
有助于理解后文
分布式锁特点
可以类比 JDK 中的锁
- 互斥
不仅要保证同个服务中不同线程的互斥,还需要保证不同服务间、不同线程的互斥;如何处理互斥,是自旋、还是阻塞 ,还是其他 ?
- 超时
锁超时设置,防止程序异常奔溃而导致锁一直存在,后续同把锁一直加不上
- 续期
程序具体执行的时长无法确定,所以过期时间只能是个估值,那么就不能保证程序在过期时间内百分百能运行完,所以需要进行锁续期,保证业务是在加锁的情况下完成的
- 可重入
可重入锁又名递归锁,是指同一个线程在外层方法已经获得锁,再进入该线程的中层或内层方法会自动获取锁;简单点来说,就是同个线程可以反复获取同一把锁
- 专一释放
通俗点来讲:谁加的锁就只有它能释放这把锁;为什么会出现这种错乱释放的问题了,举个例子就理解了
线程 T1 对资源 lock_zhangsan 加了锁,由于某些原因,业务还未执行完,锁已经过期自动释放了,此时线程 T2 对资源 lock_zhangsan 加锁成功,T2 还在执行业务的过程中,T1 业务执行完后释放资源 lock_zhangsan 的锁,结果把 T2 加的锁给释放了
- 公平与非公平
公平锁:多个线程按照申请锁的顺序去获得锁,所有线程都在队列里排队,这样就保证了队列中的第一个先得到锁
非公平锁:多个线程不按照申请锁的顺序去获得锁,而是同时直接去尝试获取锁
JDK 中的 ReentrantLock 就有公平和非公平两种实现,有兴趣的可以去看看它的源码;多数情况下用的是非公平锁,但有些特殊情况下需要用公平锁
你们可能会有这样的疑问
引入一个简单的分布式锁而已,有必要考虑这么多吗?
虽然绝大部分情况下,我们的程序都是在跑正常流程,但不能保证异常情况 100% 跑不到,出于健壮性考虑,异常情况都需要考虑到;下面我们就来看看 Redisson 是如何实现这些特点的
Redisson实现分布式锁
关于 Redisson
,更多详细信息可查看官方文档,它提供了非常丰富的功能,分布式锁 只是其中之一;我们基于 Redisson 3.13.6
,来看看分布式锁的实现
- 先将 Redis 信息配置给 Redisson,创建出 RedissonClient 实例
Redis 的部署方式不同,Redisson 配置模式也会不同,详细信息可查看:Configuration,我们就以最简单的
Single mode
来配置@Before
public void before() {
Config config = new Config();
config.useSingleServer()
.setAddress("redis://192.168.1.110:6379");
redissonClient = Redisson.create(config);
} - 通过 RedissonClient 实例获取锁
RedissonClient 实例创建出来后,就可以通过它来获取锁
/**
* 多线程
* @throws Exception
*/
@Test
public void multiLock() throws Exception {
RLock testLock = redissonClient.getLock("multi_lock");
int count = 5;
CountDownLatch latch = new CountDownLatch(count);
for (int i=1; i<=count; i++) {
new Thread(() -> {
try {
System.out.println("线程 " + Thread.currentThread().getName() + " 尝试获取锁");
testLock.lock();
System.out.println(String.format("线程 %s 获取到锁, 执行业务中...", Thread.currentThread().getName()));
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(String.format("线程 %s 业务执行完成", Thread.currentThread().getName()));
latch.countDown();
} finally {
testLock.unlock();
System.out.println(String.format("线程 %s 释放锁完成", Thread.currentThread().getName()));
}
}, "t" + i).start();
}
latch.await();
System.out.println("结束");
}完整示例代码:redisson-demo
用 Redisson 实现分布式锁就是这么简单,但光会使用肯定是不够的,我们还得知道其底层实现原理
知其然,并知其所以然!
那如何知道其原理呢?当然是看其源码实现
客户端创建
客服端的创建过程中,会生成一个 id
作为唯一标识,用以区分分布式下不同节点中的客户端
id 值就是一个 UUID,客户端启动时生成;至于这个 id 有什么用,大家暂且在脑中留下这个疑问,我们接着往下看
锁获取
我们从 lock
开始跟源码
最终会来到有三个参数的 lock 方法
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
// 尝试获取锁;ttl为null表示锁获取成功; ttl不为null表示获取锁失败,其值为其他线程占用该锁的剩余时间
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
// 锁被其他线程占用而获取失败,使用redis的发布订阅功能来等待锁的释放通知,而非自旋监测锁的释放
RFuture<RedissonLockEntry> future = subscribe(threadId);
// 当前线程会阻塞,直到锁被释放时当前线程被唤醒(有超时等待,默认 7.5s,而不会一直等待)
// 持有锁的线程释放锁之后,redis会发布消息,所有等待该锁的线程都会被唤醒,包括当前线程
if (interruptibly) {
commandExecutor.syncSubscriptionInterrupted(future);
} else {
commandExecutor.syncSubscription(future);
}
try {
while (true) {
// 尝试获取锁;ttl为null表示锁获取成功; ttl不为null表示获取锁失败,其值为其他线程占用该锁的剩余时间
ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
if (ttl >= 0) {
try {
// future.getNow().getLatch() 返回的是 Semaphore 对象,其初始许可证为 0,以此来控制线程获取锁的顺序
// 通过 Semaphore 控制当前服务节点竞争锁的线程数量
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
if (interruptibly) {
future.getNow().getLatch().acquire();
} else {
future.getNow().getLatch().acquireUninterruptibly();
}
}
}
} finally {
// 退出锁竞争(锁获取成功或者放弃获取锁),则取消锁的释放订阅
unsubscribe(future, threadId);
}
// get(lockAsync(leaseTime, unit));
}
主要三个点:尝试获取锁
、订阅
、取消订阅
- 尝试获取锁
尝试获取锁主要做了两件事:1、尝试获取锁,2、锁续期;尝试获取锁主要涉及到一段 Lua 代码
结合 搞懂了 Redis 的订阅、发布与Lua,Redisson的加锁原理就好理解了 来看这段 Lua 脚本,还是很好理解的
- 用 exists 判断 key 不存在,则用 hash 结构来存放锁,key = 资源名,field = uuid + : + threadId,value 自增 1;设置锁的过期时间(默认是 lockWatchdogTimeout = 30 * 1000 毫秒),并返回 nil
- 用 hexists 判断 field = uuid + : + threadId 存在,则该 field 的 value 自增 1,并重置过期时间,最后返回 nil
这里相当于实现了锁的重入
- 上面两种情况都不满足,则说明锁被其他线程占用了,直接返回锁的过期时间
给你们提个问题
为什么 field = uuid + : + threadId,而不是 field = threadId
友情提示:从多个服务(也就是多个 Redisson 客户端)来考虑
这个问题想清楚了,那么前面提到的:在 Redisson 客户端创建的过程中生成的 id(一个随机的 uuid 值),它的作用也就清楚了
尝试获取锁成功之后,会启动一个定时任务(即
WatchDog
,亦称看门狗
)实现锁续期,也涉及到一段 Lua 脚本这段脚本很简单,相信你们都能看懂
默认情况下,锁的过期时间是 30s,锁获取成功之后每隔 10s 进行一次锁续期,重置过期时间成 30s
若锁已经被释放了,则定时任务也会停止,不会再续期
- 订阅
获取锁的过程中,尝试获取锁失败(锁被其他线程锁占有),则会完成对该锁频道的订阅,订阅过程中线程会阻塞;持有锁的线程释放锁时会向锁频道发布消息,订阅了该锁频道的线程会被唤醒,继续去获取锁,
给你们提个问题
如果持有锁的线程意外停止了,未向锁频道发布消息,那订阅了锁频道的线程该如何唤醒
Redisson 其实已经考虑到了,提供了超时机制来处理
默认超时时长 = 3000 + 1500 * 3 = 7500 毫秒
再给你们提个问题
为什么要用 Redis 的发布订阅
如果我们不用 Redis 的发布订阅,我们该如何实现,自旋?自旋有什么缺点? 自旋频率难以掌控,太高会增大 CPU 的负担,太低会不及时(锁都释放半天了才检测到);可以类比
生产者与消费者
来考虑这个问题 - 取消订阅
有订阅,肯定就有取消订阅;当阻塞的线程被唤醒并获取到锁时需要取消对锁频道的订阅,当然,取消获取锁的线程也需要取消对锁频道的订阅
比较好理解,就是取消当前线程对锁频道的订阅
锁释放
我们从 unlock
开始
代码比较简单,我们继续往下跟
主要有两点:释放锁
和 取消续期定时任务
- 释放锁
重点在于一个 Lua 脚本
我们把参数具象化,脚本就好理解了
KEYS[1] = 锁资源,KEYS[2] = 锁频道
ARGV[1] = 锁频道消息类型,ARGV[2] = 过期时间,ARGV[3] = uuid + : + threadId
- 如果当前线程未持有锁,直接返回 nil
- hash 结构的 field 的 value 自减 1,counter = 自减后的 value 值
如果 counter > 0,表示线程重入了,重置锁的过期时间,返回 0
如果 counter <= 0,删除锁,并对锁频道发布锁释放消息(频道订阅者则可收到消息,然后唤醒线程去获取锁),返回 1
- 上面 1、2 都不满足,则直接返回 nil
两个细节:1、重入锁的释放,2、锁彻底释放后的消息发布
- 取消续期定时任务
比较简单,没什么好说的
总结
我们从分布式锁的特点出发,来总结下 Redisson 是如何实现这些特点的
- 互斥
Redisson 采用 hash 结构来存锁资源,通过 Lua 脚本对锁资源进行操作,保证线程之间的互斥;互斥之后,未获取到锁的线程会订阅锁频道,然后进入一定时长的阻塞
- 超时
有超时设置,给 hash 结构的 key 加上过期时间,默认是 30s
- 续期
线程获取到锁之后会开启一个定时任务(watchdog 即 看门狗),每隔一定时间(默认 10s)重置 key 的过期时间
- 可重入
通过 hash 结构解决,key 是锁资源,field(值:uuid + : + threadId) 是持有锁的线程,value 表示重入次数
- 专一释放
通过 hash 结构解决,field 中存放了线程信息,释放的时候就能够知道是不是当前线程加上的锁,是才能够进行锁释放
- 公平与非公平
由你们在评论区补充
来源:juejin.cn/post/7425786548061683727