> 文档中心 > 使用rabbitmq广播模式来处理集群下的websocket消息推送

使用rabbitmq广播模式来处理集群下的websocket消息推送

websocket属于长连接,当客户端连接上服务端后,将保持于服务端的连接。
而当websocket服务端存在集群的情况,如果需要将某个消息发送到客户端时,通过接口调用发送,这种情况只能将消息发送到与这台服务端连接的客户端,会存在部分客户无法接收消息的情况。

后续通过搜集资料选择采用了用rabbitmq来做websocket的集群,即通过使用mq的广播交换机,然后结合服务端启动创建动态队列来绑定同一个交换机,这样便能使集群中的每一个服务端都能收到消息,然后再去往连在当前服务端的websocket推送消息,不在的则跳过不发。

交换机以及队列设置

/** * @author peng * @program  * @description 交换机以及队列设置 * @create 2022/03/30 22:08 **/@Configurationpublic class BroadcastMqConfig {    private final static String BROADCAST_EXCHANGE = "broadcast_exchange";    @Value("${server.port}")    private String serverPort;    public static String getQueueName() { return "broadcast:" + getLocalIP() + ":";    }    /**     * websocket 交换机     *     * @return     */    @Bean    public FanoutExchange fanoutDirectExchange() { return new FanoutExchange(BROADCAST_EXCHANGE);    }    /**     * websocket 队列     *     * @return     */    @Bean    public Queue queueFanoutQueue() { return QueueBuilder.durable(BROADCAST_EXCHANGE + ":" + getLocalIP() + ":" + serverPort).build();    }    /**     * websocket 队列交换机绑定     *     * @param callQueueFanoutQueue     * @param broadcastmsgFanoutDirectExchange     * @return     */    @Bean    public Binding broadcastFanoutBinding(Queue callQueueFanoutQueue, FanoutExchange broadcastmsgFanoutDirectExchange) { return BindingBuilder  .bind(callQueueFanoutQueue)  .to(broadcastmsgFanoutDirectExchange);    }    /**     * 取当前系统站点本地地址 linux下 和 window下可用     *     * @return     */    public static String getLocalIP() { String ipStr = ""; InetAddress ip = null; try {     // 如果是Windows操作系统     if (isWindowsOS()) {  ip = InetAddress.getLocalHost();     } else {  boolean findIp = false;  Enumeration<NetworkInterface> netInterfaces = NetworkInterface   .getNetworkInterfaces();  while(netInterfaces.hasMoreElements()) {      if (findIp) {   break;      }      NetworkInterface ni = netInterfaces.nextElement();      // 跳过docker网卡      if (ni.getName().contains("docker")) {   continue;      }      // 遍历所有ip      Enumeration<InetAddress> ips = ni.getInetAddresses();      while(ips.hasMoreElements()) {   ip = (InetAddress) ips.nextElement();   // 127.开头的都是lookback地址   if (ip.isSiteLocalAddress() && !ip.isLoopbackAddress()    && !ip.getHostAddress().contains(":")) {findIp = true;break;   }      }  }     } } catch (Exception e) {     e.printStackTrace(); } if (null != ip) {     ipStr = ip.getHostAddress(); } return ipStr;    }    /**     * 判断当前系统是否windows     *     * @return boolean     */    public static boolean isWindowsOS() { boolean isWindowsOS = false; String osName = System.getProperty("os.name"); if (osName.toLowerCase().contains("windows")) {     isWindowsOS = true; } return isWindowsOS;    }    }

上面主要是通过动态获取当前服务器的ip以及应用启动的端口来作为队列名字,但是绑定交换机是同一个,由于交换机使用的是广播交换机,此模式下会将消息发送到所有队列。
当应用启动后可自动创建以 当前应用的的ip + 端口 为名称的队列

生产者

/** * @author peng * @program  * @description 生产者设置 * @create 2022/03/30 22:08 **/@Component@Slf4jpublic class BroadcastmsgProducer {    @Autowired    private RabbitTemplate rabbitTemplate;    /**     * 发送推送websocket消息     *     * @param param     */    public void sendMsg(BroadcastSendDto param) { rabbitTemplate.convertAndSend(BroadcastMqConfig.BROADCAST_EXCHANGE,  "",  param);    }}

以上为生产者代码,因为是广播模式,所以路由key为空也可以广播到所有队列。

消费者

/** * @author peng * @program  * @description 消费者 * @create 2022/03/30 22:08 **/@Component@Slf4jpublic class BroadcastmsgReceiver {    @Autowired    private BoadcastWebSocketServer webSocketServer;    @RabbitListener(queues = {"#{T(com.server.mq.BroadcastMqConfig).getBroadcastmsgQueueName()}${server.port}"})    public void pushMessageToUser(Channel channel, BroadcastSendDto param, @Header(AmqpHeaders.DELIVERY_TAG) long tag) { try {     log.info("接收推送至具体用户websocket mq :{}", param);     // 发送websocket消息     webSocketServer.sendMessageToUser(param); } catch (Exception e) {     log.error("获取推送至具体用户websocket 异常 ...", e); } finally {     try {  channel.basicAck(tag, false);     } catch (IOException e) {  log.error("---------消息确认异常---------", e);     } }    }}

以上为消费者设置,消费者使用动态队列名称监听,“#T(com.server.mq.BroadcastMqConfig).getBroadcastmsgQueueName()}*” 是表示使用代码块获取返回字符串,“${server.port}”是动态获取配置文件参数值,获取端口。
当应用启动后可监听以 当前应用的的ip + 端口 为名称的队列消息

调用入口

/**     * 发送消息至某个用户     * @param sendDto     * @return     * @throws IOException     */    @PostMapping("/sendMsg")    public R sendMsg(@RequestBody @Validated BroadcastSendDto sendDto) { log.info("发送消息:{}", JSON.toJSONString(sendDto)); broadcastmsgProducer.sendMsg(sendDto); return R.ok();    }

以上为通过接口调用的入口,我们此部分由于是微服务的存在,所以对外提供接口调用,便在接口处修改为发送mq消息。
如若处于设计阶段且是单机应用,可考虑直接需发送消息代码处直接发送mq消息。

以上就是我使用的处理集群的方式,这种属于比较简单粗暴的,看到网上有通过记录所有连接与哪一台服务端连接的信息,然后再通过redis等中间来获取精准通知到具体某一台服务端来推送消息。但是由于时间有限,就没有仔细研究了。
在这里插入图片描述

8度云软件下载中心