分布式锁redis正确实现姿势

redis 正确实现分布式锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106

package io.binghe.concurrent.chapter19.service.impl;

import io.binghe.concurrent.chapter19.service.RedisDistributeLock;
import io.binghe.concurrent.chapter19.task.UpdateLockTimeoutTask;
import io.binghe.concurrent.chapter19.utils.ThreadUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Service;

import java.util.Collections;
import java.util.concurrent.TimeUnit;

/**
* @author binghe
* @version 1.0.0
* @description 实现Redis分布式锁 19.6.11 解决锁失效问题
*/
@Service("redisDistributeLockV8")
public class RedisDistributeLockV9Impl implements RedisDistributeLock {
@Autowired
private StringRedisTemplate stringRedisTemplate;
private ThreadLocal<String> threadLocal = new ThreadLocal<String>();
private ThreadLocal<Integer> threadLocalCount = new ThreadLocal<Integer>();
@Override
public boolean tryLock(String key, long timeout, TimeUnit unit) {
Boolean isLocked = false;
if (threadLocal.get() == null){
String currentThreadId = this.getCurrentThreadId();
threadLocal.set(currentThreadId);
isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, currentThreadId, timeout, unit);

//如果获取锁失败,则执行自旋操作,直到获取锁成功
if (!isLocked){
for (;;){
isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, currentThreadId, timeout, unit);
if (isLocked){
break;
}
}
}
//启动线程执行定时更新超时时间的方法
new Thread(new UpdateLockTimeoutTask(currentThreadId, stringRedisTemplate, key)).start();

}else{
isLocked = true;
}
//加锁成功后,计数器的值加1
if (isLocked){
Integer count = threadLocalCount.get() == null ? 0 : threadLocalCount.get();
threadLocalCount.set(++count);
}
return isLocked;
}

@Override
public void releaseLock(String key) {
//当前线程中绑定的线程id与Redis中的线程id相同时,再执行删除锁的操作
String currentThreadId = stringRedisTemplate.opsForValue().get(key);
if (threadLocal.get().equals(currentThreadId)){
Integer count = threadLocalCount.get();
if (count == null || --count <= 0){
stringRedisTemplate.delete(key);
//防止内存泄露
threadLocal.remove();
threadLocalCount.remove();

//通过当前线程的id从Redis中获取更新超时时间的线程id
String updateTimeThreadId = stringRedisTemplate.opsForValue().get(currentThreadId);
if (updateTimeThreadId != null && !"".equals(updateTimeThreadId.trim())){
Thread updateTimeThread = ThreadUtils.getThreadByThreadId(Long.parseLong(updateTimeThreadId));
if (updateTimeThread != null){
//中断更新超时时间的线程
updateTimeThread.interrupt();
stringRedisTemplate.delete(currentThreadId);

}
}
}else {
threadLocalCount.set(count);
}

}
}

private static final String LUA_UN_LOCK =
"if redis.call('get',KEYS[1]) == ARGV[1] then\n" +
" return redis.call('del',KEYS[1])\n" +
"else\n" +
" return 0\n" +
"end";

public boolean releaseLock2(String key, String lockValue) {
DefaultRedisScript<String> redisScript = new DefaultRedisScript<>();
redisScript.setScriptText(LUA_UN_LOCK);
redisScript.setResultType(String.class);
Object result = stringRedisTemplate.execute(redisScript, Collections.singletonList(key), lockValue);
return "1".equals(result.toString());
}

private String getCurrentThreadId(){
return String.valueOf(Thread.currentThread().getId());
}
}

watchdog看门狗机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package io.binghe.concurrent.chapter19.task;

import org.springframework.data.redis.core.StringRedisTemplate;

import java.util.concurrent.TimeUnit;

/**
* @author binghe
* @version 1.0.0
* @description 更新分布式锁的超时时间
*/
public class UpdateLockTimeoutTask implements Runnable{
private String currentThreadId;
private StringRedisTemplate stringRedisTemplate;
private String key;

public UpdateLockTimeoutTask(String currentThreadId,
StringRedisTemplate stringRedisTemplate,
String key) {
this.currentThreadId = currentThreadId;
this.stringRedisTemplate = stringRedisTemplate;
this.key = key;
}

@Override
public void run() {
//以传递的线程id为key,当前执行更新超时时间的线程为value,保存到redis中
stringRedisTemplate.opsForValue().set(currentThreadId, String.valueOf(Thread.currentThread().getId()));
while (true){
if (Thread.interrupted()){
break;
}
stringRedisTemplate.expire(key, 30, TimeUnit.SECONDS);
try {
//每个10秒执行一次
Thread.sleep(10000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}


__END__