前言
我们在工作等一些场景中,一定是用到过分布式锁的。
无论采用Zookeeper
还是Redis
做分布式锁,锁是应该有一个有效期的。
如果我们不设置有效期,可能会出现死锁问题,导致锁不能释放,影响系统功能。
但是如果我们锁释放时间设置的不合理,导致锁释放后实际业务逻辑还没处理完,那也是不行的。
当然,我们可以预估业务逻辑的处理时间范围,合理的设置锁的释放时间,但是总归会可能出现业务处理时间过长的情况,这时候锁已经释放了,业务逻辑执行过程中就会处于不安全的环境里。
我们如何完全避免这种情况呢?
这就是我们这节要说的锁续期的一些知识。
正文
上述的情况其实非常好理解,就是如果有这种情况,我们直接让当前线程持有的锁在续期一段时间,保证业务逻辑能正常走完。
我们这儿以Redis
分布式锁来进行说明。
分布式锁举例
关于Redis
分布式锁,我们可以使用开源类库,如 Redisson
就是一款优秀的工具包,其中包含了Redis
分布式锁的实现。
其获取分布式锁的代码如下:
1 2 3 4 5 6 7 8 9 10 11
| private static RedissonClient redissonClient(){ Config config = new Config(); config.useSingleServer().setAddress("redis://127.0.0.1:6379").setDatabase(0); return Redisson.create(config); } public static void main(String[] args) { RedissonClient client = redissonClient(); RLock lock = client.getLock(REDIS_KEY); }
|
注:这儿我们为了方便,使用了单一 Redis
实例,当然也是支持 Redis
集群和主从模式的,这儿就不过多介绍了。
我们拿到lock
后,可以进行加锁、解锁等逻辑,如下图。
可以看到它继承Lock
接口,当然Lock
接口的几个方法也会实现。
我们知道,对于没有设置锁超时时间的方法,如tryLock()
,如果加锁后某些原因导致锁不能释放,会出现死锁问题。
Redisson
对于这种情况是怎么处理的呢?我们来看一下。
首先,我们先来看如下代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| public class RedissonRenewalLock {
private static final String REDIS_KEY = "redisson_renewal_lock";
private static RedissonClient redissonClient(){ Config config = new Config(); config.useSingleServer().setAddress("redis://127.0.0.1:6379").setDatabase(0); return Redisson.create(config); }
public static void main(String[] args) { RedissonClient client = redissonClient(); RLock lock = client.getLock(REDIS_KEY); boolean isLock = false; try { isLock = lock.tryLock(1,30,TimeUnit.SECONDS); if(isLock){ for(int i=0;i<50;i++){ TimeUnit.SECONDS.sleep(1); System.out.println(lock.remainTimeToLive()); }
}else{ System.out.println("未成功获得锁......"); } }catch (Exception e){ e.printStackTrace(); }finally { if(isLock){ if(lock.isHeldByCurrentThread()){ lock.unlock(); } } } } }
|
上述代码的意思是获取锁后,我们设置锁超时时间为30s,假设我们需要处理一个耗时50s的任务,每隔1s输出下锁的剩余时间。
可以看到30s后返回时间变为-2,表示锁已经被释放,此时业务逻辑还没执行完,执行处于不安全环境中。
我们这样使用,如果业务逻辑处理时间超过我们锁的超时时间,就会有些问题。
我们将上述的lock.tryLock(1,30,TimeUnit.SECONDS);
改为 lock.tryLock();
方法来看下。
1 2 3 4 5
| try { isLock = lock.tryLock(); if(isLock){ ......
|
查看日志我们可以发现锁的剩余时间每隔一段时间就会增加,这就是我们今天要说的锁续期。
分布式锁续期原理
上面代码我们肯定有所疑问,我们来看下。
- 我们通过
tryLock()
没有设置锁的超时时间,看日志它好像是默认30s计时? - 当每隔10s后,如果业务逻辑还没执行完,会进行锁的续期?
这些问题我们都需要通过源码来看下。
首先通过 tryLock()
方法找到 RedissonLock
的 tryLock()
方法实现。
1 2 3 4
| @Override public boolean tryLock() { return get(tryLockAsync()); }
|
通过tryLockAsync()
方法可以找到具体实现逻辑,如下:
1 2 3 4 5 6 7 8
| @Override public RFuture<Boolean> tryLockAsync() { return tryLockAsync(Thread.currentThread().getId()); } @Override public RFuture<Boolean> tryLockAsync(long threadId) { return tryAcquireOnceAsync(-1, null, threadId); }
|
关键方法tryAcquireOnceAsync
,其中leaseTime
传入 -1,会执行tryLockInnerAsync
方法,传入一个过期时间,同时会有一个续期的逻辑。
tryLockInnerAsync
方法就是去执行一段 Lua
脚本,设置锁。
这段Lua
脚本的**KEYS[1]
**代表的是加锁的那个key,比如redisson_renewal_lock
;
**ARGV[1]
**代表的是锁key的默认生存时间,默认30秒;
**ARGV[2]
**代表的是加锁的客户端的ID,如 25008516-595b-4a9f-af59-165a14330820:1
。
逻辑如下:
加锁逻辑
第一段if
判断语句,用exists redisson_renewal_lock
命令判断一下,如果要加锁的那个key不存在的话,就进行加锁;
加锁操作,hset redisson_renewal_lock 25008516-595b-4a9f-af59-165a14330820:1 1
;
redisson_renewal_lock
的数据如下:
1 2 3 4
| redisson_renewal_lock { 25008516-595b-4a9f-af59-165a14330820:1 1 }
|
设置锁过期时间,pexpire redisson_renewal_lock 30000
;
至此加锁完成。
锁互斥逻辑
- 当客户端2想尝试获取锁时,此时只有客户端id不同,比如
ARGV[2]
为 f0b30905-e2a0-43c4-91a2-9aa608a3ba1e:1
; - 第一段
if
判断语句,明显锁已被客户端1持有,判断逻辑进不去; - 来到第二段
if
判断语句,判断 hash
数据结构中是否包含客户端2的id,显然不存在,里面是客户端1的id; - 所以客户端2会获取到
pttl redisson_renewal_lock
返回的一个数字,这个数字代表了redisson_renewal_lock
这个锁key的剩余生存时间。 - 客户端2会等待,并尝试重新获取锁。
重入锁逻辑
当客户端1未释放锁,又想获取该锁时(重入);
第一段if
判断语句,明显锁已经存在,判断逻辑进不去;
来到第二段if
判断语句,hexists redisson_renewal_lock 25008516-595b-4a9f-af59-165a14330820:1
,判断通过,存在客户端1的id;
调用 hincrby redisson_renewal_lock 25008516-595b-4a9f-af59-165a14330820:1 1
,意思是为其+1;
因此redisson_renewal_lock
的数据变为了如下:
1 2 3 4
| redisson_renewal_lock { 25008516-595b-4a9f-af59-165a14330820:1 2 }
|
设置锁过期时间,pexpire redisson_renewal_lock 30000
;
至此重入锁逻辑完成。
注: 这儿使用Lua
脚本的原因就是为了保证操作的原子性。
上述传入的时间我们也可以很快找到,默认设置的是30s。
1
| private long lockWatchdogTimeout = 30 * 1000;
|
这也就解决了我们的第一个疑问。
可以看到,Redisson
的 tryLock()
方法默认设置的30s的锁有效期,每隔一定时间,就会为锁续期,续期是通过scheduleExpirationRenewal
保证的。
我们来看下这个方法,关键调用方法如下:
其关键部分就是创建一个TimerTask
执行续约逻辑,该任务的间隔时间是internalLockLeaseTime / 3
,默认的情况下是10s,也就是我们在日志中看到的每个10s,锁的超时时间又变为了30s的原因。
其锁续约的代码也是一段Lua
脚本。
这段Lua
脚本就不过多介绍了。
锁续期的原理图如下:
我们再来看下释放锁的逻辑,这儿只上关键性部分,如下:
释放锁的Lua
脚本如下:
其主要逻辑是:
- 判断当前线程锁存不存在,不存在就不用处理了;
- 如果存在,进行次数-1操作;
- 如果次数还是大于0的(重入锁),就重新设置过期时间;
- 否则就删除锁key(释放锁),同时进行通知。
这里的**KEYS[1]
**代表的是加锁的那个key,比如redisson_renewal_lock
;
**ARGV[1]
和KEYS[2]
**代表的是释放锁后通知的消息信息;
**ARGV[2]
**代表的是锁key的默认生存时间,默认30秒;
**ARGV[3]
**代表的是加锁的客户端的ID,如 25008516-595b-4a9f-af59-165a14330820:1
。
自写Redis续期锁
我们根据上面逻辑,可以手写一个Redis
续期锁,这儿我们依靠守护线程来延期锁key
。
首先我们需要锁key
、加锁时间、守护线程参数等,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| public class MyRedisLock {
private String lockKey;
private long lockTime;
private boolean success = false;
private DaemonCheckProcessor checkProcessor; private Thread checkThread;
private String clientId = UUID.randomUUID().toString()+":"+Thread.currentThread().getId();
private RedisRepository redisRepository; }
|
success
用来记录锁状态,clientId
用于标识该锁的持有者。
构造方法只要传入锁key
就行,我们这儿还传入了RedisRepository(即 RedisTemplate)
,其实可以用Spring
注入,我们这儿为了方便就直接引入了。
1 2 3 4 5
| public MyRedisLock(String lockKey,RedisRepository redisRepository) { this.lockKey = lockKey; this.redisRepository = redisRepository; this.lockTime = 30000; }
|
锁需要有几个关键方法:tryLock
、unLock
等,此外还需要能判断锁是否加锁成功及是否当前线程持有锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| public boolean tryLock(){ String script = "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);"; Long waitTime = redisRepository.eval(script,Long.class,lockKey,lockTime,clientId); if(waitTime!=null){ return false; } checkProcessor = new DaemonCheckProcessor(lockKey, clientId, lockTime,redisRepository); checkThread = new Thread(checkProcessor); checkThread.setDaemon(Boolean.TRUE); checkThread.start(); success = true; return true; }
public void unLock(){ String script = "if (redis.call('hexists', KEYS[1], ARGV[2]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[2], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "return 1; "+ "end; " + "return nil;"; redisRepository.eval(script,Integer.class,lockKey,lockTime,clientId); if(checkProcessor != null){ checkProcessor.stop(); } if(checkThread!=null){ checkThread.interrupt(); } success =false; }
|
这儿我们构建一个守护线程类,该类每隔 2/3 lockTime
就会去尝试延期一下,如果锁还没被释放的话。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
| class DaemonCheckProcessor implements Runnable{
private static final long MARK_EXPIRE_SUCCESS = 1L;
private String key;
private String clientId;
private long lockTime;
private RedisRepository redisRepository;
public DaemonCheckProcessor(String key, String clientId, long lockTime,RedisRepository redisRepository) { this.key = key; this.clientId = clientId; this.lockTime = lockTime; this.signal = Boolean.TRUE; this.redisRepository = redisRepository; }
private volatile Boolean signal;
void stop() { this.signal = Boolean.FALSE; }
@Override public void run() { long waitTime = lockTime * 2 / 3; while (signal) { try { Thread.sleep(waitTime); if (expandLockTime(key, clientId, lockTime) != MARK_EXPIRE_SUCCESS) { this.stop(); } } catch (InterruptedException e) { e.printStackTrace(); } } }
private Long expandLockTime(String key,String clientId,long lockTime){ String script = "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;"; return redisRepository.eval(script,Long.class,key,lockTime,clientId); } }
|
基本流程就是这样,我下面附上自写的完整代码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248
|
public class MyRedisLock {
private String lockKey;
private long lockTime;
private boolean success = false;
private DaemonCheckProcessor checkProcessor; private Thread checkThread;
private String clientId = UUID.randomUUID().toString()+":"+Thread.currentThread().getId();
private RedisRepository redisRepository;
public MyRedisLock(String lockKey,RedisRepository redisRepository) { this.lockKey = lockKey; this.redisRepository = redisRepository; this.lockTime = 30000; }
public MyRedisLock(String lockKey,RedisRepository redisRepository,long lockTime) { this.lockKey = lockKey; this.redisRepository = redisRepository; this.lockTime = lockTime; }
public boolean tryLock(){ String script = "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);"; Long waitTime = redisRepository.eval(script,Long.class,lockKey,lockTime,clientId); if(waitTime!=null){ return false; } checkProcessor = new DaemonCheckProcessor(lockKey, clientId, lockTime,redisRepository); checkThread = new Thread(checkProcessor); checkThread.setDaemon(Boolean.TRUE); checkThread.start(); success = true; return true; }
public Long remainTimeToLive(){ String script = "return redis.call('pttl', KEYS[1]);"; return redisRepository.eval(script,Long.class,lockKey); }
public void unLock(){ String script = "if (redis.call('hexists', KEYS[1], ARGV[2]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[2], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "return 1; "+ "end; " + "return nil;"; redisRepository.eval(script,Integer.class,lockKey,lockTime,clientId); if(checkProcessor != null){ checkProcessor.stop(); } if(checkThread!=null){ checkThread.interrupt(); } success =false; }
public boolean isSuccess() { return success; }
public boolean isHeldByCurrentThread(){ String script = "return redis.call('hexists', KEYS[1], ARGV[1])"; Long val = redisRepository.eval(script,Long.class,lockKey,clientId); if(val !=null && val > 0){ return true; } return false; }
public static void main(String[] args) {
RedisStandaloneConfiguration config = new RedisStandaloneConfiguration(); config.setDatabase(0); config.setHostName("127.0.0.1"); config.setPort(6379); JedisConnectionFactory conn = new JedisConnectionFactory(config);
RedisTemplate<String,String> template = new RedisTemplate<>(); template.setConnectionFactory(conn); template.afterPropertiesSet(); RedisRepository redisRepository = new RedisRepository(template); redisRepository.set("test","test"); System.out.println(redisRepository.get("test")); redisRepository.del("test");
MyRedisLock lock = new MyRedisLock("lock_test",redisRepository); try { if(lock.tryLock()){ for (int i=0;i<50;i++){ TimeUnit.SECONDS.sleep(1); System.out.println(lock.remainTimeToLive()); } } }catch (Exception e){ e.printStackTrace(); }finally { if(lock.isSuccess()){ if(lock.isHeldByCurrentThread()){ lock.unLock(); } } } } }
class DaemonCheckProcessor implements Runnable{
private static final long MARK_EXPIRE_SUCCESS = 1L;
private String key;
private String clientId;
private long lockTime;
private RedisRepository redisRepository;
public DaemonCheckProcessor(String key, String clientId, long lockTime,RedisRepository redisRepository) { this.key = key; this.clientId = clientId; this.lockTime = lockTime; this.signal = Boolean.TRUE; this.redisRepository = redisRepository; }
private volatile Boolean signal;
void stop() { this.signal = Boolean.FALSE; }
@Override public void run() { long waitTime = lockTime * 2 / 3; while (signal) { try { Thread.sleep(waitTime); if (expandLockTime(key, clientId, lockTime) != MARK_EXPIRE_SUCCESS) { this.stop(); } } catch (InterruptedException e) { e.printStackTrace(); } } }
private Long expandLockTime(String key,String clientId,long lockTime){ String script = "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;"; return redisRepository.eval(script,Long.class,key,lockTime,clientId); } }
|
另外这儿还涉及到了 RedisRepository
这个类,这个类其实就是包装了下 RedisTemplate
,如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| public class RedisRepository {
private RedisTemplate<String, String> redisTemplate;
public RedisRepository(RedisTemplate<String, String> redisTemplate) { this.redisTemplate = redisTemplate; }
protected RedisSerializer<String> getRedisSerializer() { return redisTemplate.getStringSerializer(); }
public <T> T eval(final String luaScript, Class<T> resultClass, String key, Object... args) { return redisTemplate.execute((RedisCallback<T>) connection -> { RedisSerializer<String> serializer = getRedisSerializer(); List<Object> list = new ArrayList<>(); list.add(key); list.addAll(Arrays.asList(args)); List<byte[]> var1 = list.stream().map(x -> serializer.serialize("" + x)).collect(Collectors.toList()); byte[][] var2 = var1.toArray(new byte[var1.size()][]); return connection.eval(serializer.serialize(luaScript), ReturnType.fromJavaType(resultClass), 1, var2); }); } }
|
我们用上面的类运行一下,可以看到输出日志,锁在剩余10s左右的时候续期了。
总结
以上就是关于分布式锁续期的一些内容,我们通过分析Redisson
的加锁逻辑,发现其锁续期的一些内容,其通过Lua
脚本+ TimeTask
等实现锁续期,使用到了Redis
哈希表及其一些操作hset、hexists
等来实现分布式锁,保证其唯一性和可重入性。
而后我们又自己手写了一个基于Redis
的续期锁,加深对续期锁的理解。
到这儿差不多就结束了,另外我们需要补充一点,是一种有可能发生的情况。
这儿需要注意的是我们以上是针对的单机Redis
来说的,如果是主从服务或者集群Redis
,这种分布式锁可能会有一个小问题。如下:
- 客户端1对某个
redis master
实例,写入了锁key
的value
,此时会异步复制给对应的master slave
实例; - 但是在复制过程中发生了
redis master
宕机,主备切换,redis slave
变为了redis master
,redis slave
上并不存在锁key
; - 客户端2来尝试加锁的时候,在新的
redis master
上完成了加锁,而客户端1也以为自己成功加了锁; - 此时就会导致多个客户端对一个分布式锁完成了加锁,这样就有可能在业务上产生一些问题。
关于上面这种情况,我们可以使用红锁RedLock
来解决,这部分内容可以查看 Redis分布式锁之红锁(RedLock) 这篇文章。