对每个人而言,真正的职责只有一个:找到自我。然后在心中坚守其一生,全心全意,永不停息。所有其它的路都是不完整的,是人的逃避方式,是对大众理想的懦弱回归,是随波逐流,是对内心的恐惧 ——赫尔曼·黑塞《德米安》
写在前面
工作中遇到,整理 reids
做简单分布式锁的思考
博文适合刚接触 redis
的小伙伴
理解不足小伙伴帮忙指正
对每个人而言,真正的职责只有一个:找到自我。然后在心中坚守其一生,全心全意,永不停息。所有其它的路都是不完整的,是人的逃避方式,是对大众理想的懦弱回归,是随波逐流,是对内心的恐惧 ——赫尔曼·黑塞《德米安》
假设现在有这样一个需求,需要做排队预约
住宿的功能,当前宿舍住满了,有新的同学需要来入住,可以进行排队预约,排队编号通过累加的方式生成
我们设计这样一张数据表
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 CREATE TABLE `ams_student_queue_check_in_sync` ( `queue_check_in_id` INT (11 ) NOT NULL AUTO_INCREMENT COMMENT '学生队列ID' , `student_name` VARCHAR (50 ) NOT NULL COMMENT '学生姓名' COLLATE 'utf8mb4_general_ci' , `student_uid` VARCHAR (50 ) NULL DEFAULT NULL COMMENT '学生uid' COLLATE 'utf8mb4_general_ci' , `student_card` VARCHAR (30 ) NULL DEFAULT NULL COMMENT '学生身份证号' COLLATE 'utf8mb4_general_ci' , `student_contact_number` VARCHAR (20 ) NOT NULL COMMENT '学生联系电话' COLLATE 'utf8mb4_general_ci' , `student_email` VARCHAR (50 ) NULL DEFAULT NULL COMMENT '学生电子邮件地址' COLLATE 'utf8mb4_general_ci' , `student_gender` TINYINT(4 ) NOT NULL DEFAULT '0' COMMENT '学生性别' , `student_emergency_contact_name` VARCHAR (100 ) NULL DEFAULT NULL COMMENT '第二联系人姓名' COLLATE 'utf8mb4_general_ci' , `student_emergency_contact_number` VARCHAR (20 ) NULL DEFAULT NULL COMMENT '第二联系人电话' COLLATE 'utf8mb4_general_ci' , `student_status` TINYINT(4 ) NULL DEFAULT '1' COMMENT '学生排队状态(1.待入住,2.以入住 3.以取消)' , `arrival_dates` DATETIME NULL DEFAULT NULL COMMENT '预计入住时间' , `departure_dates` DATETIME NULL DEFAULT NULL COMMENT '预计离开日期' , `queue_position` INT (11 ) NULL DEFAULT NULL COMMENT '学生在排队中的位置' , `check_in_remark` TEXT NULL DEFAULT NULL COMMENT '备注' COLLATE 'utf8mb4_general_ci' , `extended1` VARCHAR (50 ) NULL DEFAULT NULL COMMENT '扩展字段1' COLLATE 'utf8mb4_general_ci' , `extended2` VARCHAR (50 ) NULL DEFAULT NULL COMMENT '扩展字段2' COLLATE 'utf8mb4_general_ci' , `extended3` VARCHAR (50 ) NULL DEFAULT NULL COMMENT '扩展字段3' COLLATE 'utf8mb4_general_ci' , `created_at` TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间' , `updated_at` TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间' , PRIMARY KEY (`queue_check_in_id`) USING BTREE, INDEX `student_uid` (`student_uid`) USING BTREE ) COMMENT= '入住排队表' COLLATE = 'utf8mb4_general_ci' ENGINE= InnoDB AUTO_INCREMENT= 1363 ;
queue_position
为每一位同学的排队编号,需要根据当前的学生编号最大来累加
下面为实现的基础代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 @ApiOperation("入住排队接口" ) @PostMapping("/checkInQueue" ) @Transactional public AjaxResult checkInQueue( @RequestHeader("UID" ) String uid, @RequestBody AmsStudentQueueCheckIn amsStudentQueueCheckIn){ if (Objects.isNull(uid)){ return AjaxResult.error("Uid 为空" ); } if (Objects.isNull(amsStudentQueueCheckIn.getStudentCard())){ return AjaxResult.error("身份证号为空" ); } if (Objects.isNull(amsStudentQueueCheckIn.getStudentEmergencyContactNumber())){ return AjaxResult.error("电话号为空" ); } if (Objects.isNull(amsStudentQueueCheckIn.getStudentName())){ return AjaxResult.error("姓名为空" ); } if (Objects.isNull(amsStudentQueueCheckIn.getStudentGender())){ return AjaxResult.error("性别为空" ); } StringBuilder stringBuilder = new StringBuilder(); String studentContactNumber = amsStudentQueueCheckIn.getStudentContactNumber(); List<AmsStudentQueueCheckIn> amsStudentQueueCheckIns1 = amsStudentQueueCheckInService.selectAmsStudentQueueCheckInList(new AmsStudentQueueCheckIn().setStudentContactNumber(studentContactNumber)); Integer count = amsStudentQueueCheckInService.selectAmsStudentQueueCheckInListCount(amsStudentQueueCheckIn.getStudentGender()); if (Objects.nonNull(amsStudentQueueCheckIns1) && amsStudentQueueCheckIns1.size() !=0 ){ stringBuilder.append("已经排队预约啦,请耐心等待 ^_^" ) .append(", 预约编号为 " ).append(amsStudentQueueCheckIns1.get(0).getQueuePosition()) .append(", 前面还有 " ).append(count - 1).append( " 人" ); return AjaxResult.success(stringBuilder.toString(),ImmutableMap.of("queuePosition" ,amsStudentQueueCheckIns1.get(0).getQueuePosition(),"beforePeopleBumber" ,count -1 )); } AmsStudentQueueCheckIn amsStudentQueueCheckIns = amsStudentQueueCheckInService.selectAmsStudentQueueCheckInListMax(amsStudentQueueCheckIn.getStudentGender()); Long queuePosition = 0L; if (Objects.nonNull(amsStudentQueueCheckIns)){ queuePosition = amsStudentQueueCheckIns.getQueuePosition() } amsStudentQueueCheckIn.setStudentStatus(1).setQueuePosition(queuePosition + 1L).setStudentUid(uid); amsStudentQueueCheckIn.setStudentStatus(1).setQueuePosition(amsStudentQueueCheckIns.getQueuePosition() + 1L).setStudentUid(uid); int i = amsStudentQueueCheckInService.insertAmsStudentQueueCheckIn(amsStudentQueueCheckIn); if (i != 1){ return AjaxResult.error("排队预约失败!" ); } stringBuilder.append("排队预约成功" ) .append(", 预约编号为 " ).append(amsStudentQueueCheckIn.getQueuePosition()) .append(", 前面还有 " ).append(count).append( " 人" ); return AjaxResult.success(stringBuilder.toString(),ImmutableMap.of("queuePosition" ,amsStudentQueueCheckIn.getQueuePosition(),"beforePeopleBumber" ,count)); }
逻辑比较简单,拿到数据,获取编号最大值累加,数据落表,但是上面的代码存在一个问题,因为是 Springboot
项目,使用 tomcat
部署,Spring Boot 嵌入的 Tomcat 默认启用 Http11NioProtocol
,可以切换日志级别为 Debug 可看到
Http11NioProtocol
表示多线程非阻塞模式的HTTP协议的通信
(web 服务端网络IO处理模型包括:单(多)线程阻塞(非阻塞)IO模型)。
1 2 3 4 5 logging: level: root: debug
1 2 3 4 5 6 7 8 9 10 11 12 13 14 11:42:51.810 [restartedMain] INFO o.a.c.h.Http11NioProtocol - [log ,173] - Initializing ProtocolHandler ["http-nio-8080" ] 11:42:51.811 [restartedMain] DEBUG o.a.c.u.LifecycleBase - [log ,173] - Setting state for [Connector[HTTP/1.1-8080]] to [INITIALIZED] 11:42:51.811 [restartedMain] DEBUG o.a.c.u.LifecycleBase - [log ,173] - Setting state for [StandardService[Tomcat]] to [INITIALIZED] 11:42:51.811 [restartedMain] DEBUG o.a.c.u.LifecycleBase - [log ,173] - Setting state for [StandardServer[-1]] to [INITIALIZED] 11:42:51.811 [restartedMain] DEBUG o.a.c.u.LifecycleBase - [log ,173] - Setting state for [StandardServer[-1]] to [STARTING_PREP] 11:42:51.811 [restartedMain] DEBUG o.a.c.u.LifecycleBase - [log ,173] - Setting state for [StandardServer[-1]] to [STARTING] 11:42:51.812 [restartedMain] DEBUG o.a.c.u.LifecycleBase - [log ,173] - Setting state for [org.apache.catalina.deploy.NamingResourcesImpl@1dc49001] to [STARTING_PREP] 11:42:51.812 [restartedMain] DEBUG o.a.c.u.LifecycleBase - [log ,173] - Setting state for [org.apache.catalina.deploy.NamingResourcesImpl@1dc49001] to [STARTING] 11:42:51.812 [restartedMain] DEBUG o.a.c.u.LifecycleBase - [log ,173] - Setting state for [org.apache.catalina.deploy.NamingResourcesImpl@1dc49001] to [STARTED] 11:42:51.812 [restartedMain] DEBUG o.a.c.u.LifecycleBase - [log ,173] - Setting state for [StandardService[Tomcat]] to [STARTING_PREP] 11:42:51.812 [restartedMain] INFO o.a.c.c.StandardService - [log ,173] - Starting service [Tomcat] 11:42:51.812 [restartedMain] DEBUG o.a.c.u.LifecycleBase - [log ,173] - Setting state for [StandardService[Tomcat]] to [STARTING] 11:42:51.813 [restartedMain] DEBUG o.a.c.u.LifecycleBase - [log ,173] - Setting state for [StandardEngine[Tomcat]] to [STARTING_PREP] 11:42:51.813 [restartedMain] INFO o.a.c.c.StandardEngine - [log ,173] - Starting Servlet engine: [Apache Tomcat/9.0.75]
可以看到 spring-boot-starter-web
嵌入的 9.0.75
版本的 tomcat
,Tomcat 从 8.5
版本开始移除了 BIO
,默认启用 NIO
下图为从套接字连接接收、处理请求、响应客户端的整个过程
所以当多个排队请求并发调用接口时,不同的线程会分别进入方法,这个时候有可能会从数据库获取相同的排队编号进行累加,同时生成相同新编号,所以这里需要考虑方法线程安全
,
最简单的方式是使用同步方法
,保证只有一个线程获取锁,但是这不是最优的方式,这里不做考虑
1 2 public synchronized AjaxResult checkInQueue ( @RequestHeader("UID") String uid, @RequestBody AmsStudentQueueCheckIn amsStudentQueueCheckIn) {....................
使用同步方法
的方式解决了上面的问题,但是如果当前项目是在 k8s
集群上面部署,以分布式的方式,就需要考虑多个 Pod 的数据同步问题。
假设两个排队请求被负载到两个不同的 Pod,这个时候同时查询数据会获取相同的最大编号,生成相同的编号,考虑使用分布式锁
。下面为对方法的改进,这里如果使用分布式锁
的方式,那么上面的同步方法
即可以去掉了,因为获取锁的方法是原子操作。
分布式锁实现很简单,就是进来一个线程先占位,当别的线城进来操作时,发现已经有人占位了,就会放弃或者稍后再试。这里的占位状态是全局的,相对整个集群而言,代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 String token = UUID.randomUUID().toString();if (redisCache.tryAcquireLock("checkInQueue" , token, 2 , 10 )){ AmsStudentQueueCheckIn amsStudentQueueCheckIns = amsStudentQueueCheckInService.selectAmsStudentQueueCheckInListMax(amsStudentQueueCheckIn.getStudentGender()); Long queuePosition = 0L ; if (Objects.nonNull(amsStudentQueueCheckIns)){ queuePosition = amsStudentQueueCheckIns.getQueuePosition(); } amsStudentQueueCheckIn.setStudentStatus(1 ).setQueuePosition(queuePosition + 1L ).setStudentUid(uid); int i = amsStudentQueueCheckInService.insertAmsStudentQueueCheckIn(amsStudentQueueCheckIn); redisCache.unlock("checkInQueue" , token); if (i != 1 ){ return AjaxResult.error("排队预约失败!请重新填写" ); } }else { return AjaxResult.error("系统繁忙,请稍后提交!" ); }
tryAcquireLock
和 tryLock
以及 unlock
的方法实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 public class RedisCache { private static final Logger log = LoggerFactory.getLogger(RedisCache.class); private static final String REDIS_UNLOCK_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end" ; @Autowired public RedisTemplate redisTemplate; public boolean tryLock (String key, String token, long expireInSeconds) { Boolean res = redisTemplate.opsForValue().setIfAbsent(key, token, expireInSeconds, TimeUnit.SECONDS); log.info("获取分布式锁:" + key + ":" + token); return Objects.equals(res, true ); } public void unlock (String key, String token) { try { DefaultRedisScript<Long> redisScript = new DefaultRedisScript <>(REDIS_UNLOCK_SCRIPT, Long.class); Long res = (Long) redisTemplate.execute(redisScript, Collections.singletonList(key), token); log.info("释放分布式锁:" + key + ":" + token); if (!Objects.equals(res, 1L )) { log.warn("redis unlock wrong:key=[{}],token=[{}],res=[{}]" , key, token, res); } } catch (Exception e) { log.error("redis unlock error:key=[{}],token=[{}]" , key, token, e); } } public boolean tryAcquireLock (String key, String token, long lockTimeout, long acquireTimeout) { try { long end = System.currentTimeMillis() + acquireTimeout; while (System.currentTimeMillis() < end) { Boolean res = redisTemplate.opsForValue().setIfAbsent(key, token, lockTimeout, TimeUnit.MILLISECONDS); if (Boolean.TRUE.equals(res)) { log.info("获取分布式锁:" + key + ":" + token); return true ; } try { Thread.sleep(100 ); } catch (Exception e) { log.error("thread sleep error" , e); Thread.currentThread().interrupt(); } } } catch (Exception e) { log.error("try acquire lock error, " , e); } return false ; } }
tryAcquireLock
和 tryLock
都用于获取分布式锁,unlock
用于释放分布式锁,逻辑简单,这里不做说明,关注以下几点:
tryAcquireLock
和 tryLock
的区别在于,前者在没有获取到锁之后会在限定的时间进行重复尝试获取,后者只尝试获取一次。
防止业务代码在执行的时候抛出异常,每一个锁添加了一个超时时间
,超时之后,锁会被自动释放,考虑获取锁和设置过期时间之间如果服务器突然挂掉了,这个时候锁被占用,无法及时得到释放,也会造成死锁所以,所以要保证这个操作是原子的
,所以使用 Redis 提供的原子操作 setIfAbsent(检查指定的键是否存在,如果不存在则设置键值对)
如果当前线程执行业务较耗时,超时时间会自动释放锁,其他线程会获取锁,当前线程执行完释放锁或释放到其他线程的锁,会出现混乱,所以需要锁相对线程唯一,自己的锁只能自己释放,使用 key+token
的机制
使用 key+token
的机制,每次释放锁都要判断 value, 一致才释放,但是这样的话,要去查看锁的 value,比较 value 的值是否正确,释放锁, 多个操作不保证原子性,所以unlock
需要引入 lua脚本
,Lua 脚本可以在 Redis 服务端原子的执行多个 Redis 命令
上面的实现是最简单的 redis
实现分布式锁,如果要进一步增强分布式锁的可靠性和性能,可以考虑使用更复杂的方案,如 RedLock 算法
(redis 集群)、基于 Redis 的 Pub/Sub
机制等。这些方案可以提供更强的分布式锁功能,并解决一些特殊情况下的竞态条件和故障恢复问题。
博文部分内容参考 © 文中涉及参考链接内容版权归原作者所有,如有侵权请告知,这是一个开源项目,如果你认可它,不要吝啬星星哦 :)
https://liruilong.blog.csdn.net/article/details/107076223
http://www.gxitsky.com/2022/02/12/SpringBoot-60-tomcat-nio/
© 2018-2024 liruilonger@gmail.com , All rights reserved. 保持署名-非商用-相同方式共享(CC BY-NC-SA 4.0)