> 技术文档 > WebSoket的简单使用_websocket使用

WebSoket的简单使用_websocket使用

一、WebSocket简介

1.1、双向通信/全双工

客户端和服务器之间同时双向传输,全双工通信允许客户端和服务器随时互相发送消息,不需等一方发送请求后另一方才进行响应。
适用要低延迟/实时交互的场景,如在线游戏、即时通讯、股票行情等。

1.2、开发中的问题

1、处理客户端断网的策略。
心跳检测:定期发送心跳消息以确保连接的有效性。
超时处理:在特定时间内未收到客户端的消息或心跳回应时关闭连接。
异常处理:捕获并处理连接异常,如断网错误
2、连接管理
连接断开和重连:客户端可能会由于网络波动、服务器重启等原因导致连接断开。需要实现自动重连机制。
  解决方案:客户端实现自动重连,服务器端实现连接状态的监控和重连处理。
连接数量限制:服务器可能会面临大量的并发连接,需要管理连接的生命周期。
  解决方案:使用连接池、限制单个用户的最大连接数、负载均衡。
3、数据传输
消息顺序:WebSocket 是全双工通信,消息可能会乱序到达。
  解决方案:在消息中添加序列号,客户端根据序列号重排消息。
消息大小:某些应用可能需要传输大数据,WebSocket 本身对消息大小有一定限制。
  解决方案:将大消息分割成小块发送,在客户端重新组装。
4、安全性
数据加密:WebSocket 传输的数据可以被中间人截获。
  解决方案:使用 wss:// 协议(基于 TLS 加密的 WebSocket)确保传输安全。
身份验证和授权:需要确保只有经过认证和授权的客户端才能建立 WebSocket 连接。
  解决方案:在握手阶段进行身份验证,使用 JWT 或其他令牌机制。
跨站脚本攻击 (XSS):WebSocket 可能成为 XSS 攻击的目标。
  解决方案:在服务器端验证和过滤输入数据,确保数据格式和内容安全。
5、性能优化
延迟:需要尽量减少消息传输的延迟。
  解决方案:优化网络路径、使用更快的服务器、减少数据量。
带宽消耗:频繁的消息传输会消耗大量带宽。
  解决方案:压缩消息、优化数据结构。
6、服务器架构
扩展性:需要确保 WebSocket 服务器能处理大量并发连接。
  解决方案:使用集群和负载均衡,将连接分配到多个服务器上。
高可用性:需要确保服务器在出现故障时能迅速恢复。
  解决方案:使用容错和故障转移机制,配置多个冗余服务器。

1.3、JavaScript 中 WebSocket 对象的属性和方法

WebSocket 对象:WebSocket 对象表示一个新的 WebSocket 连接。
WebSocket.onopen 事件处理程序:当 WebSocket 连接打开时触发。
WebSocket.onmessage 事件处理程序:当接收到来自 WebSocket 的消息时触发。
WebSocket.onerror 事件处理程序:当 WebSocket 发生错误时触发。
WebSocket.onclose 事件处理程序:当 WebSocket 连接关闭时触发。
WebSocket.send 方法:向 WebSocket 发送数据。
WebSocket.close 方法:关闭 WebSocket 连接。

1.4、WebSocket 的错误处理

WebSocket is not supported:当浏览器不支持 WebSocket 时,会出现此错误。解决方法是在浏览器兼容性列表中检查是否支持 WebSocket。
WebSocket connection closed:当 WebSocket 连接被关闭时,会出现此错误。解决方法是在 WebSocket.onclose 事件处理程序中进行错误处理。
WebSocket error:当 WebSocket 发生错误时,会出现此错误。解决方法是在 WebSocket.onerror 事件处理程序中进行错误处理。
WebSocket timeout:当 WebSocket 连接超时时,会出现此错误。解决方法是在 WebSocket.ontimeout 事件处理程序中进行错误处理。
WebSocket handshake error:当 WebSocket 握手失败时,会出现此错误。解决方法是在 WebSocket.onerror 事件处理程序中进行错误处理。
WebSocket closed by server:当 WebSocket 连接被服务器关闭时,会出现此错误。解决方法是在 WebSocket.onclose 事件处理程序中进行错误处理。
WebSocket closed by protocol:当 WebSocket 连接被协议错误关闭时,会出现此错误。解决方法是在 WebSocket.onclose 事件处理程序中进行错误处理。
WebSocket closed by network:当 WebSocket 连接被网络错误关闭时,会出现此错误。解决方法是在 WebSocket.onclose 事件处理程序中进行错误处理。
WebSocket closed by server:当 WebSocket 连接被服务器错误关闭时,会出现此错误。解决方法是在 WebSocket.onclose 事件处理程序中进行错误处理。

二、需求

使用人数500~1000人,时时使用不间断。

2.1、一对一单独聊天

2.1.1、聊天会话保存聊天记录(头像、图片、视频、表情包)。
2.1.2、支持发送视频、图片、表情包。
2.1.3、发出去的消息支持限时撤回(保存历史记录)。
2.1.4、会话支持删除,支持聊天信息清屏等。
2.1.5、支持聊天信息的摘取、转发。
2.1.6、支持艾特人、引用消息。

2.2、群聊

2.2.1、聊天会话保存聊天记录(头像、图片、视频、表情包)。
2.2.2、支持发送视频、图片、表情包。
2.2.3、发出去的消息支持限时撤回(保存历史记录)。
2.2.4、会话支持删除,支持聊天信息清屏等。
2.2.5、支持艾特人、引用消息。
2.2.6、有群组、管理员功能。
2.2.7、可剔除群员、支持禁言及全部禁言。

2.3、聊天会话管理

2.3.1、会话制顶、会话加特别关注、会话消息免打扰。
2.3.2、会话删除。

三、设计

3.1、系统架构(单WebSocket连接占10KB内存,单连1.2w)

3.1.1、三台服务器构建 WebSocket 服务。
3.1.2、用 Redis 存储全局会话、分布式锁

3.2、通信

3.2.1、SpringMVC: 创建/编辑/查询聊天会话,群成员的的增减等操作。
3.2.2、WebSocket: 聊天内容的传递、群公告的发布。

3.3、数据存储

3.3.1、Mysql: 用户表、用户—会话关联表、会话(单聊/群聊)表、群-成员关联表。
3.3.2、MongoDB: 聊天记录集、群公告、备份会话详情表。
3.3.3、Redis: 储存全局会话。

3.4、交互设计

3.4.1、查: 用户登录查询自己的参与的会话列表、点击会话查看聊天信息、查看群成员信息。
3.4.2、增/改:新增会话(单聊/群聊)、关闭单聊/退出群聊、修改群聊名称、屏蔽/关注群聊。
3.4.3、单聊/群聊时先创建会话,然后用SessionID去进行聊天交互。

3.5、问题点

3.5.1.MongoDB嵌套数组分页查询。
3.5.2.聊天信息的修改、撤回更新到 MongoDB中。
3.5.3.多人聊天会话很多。
3.5.4.会话是否有关闭。
3.5.5.一个会话聊天过长怎么办。

四、代码实现

4.1、引入依赖及配置

org.springframework.bootspring-boot-starter-websocketorg.projectlomboklombokorg.springframework.bootspring-boot-starter-data-mongodborg.projectlomboklombokorg.mapstructmapstruct1.4.1.Finalorg.mapstructmapstruct-processor1.4.1.Finalcom.alibaba.fastjson2fastjson22.0.50
spring.data.mongodb.host= 127.0.0.1spring.data.mongodb.database= testspring.data.mongodb.port= 27017

4.2、MongoDB端实体类

/** * 聊天信息 */@Data@Accessors(chain = true)public class ChatMessageMg { /** * 会话 ID */ private String sessionId; /** * 消息体 ID */ @Id private String messageId; /** * 发送人ID */ private String userId; /** * 发送人工号 */ private String userNo; /** * 发送时间 */ private Date sendTime; /** * 聊天内容 */ private String chatMessage; /** * 引用内容 */ private String refMessage; /** * 序号 */ private Integer sortNo; /** * 标识(0: 原信息, 1: 已撤回 , 2: 已修改) */ private Integer messageType; /** * 版本(默认0,累加) */ private Integer messageVersion;}/** * 群聊框-会话 */@Data@Accessors(chain = true)public class ChatSessionGroupMg { /** * 聊天-会话 ID */ @Id private String sessionId; /** * 会话框类型(0单聊,1群聊) */ private Integer chatType; /** * 群名称 */ private String chatName; /** * 创建时间 */ private Date chatCreateTime; /** * 最后活跃时间 */ private Date lastActiveTime; /** * 会话创建人(或群主工号) */ private String chatCreateUserNo; /** * 历史聊天集合 */ private List messageList;}/** * 单聊框-会话 */@Data@Accessors(chain = true)public class ChatSessionSingleMg { /** * 聊天-会话 ID */ @Id private String sessionId; /** * 会话框类型(0单聊,1群聊) */ private Integer chatType; /** * 创建时间 */ private Date chatCreateTime; /** * 最后活跃时间 */ private Date lastActiveTime; /** * 会话甲(创建人)名称 */ private String userAName; /** * 会话甲(创建人)工号 */ private String userANo; /** * 会话甲(创建人)头像 */ private String userAUrl; /** * 会话乙 名称 */ private String userBName; /** * 会话乙 工号 */ private String userBNo; /** * 会话乙 头像 */ private String userBUrl; /** * 历史聊天集合 */ private List messageList;}/** * 群公告 */@Data@Accessors(chain = true)public class NoticeMg {}

4.3、Mysql端实体类

/** * 群聊框-会话 */@Data@Accessors(chain = true)public class ChatSessionGroup { /** * 聊天-会话 ID */ @Id private String sessionId; /** * 会话框类型(0单聊,1群聊) */ private Integer chatType; /** * 群名称 */ private String chatName; /** * 创建时间 */ private Date chatCreateTime; /** * 最后活跃时间 */ private Date lastActiveTime; /** * 会话创建人(或群主工号) */ private String chatCreateUserNo;}/** * 单聊框-会话 */@Data@Accessors(chain = true)public class ChatSessionSingle { /** * 聊天-会话 ID */ @Id private String sessionId; /** * 会话框类型(0单聊,1群聊) */ private Integer chatType; /** * 创建时间 */ private Date chatCreateTime; /** * 最后活跃时间 */ private Date lastActiveTime; /** * 会话甲(创建人)名称 */ private String userAName; /** * 会话甲(创建人)工号 */ private String userANo; /** * 会话甲(创建人)头像 */ private String userAUrl; /** * 会话乙 名称 */ private String userBName; /** * 会话乙 工号 */ private String userBNo; /** * 会话乙 头像 */ private String userBUrl;}/** * 会话 - 成员 */@Data@Accessors(chain = true)public class ChatSessionUser { /** * ID */ private String id; /** * 会话ID */ private String sessionId; /** * 会话框类型(0单聊,1群聊) */ private Integer chatType; /** * 用户工号 */ private String userNo; /** * 用户名称 */ private String userName; /** * 用户头像 */ private String userUrl; /** * 入群时间 */ private Date inGroupTime; /** * 最后活跃时间 */ private Date lastActiveTime; /** * 是否群主(0否,1是) */ private Integer isGroupLeader; /** * 是否管理员(0否,1是) */ private Integer isAdmin; /** * 是否禁言(0否,1是) */ private Integer isSilence; /** * 禁言时长 */ private Integer silenceDuration; /** * 禁言开启时间 */ private Date openSpeakTime;}/** * 用户 */@Data@Accessors(chain = true)public class User { private String name; private Integer age;}

4.4、封装参数类

@Datapublic class ChatSessionParam { /** * 会话ID */ private String sessionId; /** * 群名称 */ private String chatName; /** * 是否 废弃 (0否 , 1是) */ private Integer isDiscard; /** * 是否 屏蔽 (0否 , 1是) */ private Integer isShield; /** * 是否 关注 (0否 , 1是) */ private Integer isWatch; /** * 群员工号 */ private List userNos;}@Data@Accessors(chain = true)public class SocketMessageParam { /** * 用户ID */ private String userId; /** * 用户工号 */ private String userNo; /** * 此聊天会话 ID */ private String chatSessionId; /** * 消息 */ private String message;}

4.5、MapStruct 工具类(不建议用)

import com.example.demo.bean.mongo.ChatSessionGroupMg;import com.example.demo.bean.mongo.ChatSessionSingleMg;import com.example.demo.bean.mysql.ChatSessionGroup;import com.example.demo.bean.mysql.ChatSessionSingle;import org.mapstruct.Mapper;import org.mapstruct.factory.Mappers;/** * MapStruct 实体类 转换工具 */@Mapperpublic interface SourceDestinationMapper { SourceDestinationMapper INSTANCE = Mappers.getMapper(SourceDestinationMapper.class); ChatSessionSingle ChatSessionSingleMg_ChatSessionSingle(ChatSessionSingleMg chatSessionSingleMg); ChatSessionGroup ChatSessionGroupMg_ChatSessionGroup(ChatSessionGroupMg chatSessionGroupMg);}

4.6、WebSocket端代码(Demo)

import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.web.socket.server.standard.ServerEndpointExporter;/** * @Description: WebSocket配置类。开启WebSocket的支持 */@Configurationpublic class WebSocketConfig { @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); }}import com.alibaba.fastjson2.JSON;import com.example.demo.bean.mongo.ChatMessageMg;import com.example.demo.bean.query.SocketMessageParam;import com.example.demo.service.WebSocketService;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.data.mongodb.core.MongoTemplate;import org.springframework.stereotype.Component;import javax.websocket.OnClose;import javax.websocket.OnError;import javax.websocket.OnMessage;import javax.websocket.OnOpen;import javax.websocket.Session;import javax.websocket.server.PathParam;import javax.websocket.server.ServerEndpoint;import java.util.Date;import java.util.List;import java.util.Map;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.atomic.AtomicInteger;/** * WebSocket 服务器类,用于处理WebSocket连接、消息、错误等事件。 */@Component@Slf4j@ServerEndpoint(\"/websocket/{userId}\")public class WebSocketServer { @Autowired private WebSocketService webSocketService; @Autowired private MongoTemplate mongoTemplate; // 记录当前在线客户端的数量 private static AtomicInteger onlineSessionClientCount = new AtomicInteger(0); // 存储在Redis中 private static Map userIdSessionMap= new ConcurrentHashMap(); /** * 当WebSocket连接打开时调用的方法 * @param userId 用户唯一ID * @param session 当前WebSocket会话 */ @OnOpen public void onOpen(@PathParam(\"userId\") String userId, Session session) { log.info(\"连接建立中 ==> session_id = {}, sid = {}\", session.getId(), userId); // 将当前会话添加到在线会话映射表中 userIdSessionMap.put(userId, session); // 在线数量+1 onlineSessionClientCount.incrementAndGet(); // 发送消息给当前连接 sendToOne(userId, \"连接成功\"); // 输出当前在线数量以及连接信息 log.info(\"连接建立成功,当前在线数为:{} ==> 开始监听新连接:session_id = {}, sid = {},。\", onlineSessionClientCount, session.getId(), userId); } /** * 当 WebSocket 连接关闭时调用的方法 * @param userId 客户端的唯一标识符 * @param session 当前WebSocket会话 */ @OnClose public void onClose(@PathParam(\"userId\") String userId, Session session) { // 在线数量-1 onlineSessionClientCount.decrementAndGet(); // 输出当前在线数量以及关闭的连接信息 log.info(\"连接关闭成功,当前在线数为:{} ==> 关闭该连接信息:userId = {}, sid = {},。\", onlineSessionClientCount, session.getId(), userId); } /** * 当从客户端接收到消息时调用的方法。 * @param socketMessage 接收到的消息 * @param session 当前WebSocket会话 */ @OnMessage public void onMessage(String socketMessage, Session session) { log.info(\"服务端收到客户端消息 ==> fromSid = {}, socketMessage = {}\", session.getId(), socketMessage); SocketMessageParam messageParam = JSON.parseObject(socketMessage,SocketMessageParam.class); System.out.println(\"----messageParam: \"+messageParam); if(messageParam!=null){ String chatSessionId = messageParam.getChatSessionId(); String userNo = messageParam.getUserNo(); String message= messageParam.getMessage(); List userIdList = webSocketService.getChatUserByUserId(userNo); if(userIdList==null||userIdList.isEmpty()){ return; } if(userIdList.size()==1){ String userId = userIdList.get(0); Session otherSession = userIdSessionMap.get(userId); otherSession.getAsyncRemote().sendText(message); ChatMessageMg chatMessageMg = new ChatMessageMg().setSessionId(chatSessionId).setUserId(userId).setUserNo(userNo) .setMessageVersion(0).setMessageType(0).setSendTime(new Date()).setChatMessage(message); webSocketService.saveSingleChatMessage(chatMessageMg); }else { for(String userId : userIdList){  Session otherSession = userIdSessionMap.get(userId);  if(otherSession!=null) { otherSession.getAsyncRemote().sendText(message); ChatMessageMg chatMessageMg = new ChatMessageMg().setSessionId(chatSessionId).setUserId(userId).setUserNo(userNo) .setMessageVersion(0).setMessageType(0).setSendTime(new Date()).setChatMessage(message); webSocketService.saveGroupChatMessage(chatMessageMg);  } } } } } /** * 当WebSocket发生错误时调用的方法。 * @param session 当前WebSocket会话 * @param error 错误信息 */ @OnError public void onError(Session session, Throwable error) { log.error(\"WebSocket发生错误,错误信息为:\" + error.getMessage()); error.printStackTrace(); } /** * 向所有在线客户端发送消息。 * @param message 要发送的消息 */ public void sendToAll(String message) { // 遍历在线会话映射表 userIdSessionMap.forEach((onlineSid, toSession) -> { try { log.info(\"服务端给客户端群发消息 ==> toSid = {}, message = {}\", onlineSid, message); toSession.getAsyncRemote().sendText(message); } catch (Exception e) { log.error(\"发送消息给客户端时发生错误:{}\", onlineSid, e); userIdSessionMap.remove(onlineSid); } }); } /** * 向单个客户端发送消息。 * @param toSid 目标客户端的唯一标识符 * @param message 要发送的消息 */ public void sendToOne(String toSid, String message) { Session toSession = userIdSessionMap.get(toSid); if (toSession == null) { log.error(\"服务端给客户端发送消息 ==> toSid = {} 不存在, message = {}\", toSid, message); return; } // 使用异步方式发送消息 log.info(\"服务端给客户端发送消息 ==> toSid = {}, message = {}\", toSid, message); toSession.getAsyncRemote().sendText(message); }}

4.7、MVC端的代码(Demo)

import com.example.demo.bean.mongo.ChatSessionGroupMg;import com.example.demo.bean.mongo.ChatSessionSingleMg;import com.example.demo.bean.mysql.ChatSessionGroup;import com.example.demo.bean.mysql.ChatSessionSingle;import com.example.demo.bean.mysql.User;import com.example.demo.bean.mysql.ChatSessionUser;import com.example.demo.bean.query.ChatSessionParam;import com.example.demo.mapper.SourceDestinationMapper;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.data.mongodb.core.MongoTemplate;import org.springframework.web.bind.annotation.ModelAttribute;import org.springframework.web.bind.annotation.PostMapping;import org.springframework.web.bind.annotation.RequestBody;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RequestParam;import org.springframework.web.bind.annotation.RestController;import java.util.ArrayList;import java.util.Date;import java.util.HashSet;import java.util.List;import java.util.Set;@Slf4j@RestController@RequestMapping(\"/sessionController/\")public class WebSessionController { @Autowired private MongoTemplate mongoTemplate; /** * 创建会话 */ @PostMapping(\"createSession\") public String createSession(@RequestBody ChatSessionParam sessionParam) { if(sessionParam==null||sessionParam.getUserNos()==null||sessionParam.getUserNos().isEmpty()){ return \"参数、聊天人员不能为空\"; } List userNos = sessionParam.getUserNos(); // 获取当前登录用户信息 String userANo =\"NO123654\"; // 去重处理 Set userNoSets = new HashSet(userNos); // 当前时间 Date nowTime = new Date(); // 会话 与 用户关联 集合 List sessionUserList = new ArrayList(); if(userNoSets.size()==1){ /* 单聊情况 */ // 1.判断是否有之前的 会话记录,有则获取之前的会话聊天记录。 // 获取单聊人员工号。 String userBNo =userNoSets.iterator().next(); if(userANo.equals(userBNo)){ return \"不能和自己聊天\"; } // 创建单聊会话 ChatSessionSingleMg chatSessionMg = new ChatSessionSingleMg().setChatType(0)  .setChatCreateTime(nowTime).setLastActiveTime(nowTime)  .setUserANo(userANo).setUserBNo(userBNo).setMessageList(new ArrayList()); ChatSessionSingle chatSessionSingle = SourceDestinationMapper.INSTANCE.ChatSessionSingleMg_ChatSessionSingle(chatSessionMg); // 持久化到 MonGoDB 并返回 SessionId mongoTemplate.insert(chatSessionMg); String sessionId = chatSessionMg.getSessionId(); chatSessionSingle.setSessionId(sessionId); // chatSession 持久化到 Mysql ... // 会话 与 用户关联 ChatSessionUser chatSessionUserA = new ChatSessionUser().setSessionId(sessionId).setChatType(0).setUserNo(userANo)  .setOpenSpeakTime(nowTime).setLastActiveTime(nowTime).setIsGroupLeader(0).setIsAdmin(0).setIsGroupLeader(0); ChatSessionUser chatSessionUserB = new ChatSessionUser().setSessionId(sessionId)  .setChatType(0).setUserNo(userBNo).setIsGroupLeader(0).setIsAdmin(0).setIsGroupLeader(0); sessionUserList.add(chatSessionUserA); sessionUserList.add(chatSessionUserB); // sessionUserList 持久化到 Mysql、Redis ... return sessionId; }else { /* 群聊情况 */ // 创建群聊对象 ChatSessionGroupMg chatSessionGroupMg =new ChatSessionGroupMg().setChatType(1)  .setChatCreateTime(nowTime).setLastActiveTime(nowTime)  .setChatName(sessionParam.getChatName()).setChatCreateUserNo(userANo).setMessageList(new ArrayList()); mongoTemplate.insert(chatSessionGroupMg); String sessionId = chatSessionGroupMg.getSessionId(); ChatSessionGroup chatSessionGroup = SourceDestinationMapper.INSTANCE.ChatSessionGroupMg_ChatSessionGroup(chatSessionGroupMg); chatSessionGroup.setSessionId(sessionId); // 群主 ChatSessionUser leadSenssionUser = new ChatSessionUser()  .setChatType(1).setSessionId(sessionId).setUserNo(userANo).setSessionId(sessionId)  .setIsGroupLeader(1).setIsAdmin(1).setIsSilence(0).setInGroupTime(nowTime).setLastActiveTime(nowTime); sessionUserList.add(leadSenssionUser); for(String usNo : userNoSets){ if(userANo.equals(usNo)){  continue; } // 创建群成员 ChatSessionUser senssionUser = new ChatSessionUser() .setChatType(1).setSessionId(sessionId).setUserNo(usNo).setSessionId(sessionId) .setIsGroupLeader(1).setIsAdmin(1).setIsSilence(0).setInGroupTime(nowTime).setLastActiveTime(nowTime); sessionUserList.add(senssionUser); } // chatSessionGroup、sessionUserList 持久化到 Mysql、Redis ... return sessionId; } } /** * 会话的(置顶、屏蔽、特别关注) */ @PostMapping(\"sessionEdit\") public String sessionEdit(@RequestBody ChatSessionParam chatSessionParam) { // 获取当前登录用户信息,查询此用户下有效的会话信息。 String userNo =\"NO123654\"; return \"Success\"; } /** * 获取当前用户会话列表 */ @RequestMapping(\"getSessionList\") public List getSessionList() { // 获取当前登录用户信息,查询此用户下有效的会话信息。 return new ArrayList(); } // http://127.0.0.1:8080/html @RequestMapping(\"/html\") public String html() { return \"index.html\"; } @ModelAttribute public void parseUser(@RequestParam(name = \"name\", defaultValue = \"unknown user\") String name , @RequestParam(name = \"age\", defaultValue = \"12\") Integer age, User user) { user.setName(\"zhangsan\"); user.setAge(18); }}import com.example.demo.bean.mongo.ChatMessageMg;import com.example.demo.bean.mongo.ChatSessionGroupMg;import com.example.demo.bean.mongo.ChatSessionSingleMg;import com.example.demo.service.WebSocketService;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.data.mongodb.core.MongoTemplate;import org.springframework.data.mongodb.core.query.Criteria;import org.springframework.data.mongodb.core.query.Query;import org.springframework.data.mongodb.core.query.Update;import org.springframework.stereotype.Service;import java.util.ArrayList;import java.util.List;@Slf4j@Servicepublic class WebSocketServiceImpl implements WebSocketService { @Autowired private MongoTemplate mongoTemplate; /** * 通过聊天会话Id 获取会话成员集合 * @param chatSessionId */ public List getChatUserByUserId(String chatSessionId){ System.out.println(\"----chatSessionId: \"+chatSessionId); // 查询 会话成员关系 获取此会话下全部成员信息 List userIdList = new ArrayList(); userIdList.add(\"111\"); userIdList.add(\"222\"); userIdList.add(\"333\"); return userIdList; } /** * 记录聊天内容 * @param chatMessageMg */ public boolean saveSingleChatMessage(ChatMessageMg chatMessageMg){ if(chatMessageMg==null||chatMessageMg.getChatMessage()==null){ return false; } String chatSessionId = chatMessageMg.getSessionId(); Query query = Query.query(Criteria.where(\"_id\").is(chatSessionId)); Update update = new Update(); update.push(\"messageList\", chatMessageMg); mongoTemplate.updateFirst(query, update, ChatSessionSingleMg.class); return true; } /** * 记录聊天内容 * @param chatMessageMg */ public boolean saveGroupChatMessage(ChatMessageMg chatMessageMg){ if(chatMessageMg==null||chatMessageMg.getChatMessage()==null){ return false; } String chatSessionId = chatMessageMg.getSessionId(); Query query = Query.query(Criteria.where(\"_id\").is(chatSessionId)); Update update = new Update(); update.push(\"messageList\", chatMessageMg); mongoTemplate.updateFirst(query, update, ChatSessionGroupMg.class); return true; }}

4.8、页面

  WebSocket Demo  
var websocket = null; var userId = Math.random().toString(36).substr(2); if(\'WebSocket\' in window){ // 连接 WebSocket 节点 websocket = new WebSocket(\"ws://localhost:8080/websocket/\"+userId); } else{ alert(\'Not support websocket\') } //连接发生错误的回调方法 websocket.onerror = function(){ setMessageInnerHTML(\"error\"); }; //连接成功建立的回调方法 websocket.onopen = function(){ setMessageInnerHTML(\"连接成功\"); } //接收到消息的回调方法 websocket.onmessage = function(event){ setMessageInnerHTML(event.data); } //连接关闭的回调方法 websocket.onclose = function(){ setMessageInnerHTML(\"close\"); } //监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。 window.onbeforeunload = function(){ websocket.close(); } //将消息显示在网页上 function setMessageInnerHTML(innerHTML){ document.getElementById(\'message\').innerHTML += innerHTML + \'
\'; } //发送消息 function send(){ var message = document.getElementById(\'text\').value; websocket.send(message); } //关闭连接 function closeWebSocket() { websocket.close(); }