分布式锁续期问题

前言

我们在工作等一些场景中,一定是用到过分布式锁的。

无论采用Zookeeper还是Redis做分布式锁,锁是应该有一个有效期的。

如果我们不设置有效期,可能会出现死锁问题,导致锁不能释放,影响系统功能。

但是如果我们锁释放时间设置的不合理,导致锁释放后实际业务逻辑还没处理完,那也是不行的。

当然,我们可以预估业务逻辑的处理时间范围,合理的设置锁的释放时间,但是总归会可能出现业务处理时间过长的情况,这时候锁已经释放了,业务逻辑执行过程中就会处于不安全的环境里。

我们如何完全避免这种情况呢?

这就是我们这节要说的锁续期的一些知识。

正文

上述的情况其实非常好理解,就是如果有这种情况,我们直接让当前线程持有的锁在续期一段时间,保证业务逻辑能正常走完。

我们这儿以Redis分布式锁来进行说明。

分布式锁举例

关于Redis分布式锁,我们可以使用开源类库,如 Redisson 就是一款优秀的工具包,其中包含了Redis分布式锁的实现。

其获取分布式锁的代码如下:

1
2
3
4
5
6
7
8
9
10
11
//配置,这些东西可以放到配置文件里
private static RedissonClient redissonClient(){
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379").setDatabase(0);
return Redisson.create(config);
}
public static void main(String[] args) {
RedissonClient client = redissonClient();
//通过客户端获取Redis分布式锁
RLock lock = client.getLock(REDIS_KEY);
}

注:这儿我们为了方便,使用了单一 Redis实例,当然也是支持 Redis 集群和主从模式的,这儿就不过多介绍了。

我们拿到lock后,可以进行加锁、解锁等逻辑,如下图。

可以看到它继承Lock接口,当然Lock接口的几个方法也会实现。

我们知道,对于没有设置锁超时时间的方法,如tryLock() ,如果加锁后某些原因导致锁不能释放,会出现死锁问题。

Redisson 对于这种情况是怎么处理的呢?我们来看一下。

首先,我们先来看如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class RedissonRenewalLock {

/**
* 测试续期锁key
*/
private static final String REDIS_KEY = "redisson_renewal_lock";

/**
* 配置信息
* @return
*/
private static RedissonClient redissonClient(){
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379").setDatabase(0);
return Redisson.create(config);
}

public static void main(String[] args) {
RedissonClient client = redissonClient();
RLock lock = client.getLock(REDIS_KEY);
boolean isLock = false;
try {
isLock = lock.tryLock(1,30,TimeUnit.SECONDS);
if(isLock){
// do something
//假设执行任务花费了50s
for(int i=0;i<50;i++){
TimeUnit.SECONDS.sleep(1);
System.out.println(lock.remainTimeToLive());
}

}else{
System.out.println("未成功获得锁......");
}
}catch (Exception e){
e.printStackTrace();
}finally {
if(isLock){
if(lock.isHeldByCurrentThread()){
lock.unlock();
}
}
}
}
}

上述代码的意思是获取锁后,我们设置锁超时时间为30s,假设我们需要处理一个耗时50s的任务,每隔1s输出下锁的剩余时间。

可以看到30s后返回时间变为-2,表示锁已经被释放,此时业务逻辑还没执行完,执行处于不安全环境中。

我们这样使用,如果业务逻辑处理时间超过我们锁的超时时间,就会有些问题。

我们将上述的lock.tryLock(1,30,TimeUnit.SECONDS); 改为 lock.tryLock();方法来看下。

1
2
3
4
5
try {
//isLock = lock.tryLock(1,30,TimeUnit.SECONDS);
isLock = lock.tryLock();//改为tryLock();
if(isLock){
......

查看日志我们可以发现锁的剩余时间每隔一段时间就会增加,这就是我们今天要说的锁续期。

分布式锁续期原理

上面代码我们肯定有所疑问,我们来看下。

  • 我们通过tryLock()没有设置锁的超时时间,看日志它好像是默认30s计时?
  • 当每隔10s后,如果业务逻辑还没执行完,会进行锁的续期?

这些问题我们都需要通过源码来看下。

首先通过 tryLock()方法找到 RedissonLocktryLock() 方法实现。

1
2
3
4
@Override
public boolean tryLock() {
return get(tryLockAsync());
}

通过tryLockAsync()方法可以找到具体实现逻辑,如下:

1
2
3
4
5
6
7
8
@Override
public RFuture<Boolean> tryLockAsync() {
return tryLockAsync(Thread.currentThread().getId());
}
@Override
public RFuture<Boolean> tryLockAsync(long threadId) {
return tryAcquireOnceAsync(-1, null, threadId);
}

关键方法tryAcquireOnceAsync ,其中leaseTime 传入 -1,会执行tryLockInnerAsync方法,传入一个过期时间,同时会有一个续期的逻辑。

tryLockInnerAsync 方法就是去执行一段 Lua脚本,设置锁。

这段Lua脚本的**KEYS[1]**代表的是加锁的那个key,比如redisson_renewal_lock

**ARGV[1]**代表的是锁key的默认生存时间,默认30秒;

**ARGV[2]**代表的是加锁的客户端的ID,如 25008516-595b-4a9f-af59-165a14330820:1

逻辑如下:

加锁逻辑

  1. 第一段if判断语句,用exists redisson_renewal_lock命令判断一下,如果要加锁的那个key不存在的话,就进行加锁;

  2. 加锁操作,hset redisson_renewal_lock 25008516-595b-4a9f-af59-165a14330820:1 1

  3. redisson_renewal_lock 的数据如下:

    1
    2
    3
    4
    redisson_renewal_lock
    {
    25008516-595b-4a9f-af59-165a14330820:1 1
    }
  4. 设置锁过期时间,pexpire redisson_renewal_lock 30000

  5. 至此加锁完成。

锁互斥逻辑

  1. 当客户端2想尝试获取锁时,此时只有客户端id不同,比如 ARGV[2]f0b30905-e2a0-43c4-91a2-9aa608a3ba1e:1;
  2. 第一段if判断语句,明显锁已被客户端1持有,判断逻辑进不去;
  3. 来到第二段if判断语句,判断 hash数据结构中是否包含客户端2的id,显然不存在,里面是客户端1的id;
  4. 所以客户端2会获取到pttl redisson_renewal_lock返回的一个数字,这个数字代表了redisson_renewal_lock 这个锁key的剩余生存时间。
  5. 客户端2会等待,并尝试重新获取锁。

重入锁逻辑

  1. 当客户端1未释放锁,又想获取该锁时(重入);

  2. 第一段if判断语句,明显锁已经存在,判断逻辑进不去;

  3. 来到第二段if判断语句,hexists redisson_renewal_lock 25008516-595b-4a9f-af59-165a14330820:1 ,判断通过,存在客户端1的id;

  4. 调用 hincrby redisson_renewal_lock 25008516-595b-4a9f-af59-165a14330820:1 1,意思是为其+1;

  5. 因此redisson_renewal_lock 的数据变为了如下:

    1
    2
    3
    4
    redisson_renewal_lock
    {
    25008516-595b-4a9f-af59-165a14330820:1 2
    }
  6. 设置锁过期时间,pexpire redisson_renewal_lock 30000

  7. 至此重入锁逻辑完成。

注: 这儿使用Lua脚本的原因就是为了保证操作的原子性

上述传入的时间我们也可以很快找到,默认设置的是30s。

1
private long lockWatchdogTimeout = 30 * 1000;

这也就解决了我们的第一个疑问。

可以看到,RedissontryLock()方法默认设置的30s的锁有效期,每隔一定时间,就会为锁续期,续期是通过scheduleExpirationRenewal 保证的。

我们来看下这个方法,关键调用方法如下:

其关键部分就是创建一个TimerTask 执行续约逻辑,该任务的间隔时间是internalLockLeaseTime / 3,默认的情况下是10s,也就是我们在日志中看到的每个10s,锁的超时时间又变为了30s的原因。

其锁续约的代码也是一段Lua脚本。

这段Lua脚本就不过多介绍了。

锁续期的原理图如下:

我们再来看下释放锁的逻辑,这儿只上关键性部分,如下:

释放锁的Lua脚本如下:

其主要逻辑是:

  1. 判断当前线程锁存不存在,不存在就不用处理了;
  2. 如果存在,进行次数-1操作;
  3. 如果次数还是大于0的(重入锁),就重新设置过期时间;
  4. 否则就删除锁key(释放锁),同时进行通知。

这里的**KEYS[1]**代表的是加锁的那个key,比如redisson_renewal_lock

**ARGV[1]KEYS[2]**代表的是释放锁后通知的消息信息;

**ARGV[2]**代表的是锁key的默认生存时间,默认30秒;

**ARGV[3]**代表的是加锁的客户端的ID,如 25008516-595b-4a9f-af59-165a14330820:1

自写Redis续期锁

我们根据上面逻辑,可以手写一个Redis续期锁,这儿我们依靠守护线程来延期锁key

首先我们需要锁key、加锁时间、守护线程参数等,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class MyRedisLock {
/**
* 锁key
*/
private String lockKey;
/**
* 锁持有时间,默认30s
*/
private long lockTime;
/**
* 是否加锁成功
*/
private boolean success = false;

/**
* 守护线程参数
*/
private DaemonCheckProcessor checkProcessor;
private Thread checkThread;

/**
* 加锁id
*/
private String clientId = UUID.randomUUID().toString()+":"+Thread.currentThread().getId();
/**
* redis操作类,直接用redisTemplate也可以,这儿为了main运行方便直接引入
*/
private RedisRepository redisRepository;
//......
}

success用来记录锁状态,clientId用于标识该锁的持有者。

构造方法只要传入锁key就行,我们这儿还传入了RedisRepository(即 RedisTemplate),其实可以用Spring注入,我们这儿为了方便就直接引入了。

1
2
3
4
5
public MyRedisLock(String lockKey,RedisRepository redisRepository) {
this.lockKey = lockKey;
this.redisRepository = redisRepository;
this.lockTime = 30000;
}

锁需要有几个关键方法:tryLockunLock等,此外还需要能判断锁是否加锁成功及是否当前线程持有锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public boolean tryLock(){
String script = "if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);";
Long waitTime = redisRepository.eval(script,Long.class,lockKey,lockTime,clientId);
if(waitTime!=null){
//如果存在并且不是当前线程,返回加锁失败,这儿也可以返回需要等待的时间waitTime
return false;
}
//开启一个守护线程计时
checkProcessor = new DaemonCheckProcessor(lockKey, clientId, lockTime,redisRepository);
checkThread = new Thread(checkProcessor);
checkThread.setDaemon(Boolean.TRUE);
checkThread.start();
success = true;
return true;
}

public void unLock(){
String script = "if (redis.call('hexists', KEYS[1], ARGV[2]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[2], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"return 1; "+
"end; " +
"return nil;";
redisRepository.eval(script,Integer.class,lockKey,lockTime,clientId);
//需要释放守护线程资源
if(checkProcessor != null){
checkProcessor.stop();
}
if(checkThread!=null){
checkThread.interrupt();
}
success =false;
}

这儿我们构建一个守护线程类,该类每隔 2/3 lockTime就会去尝试延期一下,如果锁还没被释放的话。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
class DaemonCheckProcessor implements Runnable{

private static final long MARK_EXPIRE_SUCCESS = 1L;

private String key;

private String clientId;

private long lockTime;

private RedisRepository redisRepository;

public DaemonCheckProcessor(String key, String clientId, long lockTime,RedisRepository redisRepository) {
this.key = key;
this.clientId = clientId;
this.lockTime = lockTime;
this.signal = Boolean.TRUE;
this.redisRepository = redisRepository;
}

/**
* 守护线程停止标记
*/
private volatile Boolean signal;

void stop() {
this.signal = Boolean.FALSE;
}


@Override
public void run() {
//每隔 2/3 时间自动检测
long waitTime = lockTime * 2 / 3;
while (signal) {
try {
Thread.sleep(waitTime);
//延长过期时间
if (expandLockTime(key, clientId, lockTime) != MARK_EXPIRE_SUCCESS) {
//如果不成功就停掉任务
this.stop();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

/**
* 锁续期
* @param key
* @param clientId
* @param lockTime
* @return
*/
private Long expandLockTime(String key,String clientId,long lockTime){
String script = "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;";
return redisRepository.eval(script,Long.class,key,lockTime,clientId);
}
}

基本流程就是这样,我下面附上自写的完整代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
/**
* @author zwt
* @version 1.0
* @desc 自定义续期锁
* @date 2021/7/26 10:34
*/
public class MyRedisLock {
/**
* 锁key
*/
private String lockKey;
/**
* 锁持有时间,默认30s
*/
private long lockTime;
/**
* 是否加锁成功
*/
private boolean success = false;

/**
* 守护线程参数
*/
private DaemonCheckProcessor checkProcessor;
private Thread checkThread;

/**
* 加锁id
*/
private String clientId = UUID.randomUUID().toString()+":"+Thread.currentThread().getId();
/**
* redis操作类,直接用redisTemplate也可以,这儿为了main运行方便直接引入
*/
private RedisRepository redisRepository;

/**
* 构造方法
* @param lockKey
* @param redisRepository
*/
public MyRedisLock(String lockKey,RedisRepository redisRepository) {
this.lockKey = lockKey;
this.redisRepository = redisRepository;
this.lockTime = 30000;
}

public MyRedisLock(String lockKey,RedisRepository redisRepository,long lockTime) {
this.lockKey = lockKey;
this.redisRepository = redisRepository;
this.lockTime = lockTime;
}

/**
* 加锁
* @return
*/
public boolean tryLock(){
String script = "if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);";
Long waitTime = redisRepository.eval(script,Long.class,lockKey,lockTime,clientId);
if(waitTime!=null){
//如果存在并且不是当前线程,返回加锁失败,这儿也可以返回需要等待的时间waitTime
return false;
}
//开启一个守护线程计时
checkProcessor = new DaemonCheckProcessor(lockKey, clientId, lockTime,redisRepository);
checkThread = new Thread(checkProcessor);
checkThread.setDaemon(Boolean.TRUE);
checkThread.start();
success = true;
return true;
}

/**
* 查询剩余时间
* @return
*/
public Long remainTimeToLive(){
String script = "return redis.call('pttl', KEYS[1]);";
return redisRepository.eval(script,Long.class,lockKey);
}

/**
* 解锁
*/
public void unLock(){
String script = "if (redis.call('hexists', KEYS[1], ARGV[2]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[2], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"return 1; "+
"end; " +
"return nil;";
redisRepository.eval(script,Integer.class,lockKey,lockTime,clientId);
//需要释放守护线程资源
if(checkProcessor != null){
checkProcessor.stop();
}
if(checkThread!=null){
checkThread.interrupt();
}
success =false;
}

/**
* 是否加锁成功
* @return
*/
public boolean isSuccess() {
return success;
}

/**
* 是否当前线程持有锁
* @return
*/
public boolean isHeldByCurrentThread(){
String script = "return redis.call('hexists', KEYS[1], ARGV[1])";
Long val = redisRepository.eval(script,Long.class,lockKey,clientId);
if(val !=null && val > 0){
//存在并且大于0
return true;
}
return false;
}


public static void main(String[] args) {
//构建RedisTemplate

RedisStandaloneConfiguration config = new RedisStandaloneConfiguration();
config.setDatabase(0);
config.setHostName("127.0.0.1");
config.setPort(6379);
JedisConnectionFactory conn = new JedisConnectionFactory(config);

RedisTemplate<String,String> template = new RedisTemplate<>();
template.setConnectionFactory(conn);
template.afterPropertiesSet();
RedisRepository redisRepository = new RedisRepository(template);
//测试一下是不是能用
redisRepository.set("test","test");
System.out.println(redisRepository.get("test"));
redisRepository.del("test");

//测试自写的续期锁
MyRedisLock lock = new MyRedisLock("lock_test",redisRepository);
try {
if(lock.tryLock()){
//do something
for (int i=0;i<50;i++){
TimeUnit.SECONDS.sleep(1);
System.out.println(lock.remainTimeToLive());
}
}
}catch (Exception e){
e.printStackTrace();
}finally {
if(lock.isSuccess()){
if(lock.isHeldByCurrentThread()){
lock.unLock();
}
}
}
}
}

/**
* 守护进程类
*/
class DaemonCheckProcessor implements Runnable{

private static final long MARK_EXPIRE_SUCCESS = 1L;

private String key;

private String clientId;

private long lockTime;

private RedisRepository redisRepository;

public DaemonCheckProcessor(String key, String clientId, long lockTime,RedisRepository redisRepository) {
this.key = key;
this.clientId = clientId;
this.lockTime = lockTime;
this.signal = Boolean.TRUE;
this.redisRepository = redisRepository;
}

/**
* 守护线程停止标记
*/
private volatile Boolean signal;

void stop() {
this.signal = Boolean.FALSE;
}


@Override
public void run() {
//每隔 2/3 时间自动检测
long waitTime = lockTime * 2 / 3;
while (signal) {
try {
Thread.sleep(waitTime);
//延长过期时间
if (expandLockTime(key, clientId, lockTime) != MARK_EXPIRE_SUCCESS) {
//如果不成功就停掉任务
this.stop();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

/**
* 锁续期
* @param key
* @param clientId
* @param lockTime
* @return
*/
private Long expandLockTime(String key,String clientId,long lockTime){
String script = "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;";
return redisRepository.eval(script,Long.class,key,lockTime,clientId);
}
}

另外这儿还涉及到了 RedisRepository 这个类,这个类其实就是包装了下 RedisTemplate,如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class RedisRepository {
/**
* Spring Redis Template
*/
private RedisTemplate<String, String> redisTemplate;

public RedisRepository(RedisTemplate<String, String> redisTemplate) {
this.redisTemplate = redisTemplate;
}
//......部分方法略,见RedisTemplate
/**
* 获取 RedisSerializer
*
* @return the redis serializer
*/
protected RedisSerializer<String> getRedisSerializer() {
return redisTemplate.getStringSerializer();
}
/**
* Lua 脚本执行器
*/
public <T> T eval(final String luaScript, Class<T> resultClass, String key, Object... args) {
return redisTemplate.execute((RedisCallback<T>) connection -> {
RedisSerializer<String> serializer = getRedisSerializer();
List<Object> list = new ArrayList<>();
list.add(key);
list.addAll(Arrays.asList(args));
List<byte[]> var1 = list.stream().map(x -> serializer.serialize("" + x)).collect(Collectors.toList());
byte[][] var2 = var1.toArray(new byte[var1.size()][]);
return connection.eval(serializer.serialize(luaScript), ReturnType.fromJavaType(resultClass), 1, var2);
});
}
}

我们用上面的类运行一下,可以看到输出日志,锁在剩余10s左右的时候续期了。

总结

以上就是关于分布式锁续期的一些内容,我们通过分析Redisson的加锁逻辑,发现其锁续期的一些内容,其通过Lua脚本+ TimeTask等实现锁续期,使用到了Redis哈希表及其一些操作hset、hexists 等来实现分布式锁,保证其唯一性和可重入性。

而后我们又自己手写了一个基于Redis的续期锁,加深对续期锁的理解。

到这儿差不多就结束了,另外我们需要补充一点,是一种有可能发生的情况。

这儿需要注意的是我们以上是针对的单机Redis来说的,如果是主从服务或者集群Redis,这种分布式锁可能会有一个小问题。如下:

  1. 客户端1对某个redis master实例,写入了锁keyvalue,此时会异步复制给对应的master slave实例;
  2. 但是在复制过程中发生了redis master宕机,主备切换,redis slave变为了redis masterredis slave上并不存在锁key
  3. 客户端2来尝试加锁的时候,在新的redis master上完成了加锁,而客户端1也以为自己成功加了锁;
  4. 此时就会导致多个客户端对一个分布式锁完成了加锁,这样就有可能在业务上产生一些问题。

关于上面这种情况,我们可以使用红锁RedLock来解决,这部分内容可以查看 Redis分布式锁之红锁(RedLock) 这篇文章。




-------------文章结束啦 ~\(≧▽≦)/~ 感谢您的阅读-------------

您的支持就是我创作的动力!

欢迎关注我的其它发布渠道