Ruoyi(若依)整合websocket实现信息推送功能(消息铃铛)_若依 websocket
实现消息推送功能
来了,来了,大家做系统应该是最关心这个功能。
【思路】
需求:对全系统【所有的业务操作】进行消息推送,有【群发】、【私发】功能、处理【消息状态(未读/已读)】,websocket持续链接防止因其他故障中断【心跳机制】
【后端篇】
1、确定自己系统的需求,先做数据表
通过代码生成,对后续推送的信息进行保存,通过is_read字段来对消息进行已读未读操作
添加mapper
/** * 设为已读 * @param id 消息的id * @return 结果 * */ public int updateWbNoticeMessageReadStatus(Long id);
添加service
/** * 设为已读 * @param id 消息的id * @return 结果 * */ public int updateWbNoticeMessageReadStatus(Long id);
添加serviceImpl
/** * 更新消息的阅读状态 * @param id 消息的id * @return */ @Override public int updateWbNoticeMessageReadStatus(Long id) { return wbNoticeMessageMapper.updateWbNoticeMessageReadStatus(id); }
添加mapper.xml下的方法
update wb_notice_message set is_read = \'1\' where id = #{id}
2、明确websocket链接
消息的推送,肯定是有推送人和被推送人,根据如何获取这些数据来确定你的websocket链接
// const token // 需要鉴权const currentUserId = this.$store.state.user.id;const currentUserNickName = this.$store.state.user.nickName;const wsUrl = `ws://localhost:8080/websocket/pushMessage?userId=${currentUserId}&nickName=${currentUserNickName}`; // 替换为你的 WebSocket 地址this.socket = new WebSocket(wsUrl);
这是我的websocket链接,可以看出我是通过前端拼接的userId和userName来获取到推送人信息的。
ps:实际开发过程中最好是通过token来获取,并解析出用户,进行后续的操作,此处是为了方便理解和通用
3、配置WebSocketConfig
package com.ruoyi.websocket.config;import com.ruoyi.websocket.handler.ChatRoomSessionIdWebsocketHandler;import com.ruoyi.websocket.handler.ChatRoomUserIdWebsocketHandler;import com.ruoyi.websocket.handler.ConnectWebsocketHandler;import com.ruoyi.websocket.handler.PushMessageWebsocketHandler;import com.ruoyi.websocket.interceptor.WebSocketInterceptor;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Configuration;import org.springframework.web.socket.config.annotation.*;@Configuration@EnableWebSocketpublic class WebSocketConfig implements WebSocketConfigurer { @Autowired private PushMessageWebsocketHandler pushMessageWebsocketHandler; @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { //连接websocket测试 registry.addHandler(new ConnectWebsocketHandler(), \"/websocket\") .setAllowedOrigins(\"*\"); // 允许跨域 //聊天室 -- sessionId版 registry.addHandler(new ChatRoomSessionIdWebsocketHandler(), \"/websocket/chatRoomSessionId\") .setAllowedOrigins(\"*\"); // 允许跨域 //聊天室 -- UserId版 registry.addHandler(new ChatRoomUserIdWebsocketHandler(), \"/websocket/chatRoomUserId\") .addInterceptors(new WebSocketInterceptor())//拦截器用来获取前端传递过来的userid .setAllowedOrigins(\"*\"); // 允许跨域 //消息推送 registry.addHandler(pushMessageWebsocketHandler, \"/websocket/pushMessage\") .addInterceptors(new WebSocketInterceptor())//拦截器用来获取前端传递过来的userid .setAllowedOrigins(\"*\"); // 允许跨域 }}
4、添加拦截器 WebSocketInterceptor 来获取到webocket链接携带的userId和nickName
package com.ruoyi.websocket.interceptor;import org.springframework.http.server.ServerHttpRequest;import org.springframework.http.server.ServerHttpResponse;import org.springframework.stereotype.Component;import org.springframework.web.socket.WebSocketHandler;import org.springframework.web.socket.server.HandshakeInterceptor;import java.net.URI;import java.util.Map;@Componentpublic class WebSocketInterceptor implements HandshakeInterceptor { @Override public boolean beforeHandshake( ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map attributes) throws Exception { URI uri = request.getURI(); String query = uri.getQuery(); // userId=xxx&nickName=yyy if (query == null) return false; Map paramMap = parseQuery(query); String userId = paramMap.get(\"userId\"); String nickName = paramMap.get(\"nickName\"); if (userId == null || nickName == null) { return false; // 拒绝握手 } // 放入 WebSocketSession attributes,后面 WebSocketHandler 可取 attributes.put(\"userId\", userId); attributes.put(\"nickName\", nickName); return true; // 允许握手 } @Override public void afterHandshake( ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) { // 握手完成后进行的操作 } //拆分传递的参数 private Map parseQuery(String query) { Map map = new java.util.HashMap(); if (query == null || query.isEmpty()) return map; String[] pairs = query.split(\"&\"); for (String pair : pairs) { int idx = pair.indexOf(\'=\'); if (idx > 0) { String key = pair.substring(0, idx); String value = pair.substring(idx + 1); map.put(key, value); } } return map; }}
5、添加 PushMessageWebsocketHandler 来处理推送信息
package com.ruoyi.websocket.handler;import com.alibaba.fastjson2.JSONObject;import com.ruoyi.common.core.domain.entity.SysUser;import com.ruoyi.system.mapper.SysUserMapper;import com.ruoyi.websocket.domain.WbNoticeMessage;import com.ruoyi.websocket.service.IWbNoticeMessageService;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import org.springframework.web.socket.*;import org.springframework.web.socket.handler.TextWebSocketHandler;import java.util.Collections;import java.util.HashSet;import java.util.List;import java.util.Set;/** * 消息推送 WebSocket Handler */@Componentpublic class PushMessageWebsocketHandler extends TextWebSocketHandler { @Autowired private IWbNoticeMessageService wbNoticeMessageService; @Autowired private SysUserMapper userMapper; // 存储所有连接的会话 private final Set sessions = Collections.synchronizedSet(new HashSet()); @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { sessions.add(session); } @Override protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { //获取前端发送的message String payload = message.getPayload(); // 解析整个 JSON 对象 JSONObject jsonObject = JSONObject.parseObject(payload); // 心跳检测 String type = jsonObject.getString(\"type\"); if (\"ping\".equalsIgnoreCase(type)) { session.sendMessage(new TextMessage(\"{\\\"type\\\":\\\"pong\\\"}\")); return; } //获取websocket携带的参数的userId和nickName // todo 前端可以通过token携带参数,然后使用ruoyi封装的token方法获取到当前用户,这里方便演示和通用性直接使用前端传递的UserId和nickName String userId = (String) session.getAttributes().get(\"userId\"); String nickName = (String) session.getAttributes().get(\"nickName\"); // 提取 data 对象--从这里添加前端所需要推送的字段 JSONObject data = jsonObject.getJSONObject(\"data\"); String title = data.getString(\"title\"); String content = data.getString(\"content\"); Long receiverId = data.getLong(\"receiverId\"); String receiverName = data.getString(\"receiverName\"); // 1. 如果receiverId为空则是群发,否则是单发,保存消息到数据库 // todo 可以自行根据前端传递的type来判断是群发还是单发,这里为了方便演示直接通过receiverId是否为空来判断 if (receiverId != null) { WbNoticeMessage wbNoticeMessage = new WbNoticeMessage(); wbNoticeMessage.setTitle(title); wbNoticeMessage.setContent(content); wbNoticeMessage.setSenderId(Long.parseLong(userId)); wbNoticeMessage.setSenderName(nickName); wbNoticeMessage.setReceiverId(receiverId); wbNoticeMessage.setReceiverName(receiverName); wbNoticeMessageService.insertWbNoticeMessage(wbNoticeMessage); } else { SysUser user = new SysUser(); List userList = userMapper.selectUserList(user); for (SysUser sysUser : userList) { WbNoticeMessage wbNoticeMessage = new WbNoticeMessage(); wbNoticeMessage.setTitle(title); wbNoticeMessage.setContent(content); wbNoticeMessage.setSenderId(Long.parseLong(userId)); wbNoticeMessage.setSenderName(nickName); wbNoticeMessage.setReceiverId(sysUser.getUserId()); wbNoticeMessage.setReceiverName(receiverName); wbNoticeMessageService.insertWbNoticeMessage(wbNoticeMessage); } } // 2. 给所有在线客户端广播消息 for (WebSocketSession s : sessions) { if (s.isOpen()) { s.sendMessage(new TextMessage(payload)); } } // todo 3.重要的信息还可以通过邮件等其他方式通知用户 } @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { sessions.remove(session); } @Override public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { exception.printStackTrace(); sessions.remove(session); if (session.isOpen()) { session.close(); } }}
【前端篇】
1、创建消息铃铛样式,封装成组件
InfoBell.vue代码
0 }\"> import { listWbNoticeMessage } from \"@/api/websocket/WbNoticeMessage\"; export default { name: \"InfoBell\", props: { refreshNoticeCount: { type: Boolean, default: false } }, data() { return { noticeContent: \"\", // 通知内容 noticeCount: 0, // 通知数量 socket: null, // WebSocket 实例 // 查询参数 queryParams: { pageNum: 1, pageSize: 10, title: null, content: null, type: null, senderId: null, senderName: null, receiverId: this.$store.state.user.id, receiverName: null, isRead: null, readTime: null, priority: null, targetUrl: null, bizType: null, bizId: null }, }; }, created() { this.getList(); }, mounted() { this.initWebSocket(); }, beforeDestroy() { this.closeWebSocket(); }, watch: { refreshNoticeCount(val) { if (val) { this.getList(); } } }, methods: { /**---------------------websocket专栏-------------------- */ /** 初始化/连接 WebSocket */ initWebSocket() { // const token // 需要鉴权 const currentUserId = this.$store.state.user.id; const currentUserNickName = this.$store.state.user.nickName; const wsUrl = `ws://localhost:8080/websocket/pushMessage?userId=${currentUserId}&nickName=${currentUserNickName}`; // 替换为你的 WebSocket 地址 this.socket = new WebSocket(wsUrl); this.socket.onopen = () => { console.log(\"头部导航消息铃铛-WebSocket 连接已建立\"); this.startHeartbeat();//启用心跳机制 }; this.socket.onmessage = (event) => { try { const msg = JSON.parse(event.data); if (msg.type === \"pong\") { console.log(\"收到心跳 pong\"); return; } } catch (e) { // 非 JSON 消息,继续执行 } this.getList(); }; this.socket.onerror = (error) => { console.error(\"头部导航消息铃铛-WebSocket 发生错误:\", error); }; this.socket.onclose = () => { console.log(\"头部导航消息铃铛-WebSocket 已关闭\"); this.stopHeartbeat(); this.tryReconnect(); }; }, /** 关闭 WebSocket */ closeWebSocket() { if (this.socket) { this.socket.close(); this.socket = null; } this.stopHeartbeat(); if (this.reconnectTimer) { clearInterval(this.reconnectTimer); this.reconnectTimer = null; } }, /** 启动心跳 */ startHeartbeat() { this.heartbeatTimer = setInterval(() => { if (this.socket && this.socket.readyState === WebSocket.OPEN) { this.socket.send(JSON.stringify({ type: \"ping\" })); console.log(\"发送心跳 ping\"); } }, 30000); // 每 30 秒 }, /** 停止心跳 */ stopHeartbeat() { if (this.heartbeatTimer) { clearInterval(this.heartbeatTimer); this.heartbeatTimer = null; } }, /** 尝试重连 */ tryReconnect() { if (this.reconnectTimer) return; this.reconnectTimer = setInterval(() => { console.log(\"尝试重连 InfoBell-WebSocket...\"); this.initWebSocket(); if (this.socket && this.socket.readyState === WebSocket.OPEN) { clearInterval(this.reconnectTimer); this.reconnectTimer = null; } }, 5000); // 每 5 秒重连一次 }, /** -------------------------- 业务处理专栏---------------------- */ /** 查询通知信息框列表 */ getList() { this.queryParams.isRead = 0; listWbNoticeMessage(this.queryParams).then(response => { this.noticeCount = response.total; this.noticeContent = `您有${this.noticeCount}条未读的信息`; }) }, /** 跳转到通知页面 */ toNoticePage() { this.$router.push(\"/websocket/pushMessage\"); }, }, }; ::v-deep .el-badge__content { margin-top: 9px; margin-right: 1px; } .badge-custom { animation: blink-animation 0.5s infinite alternate; } @keyframes blink-animation { 0% { opacity: 1; } 100% { opacity: 0.1; } }
2、在顶部导航引用消息铃铛组件(InfoBell)
引入组件后,页面就完成了
3、创建推送信息查看页面
pushMessage.vue代码
消息推送(快捷创建)
PS:不填接受人id则视为群发 推送消息 消息 {{ index + 1 }} {{ msg }} {{ parseTime(scope.row.readTime, \'{y}-{m}-{d}\') }} 设为已读 0\" :total=\"total\" :page.sync=\"queryParams.pageNum\" :limit.sync=\"queryParams.pageSize\" @pagination=\"getList\" /> {{ parseTime(scope.row.readTime, \'{y}-{m}-{d}\') }} 0\" :total=\"total\" :page.sync=\"queryParams.pageNum\" :limit.sync=\"queryParams.pageSize\" @pagination=\"getList\" /> import { listWbNoticeMessage,updateReadStatus} from \"@/api/websocket/WbNoticeMessage\"import InfoBell from \"@/components/InfoBell\";export default { name:\"pushMesage\", components: { InfoBell }, data() { return { ws: null, message: \'\', messages: [], loading: true, total: 0, WbNoticeMessageList: [], form:{}, // 查询参数 queryParams: { pageNum: 1, pageSize: 10, title: null, content: null, type: null, senderId: null, senderName: null, receiverId: this.$store.state.user.id, receiverName: null, isRead: null, readTime: null, priority: null, targetUrl: null, bizType: null, bizId: null }, activeName: \'unread\', isRefreshNoticeCount:false,//是否刷新通知数量 }; }, methods: { connectWebSocket() { // 连接 WebSocket,地址根据后端实际情况调整 const currentUserId = this.$store.state.user.id; const currentUserNickName = this.$store.state.user.nickName; this.ws = new WebSocket(`ws://localhost:8080/websocket/pushMessage?userId=${currentUserId}&nickName=${currentUserNickName}`); this.ws.onopen = () => { console.log(\"推送信息-WebSocket 已连接\"); this.addMessage(\"推送信息-WebSocket 已连接\"); }; this.ws.onmessage = event => { console.log(\"收到消息:\", event.data); this.addMessage(event.data); }; this.ws.onclose = () => { this.addMessage(\"推送信息-WebSocket 已关闭\"); }; this.ws.onerror = error => { this.addMessage(\"推送信息-WebSocket 发生错误\"); }; }, sendMessage() { if (!this.form.content.trim()) { this.$message.warning(\"请输入消息内容\"); return; } if (this.ws && this.ws.readyState === WebSocket.OPEN) { // 发送整个表单内容 this.ws.send(JSON.stringify({ data: this.form })); this.$message.success(\"消息发送成功\"); // 因为websocket发送请求是异步的,为了方便显示这里使用了延时,实际情况还是要在后端通过返回值来显示getList setTimeout(() => { this.getList(); }, 500); } else { this.$message.error(\"WebSocket 未连接\"); } }, addMessage(msg) { this.messages.push(msg); this.$nextTick(() => { // 自动滚动到底部 const container = this.$el.querySelector(\"div[style*=\'overflow-y\']\"); if (container) container.scrollTop = container.scrollHeight; }); }, /** --------------------------------- 信息模块 --------------------- */ handleClick(){ this.getList(); }, /** 查询通知信息框列表 */ getList() { this.loading = true this.queryParams.isRead = this.activeName === \'unread\' ? 0 : 1; console.log(this.queryParams); listWbNoticeMessage(this.queryParams).then(response => { this.WbNoticeMessageList = response.rows this.total = response.total this.loading = false }) }, handleUpdateReadStatus(row){ if (row.id != null) { updateReadStatus(row.id).then(response => { this.isRefreshNoticeCount = true; console.log(this.$store); this.$modal.msgSuccess(\"该信息已标记为已读~\") this.getList(); }) } } }, created() { this.getList(); }, mounted() { this.connectWebSocket(); }, beforeDestroy() { if (this.ws) { this.ws.close(); } }};
以下是快捷创建推送信息的页面
4、详解【心跳机制】
一、详解
WebSocket 的心跳机制,是一种保持连接活跃、防止断线、检测对方是否存活的机制。特别是在使用 WebSocket 建立了长连接之后,如果网络设备(如代理、网关、防火墙)或者服务端/客户端本身在长时间没有数据传输时自动断开连接,就会导致推送失败、消息丢失的问题。
二、为什么要使用心跳机制?
1、防止连接被中间设备断开
很多中间设备(比如 Nginx、CDN、防火墙)会在一段时间内没有数据传输时,主动断开“看起来闲置”的连接。
2、检测对方是否在线
如果客户端意外断线(如:网络断了、电脑睡眠、浏览器崩溃),服务器端并不知道,继续保留 WebSocket 会话资源,浪费内存。
3、实现自动重连
通过心跳,可以判断连接是否断开,如果断了,客户端就能自动发起重连。
三、心跳机制怎么工作?
通常的设计方式如下:
{ \"type\": \"ping\" }
ping
后立即回复 { \"type\": \"pong\" }
,表示“我还活着”pong
,说明可能断线,可以发起重连四、代码实操
【浏览器】,每隔30秒向【后端】发送ping信号,后端接收到了返回pong信号表示通信正常,不做任何业务处理。
可以理解成这是一个地震的救援过程:
遇难者被埋在了地底下,救援人员在进行挖地救援,遇难者每隔30秒向救援人员叫喊一声:ping!,救援人员听到了遇难者的声音得知遇难者还活着,随之回复一声:pong!。表示别怕,我正在救援。表示通信正常。
【前端发起心跳】
/** 启动心跳 */ startHeartbeat() { this.heartbeatTimer = setInterval(() => { if (this.socket && this.socket.readyState === WebSocket.OPEN) { this.socket.send(JSON.stringify({ type: \"ping\" })); console.log(\"发送心跳 ping\"); } }, 30000); // 每 30 秒 }, /** 停止心跳 */ stopHeartbeat() { if (this.heartbeatTimer) { clearInterval(this.heartbeatTimer); this.heartbeatTimer = null; } }, /** 尝试重连 */ tryReconnect() { if (this.reconnectTimer) return; this.reconnectTimer = setInterval(() => { console.log(\"正在尝试重连 InfoBell-WebSocket...\"); this.initWebSocket(); if (this.socket && this.socket.readyState === WebSocket.OPEN) { clearInterval(this.reconnectTimer); this.reconnectTimer = null; } }, 5000); // 每 5 秒重连一次 },
【后端接收心跳】
// 心跳检测 String type = jsonObject.getString(\"type\"); if (\"ping\".equalsIgnoreCase(type)) { session.sendMessage(new TextMessage(\"{\\\"type\\\":\\\"pong\\\"}\")); return; }
代码将整理成 ruoyi-vue-websocket上传到git~
git链接
https://gitee.com/chen_peng_wei/ruoyi-vue-websocket.git