test_lock :
UUID:ThreadId:1
这里的1使用来做锁重入的。
pexpire
指令为test_lock这个key设置过期时间为30s,即:30s后这个key会⾃动过期被删除,key对应的锁在那时也就被释放了。
总体来看,加锁的逻辑很简单:
在key对应的hash数据结构中记录了⼀ 下当前是哪个客户端的哪个线程过来加锁了,然后设置了⼀下key的过期时间为30s。 3)加锁成功之后的锁续约
成功加锁后,lua脚本返回nil,即null。
加锁成功之后,tryLockInnerAsync()⽅法返回;再结合Java8的Stream,对加锁结果进一步处理;
因为加锁成功后返回的是nil,这是lua脚本的返回形式,体现到java代码中的返回值为:null。
又由于RLock#lock()方法传入的leaseTime是-1,所以进入到scheduleExpirationRenewal(long)
方法做锁续约。
renewExpirationAsync()方法负责做具体的锁续约:
protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getRawName()),
internalLockLeaseTime, getLockName(threadId));
这里LUA脚本的逻辑很简单:
- 判断当前key中,是否还被线程UUID:ThreadId持有锁,持有则设置过期时间为30s(续命)。
锁续约(看门狗机制)其实就是每次加锁成功后,会⻢上开启⼀个后台线程, 每隔10s检查⼀下key是否存在,如果存在就为key续期30s。
- 这里的10s,取自配置的
lockWatchdogTimeout
参数,默认为30 * 1000 ms; - 所以⼀个key往往当过期时间慢慢消逝到20s左右时就⼜会被定时任务重置为了30s,这样就能保证:只要这个定时任务还在、这个key还在,就⼀直维持加锁。
如果当前持有锁的线程被中断了,会停止锁续约,即杀死看门狗;
protected void cancelExpirationRenewal(Long threadId) {
ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (task == null) {
return;
if (threadId != null) {
task.removeThreadId(threadId);
if (threadId == null || task.hasNoThreads()) {
Timeout timeout = task.getTimeout();
if (timeout != null) {
timeout.cancel();
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
所谓的停止锁续约,实际就是将当前线程的threadId从看门狗缓存中移除,后续在执行锁续约时,如果发现看门狗缓存中已经没有了当前线程threadId,则直接退出锁续约 并且 不再延时10s开启一个定时任务。
如果加锁时指定了leaseTime > 0,则不会开门狗机制,表示强制锁leaseTime 毫秒后过期。一共有三种加锁方式可以做到,如下:
- RLock#lock(long leaseTime, TimeUnit unit)
- RLock#tryLock(long waitTime, long leaseTime, TimeUnit unit)
- RLock#lockInterruptibly(long leaseTime, TimeUnit unit)
4)重入加锁(相同线程多次加锁)
再次回到加锁的LUA脚本:
同一个线程对分布式锁多次加锁时,会走以下逻辑:
- 判断当前key是否被当前线程持有,如果是则增加锁重入的次数,并重新设置锁的过期时间为30s;
对应的Redis命令为:
hexists test_lock UUID:ThreadId
hincrby test_lock UUID:ThreadId 1
pexpire test_lock 30000
此时Redis中test_key对应的数据结构从
test_lock :
UUID:ThreadId:1
test_lock :
UUID:ThreadId:2
并将key的过期时间重新设置为30s。
锁重入成功之后,后台也会开启⼀个watchdog后台线程做锁续约,每隔10s检查⼀下key,如果key存在就将key的过期时间重新设置为30s。
Redisson可重⼊加锁的语义,实际是通过Hash结构的key中某个线程(UUID:ThreadId)对应的加锁次数来表示的。
5)锁竞争(其他线程加锁失败)
再再次回到加锁的LUA脚本:
如果分布式锁已经被其他线程持有,LUA脚本会执行以下逻辑:
返回当前key的剩余存活时间,因为不是返回nil,也就代表着加锁失败;
对应的Redis的命令为:
针对加锁方式的不同,加锁失败的逻辑也不同;可以分两大类:指定了加锁失败的等待时间waitTime和未指定waitTime。
- 未执行加锁失败的等待时间
waitTime
:获取分布式锁失败会一直重试,直到获取锁成功。比如下列加锁方法:
Rlock#lock()
:一直尝试获取分布式锁,直到获取锁成功。RLock#lockInterruptibly(long leaseTime, TimeUnit unit)
RLock#lock(long leaseTime, TimeUnit unit)
- 指定了加锁失败的等待时间
waitTime
:获取分布式锁会超时,超时之后返回加锁失败;Rlock#tryLock(long waitTime, TimeUnit unit)
:指定获取锁失败的等待时间。在等待时间范围之内进行重试,超时则返回加锁失败。Rlock#tryLock(long waitTime, long leaseTime, TimeUnit unit)
:同样是指定获取锁失败的等待时间,并且强制指定锁过期的时间(不开启看门狗)。在等待时间范围之内进行重试,超时则返回加锁失败。
可以简单的概述为RLock接口下的tryLock()
方法获取锁会失败,lock()
方法获取锁一定会成功。
1> 一直重试直到加锁成功
以Rlock#lock()
方法为例:
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
pubSub.timeout(future);
RedissonLockEntry entry;
if (interruptibly) {
entry = commandExecutor.getInterrupted(future);
} else {
entry = commandExecutor.get(future);
try {
while (true) {
// lock() 或 lockInterruptibly()为入口走到这里时。leaseTime为-1,表示会开始开门狗;如果leaseTime大于0,则不会开启开门狗;
ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
// waiting for message
if (ttl >= 0) {
try {
// 因为Semaphore的可用资源为0,所以这里就等价于Thread.sleep(ttl);
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
if (interruptibly) {
entry.getLatch().acquire();
} else {
entry.getLatch().acquireUninterruptibly();
} finally {
unsubscribe(entry, threadId);
首先订阅解锁channel(命名格式:redisson_lock__channel:{keyName}
),其他线程解锁后,会发布解锁的消息;这里收到消息会立即尝试获取锁;订阅解锁channel的超时时间默认为7.5s。也就说获取锁失败7.5s之内,如果其他线程释放锁,当前线程可以立即尝试获取到锁。
获取锁失败之后会进⼊⼀个while死循环中:
每休息锁的存活时间ttl
之后,就尝试去获取锁,直到成功获取到锁才会跳出while死循环。
2> 等待锁超时返回加锁失败
以Rlock#tryLock(long waitTime, TimeUnit unit)
为例:
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
// 获取锁剩余的等待时长
time -= System.currentTimeMillis() - current;
if (time <= 0) {
// 获取锁超时,返回获取分布式锁失败
acquireFailed(waitTime, unit, threadId);
return false;
current = System.currentTimeMillis();
CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
try {
// 订阅解锁channel的超时时长为 获取锁剩余的等待时长
subscribeFuture.get(time, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
if (!subscribeFuture.completeExceptionally(new RedisTimeoutException(
"Unable to acquire subscription lock after " + time + "ms. " +
"Try to increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."))) {
subscribeFuture.whenComplete((res, ex) -> {
if (ex == null) {
unsubscribe(res, threadId);
acquireFailed(waitTime, unit, threadId);
return false;
} catch (ExecutionException e) {
acquireFailed(waitTime, unit, threadId);
return false;
try {
// 收到解锁channel的消息之后,走到这里,再次判断获取锁等待时长是否超时
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
// while循环中尝试去获取锁
while (true) {
long currentTime = System.currentTimeMillis();
ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
// waiting for message
currentTime = System.currentTimeMillis();
if (ttl >= 0 && ttl < time) {
// 如果获取锁失败后,锁存活时长 小于 剩余锁等待时长,则线程睡眠 锁存活时长
commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
// 如果获取锁失败后,锁存活时间 大于等于 剩余锁等待时长,则线程睡眠 锁等待时长
commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
} finally {
unsubscribe(commandExecutor.getNow(subscribeFuture), threadId);
加锁存在超时时间 相比于 一直重试直到加锁成功,只是多一个时间限制,具体差异体现在:订阅解锁channel的超时时长、获取锁失败后线程的睡眠时长、重试获取锁次数的限制;
获取分布式锁失败之后,立即判断当前获取锁是否超时,如果超时,则返回加锁失败;
否者,订阅解锁channel(命名格式:redisson_lock__channel:{keyName}
),其他线程解锁后,会发布解锁的消息;
订阅解锁channel的超时时间为 获取锁剩余的等待时长
。 在这个时间范围之内,如果其他线程释放锁,当前线程收到解锁channel的消息之后再次判断获取锁是否超时,如果不超时,尝试获取锁。
获取锁之后会进⼊⼀个while死循环中: 如果获取锁超时,则返回加锁失败;
否者让线程睡眠: 如果锁存活时长ttl
小于 剩余锁等待时长,则线程睡眠 锁存活时长;
如果锁存活时间ttl
大于等于 剩余锁等待时长,则线程睡眠 锁等待时长;
线程睡眠完之后,判断获取锁是否超时,不超时则尝试去获取锁。
3、释放锁流程
1)Client主动尝试释放锁
进入到Rlock#unlock()方法;
和加锁的方式⼀样,释放锁也是通过lua脚本来完成的;
LUA脚本参数解析:
- KEYS[1] 表示的是 getName() ,代表的是锁名 test_lock;
- KEYS[2] 表示getChanelName() 表示的是发布订阅过程中使用的Chanel;
- ARGV[1] 表示的是LockPubSub.unLockMessage,解锁消息,实际代表的是数字 0,代表解锁消息;
- ARGV[2] 表示的是internalLockLeaseTime 默认的有效时间 30s;
- ARGV[3] 表示的是 getLockName(thread.currentThread().getId()) 代表的是 UUID:ThreadId 用锁对象id+线程id, 表示当前访问线程,用于区分不同服务器上的线程。
LUA脚本逻辑:
- 如果锁名称不存在;
- 可能是因为锁过期导致锁不存在,也可能是并发解锁。
- 则发布锁解除的消息,返回1,lua脚本执行完毕;
- 如果锁存在,检测当前线程是否持有锁;
- 如果是当前线程持有锁,定义变量counter,接收执行incrby将该线程重入的次数–的结果;
- 如果重入次数大于0,表示该线程还有其他任务需要执行;重新设置锁的过期时间;返回0,lua脚本执行完毕;
- 否则表示该线程执行结束,del删除该锁;并且publish发布该锁解除的消息;返回1,lua脚本执行完毕;
- 如果不是当前线程持有锁 或 其他情况,都返回nil,lua脚本执行完毕。
脚本执行结束之后,如果返回值不是0或1,即当前线程去释放其他线程的加锁时,抛出异常。
通过LUA脚本释放锁成功之后,会将看门狗杀死;
2)Client主动强制释放锁
forceUnlockAsync()
方法被调用的地方很多,大多都是在清理资源时删除锁。
@Override
public RFuture<Boolean> forceUnlockAsync() {
cancelExpirationRenewal(null);
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('del', KEYS[1]) == 1) then "
+ "redis.call('publish', KEYS[2], ARGV[1]); "
+ "return 1 "
+ "else "
+ "return 0 "
+ "end",
Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE);
LUA脚本逻辑:
逻辑比较简单粗暴:删除锁成功则并发布锁被删除的消息,返回1结束,否则返回0结束。
3)Client宕机,锁超时释放
如果Redisson客户端刚加锁成功,并且未指定releaseTime,后台会启动一个定时任务watchdog每隔10s检查key:key如果存在就为它⾃动续命到30s;在watchdog定时任务存在的情况下,如果不是主动释放锁,那么key将会⼀直的被watchdog这个定时任务维持加锁。
但是如果客户端宕机了,定时任务watchdog也就没了,也就没有锁续约机制了,那么过完30s之后,key会⾃动被删除、key对应的锁也自动被释放了。
4)不启动锁续约的超时释放锁
如果在加锁时指定了leaseTime,加锁成功之后,后台并不会启动一个定时任务watchdog做锁续约;key存活leaseTime 毫秒之后便会自动被删除、key对应的锁也就自动被释放了;无论当前线程的业务逻辑是否执行完毕。
比如使用如下方式加锁:
- RLock#lock(long leaseTime, TimeUnit unit)
- RLock#tryLock(long waitTime, long leaseTime, TimeUnit unit)
- RLock#lockInterruptibly(long leaseTime, TimeUnit unit)
到此这篇关于Redisson如何实现分布式锁、锁续约的文章就介绍到这了,更多相关redisson分布式锁、锁续约内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
您可能感兴趣的文章:
电脑版 - 返回首页
2006-2023 脚本之家 JB51.Net , All Rights Reserved.
苏ICP备14036222号