c语言开发的基于内存的数据库,读写速度很快,因此广泛用于缓存方向。
我们在系统中修改已有数据时,需要先读取,然后进行修改保存,此时很容易遇到并发问题。由于修改和保存不是原子操作,在并发场景下,部分对数据的操作可能会丢失。在单服务器系统我们常用本地锁来避免并发带来的问题,然而,当服务采用集群方式部署时,本地锁无法在多个服务器之间生效,这时候保证数据的一致性就需要分布式锁来实现。
redis原生命令实现
使用redis实现分布式锁,主要是使用redis的SETNX命令(set if not exist)
加锁命令:SETNX key value,当键不存在的时候设置键并返回成功,否者返回失败,key是锁的唯一标识,一般按照业务内容来命名
解锁命令:DEL key,通过删除键值对来释放锁,以便其他线程来使用加锁命令获取锁
锁超时:EXPIRE key timeout设置key的超时时间,当锁没有被线程显式的释放时 ,会在达到超时时间后,自动删除锁,避免死锁。
简单的加锁代码实现如下:
public boolean tryLock(String key,String requset,int timeout) {
Long result = jedis.setnx(key, requset);
if (result == 1L) {
return jedis.expire(key, timeout) == 1L;
} else {
return false;
上述代码有一些问题:
1.setnx 和 expire 两个操作是非原子性的
如果执行setnx命令设置锁成功,在执行expire命令设置失效时间时服务器宕机或者重启等其他问题导致了expire命令没有执行成功,此时锁没有设置超时时间,会有死锁的可能。
利用lua脚本将这两个操作原子化:
public boolean tryLock_with_lua(String key, String UniqueId, int seconds) {
String lua_scripts = "if redis.call('setnx',KEYS[1],ARGV[1]) == 1 then" +
"redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end"
List<String> keys = new ArrayList<>()
List<String> values = new ArrayList<>()
keys.add(key)
values.add(UniqueId)
values.add(String.valueOf(seconds))
Object result = jedis.eval(lua_scripts, keys, values)
//判断是否成功
return result.equals(1L)
2.锁误解除
线程A获取锁,设置超时时间后,A线程的执行时间超过了超时时间,在超时时间达到后,自动释放锁,此时线程B获取到锁,当A执行完成之后,手动的将锁释放了,而此时释放的是B获取的锁,从而产生了锁误解除的问题
针对锁误解除的问题,我们可以在设置key的时候设置对应的value,value可以看成是获取锁的线程或者线程的唯一标识,可以使用uuid来作为唯一标识,在删除锁之前校验key对应的value与线程持有的value是否相同,从而避免删除了不是自己持有的锁。
3.超时解锁导致并发执行
线程A获取锁后开始执行,但是执行时间超过了锁的超时时间,此时会自动释放锁,线程B获取锁开始执行,此时就会导致线程A和线程B同时执行。
A和B两个线程并发执行解决方法:
将过期时间设置的足够长,保证代码能够在过期时间内执行完成
为拿到锁的线程设置守护线程,给要过期但是未释放的锁增加有效时间
4.不可重入
当线程在持有锁的情况下,再次请求该锁。一个锁支持在一个线程多次加锁,那么这个锁就是可重入的。反之,如果一个不可重入的锁已经被持有的线程再次加锁,那么再次加锁会失败。Redis可以对锁的重入进行计数,在加锁的时候+1,在解锁的时候-1,当技术归于0的时候,锁释放。
如下是使用本地缓存ThreadLocal的简单实现:
private static ThreadLocal<Map<String, Integer>> LOCKERS = ThreadLocal.withInitial(HashMap::new);
// 加锁
public boolean lock(String key) {
Map<String, Integer> lockers = LOCKERS.get();
if (lockers.containsKey(key)) {
lockers.put(key, lockers.get(key) + 1);
return true;
} else {
if (SET key uuid NX EX 30) {
lockers.put(key, 1);
return true;
return false;
// 解锁
public void unlock(String key) {
Map<String, Integer> lockers = LOCKERS.get();
if (lockers.getOrDefault(key, 0) <= 1) {
lockers.remove(key);
DEL key
} else {
lockers.put(key, lockers.get(key) - 1);
使用redis的 Map 数据结构在设置key的同时,计入重入次数
5.无法等待锁释放
以上的方式都是直接返回失败或者成功的结果的,如果客户端可以等待锁释放就不能使用了
可以通过客户端轮询的方式解决这个问题,当未获取到锁时,等待一段时间后重新获取结果,直到获取到锁或者等待超时。这种方法比较消耗服务器的资源,在并发量较大的时候效率较低。
使用Redis的发布订阅功能,在获取锁失败以后,订阅释放锁的信息,当锁被释放的时候,发送释放锁的信息
Redisson实现
线程获取锁的时候执行的锁lua脚本,保证了原子性
线程获取锁失败的时候,会一直通过while循环尝试获取锁,直到获取成功,再执行lua脚本(这里也包含了等待时间)
支持watch doc自动延期,这一点是针对上面锁说的超时解锁导致并发执行的情况,这里watch dog在后台开启了一个线程,不断延长key的生存时间。但是这个相当于监控线程的watch dog会对性能有一定的影响
实现了可重入锁的机制
redis本身的存储数据结构支持Map
Map的key值可以表示当前的线程信息,value可以用来记录重入的次数