使用rabbitmq广播模式来处理集群下的websocket消息推送
websocket属于长连接,当客户端连接上服务端后,将保持于服务端的连接。
而当websocket服务端存在集群的情况,如果需要将某个消息发送到客户端时,通过接口调用发送,这种情况只能将消息发送到与这台服务端连接的客户端,会存在部分客户无法接收消息的情况。
后续通过搜集资料选择采用了用rabbitmq来做websocket的集群,即通过使用mq的广播交换机,然后结合服务端启动创建动态队列来绑定同一个交换机,这样便能使集群中的每一个服务端都能收到消息,然后再去往连在当前服务端的websocket推送消息,不在的则跳过不发。
交换机以及队列设置
/ * @author peng * @program * @description 交换机以及队列设置 * @create 2022/03/30 22:08 /@Configurationpublic class BroadcastMqConfig { private final static String BROADCAST_EXCHANGE = "broadcast_exchange"; @Value("${server.port}") private String serverPort; public static String getQueueName() { return "broadcast:" + getLocalIP() + ":"; } / * websocket 交换机 * * @return */ @Bean public FanoutExchange fanoutDirectExchange() { return new FanoutExchange(BROADCAST_EXCHANGE); } / * websocket 队列 * * @return */ @Bean public Queue queueFanoutQueue() { return QueueBuilder.durable(BROADCAST_EXCHANGE + ":" + getLocalIP() + ":" + serverPort).build(); } / * websocket 队列交换机绑定 * * @param callQueueFanoutQueue * @param broadcastmsgFanoutDirectExchange * @return */ @Bean public Binding broadcastFanoutBinding(Queue callQueueFanoutQueue, FanoutExchange broadcastmsgFanoutDirectExchange) { return BindingBuilder .bind(callQueueFanoutQueue) .to(broadcastmsgFanoutDirectExchange); } / * 取当前系统站点本地地址 linux下 和 window下可用 * * @return */ public static String getLocalIP() { String ipStr = ""; InetAddress ip = null; try { // 如果是Windows操作系统 if (isWindowsOS()) { ip = InetAddress.getLocalHost(); } else { boolean findIp = false; Enumeration<NetworkInterface> netInterfaces = NetworkInterface .getNetworkInterfaces(); while(netInterfaces.hasMoreElements()) { if (findIp) { break; } NetworkInterface ni = netInterfaces.nextElement(); // 跳过docker网卡 if (ni.getName().contains("docker")) { continue; } // 遍历所有ip Enumeration<InetAddress> ips = ni.getInetAddresses(); while(ips.hasMoreElements()) { ip = (InetAddress) ips.nextElement(); // 127.开头的都是lookback地址 if (ip.isSiteLocalAddress() && !ip.isLoopbackAddress() && !ip.getHostAddress().contains(":")) {findIp = true;break; } } } } } catch (Exception e) { e.printStackTrace(); } if (null != ip) { ipStr = ip.getHostAddress(); } return ipStr; } / * 判断当前系统是否windows * * @return boolean */ public static boolean isWindowsOS() { boolean isWindowsOS = false; String osName = System.getProperty("os.name"); if (osName.toLowerCase().contains("windows")) { isWindowsOS = true; } return isWindowsOS; } }
上面主要是通过动态获取当前服务器的ip以及应用启动的端口来作为队列名字,但是绑定交换机是同一个,由于交换机使用的是广播交换机,此模式下会将消息发送到所有队列。
当应用启动后可自动创建以 当前应用的的ip + 端口 为名称的队列
生产者
/ * @author peng * @program * @description 生产者设置 * @create 2022/03/30 22:08 /@Component@Slf4jpublic class BroadcastmsgProducer { @Autowired private RabbitTemplate rabbitTemplate; / * 发送推送websocket消息 * * @param param */ public void sendMsg(BroadcastSendDto param) { rabbitTemplate.convertAndSend(BroadcastMqConfig.BROADCAST_EXCHANGE, "", param); }}
以上为生产者代码,因为是广播模式,所以路由key为空也可以广播到所有队列。
消费者
/ * @author peng * @program * @description 消费者 * @create 2022/03/30 22:08 /@Component@Slf4jpublic class BroadcastmsgReceiver { @Autowired private BoadcastWebSocketServer webSocketServer; @RabbitListener(queues = {"#{T(com.server.mq.BroadcastMqConfig).getBroadcastmsgQueueName()}${server.port}"}) public void pushMessageToUser(Channel channel, BroadcastSendDto param, @Header(AmqpHeaders.DELIVERY_TAG) long tag) { try { log.info("接收推送至具体用户websocket mq :{}", param); // 发送websocket消息 webSocketServer.sendMessageToUser(param); } catch (Exception e) { log.error("获取推送至具体用户websocket 异常 ...", e); } finally { try { channel.basicAck(tag, false); } catch (IOException e) { log.error("---------消息确认异常---------", e); } } }}
以上为消费者设置,消费者使用动态队列名称监听,“#T(com.server.mq.BroadcastMqConfig).getBroadcastmsgQueueName()}*” 是表示使用代码块获取返回字符串,“${server.port}”是动态获取配置文件参数值,获取端口。
当应用启动后可监听以 当前应用的的ip + 端口 为名称的队列消息
调用入口
/ * 发送消息至某个用户 * @param sendDto * @return * @throws IOException */ @PostMapping("/sendMsg") public R sendMsg(@RequestBody @Validated BroadcastSendDto sendDto) { log.info("发送消息:{}", JSON.toJSONString(sendDto)); broadcastmsgProducer.sendMsg(sendDto); return R.ok(); }
以上为通过接口调用的入口,我们此部分由于是微服务的存在,所以对外提供接口调用,便在接口处修改为发送mq消息。
如若处于设计阶段且是单机应用,可考虑直接需发送消息代码处直接发送mq消息。
以上就是我使用的处理集群的方式,这种属于比较简单粗暴的,看到网上有通过记录所有连接与哪一台服务端连接的信息,然后再通过redis等中间来获取精准通知到具体某一台服务端来推送消息。但是由于时间有限,就没有仔细研究了。