我看远山,远山悲悯
写在前面
- 项目中用到,简单整理,为什么选择这种方式,主要是考虑
SSE
简单轻量
- 和 HTTP 耦合度高,不需要单独考虑限流,加密、认证鉴权等,方便维护
- 博文内容为通过
SSE + MQ
实现分布式广播推送
,
- 当然考虑成本问题,这里的
MQ
也可以使用 redis
发布订阅模式
- 理解不足小伙伴帮忙指正 :),生活加油
我看远山,远山悲悯
持续分享技术干货,感兴趣小伙伴可以关注下 ^_^
实现逻辑简单说明,分布式系统,当前项目有一个局部刷新
的业务场景,后端处理完数据需要实时推送到前端,之前的处理办法是 WebSocket + redis
,但是 WebSocket
老断,后面考虑做一些报文加密之类的,考虑 WebSocket
协议升级全双工通信之后Servlet
过滤器之类不再适用,而且也没有客户端推送的需求,所以考虑使用 SSE
加 MQ
的方式,可以基于当前Web安全框架
,不需要额外编码
实现方式也比较简单,后端业务处理完数据,通过 MQ
广播交换机发布广播,分布式节点收到订阅后,通过之前建立的 SSE 长连接推送刷新数据ID,前端根据事件进行更新渲染操作
SSE 部分
SSE 由三个方法分别负责建立SSE连接
、发送房间状态更新
和维持心跳连接
,每个方法都结合了SseEmitter
的功能和Spring
的异步处理机制,确保实时数据推送的可靠性和效率。
streamStockPrice
建立SSE连接:该方法通过@GetMapping
注解定义了一个SSE端点/stream/{hotelId}
,用于为指定酒店建立实时数据推送连接。核心逻辑包括:
- 创建SseEmitter:为每个酒店生成唯一标识
sId
(基于SnowFlake算法),并创建SseEmitter
实例,设置超时时间为最大值以保持长连接。
- 事件处理:通过
onCompletion
、onTimeout
和onError
回调处理连接关闭、超时和错误场景,自动从sseEmitterMap
中移除失效连接。1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
|
@SneakyThrows @GetMapping (value ="/stream/{hotelId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public SseEmitter streamStockPrice(@HotelValidator @PathVariable("hotelId") Long hotelId) { Future<SseEmitter> future1 = executor.submit(() -> { final String sId = SnowFlake.No (hotelId+"SSE"); SseEmitter emitter = new SseEmitter(0L); try { emitter.send(SseEmitter.event().reconnectTime(1000).name("connectionEstablished_"+ hotelId ).data("连接已建立完成")); emitter.onCompletion(() -> { sseEmitterMap.remove(sId); log.info("========================= SSE连接完成 ===================================:"+sId); }); emitter.onTimeout(() -> { log.warn("========================= SSE连接超时 ====================================:"+sId); sseEmitterMap.remove(sId); }); emitter.onError ((e) ->{ log.warn("========================= SSE连接错误 ====================================:"+sId); sseEmitterMap.remove(sId); }); log.info("=========================新建连接 ===================================:"+hotelId); sseEmitterMap.put (sId,emitter); return emitter; } catch ( IOException e) { sseEmitterMap.remove(sId); emitter.completeWithError(e); } return emitter; }); return future1.get (5, TimeUnit.SECONDS); }
|
sendRoomStatus
推送房间状态更新 该方法接收房间状态数据,向指定酒店的所有SSE连接推送更新。实现细节包括:
- 事件定制:通过
SseEmitter.event().name("roomStatus_...")
定义自定义事件类型,携带房间状态数据(如roomStatus
Map)。
- 精准推送:遍历
sseEmitterMap
,仅向与传入hotelId
匹配的连接发送数据,避免跨酒店干扰。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
|
public void sendRoomStatus(Map roomStatus){ executor.submit (() -> { sseEmitterMap.forEach ((key,emitter) ->{ final long hotelId =Long.parseLong (roomStatus.get ("hotelId").toString ()); final long SSEhotelId = Long.parseLong (key.split ("SSE")[0]); if (hotelId == SSEhotelId){ try { emitter.send(SseEmitter.event() .name("roomStatus_"+ key.split ("SSE")[0]) .reconnectTime(1000) .data(roomStatus)); log.debug("sendRoomStatus sent to emitter: {}", emitter); } catch (IOException | IllegalStateException e) { log.error("Emitter failed [ID:{}] - {}", emitter.hashCode(), e.getMessage()); } } }); }); }
|
sendHeartbeat
维持长连接活跃: 该方法通过@Scheduled
注解每5秒执行一次,用于发送心跳事件以保持SSE连接活跃:
- 心跳事件:使用
SseEmitter.event().comment("heartbeat")
发送注释型心跳数据,包含当前时间和连接标识sId
,防止浏览器因无数据传输而关闭连接。
- 连接监控:日志输出当前活跃连接数,便于监控系统状态。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
|
@Scheduled (fixedRate = 10000) public void sendHeartbeat () { executor.submit (() -> { try { final int size = sseEmitterMap.size (); log.info ("======================================== 当前连接数:" + size); sseEmitterMap.forEach ((key, emitter) -> { try { emitter.send (SseEmitter.event () .comment ("heartbeat") .reconnectTime (1000) .data (ImmutableMap.of ("sId", key, "time", System.currentTimeMillis ())));
if (log.isDebugEnabled ()) { log.debug ("Heartbeat sent to emitter: {}", emitter); } } catch (IOException | IllegalStateException e) { log.error ("Emitter failed [ID:{}] - {}", emitter.hashCode (), e.getMessage ()); } }); } catch (Exception e) { log.error ("+++++++++++++++++++++++++++++++++++++++++++++++++ee"); } }); }
|
涉及到的容器一个是线程池,一个是线程安全的 Map
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| @Slf4j @RestController @RequestMapping("/demo/webs/sse") public class SSEController {
private final ExecutorService executor = Executors.newCachedThreadPool();
private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();
@SneakyThrows @GetMapping (value ="/stream/{hotelId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public SseEmitter streamStockPrice(@HotelValidator @PathVariable("hotelId") Long hotelId) { ...................................... }
public void sendRoomStatus(Map roomStatus){ ..................................... }
@Scheduled (fixedRate = 5000) public void sendHeartbeat () { .............................. } }
|
MQ 部分
基于Spring Boot
的RabbitMQ
配置类,通过@Bean
注解创建了广播交换机(FanoutExchange)
和持久化队列
(Queue),并建立两者的无路由键绑定关系
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| @Configuration public class RabbitMQConfig {
public static final String ROOM_STATUS_EXCHANGE = "room_status_exchange"; public static final String ROOM_STATUS_QUEUE = "room_status_queue";
@Bean public FanoutExchange roomStatusExchange() { return new FanoutExchange(ROOM_STATUS_EXCHANGE, true, false); }
@Bean public Queue roomStatusQueue() { return new Queue(ROOM_STATUS_QUEUE, true); } @Bean public Binding roomStatusBinding(FanoutExchange roomStatusExchange, Queue roomStatusQueue) { return BindingBuilder.bind(roomStatusQueue).to(roomStatusExchange); } }
|
广播消息发送
sendBroadcast
方法接收房态数据Map
,将其序列化为JSON格式并封装为RabbitMQ
消息,通过ROOM_STATUS_EXCHANGE
广播交换机(Fanout模式)发送,路由键留空确保所有绑定该交换机的队列均能接收消息。消息头设置x-retry-count=0
表示禁用自动重试机制,由业务层自行处理异常重试逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13
|
public void sendBroadcast(Map<String, Object> msg) { log.info("广播时间:{},内容:{}", LocalDateTime.now(), msg); MessageProperties props = new MessageProperties(); props.setHeader("x-retry-count", 0); Message message = new Message(new Gson().toJson(msg).getBytes(), props); rabbitTemplate.convertAndSend(ROOM_STATUS_EXCHANGE, "", message); }
|
消息消费处理
handleRoomStatus
方法通过 @RabbitListener
监听ROOM_STATUS_QUEUE
队列,接收房态变更消息后,将消息体反序列化为Map对象,并调用sseController.sendRoomStatus()
方法将房态数据推送给前端SSE长连接
。处理完成后通过basicAck
手动确认消息消费成功,避免消息重复投递。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
|
@RabbitListener(queues = RabbitMQConstants.ROOM_STATUS_QUEUE,ackMode = "MANUAL") public void handleRoomStatus(Message message, Channel channel) throws IOException { try { String msg = new String(message.getBody()); ObjectMapper objectMapper = new ObjectMapper(); Map map = objectMapper.readValue(msg, Map.class); sseController.sendRoomStatus (map); } catch (Exception e) { log.error("消息处理异常:", e); } channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }
|
业务部分
添加埋点,在业务数据处理完成之后,推送数据ID 到MQ
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| ................................ final ImmutableMap<String, ? extends Serializable> stringImmutableMap = ImmutableMap.of ("hotelId", hotelId , "action", BusinessTypeConstants.ORDER_CHANGE_LOG_LIVE, "roomCodeNew", amsHotelRoom.getParentCode ()); notifySendOnBlockingTaskAsync (stringImmutableMap); ...................................
private static final ExecutorService executor = Executors.newFixedThreadPool(5, r -> new Thread(r, "Notify-Thread"));
public void notifySendOnBlockingTaskAsync (Map map) { executor.submit (() -> { delayedMessageService.sendBroadcast (map); });
}
|
博文部分内容参考
© 文中涉及参考链接内容版权归原作者所有,如有侵权请告知 :)
© 2018-至今 liruilonger@gmail.com, 保持署名-非商用-相同方式共享(CC BY-NC-SA 4.0)