分布式锁轮子
简介
在如今分布式的大环境中,我们需要保证线程安全就无法像单体应用中定义一个简单的锁,我们需要一个在任何地方都可以访问到的锁。
为了保证一个方法或属性在高并发情况下的同一时间只能被同一个线程执行,在传统单体应用单机部署的情况下,可以使用并发处理相关的功能进行互斥控制。但是,随着业务发展的需要,原单体单机部署的系统被演化成分布式集群系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的应用并不能提供分布式锁的能力。为了解决这个问题就需要一种跨机器的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题!
分布式锁的要素
1、在分布式系统环境下,一个方法在同一时间只能被一个机器的一个线程执行;
2、高可用的获取锁与释放锁;
3、高性能的获取锁与释放锁;
4、具备可重入特性;
5、具备锁失效机制,防止死锁;
6、具备非阻塞锁特性,即没有获取到锁将直接返回获取锁失败
分布式锁的实现有很多种,如:基于数据库的实现方式
、基于Redis的实现方式
、基于ZooKeeper的实现方式
那本文就讲一讲任何利用SpringBoot
+Redis
搭建一个较为完善的分布式demo
对于分布式锁的介绍,以及从最简单的分布式锁到后续使用lua脚本配合实现分布式锁可以参考我之前的文章
这边做一个简单的总结
-
为什么锁要设置TTL生存时间
如果一个线程获取到锁,但是在执行业务中挂掉或者宕机等等原因导致无法删除锁,那么就会形成死锁,往后其他服务的线程永远无法执行,除非将死锁删除
-
为什么需要锁续期(看门狗)
如果业务在执行过程中遇到网络拥堵也好、cpu未分配到资源也好,导致业务执行时间超过了TTL,那么TTL时间一到,锁失效,其他线程得到锁执行业务,那么就打破锁的基本要素(同一时间只能被一个机器的一个线程执行)
创建项目
首先我们创建一个SpringBoot
+Redis
的项目
DistributedLock注解
首先我们在项目中创建一个DistributedLock
注解,该注解的作用域在方法之上,被该注解修饰的方法具有分布式锁的效果
code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
|
@Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface DistributedLock {
String value() default "";
String prefix() default "";
boolean paramLock() default false;
long keyTimeout() default 1000L;
int retryCount() default 3;
long retryInterval() default 500L; }
|
属性
-
value
为redis的key,也就是锁的key值
-
prefix
锁的前缀,不同锁可以设置不同前缀
-
paramLock
方法参数是否分布锁的构建,也就是分布式锁的颗粒度是否到达参数级别
底层实现为参数的hashcode,对于未重写hashcode的类型无效,Object的hashcode方法默认返回内存地址
-
keyTimeout
锁超时时间
-
retryCount
锁重试次数,0表示一定执行
-
retryInterval
锁重试间隔,大小越接近业务方法执行时间越好(>=),越不会浪费空闲时间
LocksAspect类
LocksAspect是实现分布式锁的核心类,内部使用AOP实现,拦截被DistributedLock
注解修饰的方法。
code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159
|
@Aspect @Component public class LocksAspect {
private static final Logger log = LoggerFactory.getLogger(LocksAspect.class);
private static final double KEY_TIMEOUT_OFFSET = 0.75;
private static final String suffix = "-locks";
private static final ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
private static final Map<String, ScheduledFuture<?>> scheduledFutureMap = new ConcurrentHashMap<>();
private static final Random random = new Random();
static { threadPoolTaskScheduler.initialize(); }
@Autowired private RedisTemplate redisTemplate;
@Around(value = "@annotation(lock)", argNames = "joinPoint,lock") void around(ProceedingJoinPoint joinPoint, DistributedLock lock) { StringBuilder str = new StringBuilder(lock.value()); if (lock.paramLock()){ Object[] args = joinPoint.getArgs(); for (Object arg : args) { if (arg == null) continue; str.append("-" + arg.hashCode()); } } String key = new StringBuilder() .append(lock.prefix()).append(str).append(suffix) .toString(); int retryCut = lock.retryCount(), sub = retryCut == 0 ? 0 : 1; long retryInterval = lock.retryInterval(), keyTimeout = lock.keyTimeout();
String value = UUID.randomUUID().toString();
verify(retryCut, retryInterval, key);
try { while ((retryCut -= sub) >= 0) {
Boolean absent = redisTemplate.opsForValue().setIfAbsent(key, value, keyTimeout, TimeUnit.MILLISECONDS); if (absent) { startWatchDog(key, value, keyTimeout);
joinPoint.proceed();
break; } else { log.info("{} key:{} 未拿到锁,准备重试", Thread.currentThread().getName(), key); Thread.sleep((long) (retryInterval + retryInterval * random.nextDouble())); } } } catch (InterruptedException e) { log.error("线程睡眠被打断"); e.printStackTrace(); } catch (Throwable e) { e.printStackTrace(); } finally { if (delLock(key, value)) { removeWatchDog(key); } } }
private boolean delLock(String key, String value) { DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(); redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("locks/del-lock.lua"))); redisScript.setResultType(Long.class); Long execute = (Long) redisTemplate.execute(redisScript, Arrays.asList(key), value); log.info("{} key:{} 删除{}",Thread.currentThread().getName(), key, execute == 0L ? "失败" : "成功"); return execute != 0L; }
private void verify(int retryCut, double retryInterval, String key) { StringBuilder errMsg = new StringBuilder(); if (retryCut < 0) errMsg.append("retryCut "); if (retryInterval <= 0) errMsg.append("retryInterval "); int lio = key.lastIndexOf(suffix); if (lio == -1 || lio != key.length() - suffix.length()) errMsg.append("key "); if (errMsg.length() != 0) throw new RuntimeException(errMsg.append("参数不合法").toString()); }
private void startWatchDog(String key, String value, long keyTimeout) { AtomicInteger i = new AtomicInteger(); ScheduledFuture<?> future = threadPoolTaskScheduler.scheduleAtFixedRate(() -> { DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(); redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("locks/pexpire-lock.lua"))); redisScript.setResultType(Long.class); Long execute = (Long) redisTemplate.execute(redisScript, Arrays.asList(key), value, keyTimeout); if (execute == 0L) removeWatchDog(key); log.info("{} key:{},第{}锁续期{}",Thread.currentThread().getName(), key, i.incrementAndGet(), execute == 0L ? "失败" : "成功"); }, (long) (keyTimeout * KEY_TIMEOUT_OFFSET));
scheduledFutureMap.put(key, future); }
private void removeWatchDog(@Nullable String key){ ScheduledFuture<?> future = scheduledFutureMap.get(key); if (future != null && !future.isCancelled()) future.cancel(false); scheduledFutureMap.remove(key); log.info("取消看门狗 key:{}", key); } }
|
around()
around()
方法实现了aop环绕通知,分布式锁的主要逻辑流程就在方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| @Around(value = "@annotation(lock)", argNames = "joinPoint,lock") void around(ProceedingJoinPoint joinPoint, DistributedLock lock) { StringBuilder str = new StringBuilder(lock.value()); if (lock.paramLock()){ Object[] args = joinPoint.getArgs(); for (Object arg : args) { if (arg == null) continue; str.append("-" + arg.hashCode()); } } String key = new StringBuilder() .append(lock.prefix()).append(str).append(suffix) .toString(); int retryCut = lock.retryCount(), sub = retryCut == 0 ? 0 : 1; long retryInterval = lock.retryInterval(), keyTimeout = lock.keyTimeout();
String value = UUID.randomUUID().toString();
verify(retryCut, retryInterval, key);
try { while ((retryCut -= sub) >= 0) {
Boolean absent = redisTemplate.opsForValue().setIfAbsent(key, value, keyTimeout, TimeUnit.MILLISECONDS); if (absent) { startWatchDog(key, value, keyTimeout);
joinPoint.proceed();
break; } else { log.info("{} key:{} 未拿到锁,准备重试", Thread.currentThread().getName(), key); Thread.sleep((long) (retryInterval + retryInterval * random.nextDouble())); } } } catch (InterruptedException e) { log.error("线程睡眠被打断"); e.printStackTrace(); } catch (Throwable e) { e.printStackTrace(); } finally { if (delLock(key, value)) { removeWatchDog(key); } }
|
锁竞争打散
使用random函数使睡眠时间在(retryInterval~2retryInterval)的时间,打散每一个线程竞争锁的时间,避免线程同一时间醒来,避免惊群效应。
1
| Thread.sleep((long) (retryInterval + retryInterval * random.nextDouble()));
|
verify()
verify()
检验DistributedLock
一些参数的合法性
1 2 3 4 5 6 7 8
| private void verify(int retryCut, double retryInterval, String key) { StringBuilder errMsg = new StringBuilder(); if (retryCut < 0) errMsg.append("retryCut "); if (retryInterval <= 0) errMsg.append("retryInterval "); int lio = key.lastIndexOf(suffix); if (lio == -1 || lio != key.length() - suffix.length()) errMsg.append("key "); if (errMsg.length() != 0) throw new RuntimeException(errMsg.append("参数不合法").toString()); }
|
startWatchDog()
看门狗方法,也就是启动一个守护线程,定时对锁进行续期
值得一提的是ThreadPoolTaskScheduler
类,该类是Spring框架的线程池任务调度类
- 该线程池核心线程数为1
- 最大空闲线程数为Integer.MAX_VALUE
- 空闲线程最大活跃时间10000000纳秒
也就是说ThreadPoolTaskScheduler
内的守护线程会马上执行,不会因为核心线程数不够而被迫放到work队列,等待调度
这对守护线程很重要,如果不能及时的对锁续期就会导致其他线程抢占锁,同一时刻两个线程执行任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| private void startWatchDog(String key, String value, long keyTimeout) { AtomicInteger i = new AtomicInteger(); ScheduledFuture<?> future = threadPoolTaskScheduler.scheduleAtFixedRate(() -> { DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(); redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("locks/pexpire-lock.lua"))); redisScript.setResultType(Long.class); Long execute = (Long) redisTemplate.execute(redisScript, Arrays.asList(key), value, keyTimeout); if (execute == 0L) removeWatchDog(key); log.info("{} key:{},第{}锁续期{}",Thread.currentThread().getName(), key, i.incrementAndGet(), execute == 0L ? "失败" : "成功"); }, (long) (keyTimeout * KEY_TIMEOUT_OFFSET));
scheduledFutureMap.put(key, future); }
|
锁续期
锁续期由于是非原子操作,所以需要调用lua脚本来实现锁续期
- KEYS[]数组是redisTemplate.execute(RedisScript script, List keys, Object… args);中的
keys
参数,是一个集合,下标从1开始
- ARGV[]数组是redisTemplate.execute(RedisScript script, List keys, Object… args);中的
args
参数是一个可变长参数,下标从1开始
lua
该lua脚本很简单,就是判断值是否相等,相等则执行pexpire
设置生存时间
1 2 3 4 5
| if redis.call("get",KEYS[1])==ARGV[1] then return redis.call("pexpire",KEYS[1],ARGV[2]) else return 0 end
|
为什么锁续期不是一个原子操作
判断锁的value值是否相等,相等再请求续期锁,明显不是一个原子操作,如果恰巧锁过期,那么pexpire
命令就会作用在非本线程设置的锁上,故我们需要lua脚本确保原子性
removeWatchDog()
删除守护线程(看门狗),当当前业务执行完成后,也就不需要对其锁进行续期,避免续期续到其他锁上去了,要及时删除看门狗
1 2 3 4 5 6
| private void removeWatchDog(@Nullable String key){ ScheduledFuture<?> future = scheduledFutureMap.get(key); if (future != null && !future.isCancelled()) future.cancel(false); scheduledFutureMap.remove(key); log.info("取消看门狗 key:{}", key); }
|
delLock()
删除锁
1 2 3 4 5 6 7 8
| private boolean delLock(String key, String value) { DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(); redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("locks/del-lock.lua"))); redisScript.setResultType(Long.class); Long execute = (Long) redisTemplate.execute(redisScript, Arrays.asList(key), value); log.info("{} key:{} 删除{}",Thread.currentThread().getName(), key, execute == 0L ? "失败" : "成功"); return execute != 0L; }
|
lua
1 2 3 4 5
| if redis.call("get",KEYS[1])==ARGV[1] then return redis.call("del",KEYS[1]) else return 0 end
|
原子性
删除锁同样需要保证原子性,删除锁和锁续期逻辑相同,但是不保证删除锁的原子性比不保证锁续期的危害性大得多
测试
写一个Service,并发请求10000个,操作资源 i=0, j=0,其中操作i的方法加分布式锁
code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| int i = 0, j = 0;
@Override @DistributedLock(prefix = "my-", value = "test", keyTimeout = 3000L, retryCount = 0, paramLock = true, retryInterval = 200L) public String test(String s, int t, Object o) { log.error("i : {}", i++); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } return s + "succeed"; }
@Override public void test(){ log.error("j : {}", j++); }
|
使用jmeter测压,并发1000,10轮,每轮间隔0.1秒。我试过一次10000并发,这样由于请求执行时间过长,一些请求死掉了没法测出锁的效果。
可以看到最后打印结果,i=9999,符合加锁线程安全的要求,而j=9986,线程不安全了。
以上代码开源至gitee仓库,欢迎star