前言
我们在工作等一些场景中,一定是用到过分布式锁的。
无论采用Zookeeper
还是Redis
做分布式锁,锁是应该有一个有效期的。
如果我们不设置有效期,可能会出现死锁问题,导致锁不能释放,影响系统功能。
但是如果我们锁释放时间设置的不合理,导致锁释放后实际业务逻辑还没处理完,那也是不行的。
当然,我们可以预估业务逻辑的处理时间范围,合理的设置锁的释放时间,但是总归会可能出现业务处理时间过长的情况,这时候锁已经释放了,业务逻辑执行过程中就会处于不安全的环境里。
我们如何完全避免这种情况呢?
这就是我们这节要说的锁续期的一些知识。
正文
上述的情况其实非常好理解,就是如果有这种情况,我们直接让当前线程持有的锁在续期一段时间,保证业务逻辑能正常走完。
我们这儿以Redis
分布式锁来进行说明。
分布式锁举例
关于Redis
分布式锁,我们可以使用开源类库,如 Redisson
就是一款优秀的工具包,其中包含了Redis
分布式锁的实现。
其获取分布式锁的代码如下:
1 2 3 4 5 6 7 8 9 10 11
| private static RedissonClient redissonClient(){ Config config = new Config(); config.useSingleServer().setAddress("redis://127.0.0.1:6379").setDatabase(0); return Redisson.create(config); } public static void main(String[] args) { RedissonClient client = redissonClient(); RLock lock = client.getLock(REDIS_KEY); }
|
注:这儿我们为了方便,使用了单一 Redis
实例,当然也是支持 Redis
集群和主从模式的,这儿就不过多介绍了。
我们拿到lock
后,可以进行加锁、解锁等逻辑,如下图。
可以看到它继承Lock
接口,当然Lock
接口的几个方法也会实现。
我们知道,对于没有设置锁超时时间的方法,如tryLock()
,如果加锁后某些原因导致锁不能释放,会出现死锁问题。
Redisson
对于这种情况是怎么处理的呢?我们来看一下。
首先,我们先来看如下代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| public class RedissonRenewalLock {
private static final String REDIS_KEY = "redisson_renewal_lock";
private static RedissonClient redissonClient(){ Config config = new Config(); config.useSingleServer().setAddress("redis://127.0.0.1:6379").setDatabase(0); return Redisson.create(config); }
public static void main(String[] args) { RedissonClient client = redissonClient(); RLock lock = client.getLock(REDIS_KEY); boolean isLock = false; try { isLock = lock.tryLock(1,30,TimeUnit.SECONDS); if(isLock){ for(int i=0;i<50;i++){ TimeUnit.SECONDS.sleep(1); System.out.println(lock.remainTimeToLive()); }
}else{ System.out.println("未成功获得锁......"); } }catch (Exception e){ e.printStackTrace(); }finally { if(isLock){ if(lock.isHeldByCurrentThread()){ lock.unlock(); } } } } }
|
上述代码的意思是获取锁后,我们设置锁超时时间为30s,假设我们需要处理一个耗时50s的任务,每隔1s输出下锁的剩余时间。
可以看到30s后返回时间变为-2,表示锁已经被释放,此时业务逻辑还没执行完,执行处于不安全环境中。
我们这样使用,如果业务逻辑处理时间超过我们锁的超时时间,就会有些问题。
我们将上述的lock.tryLock(1,30,TimeUnit.SECONDS);
改为 lock.tryLock();
方法来看下。
1 2 3 4 5
| try { isLock = lock.tryLock(); if(isLock){ ......
|
查看日志我们可以发现锁的剩余时间每隔一段时间就会增加,这就是我们今天要说的锁续期。
分布式锁续期原理
上面代码我们肯定有所疑问,我们来看下。
- 我们通过
tryLock()
没有设置锁的超时时间,看日志它好像是默认30s计时? - 当每隔10s后,如果业务逻辑还没执行完,会进行锁的续期?
这些问题我们都需要通过源码来看下。
首先通过 tryLock()
方法找到 RedissonLock
的 tryLock()
方法实现。
1 2 3 4
| @Override public boolean tryLock() { return get(tryLockAsync()); }
|
通过tryLockAsync()
方法可以找到具体实现逻辑,如下:
1 2 3 4 5 6 7 8
| @Override public RFuture<Boolean> tryLockAsync() { return tryLockAsync(Thread.currentThread().getId()); } @Override public RFuture<Boolean> tryLockAsync(long threadId) { return tryAcquireOnceAsync(-1, null, threadId); }
|
关键方法tryAcquireOnceAsync
,其中leaseTime
传入 -1,会执行tryLockInnerAsync
方法,传入一个过期时间,同时会有一个续期的逻辑。
tryLockInnerAsync
方法就是去执行一段 Lua
脚本,设置锁。
这段Lua
脚本的**KEYS[1]
**代表的是加锁的那个key,比如redisson_renewal_lock
;
**ARGV[1]
**代表的是锁key的默认生存时间,默认30秒;
**ARGV[2]
**代表的是加锁的客户端的ID,如 25008516-595b-4a9f-af59-165a14330820:1
。
逻辑如下:
加锁逻辑
第一段if
判断语句,用exists redisson_renewal_lock
命令判断一下,如果要加锁的那个key不存在的话,就进行加锁;
加锁操作,hset redisson_renewal_lock 25008516-595b-4a9f-af59-165a14330820:1 1
;
redisson_renewal_lock
的数据如下:
1 2 3 4
| redisson_renewal_lock { 25008516-595b-4a9f-af59-165a14330820:1 1 }
|
设置锁过期时间,pexpire redisson_renewal_lock 30000
;
至此加锁完成。
锁互斥逻辑
- 当客户端2想尝试获取锁时,此时只有客户端id不同,比如
ARGV[2]
为 f0b30905-e2a0-43c4-91a2-9aa608a3ba1e:1
; - 第一段
if
判断语句,明显锁已被客户端1持有,判断逻辑进不去; - 来到第二段
if
判断语句,判断 hash
数据结构中是否包含客户端2的id,显然不存在,里面是客户端1的id; - 所以客户端2会获取到
pttl redisson_renewal_lock
返回的一个数字,这个数字代表了redisson_renewal_lock
这个锁key的剩余生存时间。 - 客户端2会等待,并尝试重新获取锁。
重入锁逻辑
当客户端1未释放锁,又想获取该锁时(重入);
第一段if
判断语句,明显锁已经存在,判断逻辑进不去;
来到第二段if
判断语句,hexists redisson_renewal_lock 25008516-595b-4a9f-af59-165a14330820:1
,判断通过,存在客户端1的id;
调用 hincrby redisson_renewal_lock 25008516-595b-4a9f-af59-165a14330820:1 1
,意思是为其+1;
因此redisson_renewal_lock
的数据变为了如下:
1 2 3 4
| redisson_renewal_lock { 25008516-595b-4a9f-af59-165a14330820:1 2 }
|
设置锁过期时间,pexpire redisson_renewal_lock 30000
;
至此重入锁逻辑完成。
注: 这儿使用Lua
脚本的原因就是为了保证操作的原子性。
上述传入的时间我们也可以很快找到,默认设置的是30s。
1
| private long lockWatchdogTimeout = 30 * 1000;
|
这也就解决了我们的第一个疑问。
可以看到,Redisson
的 tryLock()
方法默认设置的30s的锁有效期,每隔一定时间,就会为锁续期,续期是通过scheduleExpirationRenewal
保证的。
我们来看下这个方法,关键调用方法如下:
其关键部分就是创建一个TimerTask
执行续约逻辑,该任务的间隔时间是internalLockLeaseTime / 3
,默认的情况下是10s,也就是我们在日志中看到的每个10s,锁的超时时间又变为了30s的原因。
其锁续约的代码也是一段Lua
脚本。
这段Lua
脚本就不过多介绍了。
锁续期的原理图如下:
我们再来看下释放锁的逻辑,这儿只上关键性部分,如下:
释放锁的Lua
脚本如下:
其主要逻辑是:
- 判断当前线程锁存不存在,不存在就不用处理了;
- 如果存在,进行次数-1操作;
- 如果次数还是大于0的(重入锁),就重新设置过期时间;
- 否则就删除锁key(释放锁),同时进行通知。
这里的**KEYS[1]
**代表的是加锁的那个key,比如redisson_renewal_lock
;
**ARGV[1]
和KEYS[2]
**代表的是释放锁后通知的消息信息;
**ARGV[2]
**代表的是锁key的默认生存时间,默认30秒;
**ARGV[3]
**代表的是加锁的客户端的ID,如 25008516-595b-4a9f-af59-165a14330820:1
。
自写Redis续期锁
我们根据上面逻辑,可以手写一个Redis
续期锁,这儿我们依靠守护线程来延期锁key
。
首先我们需要锁key
、加锁时间、守护线程参数等,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| public class MyRedisLock {
private String lockKey;
private long lockTime;
private boolean success = false;
private DaemonCheckProcessor checkProcessor; private Thread checkThread;
private String clientId = UUID.randomUUID().toString()+":"+Thread.currentThread().getId();
private RedisRepository redisRepository; }
|
success
用来记录锁状态,clientId
用于标识该锁的持有者。
构造方法只要传入锁key
就行,我们这儿还传入了RedisRepository(即 RedisTemplate)
,其实可以用Spring
注入,我们这儿为了方便就直接引入了。
1 2 3 4 5
| public MyRedisLock(String lockKey,RedisRepository redisRepository) { this.lockKey = lockKey; this.redisRepository = redisRepository; this.lockTime = 30000; }
|
锁需要有几个关键方法:tryLock
、unLock
等,此外还需要能判断锁是否加锁成功及是否当前线程持有锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| public boolean tryLock(){ String script = "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);"; Long waitTime = redisRepository.eval(script,Long.class,lockKey,lockTime,clientId); if(waitTime!=null){ return false; } checkProcessor = new DaemonCheckProcessor(lockKey, clientId, lockTime,redisRepository); checkThread = new Thread(checkProcessor); checkThread.setDaemon(Boolean.TRUE); checkThread.start(); success = true; return true; }
public void unLock(){ String script = "if (redis.call('hexists', KEYS[1], ARGV[2]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[2], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "return 1; "+ "end; " + "return nil;"; redisRepository.eval(script,Integer.class,lockKey,lockTime,clientId); if(checkProcessor != null){ checkProcessor.stop(); } if(checkThread!=null){ checkThread.interrupt(); } success =false; }
|
这儿我们构建一个守护线程类,该类每隔 2/3 lockTime
就会去尝试延期一下,如果锁还没被释放的话。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
| class DaemonCheckProcessor implements Runnable{
private static final long MARK_EXPIRE_SUCCESS = 1L;
private String key;
private String clientId;
private long lockTime;
private RedisRepository redisRepository;
public DaemonCheckProcessor(String key, String clientId, long lockTime,RedisRepository redisRepository) { this.key = key; this.clientId = clientId; this.lockTime = lockTime; this.signal = Boolean.TRUE; this.redisRepository = redisRepository; }
private volatile Boolean signal;
void stop() { this.signal = Boolean.FALSE; }
@Override public void run() { long waitTime = lockTime * 2 / 3; while (signal) { try { Thread.sleep(waitTime); if (expandLockTime(key, clientId, lockTime) != MARK_EXPIRE_SUCCESS) { this.stop(); } } catch (InterruptedException e) { e.printStackTrace(); } } }
private Long expandLockTime(String key,String clientId,long lockTime){ String script = "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;"; return redisRepository.eval(script,Long.class,key,lockTime,clientId); } }
|
基本流程就是这样,我下面附上自写的完整代码。

|
public class MyRedisLock {
private String lockKey;
private long lockTime;
private boolean success = false;
private DaemonCheckProcessor checkProcessor; private Thread checkThread;
private String clientId = UUID.randomUUID().toString()+":"+Thread.currentThread().getId();
private RedisRepository redisRepository;
public MyRedisLock(String lockKey,RedisRepository redisRepository) { this.lockKey = lockKey; this.redisRepository = redisRepository; this.lockTime = 30000; }
public MyRedisLock(String lockKey,RedisRepository redisRepository,long lockTime) { this.lockKey = lockKey; this.redisRepository = redisRepository; this.lockTime = lockTime; }
public boolean tryLock(){ String script = "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);"; Long waitTime = redisRepository.eval(script,Long.class,lockKey,lockTime,clientId); if(waitTime!=null){ return false; } checkProcessor = new DaemonCheckProcessor(lockKey, clientId, lockTime,redisRepository); checkThread = new Thread(checkProcessor); checkThread.setDaemon(Boolean.TRUE); checkThread.start(); success = true; return true; }
public Long remainTimeToLive(){ String script = "return redis.call('pttl', KEYS[1]);"; return redisRepository.eval(script,Long.class,lockKey); }
public void unLock(){ String script = "if (redis.call('hexists', KEYS[1], ARGV[2]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[2], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "return 1; "+ "end; " + "return nil;"; redisRepository.eval(script,Integer.class,lockKey,lockTime,clientId); if(checkProcessor != null){ checkProcessor.stop(); } if(checkThread!=null){ checkThread.interrupt(); } success =false; }
public boolean isSuccess() { return success; }
public boolean isHeldByCurrentThread(){ String script = "return redis.call('hexists', KEYS[1], ARGV[1])"; Long val = redisRepository.eval(script,Long.class,lockKey,clientId); if(val !=null && val > 0){ return true; } return false; }
public static void main(String[] args) {
RedisStandaloneConfiguration config = new RedisStandaloneConfiguration(); config.setDatabase(0); config.setHostName("127.0.0.1"); config.setPort(6379); JedisConnectionFactory conn = new JedisConnectionFactory(config);
RedisTemplate<String,String> template = new RedisTemplate<>(); template.setConnectionFactory(conn); template.afterPropertiesSet(); RedisRepository redisRepository = new RedisRepository(template); redisRepository.set("test","test"); System.out.println(redisRepository.get("test")); redisRepository.del("test");
MyRedisLock lock = new MyRedisLock("lock_test",redisRepository); try { if(lock.tryLock()){ for (int i=0;i<50;i++){ TimeUnit.SECONDS.sleep(1); System.out.println(lock.remainTimeToLive()); } } }catch (Exception e){ e.printStackTrace(); }finally { if(lock.isSuccess()){ if(lock.isHeldByCurrentThread()){ lock.unLock(); } } } } }
class DaemonCheckProcessor implements Runnable{
private static final long MARK_EXPIRE_SUCCESS = 1L;
private String key;
private String clientId;
private long lockTime;
private RedisRepository redisRepository;
public DaemonCheckProcessor(String key, String clientId, long lockTime,RedisRepository redisRepository) { this.key = key; this.clientId = clientId; this.lockTime = lockTime; this.signal = Boolean.TRUE; this.redisRepository = redisRepository; }
private volatile Boolean signal;
void stop() { this.signal = Boolean.FALSE; }
@Override public void run() { long waitTime = lockTime * 2 / 3; while (signal) { try { Thread.sleep(waitTime); if (expandLockTime(key, clientId, lockTime) != MARK_EXPIRE_SUCCESS) { this.stop(); } } catch (InterruptedException e) { e.printStackTrace(); } } }
private Long expandLockTime(String key,String clientId,long lockTime){ String script = "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;"; return redisRepository.eval(script,Long.class,key,lockTime,clientId); } }
|
另外这儿还涉及到了 RedisRepository
这个类,这个类其实就是包装了下 RedisTemplate
,如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| public class RedisRepository {
private RedisTemplate<String, String> redisTemplate;
public RedisRepository(RedisTemplate<String, String> redisTemplate) { this.redisTemplate = redisTemplate; }
protected RedisSerializer<String> getRedisSerializer() { return redisTemplate.getStringSerializer(); }
public <T> T eval(final String luaScript, Class<T> resultClass, String key, Object... args) { return redisTemplate.execute((RedisCallback<T>) connection -> { RedisSerializer<String> serializer = getRedisSerializer(); List<Object> list = new ArrayList<>(); list.add(key); list.addAll(Arrays.asList(args)); List<byte[]> var1 = list.stream().map(x -> serializer.serialize("" + x)).collect(Collectors.toList()); byte[][] var2 = var1.toArray(new byte[var1.size()][]); return connection.eval(serializer.serialize(luaScript), ReturnType.fromJavaType(resultClass), 1, var2); }); } }
|
我们用上面的类运行一下,可以看到输出日志,锁在剩余10s左右的时候续期了。
总结
以上就是关于分布式锁续期的一些内容,我们通过分析Redisson
的加锁逻辑,发现其锁续期的一些内容,其通过Lua
脚本+ TimeTask
等实现锁续期,使用到了Redis
哈希表及其一些操作hset、hexists
等来实现分布式锁,保证其唯一性和可重入性。
而后我们又自己手写了一个基于Redis
的续期锁,加深对续期锁的理解。
到这儿差不多就结束了,另外我们需要补充一点,是一种有可能发生的情况。
这儿需要注意的是我们以上是针对的单机Redis
来说的,如果是主从服务或者集群Redis
,这种分布式锁可能会有一个小问题。如下:
- 客户端1对某个
redis master
实例,写入了锁key
的value
,此时会异步复制给对应的master slave
实例; - 但是在复制过程中发生了
redis master
宕机,主备切换,redis slave
变为了redis master
,redis slave
上并不存在锁key
; - 客户端2来尝试加锁的时候,在新的
redis master
上完成了加锁,而客户端1也以为自己成功加了锁; - 此时就会导致多个客户端对一个分布式锁完成了加锁,这样就有可能在业务上产生一些问题。
关于上面这种情况,我们可以使用红锁RedLock
来解决,这部分内容可以查看 Redis分布式锁之红锁(RedLock) 这篇文章。