0%

分布式锁轮子

分布式锁轮子

简介

在如今分布式的大环境中,我们需要保证线程安全就无法像单体应用中定义一个简单的锁,我们需要一个在任何地方都可以访问到的锁。

为了保证一个方法或属性在高并发情况下的同一时间只能被同一个线程执行,在传统单体应用单机部署的情况下,可以使用并发处理相关的功能进行互斥控制。但是,随着业务发展的需要,原单体单机部署的系统被演化成分布式集群系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的应用并不能提供分布式锁的能力。为了解决这个问题就需要一种跨机器的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题!

分布式锁的要素

1、在分布式系统环境下,一个方法在同一时间只能被一个机器的一个线程执行;
2、高可用的获取锁与释放锁;
3、高性能的获取锁与释放锁;
4、具备可重入特性;
5、具备锁失效机制,防止死锁;
6、具备非阻塞锁特性,即没有获取到锁将直接返回获取锁失败

分布式锁的实现有很多种,如:基于数据库的实现方式基于Redis的实现方式基于ZooKeeper的实现方式

那本文就讲一讲任何利用SpringBoot+Redis搭建一个较为完善的分布式demo

对于分布式锁的介绍,以及从最简单的分布式锁到后续使用lua脚本配合实现分布式锁可以参考我之前的文章

这边做一个简单的总结

  • 为什么锁要设置TTL生存时间

    如果一个线程获取到锁,但是在执行业务中挂掉或者宕机等等原因导致无法删除锁,那么就会形成死锁,往后其他服务的线程永远无法执行,除非将死锁删除

  • 为什么需要锁续期(看门狗)

    如果业务在执行过程中遇到网络拥堵也好、cpu未分配到资源也好,导致业务执行时间超过了TTL,那么TTL时间一到,锁失效,其他线程得到锁执行业务,那么就打破锁的基本要素(同一时间只能被一个机器的一个线程执行)

创建项目

首先我们创建一个SpringBoot+Redis的项目

image-20220307231404824

DistributedLock注解

首先我们在项目中创建一个DistributedLock注解,该注解的作用域在方法之上,被该注解修饰的方法具有分布式锁的效果

code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
* 分布式锁注解类
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface DistributedLock {

/**
* 锁的key
* @return
*/
String value() default "";

/**
* key的前缀
* @return
*/
String prefix() default "";

/**
* 参数是否参与锁的构建
* 底层使用参数的hashcode构建key,注意重写参数hash方法
* @return
*/
boolean paramLock() default false;

/**
* 锁的超时时间,单位毫秒
* @return
*/
long keyTimeout() default 1000L;

/**
* 最大重试次数
* 0 : 表示该方法一定要拿到锁并执行,如果一直获取不到锁会死循环
* @return
*/
int retryCount() default 3;

/**
* 重试间隔时间,单位毫秒
* @return
*/
long retryInterval() default 500L;
}

属性

  • value

    为redis的key,也就是锁的key值

  • prefix

    锁的前缀,不同锁可以设置不同前缀

  • paramLock

    方法参数是否分布锁的构建,也就是分布式锁的颗粒度是否到达参数级别

    底层实现为参数的hashcode,对于未重写hashcode的类型无效,Object的hashcode方法默认返回内存地址

  • keyTimeout

    锁超时时间

  • retryCount

    锁重试次数,0表示一定执行

  • retryInterval

    锁重试间隔,大小越接近业务方法执行时间越好(>=),越不会浪费空闲时间

image-20220307172820544

image-20220307172926307

image-20220307173023375

image-20220307174109983

image-20220307174127584

LocksAspect类

LocksAspect是实现分布式锁的核心类,内部使用AOP实现,拦截被DistributedLock注解修饰的方法。

code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
/**
* @program: distributed-locks-demo
* @description:
* @author: xpp011
* @create: 2022-03-05 22:41
**/
@Aspect
@Component
public class LocksAspect {

private static final Logger log = LoggerFactory.getLogger(LocksAspect.class);

private static final double KEY_TIMEOUT_OFFSET = 0.75;

private static final String suffix = "-locks";


/**
* 该线程池核心线程数为1
* 最大线程数为Integer.MAX_VALUE;
* 空闲线程最大活跃时间10000000纳秒
*/
private static final ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();

private static final Map<String, ScheduledFuture<?>> scheduledFutureMap = new ConcurrentHashMap<>();

private static final Random random = new Random();

static {
threadPoolTaskScheduler.initialize();
}

@Autowired
private RedisTemplate redisTemplate;

@Around(value = "@annotation(lock)", argNames = "joinPoint,lock")
void around(ProceedingJoinPoint joinPoint, DistributedLock lock) {
StringBuilder str = new StringBuilder(lock.value());
if (lock.paramLock()){
//参数参与锁的构建
Object[] args = joinPoint.getArgs();
for (Object arg : args) {
if (arg == null) continue;
str.append("-" + arg.hashCode());
}
}
//依据参数实现锁
String key = new StringBuilder()
.append(lock.prefix()).append(str).append(suffix)
.toString();
//重试次数
int retryCut = lock.retryCount(), sub = retryCut == 0 ? 0 : 1;
//重试间隔
long retryInterval = lock.retryInterval(), keyTimeout = lock.keyTimeout();

//锁的value
String value = UUID.randomUUID().toString();

verify(retryCut, retryInterval, key);

try {
while ((retryCut -= sub) >= 0) {

Boolean absent = redisTemplate.opsForValue().setIfAbsent(key, value, keyTimeout, TimeUnit.MILLISECONDS);
if (absent) {
//拿到锁,开启看门狗进行锁续期
startWatchDog(key, value, keyTimeout);

//执行业务代码
joinPoint.proceed();

break;
} else {
//未拿到锁,停顿尝试重新获取
log.info("{} key:{} 未拿到锁,准备重试", Thread.currentThread().getName(), key);
Thread.sleep((long) (retryInterval + retryInterval * random.nextDouble()));
}
}
} catch (InterruptedException e) {
log.error("线程睡眠被打断");
e.printStackTrace();
} catch (Throwable e) {
e.printStackTrace();
} finally {
//删除锁, 非原子操作调用lua脚本
if (delLock(key, value)) {
//取消看门狗
removeWatchDog(key);
}
}
}

/**
* 调用lua脚本删除锁
*
* @param key
* @param value
*/
private boolean delLock(String key, String value) {
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("locks/del-lock.lua")));
redisScript.setResultType(Long.class);
Long execute = (Long) redisTemplate.execute(redisScript, Arrays.asList(key), value);
log.info("{} key:{} 删除{}",Thread.currentThread().getName(), key, execute == 0L ? "失败" : "成功");
return execute != 0L;
}

/**
* 检验参数合法性
*
* @param retryCut
* @param retryInterval
* @param key
*/
private void verify(int retryCut, double retryInterval, String key) {
StringBuilder errMsg = new StringBuilder();
if (retryCut < 0) errMsg.append("retryCut ");
if (retryInterval <= 0) errMsg.append("retryInterval ");
int lio = key.lastIndexOf(suffix);
if (lio == -1 || lio != key.length() - suffix.length()) errMsg.append("key ");
if (errMsg.length() != 0) throw new RuntimeException(errMsg.append("参数不合法").toString());
}

/**
* 看门狗
* 添加守护线程实现锁续期
* 看门狗这一块还没想到有什么好的方式实现,待我变强后来重构
*
* @param key
* @param keyTimeout
*/
private void startWatchDog(String key, String value, long keyTimeout) {
AtomicInteger i = new AtomicInteger();
ScheduledFuture<?> future = threadPoolTaskScheduler.scheduleAtFixedRate(() -> {
//锁续期
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("locks/pexpire-lock.lua")));
redisScript.setResultType(Long.class);
Long execute = (Long) redisTemplate.execute(redisScript, Arrays.asList(key), value, keyTimeout);
//续期失败删除看门狗
if (execute == 0L) removeWatchDog(key);
//锁续期失败
log.info("{} key:{},第{}锁续期{}",Thread.currentThread().getName(), key, i.incrementAndGet(), execute == 0L ? "失败" : "成功");
}, (long) (keyTimeout * KEY_TIMEOUT_OFFSET));

scheduledFutureMap.put(key, future);
}

/**
* 取消看门狗定时任务
* @param key
*/
private void removeWatchDog(@Nullable String key){
ScheduledFuture<?> future = scheduledFutureMap.get(key);
if (future != null && !future.isCancelled()) future.cancel(false);
scheduledFutureMap.remove(key);
log.info("取消看门狗 key:{}", key);
}
}

around()

around()方法实现了aop环绕通知,分布式锁的主要逻辑流程就在方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
@Around(value = "@annotation(lock)", argNames = "joinPoint,lock")
void around(ProceedingJoinPoint joinPoint, DistributedLock lock) {
StringBuilder str = new StringBuilder(lock.value());
if (lock.paramLock()){
//参数参与锁的构建
Object[] args = joinPoint.getArgs();
for (Object arg : args) {
if (arg == null) continue;
str.append("-" + arg.hashCode());
}
}
//依据参数实现锁
String key = new StringBuilder()
.append(lock.prefix()).append(str).append(suffix)
.toString();
//重试次数
int retryCut = lock.retryCount(), sub = retryCut == 0 ? 0 : 1;
//重试间隔
long retryInterval = lock.retryInterval(), keyTimeout = lock.keyTimeout();

//锁的value
String value = UUID.randomUUID().toString();

verify(retryCut, retryInterval, key);

try {
while ((retryCut -= sub) >= 0) {

Boolean absent = redisTemplate.opsForValue().setIfAbsent(key, value, keyTimeout, TimeUnit.MILLISECONDS);
if (absent) {
//拿到锁,开启看门狗进行锁续期
startWatchDog(key, value, keyTimeout);

//执行业务代码
joinPoint.proceed();

break;
} else {
//未拿到锁,停顿尝试重新获取
log.info("{} key:{} 未拿到锁,准备重试", Thread.currentThread().getName(), key);
Thread.sleep((long) (retryInterval + retryInterval * random.nextDouble()));
}
}
} catch (InterruptedException e) {
log.error("线程睡眠被打断");
e.printStackTrace();
} catch (Throwable e) {
e.printStackTrace();
} finally {
//删除锁, 非原子操作调用lua脚本
if (delLock(key, value)) {
//取消看门狗
removeWatchDog(key);
}
}

锁竞争打散

使用random函数使睡眠时间在(retryInterval~2retryInterval)的时间,打散每一个线程竞争锁的时间,避免线程同一时间醒来,避免惊群效应

1
Thread.sleep((long) (retryInterval + retryInterval * random.nextDouble()));

verify()

verify()检验DistributedLock一些参数的合法性

1
2
3
4
5
6
7
8
private void verify(int retryCut, double retryInterval, String key) {
StringBuilder errMsg = new StringBuilder();
if (retryCut < 0) errMsg.append("retryCut ");
if (retryInterval <= 0) errMsg.append("retryInterval ");
int lio = key.lastIndexOf(suffix);
if (lio == -1 || lio != key.length() - suffix.length()) errMsg.append("key ");
if (errMsg.length() != 0) throw new RuntimeException(errMsg.append("参数不合法").toString());
}

startWatchDog()

看门狗方法,也就是启动一个守护线程,定时对锁进行续期

值得一提的是ThreadPoolTaskScheduler类,该类是Spring框架的线程池任务调度类

  • 该线程池核心线程数为1
  • 最大空闲线程数为Integer.MAX_VALUE
  • 空闲线程最大活跃时间10000000纳秒

也就是说ThreadPoolTaskScheduler内的守护线程会马上执行,不会因为核心线程数不够而被迫放到work队列,等待调度

这对守护线程很重要,如果不能及时的对锁续期就会导致其他线程抢占锁,同一时刻两个线程执行任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private void startWatchDog(String key, String value, long keyTimeout) {
AtomicInteger i = new AtomicInteger();
ScheduledFuture<?> future = threadPoolTaskScheduler.scheduleAtFixedRate(() -> {
//锁续期
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("locks/pexpire-lock.lua")));
redisScript.setResultType(Long.class);
Long execute = (Long) redisTemplate.execute(redisScript, Arrays.asList(key), value, keyTimeout);
//续期失败删除看门狗
if (execute == 0L) removeWatchDog(key);
//锁续期失败
log.info("{} key:{},第{}锁续期{}",Thread.currentThread().getName(), key, i.incrementAndGet(), execute == 0L ? "失败" : "成功");
}, (long) (keyTimeout * KEY_TIMEOUT_OFFSET));

scheduledFutureMap.put(key, future);
}

锁续期

锁续期由于是非原子操作,所以需要调用lua脚本来实现锁续期

  • KEYS[]数组是redisTemplate.execute(RedisScript script, List keys, Object… args);中的keys参数,是一个集合,下标从1开始
  • ARGV[]数组是redisTemplate.execute(RedisScript script, List keys, Object… args);中的args参数是一个可变长参数,下标从1开始

lua

该lua脚本很简单,就是判断值是否相等,相等则执行pexpire设置生存时间

1
2
3
4
5
if redis.call("get",KEYS[1])==ARGV[1] then
return redis.call("pexpire",KEYS[1],ARGV[2])
else
return 0
end

为什么锁续期不是一个原子操作

判断锁的value值是否相等,相等再请求续期锁,明显不是一个原子操作,如果恰巧锁过期,那么pexpire命令就会作用在非本线程设置的锁上,故我们需要lua脚本确保原子性

image-20220309162211924

removeWatchDog()

删除守护线程(看门狗),当当前业务执行完成后,也就不需要对其锁进行续期,避免续期续到其他锁上去了,要及时删除看门狗

1
2
3
4
5
6
private void removeWatchDog(@Nullable String key){
ScheduledFuture<?> future = scheduledFutureMap.get(key);
if (future != null && !future.isCancelled()) future.cancel(false);
scheduledFutureMap.remove(key);
log.info("取消看门狗 key:{}", key);
}

delLock()

删除锁

1
2
3
4
5
6
7
8
private boolean delLock(String key, String value) {
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("locks/del-lock.lua")));
redisScript.setResultType(Long.class);
Long execute = (Long) redisTemplate.execute(redisScript, Arrays.asList(key), value);
log.info("{} key:{} 删除{}",Thread.currentThread().getName(), key, execute == 0L ? "失败" : "成功");
return execute != 0L;
}

lua

1
2
3
4
5
if redis.call("get",KEYS[1])==ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end

原子性

删除锁同样需要保证原子性,删除锁和锁续期逻辑相同,但是不保证删除锁的原子性比不保证锁续期的危害性大得多

image-20220309162945561

测试

写一个Service,并发请求10000个,操作资源 i=0, j=0,其中操作i的方法加分布式锁

code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
int i = 0, j = 0;

@Override
@DistributedLock(prefix = "my-", value = "test", keyTimeout = 3000L, retryCount = 0, paramLock = true, retryInterval = 200L)
public String test(String s, int t, Object o) {
log.error("i : {}", i++);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s + "succeed";
}

@Override
public void test(){
log.error("j : {}", j++);
}

使用jmeter测压,并发1000,10轮,每轮间隔0.1秒。我试过一次10000并发,这样由于请求执行时间过长,一些请求死掉了没法测出锁的效果。

image-20220309173054381

可以看到最后打印结果,i=9999,符合加锁线程安全的要求,而j=9986,线程不安全了。

image-20220309173034074

以上代码开源至gitee仓库,欢迎star

文章作者:xpp011

发布时间:2022年03月07日 - 22:03

原始链接:http://xpp011.cn/2022/03/07/da3fa96d.html

许可协议: 转载请保留原文链接及作者。