WebSocket-java篇_java websocket
问题引入
消息推送的方式
我们要实现,服务器把消息推送到客户端,可以轮训,长轮训
还有sse
WebSocket理论
WebSocket 的由来与核心价值
-
诞生背景:解决 HTTP 协议在实时通信中的固有缺陷(单向请求-响应模式)
-
核心驱动力:
-
替代低效轮询(Polling)和长轮询(Comet)
-
满足实时应用需求(聊天、金融行情、游戏等)
-
-
核心优势:
-
全双工通信:客户端/服务器可同时发送数据
-
低延迟:从 HTTP 的数百 ms 降至 10-50ms
-
高效传输:头部开销仅 2-14 字节(vs HTTP 的数百字节)
-
-
标准化:2011 年 RFC 6455 成为正式标准
WebSocket 协议核心组成
101 Switching Protocols
)切换到 WebSocket 协议Spring Boot 深度集成方案
基础架构
核心组件详解
-
Client(客户端)
-
作用:发起连接、订阅频道、收发消息
-
为什么需要:作为通信的终端用户界面
-
解决问题:
-
提供用户交互入口
-
实现跨平台通信(Web/App/桌面)
-
-
技术实现:
const socket = new WebSocket(\"ws://yourdomain/ws-endpoint\");socket.onmessage = (event) => { console.log(\"收到消息:\", event.data);};
-
-
Endpoint(连接端点)
-
作用:处理握手请求,建立持久连接
-
为什么需要:作为WebSocket连接的入口网关
-
解决问题:
-
协议升级(HTTP→WebSocket)
-
连接生命周期管理
-
跨域处理(CORS)
-
-
Spring Boot实现:
@Configuration@EnableWebSocketMessageBrokerpublic class WebSocketConfig implements WebSocketMessageBrokerConfigurer { @Override public void registerStompEndpoints(StompEndpointRegistry registry) { registry.addEndpoint(\"/ws-endpoint\") .setAllowedOrigins(\"*\") .withSockJS(); // 浏览器兼容方案 }}
-
-
WebSocket Connection(连接管道)
-
作用:维护全双工通信通道
-
为什么需要:突破HTTP的无状态限制
-
解决问题:
-
避免频繁握手(单次握手持久连接)
-
支持双向实时通信
-
降低延迟(从HTTP的300ms+降至30ms内)
-
-
-
Message Broker(消息代理)
-
作用:消息路由、分发、存储
-
为什么需要:解耦生产者和消费者
-
解决问题:
-
海量连接下的消息分发
-
分布式系统扩展
-
消息持久化与重试
-
-
配置示例:
@Overridepublic void configureMessageBroker(MessageBrokerRegistry registry) { // 使用外部消息中间件 registry.enableStompBrokerRelay(\"/topic\", \"/queue\") .setRelayHost(\"rabbitmq-host\") .setRelayPort(61613);}
-
-
频道系统(路由核心)
频道类型 前缀 作用 解决的问题 消息流向 广播频道 /topic
公共消息广播 1:N 消息分发 (如聊天室公告) 发布者 → 所有订阅者 私有队列 /queue
点对点通信 1:1 精准投递 (如订单通知) 发布者 → 特定订阅者 用户频道 /user
用户级隔离 多设备同步 (如微信网页/App同时在线) 发布者 → 用户所有会话 -
@MessageMapping Controller(业务处理器)
-
作用:处理业务逻辑,生成响应
-
为什么需要:分离通信协议与业务逻辑
-
解决问题:
-
业务逻辑集中管理
-
消息验证与转换
-
数据库/服务集成
-
-
示例:
@MessageMapping(\"/trade\")@SendTo(\"/topic/stock-updates\")public StockUpdate handleTrade(Order order) { // 1. 验证订单 // 2. 执行交易 // 3. 生成市场数据更新 return tradingService.execute(order);}
-
架构演进价值
-
协议层优化
-
替代方案对比:
方案 延迟 开销 双向通信 频道支持 HTTP轮询 300ms+ 高 ❌ ❌ WebSocket基础 50ms 低 ✔️ ❌ WS+STOMP 30ms 中 ✔️ ✔️
-
-
工程化价值
-
业务场景适配
-
广播场景:
/topic/news
(新闻推送) -
私有场景:
/queue/user-123/notifications
(个人通知) -
混合场景:
/topic/room-{id}
+/user/queue/private
(在线教育平台)
-
总结:为什么需要此架构
-
连接管理 通过
Endpoint
统一处理握手/断开,解决连接生命周期管理混乱问题 -
消息路由 频道系统实现
发布-订阅
模式,解决海量消息精准投递问题 -
业务解耦 控制器隔离业务逻辑与通信协议,解决代码维护困难问题
-
水平扩展 消息代理支持集群部署,解决单点性能瓶颈问题
-
安全管控 频道级权限控制,解决敏感数据泄露风险
终极价值:此架构在协议层实现高效实时通信,在架构层通过频道机制解决复杂业务场景的消息路由问题,在工程层通过Spring Boot实现企业级标准化,是构建现代实时应用的基石。
原理流程
在我的E盘的WebSocket文件夹
消息执行流程(Flow)概览
建立连接(connect,连接)
Client(客户端)发起到 /ws-endpoint
的 WebSocket 握手(handshake,握手),Endpoint(端点)完成升级后建立 WebSocket Connection(WebSocket 连接)。
订阅频道(subscribe,订阅)
Client 通过 STOMP 向 broker 发送 SUBSCRIBE Frame(订阅帧),表示“我要订阅 /topic/greetings
”。
发送消息到 Controller(SEND Frame)
Client 发送 SEND Frame(发送帧),destination(目的地)为 /app/hello
。
Broker 根据 setApplicationDestinationPrefixes(\"/app\")
,将消息路由(route,路由)给匹配 @MessageMapping(\"/hello\")
的方法
Controller(控制器)处理
GreetingController.handleHello(...)
被调用(invoke,调用),执行业务逻辑,返回 Greeting
对象。
Broker(代理)转发
因为方法上有 @SendTo(\"/topic/greetings\")
,返回值被封装成 MESSAGE Frame(消息帧)发送给 Broker(消息代理)。
Broker 将该消息分发(dispatch,分发)给所有订阅(subscription,订阅)了 /topic/greetings
的客户端 session。
Client(客户端)接收(receive,接收)
Client 在订阅回调(callback,回调)中拿到服务器推送(push,推送)的消息并渲染到页面。
这就是完整的一次流程。
API
客户端
websocket对象创建
let ws = new WebSocket(URL);
URL说明
格式:协议://ip地址:端口/访问路径 协议:协议名称为 ws
websocket对象相关事件
websocket对象提供的方法
简单示例
let ws = new WebSocket(\"ws://localhost/chat\");ws.onopen = function() {};ws.onmessage = function(evt) { // 通过 evt.data 可以获取服务器发送的数据};ws.onclose = function() {};
服务端
Tomcat的7.0.5版本开始支持WebSocket,并且实现了Java WebSocket规范。
Java WebSocket应用由一系列的Endpoint组成。Endpoint是一个java对象,代表WebSocket链接的一端,对于服务端,我们可以视为处理具体WebSocket消息的接口。
我们可以通过两种方式定义Endpoint:
-
第一种是编程式,即继承类javax.websocket.Endpoint并实现其方法。
-
第二种是注解式,即定义一个POJO,并添加@ServerEndpoint相关注解。
Endpoint实例在WebSocket握手时创建,并在客户端与服务端链接过程中有效,最后在链接关闭时结束。在Endpoint接口中明确了与其生命周期相关的方法,规范实现者确保生命周期的各个阶段调用实例的相关方法。生命周期方法如下:
服务端如何接收客户端发送的数据呢?
-
编程式 通过添加 MessageHandler 消息处理器来接收消息
-
注解式 在定义 Endpoint 时,通过 @OnMessage 注解指定接收消息的方法
服务端如何推送数据给客户端呢?
发送消息则由 RemoteEndpoint 完成,其实例由 Session 维护。
发送消息有 2 种方式发送消息
-
通过 session.getBasicRemote 获取同步消息发送的实例,然后调用其 sendXxx() 方法发送消息
-
通过 session.getAsyncRemote 获取异步消息发送实例,然后调用其 sendXxx() 方法发送消息
@ServerEndpoint(\"/chat\")@Componentpublic class ChatEndpoint { @OnOpen // 连接建立时被调用 public void onOpen(Session session, EndpointConfig config) { } @OnMessage // 接收到客户端发送的数据时被调用 public void onMessage(String message) { } @OnClose // 连接关闭时被调用 public void onClose(Session session) { }}
WebSocket 消息分发的三种常见模式
session.getAsyncRemote()(
getBasicRemote).sendXxx()
方法本身并不直接区分这些模式,而是通过 目标地址(如 Session、Broadcast) 和 应用层逻辑 来实现不同的消息分发方式。
WebSocket 消息分发的三种常见模式
1. 单播(Unicast)
-
点对点发送:消息直接发送给某个特定的客户端(Session)。
-
实现方式:通过目标客户端的
session.getAsyncRemote().sendText()
。 -
示例:
// 向特定客户端发送消息targetSession.getAsyncRemote().sendText(\"Private message\");
2. 广播(Broadcast)
-
一对多发送:消息发送给所有连接的客户端(或特定分组)。
-
实现方式:遍历所有 Session 或使用
@ServerEndpoint
的全局集合。 -
示例:
// 广播给所有客户端for (Session session : allSessions) { session.getAsyncRemote().sendText(\"Broadcast message\");}
-
注意:Java WebSocket API 本身不提供原生广播方法,需自行维护 Session 集合。
3. 组播(Multicast)
-
分组发送:消息发送给订阅了特定主题(Topic)或频道的客户端。
-
实现方式:通过应用层维护分组映射(如
Map<String, Set>
)。 -
示例:
// 向订阅了 \"news\" 频道的客户端发送消息for (Session session : channelSubscribers.get(\"news\")) { session.getAsyncRemote().sendText(\"News update\");}
总结
WebSocket 的灵活性在于:sendXxx()
是工具,分发模式由开发者通过 Session 代码管理逻辑实现。
在线聊天室实现
具体代码在learnWebSocket里面
流程分析
package com.learnwebsocket.config;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.web.socket.server.standard.ServerEndpointExporter;/** * @version v1.0 * @ClassName: WebsocketConfig */@Configurationpublic class WebsocketConfig { /** * 创建一个ServerEndpointExporter对象,这个对象会自动注册使用了@ServerEndpoint注解的类 * @return */ @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); }}
后端
ServerEndpointExporter
首先,由于websocket不直接归于spring管理,属于spring的扩展模块,所以为了把websocket的实例也注册到spring里面,我们需要一个spring和websocket的连接桥梁。也就是ServerEndpointExporter。这个类负责加载websocket的端点。他同时可以被spring直接管理。
package com.learnwebsocket.config;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.web.socket.server.standard.ServerEndpointExporter;/** * @version v1.0 * @ClassName: WebsocketConfig */@Configurationpublic class WebsocketConfig { /** * 创建一个ServerEndpointExporter对象,这个对象会自动注册使用了@ServerEndpoint注解的类 * @return */ @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); }}
端点Endpoint
然后。我们需要自己创建一个端点,供ServerEndpointExporter发现管理。
这里面我们需要实现三个方法,这个上面有讲。
这里面还有广播和单播的实现代码,仔细看看。
还有的就是,由于Endpoint不直接属于spring,若要给Endpoint去配置一些东西,我们需要手动创建一个类,实现java给我们的接口,来去配置之后给spring管理
package com.learnwebsocket.ws.pojo;import com.alibaba.fastjson.JSON;import com.learnwebsocket.config.GetHttpSessionConfig;import com.learnwebsocket.utils.MessageUtils;import org.springframework.stereotype.Component;import javax.servlet.http.HttpSession;import javax.websocket.*;import javax.websocket.server.ServerEndpoint;import java.util.Map;import java.util.Set;import java.util.concurrent.ConcurrentHashMap;/** * @version v1.0 * @ClassName: ChatEndpoint * @Description: 端点 * @Author: 黑马程序员 */@ServerEndpoint(value = \"/chat\",configurator = GetHttpSessionConfig.class)@Componentpublic class ChatEndpoint { // 用来保存所有的用户 private static final Map onlineUsers = new ConcurrentHashMap(); //当前用户对应的session对象 private HttpSession httpSession; /** * 建立websocket连接后,被调用 * @param session */ @OnOpen public void onOpen(Session session, EndpointConfig config) { //1,将session进行保存 this.httpSession = (HttpSession) config.getUserProperties().get(HttpSession.class.getName()); String user = (String) this.httpSession.getAttribute(\"user\"); onlineUsers.put(user,session); //2,广播消息。需要将登陆的所有的用户推送给所有的用户 String message = MessageUtils.getMessage(true,null,getFriends()); broadcastAllUsers(message); } public Set getFriends() { Set set = onlineUsers.keySet(); return set; } // 广播所有用户 private void broadcastAllUsers(String message) { try { //遍历map集合 Set<Map.Entry> entries = onlineUsers.entrySet(); for (Map.Entry entry : entries) { //获取到所有用户对应的session对象 Session session = entry.getValue(); //发送消息 session.getBasicRemote().sendText(message); } } catch (Exception e) { //记录日志 } } /** * 浏览器发送消息到服务端,该方法被调用 * * 张三 --> 李四 * @param message */ @OnMessage public void onMessage(String message) { try { //将消息推送给指定的用户 Message msg = JSON.parseObject(message, Message.class); //获取 消息接收方的用户名 String toName = msg.getToName(); String mess = msg.getMessage(); //获取消息接收方用户对象的session对象 Session session = onlineUsers.get(toName); String user = (String) this.httpSession.getAttribute(\"user\"); String msg1 = MessageUtils.getMessage(false, user, mess); session.getBasicRemote().sendText(msg1); } catch (Exception e) { //记录日志 } } /** * 断开 websocket 连接时被调用 * @param session */ @OnClose public void onClose(Session session) { //1,从onlineUsers中剔除当前用户的session对象 String user = (String) this.httpSession.getAttribute(\"user\"); onlineUsers.remove(user); //2,通知其他所有的用户,当前用户下线了 String message = MessageUtils.getMessage(true,null,getFriends()); broadcastAllUsers(message); }}
配置类
上面的httpSession来自配置类的,因为登陆后我们把用户的名字存到了httpSession。但是websocket无法直接获取httpSession,所以要把它存到websocket配置文件里面。再获取。
package com.learnwebsocket.config;import javax.servlet.http.HttpSession;import javax.websocket.HandshakeResponse;import javax.websocket.server.HandshakeRequest;import javax.websocket.server.ServerEndpointConfig;/** * @version v1.0 * @ClassName: GetHttpSessionConfig */public class GetHttpSessionConfig extends ServerEndpointConfig.Configurator { @Override public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) { //获取HttpSession对象 HttpSession httpSession = (HttpSession) request.getHttpSession(); //将httpSession对象保存起来 sec.getUserProperties().put(HttpSession.class.getName(),httpSession); }}
前端
先登陆之后,然后向后端的端点请求websocket的连接,之后绑定三个方法。
await axios.get(\"user/getUsername\").then(res => { this.username = res.data; }); //创建webSocket对象 ws = new WebSocket(\"ws://localhost:8080/chat\"); //给ws绑定事件 ws.onopen = this.onopen; //接收到服务端推送的消息后触发 ws.onmessage = this.onMessage; ws.onclose = this.onClose;