> 技术文档 > 【智能协同云图库】智能协同云图库第十一弹:基于 WebSocket 实现协同编辑功能、基于 Disruptor 优化 WebSocket 长连接、高并发场景下的阻塞、顺序保证与低延迟问题_websocket 保证顺序

【智能协同云图库】智能协同云图库第十一弹:基于 WebSocket 实现协同编辑功能、基于 Disruptor 优化 WebSocket 长连接、高并发场景下的阻塞、顺序保证与低延迟问题_websocket 保证顺序

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述


本节重点


上一节我们已经完成了团队空间的创建、成员管理和权限控制等功能。为了提高项目的商业价值,本节来完成本项目的亮点功能 —— 图片协同编辑

大纲:

  • 图片协同编辑需求分析
  • 图片协同编辑方案设计
  • 图片协同编辑后端开发

通过本节,你将学习到多人实时协作功能的设计开发,涉及 WebSocket、事件驱动设计、Disruptor 无锁队列等技术知识。学会后再去开发聊天室之类的业务,都会轻松很多。


一、需求分析


现在很多产品都有多人协作功能,比如协同文档、协同素材设计、协同代码编辑器等等,可以提高协作的效率。

对于我们的项目,所谓的图片协同编辑功能,是在图片编辑的基础上增加了 “协同” 的概念。

当用户编辑某张图片时,其他用户可以 实时 看到编辑效果和操作提示。

【智能协同云图库】智能协同云图库第十一弹:基于 WebSocket 实现协同编辑功能、基于 Disruptor 优化 WebSocket 长连接、高并发场景下的阻塞、顺序保证与低延迟问题_websocket 保证顺序

注意,因为只有团队空间才会有多个用户编辑同一张图片,所以该功能只对团队空间开放,需要成员具有编辑权限。协同的图片编辑操作包括左旋、右旋、放大、缩小。


二、方案设计


虽然需求介绍很简单,但是涉及到多人协作的业务,有很多问题需要考虑,比如:

  • 多个用户之间如何进行交互?
  • 如何防止协作编辑时出现冲突?
  • 如何提高协作的实时性?

协作交互流程


多人协作时,每个用户的动作都需要通知到其他用户,收到通知消息的用户需要进行相应的处理。

比如用户 A 放大了图片,就需要给其他正在编辑的用户发送 “图片放大” 消息,其他用户收到这个消息后,需要同步放大自己界面上的图片。

这其实是一种 事件驱动 的架构设计思想,协作编辑中的每个用户动作本质上是一个 事件,执行动作时会产生事件并提交给服务器;服务器收到事件后,会转发给其他用户;其他用户收到事件后,就要作为事件的消费者来处理事件。

流程如图:

【智能协同云图库】智能协同云图库第十一弹:基于 WebSocket 实现协同编辑功能、基于 Disruptor 优化 WebSocket 长连接、高并发场景下的阻塞、顺序保证与低延迟问题_websocket 保证顺序

相比于生产者直接调用消费者,事件驱动模型的主要优点在于 解耦 异步性

在事件驱动模型中,生产者和消费者不需要直接依赖于彼此的实现,生产者只需触发事件并将其发送到事件分发器,消费者则根据事件类型处理逻辑。这样多个消费者可以独立响应同一事件(比如一个用户旋转了图片,其他用户都能同步),系统更加灵活,可扩展性更强。

此外,事件驱动还可以提升系统的 并发性实时性,可以理解为多引入了一个中介来帮忙,通过异步消息传递,减少了阻塞和等待,能够更高效地处理多个并发任务。

老师找一个课代表,就不需要自己一个一个地收作业了,可以做其他事情。


下面我们按照·事件驱动的设计·,来详细列举协作编辑的交互流程:

事件触发者(用户 A 的动作) 事件类型(发送消息) 事件消费者(其他用户的处理) 用户 A 建立连接,加入编辑 INFO 显示“用户 A 加入编辑”的通知 用户 A 执行编辑操作 EDIT_ACTION 放大 / 缩小 / 左旋 / 右旋当前图片 用户 A 断开连接,离开编辑 INFO 显示“用户 A 离开编辑”的通知

解决协作冲突


1、解决方案


假设这样一种场景:小雷和李蛋同时快速点击了十次旋转,最终的结果会是怎样的呢?

如果所有事件都是按顺序处理的,那结果就很清晰了,但事实上,为了提高性能和响应速度,事件通常是 并发 的,而不是严格的顺序执行

这种并发操作会引发 协作冲突,导致其他用户看到的旋转效果是乱序的。那么你会怎么解决协作冲突的问题呢?

我们可以通过业务设计来减少开发成本,比如约定 同一时刻只允许一位用户进入编辑图片的状态,此时其他用户只能实时浏览到修改效果,但不能参与编辑;

进入编辑状态的用户可以退出编辑,其他用户才可以进入编辑状态。类似于给图片编辑这个动作加了一把锁,直接从源头上解决了编辑冲突的问题。

此时,协作编辑的交互流程又要增加 2 个动作 —— 进入编辑状态和退出编辑状态

事件触发者(用户 A 的动作) 事件类型(发送消息) 事件消费者(其他用户的处理) 用户 A 建立连接,加入编辑 INFO 显示“用户 A 加入编辑”的通知 用户 A 进入编辑状态 ENTER_EDIT 其他用户界面显示“用户 A 开始编辑图片”,锁定编辑状态 用户 A 执行编辑操作 EDIT_ACTION 放大 / 缩小 / 左旋 / 右旋当前图片 用户 A 退出编辑状态 EXIT_EDIT 解锁编辑状态,提示其他用户可以进入编辑状态 用户 A 断开连接,离开编辑 INFO 显示“用户 A 离开编辑”的通知,并释放编辑状态 用户 A 发送了错误的消息 ERROR 显示错误消息的通知

其实核心流程是前 5 行,但是考虑到前端传递了错误参数的情况,我们新增一种 ERROR 事件类型,可用于展示错误提示信息。

在本项目中,我们就采用这种方案,不仅实现简单、流程清晰,也尽最大可能减少了编辑冲突的风险。但这种方案的缺点也很明显,减少了实时协作的便利性,对于协作设计、协作编码、协作文档的场景,同一时间只能有一个用户编辑,提高的效率有限。

所以这里再分享另外一种实时协同算法作为扩展知识。


2、扩展知识 - OT 算法实时协同


OT 算法(Operational Transformation)是一种支持分布式系统中,多个用户实时协作编辑的核心算法,广泛应用于在线文档协作等场景。OT 算法的主要功能是解决并发编辑冲突,确保编辑结果在所有用户终端一致

OT 算法其实很好理解,先看下 3 个核心概念:

  • 操作 (Operation) :表示用户对协作内容的修改,比如插入字符、删除字符等。
  • 转化 (Transformation) :当多个用户同时编辑内容时,OT 会根据操作的上下文将它们转化,使得这些操作可以按照不同的顺序应用,而结果保持一致。
  • 因果一致性 :OT 算法确保操作按照用户看到的顺序被正确执行,即每个用户的操作基于最新的内容状态。

其中,最重要的就是 转化 步骤了,相当于有一个负责人统一收集大家的操作,然后按照设定的规则和信息进行排序与合并,最终给大家一个统一的结果。

举一个简单的例子:

假设初始内容是 \"abc\",用户 A 和 B 同时进行编辑:

  • 用户 A 在位置 1 插入 \"x\"
  • 用户 B 在位置 2 删除 \"b\"

如果不使用 OT 算法,结果是:

  • 用户 A 操作后,内容变为 \"axbc\"
  • 用户 B 操作后,内容变为 \"ac\"

如果直接应用 B 的操作到 A 的结果,得到的是 \"ac\",对于 A 来说,相当于删除了 \"b\",A 会感到一脸懵逼。


如果使用 OT 算法,结果是:

  • 用户 A 的操作,应用后内容为 \"axbc\"
  • 用户 B 的操作经过 OT 转化为删除 \"b\"\"axbc\" 中的新位置
  • 最终用户 A 和 B 的内容都一致为 \"axc\",符合预期。

OT 算法确保无论用户编辑的顺序如何,最终内容是一致的。

当然,具体的 OT 算法还是要根据需求来设计了,协作密度越高,算法设计难度越大。

此外,还有一种与 OT 类似的协同算法 CRDT(Conflict-free Replicated Data Type),其通过数学模型,实现无需中心化转化的冲突解决,在离线协作场景中更具优势,感兴趣的同学可以自行了解。


3、提高协作实时性


在实时通讯的业务场景中,常用的技术方案包括长轮询、SSE 和 WebSocket

由于我们的业务需求需要实现频繁且高效的双向通信,因此我们选用 WebSocket 来实现即时通

讯。


1、什么是 WebSocket?

WebSocket 是一种 全双工通信协议,让客户端(比如浏览器)和服务器之间能够保持实时、持续的连接。和传统的 HTTP 请求 - 响应模式不同,WebSocket 是一条 “常开的隧道”,连接的双方可以随时发送和接收数据,而不需要不断建立和关闭连接。

打个比方:

  • HTTP 就像点外卖:每次下单(请求) - 到货(响应)都是一次独立的操作,完成后连接关闭。
  • WebSocket 像是打电话:你打通了电话(建立连接),可以随时聊天(双向通信),直到挂断(关闭连接)。

【智能协同云图库】智能协同云图库第十一弹:基于 WebSocket 实现协同编辑功能、基于 Disruptor 优化 WebSocket 长连接、高并发场景下的阻塞、顺序保证与低延迟问题_websocket 保证顺序


2、WebSocket 的应用场景

WebSocket 的主要作用是 实现实时数据传输,适用于需要频繁交互或者实时更新数据的场景。比如:

  • 即时通讯(聊天软件、实时协作工具)
  • 实时数据更新(股票行情、体育比赛比分)
  • 在线游戏(多人实时互动)
  • 物联网(设备状态实时传输)
  • 协同编辑(像语雀这样的多人协作编辑)

通过 WebSocket,客户端与服务器之间能够显著减少消息传输的延迟,提高通信效率,同时降低数据传输的开销


3、WebSocket HTTP 的关系

WebSocket 和 HTTP 是两种不同的通信协议,但它们是紧密相关的,都是基于 TCP 协议、都可以在同样的端口上工作(比如 80 和 443)。首先要明确,WebSocket 是建立在 HTTP 基础之上的!

WebSocket 的连接需要通过 HTTP 协议发起一个握手(称为 HTTP Upgrade 请求),这个握手请求是 WebSocket 建立连接的前提,表明希望切换协议;服务器如果支持 WebSocket,会返回一个 HTTP 101 状态码,表示协议切换成功。

握手完成后,HTTP 协议的作用结束,通信会切换为 WebSocket 协议,双方可以开始全双工通信。


二者的区别如下,大家了解一下就好:

对比项 HTTP WebSocket 通信模式 请求 - 响应(单向) 全双工通信(双向) 连接状态 每次请求创建新的连接 握手后保持持续连接 数据传输效率 每次通信都需要带完整头部,开销大 数据帧小,传输高效 适用场景 静态网页加载、API 调用等非实时场景 实时交互场景,如聊天、游戏、直播等
4、WebSocket 协作编辑的流程

通过 WebSocket 实时通信的能力,可以将用户的编辑操作发给 WebSocket 服务器,再由服务器转发给其他连接服务器的用户前端,前端就可以根据操作处理图片。

【智能协同云图库】智能协同云图库第十一弹:基于 WebSocket 实现协同编辑功能、基于 Disruptor 优化 WebSocket 长连接、高并发场景下的阻塞、顺序保证与低延迟问题_websocket 保证顺序

具体的业务流程:

  1. 建立连接之前,先进行用户权限校验;校验通过后,将``登录用户信息、要编辑的图片信息保存到要建立的 WebSocket 连接的会话属性`中。
  2. 建立连接成功后,将 WebSocket 会话保存到该图片对应的会话集合中,便于后续分发消息给其他会话。
  3. 前端将消息发送到后端,后端根据消息类型分发到对应的处理器
  4. 处理器处理消息,将处理结果作为消息发送给需要的 WebSocket 客户端
  5. 当前端断开连接时,删除会话集合中的 WebSocket 会话,释放资源。

【智能协同云图库】智能协同云图库第十一弹:基于 WebSocket 实现协同编辑功能、基于 Disruptor 优化 WebSocket 长连接、高并发场景下的阻塞、顺序保证与低延迟问题_websocket 保证顺序

和 HTTP 请求一样,前端和 WebSocket 服务器之间传输信息时,也可以通过 JSON 格式对数据进行序列化


5、WebSocket 的实现方式

对于 Java Spring 项目,主要有原生 WebSocket(基于 WebSocketHandler 实现)、STOMP、WebFlux 这 3 种实现方式。

它们之间的对比如下:

实现方式 特点 优点 缺点 适用场景 原生 WebSocket 低层 API,手动管理连接与消息 轻量、灵活、适用于简单点对点通信 需要手动管理会话和分发,不支持 STOMP 简单的实时推送,低并发场景 WebSocket + STOMP + SockJS 基于 STOMP,支持发布 / 订阅模式 支持 STOMP、消息代理、适配 SockJS 依赖外部代理,配置较复杂 聊天室、多人协作,高级实时应用 WebFlux + Reactive WebSocket 基于 WebFlux 的响应式实现 高并发、非阻塞、适用于大流量场景 学习曲线高,不支持 STOMP 高并发场景、大数据流推送

选择建议:对于大多数简单实时推送,选用原生 WebSocket;对于复杂的聊天室和协同系统,选用 WebSocket + STOMP + SockJS;对于高并发、低延迟数据流推送,选用 WebFlux + Reactive WebSocket。

对于我们的项目,并发要求不高,选择 Spring 原生的 WebSocket 来降低开发成本。明确方案后,我们进入后端开发。


三、后端开发


1、引入 WebSocket 依赖


引入依赖:

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId></dependency>

新建 manager.websocket 包,所有和 WebSocket 相关的代码都放到该包下。


2、定义数据模型


新建 websocket.model 包,存放数据模型,包括请求类、响应类、枚举类。

1. 定义图片编辑请求消息,也就是前端要发送给后端的参数:

【智能协同云图库】智能协同云图库第十一弹:基于 WebSocket 实现协同编辑功能、基于 Disruptor 优化 WebSocket 长连接、高并发场景下的阻塞、顺序保证与低延迟问题_websocket 保证顺序

@Data@NoArgsConstructor@AllArgsConstructorpublic class PictureEditRequestMessage { /** * 消息类型,例如 \"ENTER_EDIT\", \"EXIT_EDIT\", \"EDIT_ACTION\" */ private String type; /** * 执行的编辑动作 */ private String editAction;}

2. 定义图片编辑响应消息,也就是后端要发送给前端的信息:

【智能协同云图库】智能协同云图库第十一弹:基于 WebSocket 实现协同编辑功能、基于 Disruptor 优化 WebSocket 长连接、高并发场景下的阻塞、顺序保证与低延迟问题_websocket 保证顺序

@Data@NoArgsConstructor@AllArgsConstructorpublic class PictureEditResponseMessage { /** * 消息类型,例如 \"INFO\", \"ERROR\", \"ENTER_EDIT\", \"EXIT_EDIT\", \"EDIT_ACTION\" */ private String type; /** * 信息 */ private String message; /** * 执行的编辑动作 */ private String editAction; /** * 用户信息 */ private UserVO user;}

3. 定义图片编辑消息类型枚举,便于后续根据消息类型进行相应的处理:

【智能协同云图库】智能协同云图库第十一弹:基于 WebSocket 实现协同编辑功能、基于 Disruptor 优化 WebSocket 长连接、高并发场景下的阻塞、顺序保证与低延迟问题_websocket 保证顺序

@Getterpublic enum PictureEditMessageTypeEnum { INFO(\"发送通知\", \"INFO\"), ERROR(\"发送错误\", \"ERROR\"), ENTER_EDIT(\"进入编辑状态\", \"ENTER_EDIT\"), EXIT_EDIT(\"退出编辑状态\", \"EXIT_EDIT\"), EDIT_ACTION(\"执行编辑操作\", \"EDIT_ACTION\"); private final String text; private final String value; PictureEditMessageTypeEnum(String text, String value) { this.text = text; this.value = value; } /** * 根据 value 获取枚举 */ public static PictureEditMessageTypeEnum getEnumByValue(String value) { if (value == null || value.isEmpty()) { return null; } for (PictureEditMessageTypeEnum typeEnum : PictureEditMessageTypeEnum.values()) { if (typeEnum.value.equals(value)) { return typeEnum; } } return null; }}

4. 定义图片编辑操作类型枚举:

【智能协同云图库】智能协同云图库第十一弹:基于 WebSocket 实现协同编辑功能、基于 Disruptor 优化 WebSocket 长连接、高并发场景下的阻塞、顺序保证与低延迟问题_websocket 保证顺序

@Getterpublic enum PictureEditActionEnum { ZOOM_IN(\"放大操作\", \"ZOOM_IN\"), ZOOM_OUT(\"缩小操作\", \"ZOOM_OUT\"), ROTATE_LEFT(\"左旋操作\", \"ROTATE_LEFT\"), ROTATE_RIGHT(\"右旋操作\", \"ROTATE_RIGHT\"); private final String text; private final String value; PictureEditActionEnum(String text, String value) { this.text = text; this.value = value; } /** * 根据 value 获取枚举 */ public static PictureEditActionEnum getEnumByValue(String value) { if (value == null || value.isEmpty()) { return null; } for (PictureEditActionEnum actionEnum : PictureEditActionEnum.values()) { if (actionEnum.value.equals(value)) { return actionEnum; } } return null; }}

3、WebSocket 拦截器 - 权限校验


在 WebSocket 连接前需要进行权限校验,如果发现用户没有团队空间内编辑图片的权限,则拒绝握手,可以通过定义一个 WebSocket 拦截器实现这个能力。

此外,由于 HTTP 和 WebSocket 的区别,我们不能在后续收到前端消息时直接从 request 对象中获取到登录用户信息,因此也需要通过 WebSocket 拦截器,为即将建立连接的 WebSocket 会话指定一些属性,比如登录用户信息、编辑的图片 id 等。

【智能协同云图库】智能协同云图库第十一弹:基于 WebSocket 实现协同编辑功能、基于 Disruptor 优化 WebSocket 长连接、高并发场景下的阻塞、顺序保证与低延迟问题_websocket 保证顺序

编写拦截器的代码,需要实现 HandshakeInterceptor 接口:

/** * Websocket 拦截器, 建立连接前需要先进行校验 */@Slf4jpublic class WsHandshakeInterceptor implements HandshakeInterceptor { @Resource private UserService userService; @Resource private PictureService pictureService; @Resource private SpaceService spaceService; @Resource private SpaceUserAuthManager spaceUserAuthManager; // 开发团队空间时, 开发的公共功能对象, 里面有方法, 可以根据不同情况, 返回当前登录用户的权限列表 // 1. 实现 HandshakeInterceptor 接口, 重写握手前、握手后的两个方法的逻辑 /** * 当前方法是建立连接前的校验, 如果校验信息没通过, 返回 false 连接失败即可, 无需抛异常, 通过打日志确定错误位置 * @param request * @param response * @param wsHandler * @param attributes 给 WebSocketSession 会话设置属性 * @return * @throws Exception */ @Override public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception { // 2. 握手前的逻辑, 在这个方法中完成建立连接前的校验, 步骤如下: // 获取登录用户 // 校验用户是否有编辑当前图片的权限 // 如果是团队空间, 并且有编辑者权限, 才能建立连接 // 设置用户登录信息等属性到 WebSocket 会话中 if(request instanceof ServletServerHttpRequest){ // 3. 从 ServletServerHttpRequest 中拿到 ServletRequest HttpServletRequest httpServletRequest = ((ServletServerHttpRequest) request).getServletRequest(); // 4. 从请求中获取信息, 如用户信息, 正在编辑的图片信息 String pictureId = httpServletRequest.getParameter(\"pictureId\"); if(StrUtil.isBlank(pictureId)){ // 找不到当前编辑的图片 log.error(\"缺少体图片参数, 拒绝握手\"); return false; } // 5. 获取当前登录用户信息 User loginUser = userService.getLoginUser(httpServletRequest); if(ObjUtil.isEmpty(loginUser)){ log.error(\"用户未登录, 拒绝握手\"); return false; } // 6. 查到当前图片信息, 由于校验当前登录用户是否有编辑权限 Picture picture = pictureService.getById(pictureId); if(ObjUtil.isEmpty(picture)){ log.error(\"图片不存在, 拒绝握手\"); return false; } Long spaceId = picture.getSpaceId(); Space space = null; // 8. 校验图片所在空间是否存在, 以及所在空间是否为团队空间 if(spaceId != null){ space = spaceService.getById(spaceId); if(ObjUtil.isEmpty(space)){  log.error(\"图片所在空间不存在, 拒绝握手\");  return false; } if(space.getSpaceType() != SpaceTypeEnum.TEAM.getValue()){  log.error(\"图片所在空间不是团队空间, 拒绝握手\");  return false; } } // 9. spaceId 为空, 表示当前空间是公共图库 // 公共图库理论上不支持协同编辑, 但是为了拓展多个管理员协同编辑公共空间图片, 前面的代码把 space 抽出来 List<String> permissionList = spaceUserAuthManager.getPermissionList(space, loginUser); // 引入开发团队空间时的 bean: SpaceUserAuthManager, 可以调用校验方法 // getPermissionList() 有逻辑, 如果 space == null && 是管理员, 返回管理员权限列表 // 10. 校验编辑权限, 如果当前权限列表没有编辑权限, 拒绝握手 if (!permissionList.contains(SpaceUserPermissionConstant.PICTURE_EDIT)){ log.error(\"用户没有编辑图片的权限, 拒绝握手\"); return false; } // 11. 校验通过, 可以握手, 设置登录用户等信息到 WebSocket 会话中 attributes.put(\"user\", loginUser); attributes.put(\"userId\", loginUser.getId()); // 不一定用得上, 但是可以传 attributes.put(\"pictureId\", Long.valueOf(pictureId)); // 一定要传, 一会要维护图片ID对应的会话集合 // 从请求中获取到的 pictureId 是 String 类型的, 需要转为 Long 类型 // 在握手通过的位置, 向参数 Map attributes 插入键值对, 即可把这些信息预置到即将创建的 WebSocket 会话中 } return true; } @Override public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) { }}

4、WebSocket 处理器


(1) 定义 WebSocket 处理器类


我们需要定义 WebSocket 处理器类,在连接成功、连接关闭、接收到客户端消息时进行相应的处理。

【智能协同云图库】智能协同云图库第十一弹:基于 WebSocket 实现协同编辑功能、基于 Disruptor 优化 WebSocket 长连接、高并发场景下的阻塞、顺序保证与低延迟问题_websocket 保证顺序

@Componentpublic class PictureEditHandler extends TextWebSocketHandler {}

可以继承 TextWebSocketHandler 抽象类,这样就能以字符串的方式发送和接受消息了;

因为后续都是使用 JSON 进行前后端的数据交互,因此继承 WebSocketHandler 衍生的 TextWebSocketHandler;


接下来,我们要重写抽象类中的三个方法:

【智能协同云图库】智能协同云图库第十一弹:基于 WebSocket 实现协同编辑功能、基于 Disruptor 优化 WebSocket 长连接、高并发场景下的阻塞、顺序保证与低延迟问题_websocket 保证顺序


(2) 定义全局集合常量


首先在处理器类中定义 2 个常量,分别为:

  • 保存当前正在编辑的用户 id,执行编辑操作、进入或退出编辑时都会校验。
  • 保存参与编辑图片的用户 WebSocket 会话的集合。由于每个图片的协作编辑都是相互独立的,所以需要用 Map 来区分每个图片 id 对应的数据

代码如下:

@Componentpublic class PictureEditHandler extends TextWebSocketHandler { // 每张图片的编辑状态,key: pictureId, value: 当前正在编辑的用户 ID private final Map<Long, Long> pictureEditingUsers = new ConcurrentHashMap<>(); // 保存所有连接的会话,key: pictureId, value: 用户会话集合 private final Map<Long, Set<WebSocketSession>> pictureSessions = new ConcurrentHashMap<>(); // .....}

为保证线程安全,全局常量集合pictureSessionspictureEditingUsers 都必须使用 ConcurrentHashMap

  1. pictureSessions 的 key 为 pictureId,value 为 Set
    同一个 PictureEditHandler 实例会被所有 WebSocket 连接共享,多个线程(连接)会并发读写该 Map;由于可能同时有多个 WebSocket 客户端建立连接和发送消息,集合要使用并发包(JUC)中的 ConcurrentHashMap,来保证线程安全。只有 ConcurrentHashMap 才能避免竞态条件和数据丢失

  2. pictureEditingUsers 的 key 为 pictureId,value 为当前正在编辑该图片的 userId。 该 Map 同样会被并发访问,用于判断某张图片是否已有用户正在编辑;必须使用线程安全的 ConcurrentHashMap 来确保状态一致。


(3) 定义公共方法:广播编辑信息


由于接下来很多消息,都需要传递给所有协作者,所以先编写一个 广播消息 的方法。

  • 该方法会根据 pictureId,将响应消息发送给编辑该图片的所有会话。
  • 考虑到可能会有消息,不需要发送给编辑者本人的情况,该方法还可以接受 excludeSession 参数,支持排除掉向某个会话发送消息。

代码如下:

private void broadcastToPicture(Long pictureId, PictureEditResponseMessage pictureEditResponseMessage,WebSocketSession excludeSession) throws Exception { // 12. 加上参数 WebSocketSession excludeSession, 表示不需要接收广播信息的 session // 也就是 Map value 的会话集合中, excludeSession 指的是编辑操作对应的编辑者的 session // 编辑操作, 需要对一个 session 集合中, 除编辑者本人 session 外的所有 session 广播信息 // 1. 根据全局常量集合 pictureSessions 获取编辑当前图片对应的会话集合 Set<WebSocketSession> sessionSet = pictureSessions.get(pictureId); // 2. 集合不为空, 播放广播 if(!ObjUtil.isNotEmpty(sessionSet)){ // 3. 遍历会话集合中的每一个会话 for(WebSocketSession session : sessionSet){ // 8. 创建 ObjectMapper 对象, 注意包是 jackson ObjectMapper objectMapper = new ObjectMapper(); // 9. 配置序列化: 将 Long 类型转为 String , 解决精度丢失的问题 SimpleModule module = new SimpleModule(); module.addSerializer(Long.class, ToStringSerializer.instance); module.addSerializer(Long.TYPE, ToStringSerializer.instance); objectMapper.registerModule(module); // 10. 将 pictureEditResponseMessage 的所有属性序列化为 JSON 字符串 String message = objectMapper.writeValueAsString(pictureEditResponseMessage); // 可以处理: pictureEditResponseMessage 的 UserVO 的 Long 转为 String 的情况, Hutool 不行或者很麻烦 // 11. 注释掉 5、6, 将完全转换为 String 的编辑请求传入会话信息中 TextMessage textMessage = new TextMessage(message); // 5. 将当前编辑的响应信息, 从 JSON 转为 JSON 格式的 String // String str = JSONUtil.toJsonStr(pictureEditResponseMessage); // 6. 创建一个 TextMessage 对象, 将编辑信息字符串作为参数, 调用对应的构造方法 // TextMessage textMessage = new TextMessage(str); // 4. 判断当前会话元素是否为开启的状态 if(session.isOpen()){ // 13. 对会话集合中, 除了编辑者本人 session 外的所有 session, 广播当前编辑者编辑操作的信息 if(excludeSession != null && session.equals(excludeSession)){  // 当前编辑者不发送信息  continue; } // 7. 将 TextMessage 对象(当前编辑者编辑操作的信息)作为参数, 传入会话元素的信息属性中 session.sendMessage(textMessage); // sendMessage() 需要抛异常 } } }}// 14. 编写一个重载方法, 支持不传 WebSocketSession excludeSession 参数private void broadcastToPicture(Long pictureId,PictureEditResponseMessage pictureEditResponseMessage) throws Exception { // 调用写好的方法, excludeSession 传 null, 能处理更多种情况 broadcastToPicture(pictureId, pictureEditResponseMessage, null);}

上述代码中有个小细节(注释 8~11):

【智能协同云图库】智能协同云图库第十一弹:基于 WebSocket 实现协同编辑功能、基于 Disruptor 优化 WebSocket 长连接、高并发场景下的阻塞、顺序保证与低延迟问题_websocket 保证顺序

前端 JS 无法接收 Long 类型的属性(第一期编写 config.JsonConfig 解决 Long 类型精度丢失问题, 也就是将 HTTP 响应的信息从 Long 转 String 再返回给前端):

【智能协同云图库】智能协同云图库第十一弹:基于 WebSocket 实现协同编辑功能、基于 Disruptor 优化 WebSocket 长连接、高并发场景下的阻塞、顺序保证与低延迟问题_websocket 保证顺序

@JsonComponentpublic class JsonConfig { /** * 添加 Long 转 json 精度丢失的配置 */ @Bean public ObjectMapper jacksonObjectMapper(Jackson2ObjectMapperBuilder builder) { ObjectMapper objectMapper = builder.createXmlMapper(false).build(); SimpleModule module = new SimpleModule(); module.addSerializer(Long.class, ToStringSerializer.instance); module.addSerializer(Long.TYPE, ToStringSerializer.instance); objectMapper.registerModule(module); return objectMapper; }}

由于前端 JS 的长整数可能会丢失精度,所以使用 Jackson 自定义序列化器,在将对象转换为 JSON 字符串时,将 Long 类型转换为 String 类型;

HTTP 请求需要经过这样的处理,才可以返回响应给前端,我们当前的 WebSocket 请求也需要处理 Long 类型精度丢失的问题;

否则无法拿到正在编辑图片的用户正确的 ID,就不好判断该用户是否有权限编辑该图片了。


(4) 实现连接建立成功后执行的方法


实现连接建立成功后执行的方法,保存会话到集合中,并且给其他会话发送消息:

@Resourceprivate UserService userService;@Overridepublic void afterConnectionEstablished(WebSocketSession session) throws Exception { // 1. 在 WebSocket 拦截器权限校验中, 我们在 attributes(WebSocketSession 参数) 中设置了登录用户、图片 id // 2. 从 session 属性中获取到当前登录用户消息, 当前编辑图片的 id User user = (User) session.getAttributes().get(\"user\"); Long pictureId = (Long) session.getAttributes().get(\"pictureId\"); // 3. 保存会话到集合中 // 4. 如果是首次将会话保存到 pictureSessions 中,此时需要初始化 pictureSessions 空集合  pictureSessions.putIfAbsent(pictureId, ConcurrentHashMap.newKeySet()); // 5. 保存会话到集合中 pictureSessions.get(pictureId).add(session); // 6. 构造编辑响应消息 PictureEditResponseMessage pictureEditResponseMessage = new PictureEditResponseMessage(); pictureEditResponseMessage.setType(\"ENTER_EDIT\"); // 7. 构造返回给前端的消息 String message = String.format(\"用户 %s 加入编辑\", user.getUserName()); pictureEditResponseMessage.setMessage(message); // 8. 编辑动作不用设置, 因为当前设置的响应是用户加入编辑操作 // pictureEditResponseMessage.setEditAction(); // 9. 设置脱敏后的用户消息到响应中 pictureEditResponseMessage.setUser(userService.getUserVO(user)); // 10. 广播\"加入编辑信息\"给当前会话集合的所有会话元素. 包括自己 broadcastToPicture(pictureId, pictureEditResponseMessage);}

(5) 编写接收客户端消息的方法


编写接收客户端消息的方法,根据消息类别执行不同的处理:

@Overrideprotected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { // 1. 从 TextMessage(JSON 类型字符串) 获取前端发送的消息内容 , 将 JSON 字符串转换为 PictureEditMessage PictureEditRequestMessage pictureEditRequestMessage = JSONUtil.toBean(message.getPayload(), PictureEditRequestMessage.class); // 这里不需要配置自定义 JSON 序列化, 因为前端发送的消息(包括 userId)本来就是 String 类型了, 正常解析即可 // 2. 将 PictureEditMessage 中的 type 属性, 转为我们自定义的枚举类 String type = pictureEditRequestMessage.getType(); PictureEditMessageTypeEnum pictureEditMessageTypeEnum = PictureEditMessageTypeEnum.getEnumByValue(type); // 4. 从 session 中获取公共参数 User user = (User) session.getAttributes().get(\"user\"); Long pictureId = (Long) session.getAttributes().get(\"pictureId\"); // 3. 根据 PictureEditMessageTypeEnum 的值, 也就是根据消息类型, 进行对应的处理 switch (pictureEditMessageTypeEnum){ case ENTER_EDIT: handleEnterEditMessage(pictureEditRequestMessage, session, user, pictureId); break; case EXIT_EDIT: handleEditActionMessage(pictureEditRequestMessage, session, user, pictureId); break; case EDIT_ACTION: handleExitEditMessage(pictureEditRequestMessage, session, user, pictureId); break; default: // 收到 info 、error 类型的消息, 如果是 error, 说明用户参数传递错误, 只给当前用户发送信息 PictureEditResponseMessage pictureEditResponseMessage = new PictureEditResponseMessage(); pictureEditResponseMessage.setType(PictureEditMessageTypeEnum.ERROR.getValue()); pictureEditResponseMessage.setMessage(\"消息类型错误\"); pictureEditResponseMessage.setUser(userService.getUserVO(user)); // 这里严格来说也需要处理 Long 精度丢失, 但是这个 switch 情况只是一个错误消息, 就不转了 session.sendMessage(new TextMessage(JSONUtil.toJsonStr(pictureEditRequestMessage))); break; }}

(6) 编写每个处理消息的方法


1. 接下来依次编写每个处理消息的方法。首先是用户进入编辑状态,要设置当前用户为编辑用户,并且向其他客户端发送消息:

/*** 进入编辑状态* @param pictureEditRequestMessage* @param session* @param user* @param pictureId*/public void handleEnterEditMessage(PictureEditRequestMessage pictureEditRequestMessage, WebSocketSession session, User user, Long pictureId) throws Exception {// 1. 没有用户正在编辑图片(编辑列表中没有该图片, 说明没有其他用户正在编辑), 才能进入编辑 if(!pictureEditingUsers.containsKey(pictureId)){ // 2. 设置当前用户正在编辑该图片 pictureEditingUsers.put(pictureId, user.getId()); // 3. 构造响应, 发送加入编辑的消息通知 PictureEditResponseMessage pictureEditResponseMessage = new PictureEditResponseMessage(); pictureEditResponseMessage.setType(PictureEditMessageTypeEnum.ENTER_EDIT.getValue()); String message = String.format(\"用户 %s 开始编辑图片\", user.getUserName()); pictureEditResponseMessage.setMessage(message); pictureEditResponseMessage.setUser(userService.getUserVO(user)); // 4. 广播给所有用户 broadcastToPicture(pictureId, pictureEditResponseMessage); }}

2. 用户执行编辑操作时,将该操作同步给 除了当前用户之外 的其他客户端,也就是说编辑操作不用再同步给自己:

/** * 用户正在编辑 * @param pictureEditRequestMessage * @param session * @param user * @param pictureId * @throws Exception */public void handleEditActionMessage(PictureEditRequestMessage pictureEditRequestMessage, WebSocketSession session, User user, Long pictureId) throws Exception { // 1. 如果当前图片的编辑用户, 不是正在编辑的人, 不能编辑 Long editingUserId = pictureEditingUsers.get(pictureId); // 2. 获取编辑操作 String editAction = pictureEditRequestMessage.getEditAction(); PictureEditActionEnum actionEnum = PictureEditActionEnum.getEnumByValue(editAction); // 3. 编辑操作不存在, 打日志 if(actionEnum == null){ log.error(\"无效的编辑动作\"); return; } // 4. 确认当前用户是图片的编辑者 if(editingUserId != null && editingUserId.equals(user.getId())){ // 5. 构造响应信息, 告知其他编辑者当前用户的编辑操作 PictureEditResponseMessage pictureEditResponseMessage = new PictureEditResponseMessage(); pictureEditResponseMessage.setType(PictureEditMessageTypeEnum.EDIT_ACTION.getValue()); String message = String.format(\"%s 执行了 %s\", user.getUserName(), actionEnum.getText()); pictureEditResponseMessage.setMessage(message); pictureEditResponseMessage.setEditAction(editAction); pictureEditResponseMessage.setUser(userService.getUserVO(user)); // 6. 广播除了当前编辑者外的其他用户, 否则会造成重复编辑 broadcastToPicture(pictureId, pictureEditResponseMessage, session); }}

3. 用户退出编辑操作时,移除当前用户的编辑状态,并且向其他客户端发送消息:

/** * 用户退出编辑 * @param pictureEditRequestMessage * @param session * @param user * @param pictureId */public void handleExitEditMessage(PictureEditRequestMessage pictureEditRequestMessage, WebSocketSession session, User user, Long pictureId) throws Exception { // 1. 获取当前图片对应的编辑者 ID  Long editingUserId = pictureEditingUsers.get(pictureId); // 2. 确认当前编辑者是当前登录用户 if(editingUserId != null && editingUserId.equals(user.getId())){ // 3. 从 pictureEditingUsers 移除该 entry pictureEditingUsers.remove(pictureId); // 4. 构造响应信息 PictureEditResponseMessage pictureEditResponseMessage = new PictureEditResponseMessage(); pictureEditResponseMessage.setType(PictureEditMessageTypeEnum.EXIT_EDIT.getValue()); String message = String.format(\"%s 退出编辑\", user.getUserName()); pictureEditResponseMessage.setMessage(message); pictureEditResponseMessage.setUser(userService.getUserVO(user)); // 5. 广播退出编辑的消息通知 broadcastToPicture(pictureId, pictureEditResponseMessage); }}

(7) 断开 WebSocket 时的逻辑


WebSocket 连接关闭时,需要移除当前用户的编辑状态、并且从集合中删除当前会话,还可以给其他客户端发送消息通知:

/** * 关闭连接 * @param session * @param status * @throws Exception */@Overridepublic void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { // 1. 从 session 中获取公共参数 User user = (User) session.getAttributes().get(\"user\"); Long pictureId = (Long) session.getAttributes().get(\"pictureId\"); // 2. 移除当前用户的编辑状态 handleExitEditMessage(null, session, user, pictureId); // 3. 删除会话 Set<WebSocketSession> sessionSet = pictureSessions.get(pictureId); if(sessionSet != null){ // 4. 从会话集合中删除会话(从 value 元素: 会话集合中, 移除单个会话) sessionSet.remove(session); // 5. 如果 value: 会话集合已经没有会话了, 移除掉 pictureId 这个 key if(sessionSet.isEmpty()){ pictureSessions.remove(pictureId); } } // 6. 通知其他用户, 该用户已离开编辑 PictureEditResponseMessage pictureEditResponseMessage = new PictureEditResponseMessage(); // INFO 类型表示发送通知 pictureEditResponseMessage.setType(PictureEditMessageTypeEnum.INFO.getValue()); String message = String.format(\"用户 %s 退出编辑\", user.getUserName()); pictureEditResponseMessage.setMessage(message); pictureEditResponseMessage.setUser(userService.getUserVO(user)); // 7. 广播退出消息给所有用户 broadcastToPicture(pictureId, pictureEditResponseMessage);}

💡 由于处理器的代码并不复杂,而且处理逻辑中使用到了当前类的全局变量,所以没有选择将每个处理器封装为单独的类。

大家也可以将每个处理器封装为单独的类(相当于设计模式中的策略模式),并且根据消息类别调用不同的处理器类。


5、WebSocket 配置


类似于编写 Spring MVC 的 Controller 接口,可以为指定的路径配置处理器和拦截器:

【智能协同云图库】智能协同云图库第十一弹:基于 WebSocket 实现协同编辑功能、基于 Disruptor 优化 WebSocket 长连接、高并发场景下的阻塞、顺序保证与低延迟问题_websocket 保证顺序

@Configuration@EnableWebSocketpublic class WebSocketConfig implements WebSocketConfigurer { @Resource private PictureEditHandler pictureEditHandler; // 刚刚开发的图片消息处理器类, 已经设置为 bean @Resource private WsHandshakeInterceptor wsHandshakeInterceptor; @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { // websocket registry.addHandler(pictureEditHandler, \"/ws/picture/edit\") .addInterceptors(wsHandshakeInterceptor) .setAllowedOrigins(\"*\"); }}

之后,前端就可以通过 WebSocket 连接项目启动端口的 /ws/picture/edit 路径了。


6、Disruptor 优化 WebSocket 长连接高并发场景下的阻塞、顺序保证与低延迟问题


1、现存的系统问题


WebSocket 通常是长连接,每个客户端都需要占用服务器资源。

在 Spring WebSocket 中,每个 WebSocket 连接(客户端)对应一个独立的 WebSocketSession,消息的处理是在该 WebSocketSession 所属的线程中执行。

如果 同一个 WebSocket 连接(客户端)连续发送多条消息,服务器会 按照接收的顺序依次同步处理,而不是并发执行。这是为了保证每个客户端的消息处理是线程安全的。

可以在 handleTextMessage 方法中增加 Thread.sleep 来测试一下。连续点击多次编辑操作,会发现每隔一段时间方法才会执行一次。


虽然多个客户端的消息处理是可以并发执行的,但是接受消息和具体处理某个消息,使用的是 同一个线程

如果处理消息的耗时比较长,并发量又比较高,可能会导致系统响应时间变长,甚至因为资源耗尽而服务崩溃。

💡 为了便于理解,可以类比一下调用 Spring MVC 的某个接口时,如果该接口内部的耗时较长,请求线程就会一直阻塞,最终导致 Tomcat 请求连接数(默认值是 200 个连接)耗尽

怎么解决这个问题呢?最简单的方法就是开一个线程专门来异步处理消息

但是我们还要保证操作是按照顺序同步给其他客户端的,因此还需要引入一个队列,将任务按照顺序放到队列中,交给线程去处理。

【智能协同云图库】智能协同云图库第十一弹:基于 WebSocket 实现协同编辑功能、基于 Disruptor 优化 WebSocket 长连接、高并发场景下的阻塞、顺序保证与低延迟问题_websocket 保证顺序

其实上述的异步操作 + 从任务队列取任务执行,使用线程池就可以实现了。

但对于协同编辑场景,需要尽可能地保证低延迟,因此我们选用一种高级技术 Disruptor 无锁队列减少线程上下文的切换,能够在高并发场景下保持低延迟和高吞吐量

此外,使用 Disruptor 还有一个优点,可以将任务放到队列中,通过优雅停机机制,在服务停止前执行完所有的任务,再退出服务,防止消息丢失


2、Disruptor 介绍


Disruptor 是一种高性能的并发框架,由 LMAX(一个金融交易系统公司)开发,它是一种 无锁的环形队列 数据结构,用于解决高吞吐量和低延迟场景中的并发问题。

支持生产者 - 消费者模式,可作为消息队列使用,适用于金融交易、实时数据处理、游戏事件等对并发和实时性要求较高的场景。

它最大的特点就是快、延迟低,非常低!

【智能协同云图库】智能协同云图库第十一弹:基于 WebSocket 实现协同编辑功能、基于 Disruptor 优化 WebSocket 长连接、高并发场景下的阻塞、顺序保证与低延迟问题_websocket 保证顺序

Disruptor 的核心思想是基于固定大小环形缓冲区(Ring Buffer),并通过序列化控制访问,以避免传统队列中常见的锁竞争问题


它主要通过以下几点实现高性能的消息传递机制:

  • 环形缓冲区 :使用固定大小的数组,可以复用内存,避免了频繁的内存分配和垃圾回收。
  • 无锁设计 :依赖 CAS(Compare - And - Swap)和内存屏障,而不是传统的锁,降低了线程切换的开销。
  • 缓存友好 :最大化利用 CPU 的缓存局部性,提高访问速度
  • 序列号机制 :通过序列号管理生产者和消费者的访问,保证数据一致性。
  • 多消费者模式 :支持多消费者共享同一环形缓冲区,并能配置不同的消费策略(如依赖关系、并行消费等)。

Disruptor 与传统队列对比:

特性 Disruptor BlockingQueue 并发控制 无锁(CAS + 内存屏障) 基于锁(ReentrantLock) 内存管理 固定长度的环形数组 动态数组或链表 性能 极高(百万级别消息 / 秒) 较低(数万消息 / 秒) 延迟 纳秒级别 毫秒级别 GC 压力 极低(数据复用) 较高(频繁创建新对象) 适用场景 高频实时消息处理、金融系统 一般生产者消费者模型

3、Disruptor 核心概念与工作流程


先了解 Disruptor 的核心概念:

  • RingBuffer(环形缓冲区) :固定大小的循环数组,用于存储数据项,生产者和消费者共享该数据结构。
  • Event(事件) :存储在 RingBuffer 中的数据对象,用于表示要传递的消息或数据。
  • Producer(生产者) :负责向 RingBuffer 写入数据的角色。
  • Consumer(消费者) :从 RingBuffer 中读取并处理数据的角色。
  • Sequencer(序列器) :管理生产者与消费者的索引,确保并发安全的序列管理。
  • SequenceBarrier(序列屏障) :控制消费者等待数据可用的机制,确保数据完整性。
  • WaitStrategy(等待策略) :定义消费者如何等待新的数据(如自旋、自适应等待等)。
  • EventProcessor(事件处理器) :集成了 Consumer 和 SequenceBarrier,用于更高级的消费控制。

而 Disruptor 是封装了 RingBuffer、Producer 和 Consumer 的核心管理类,用于协调所有组件的运行


下面我举例来说明 Disruptor 的工作流程:

  1. 环形队列初始化 :创建一个固定大小为 8 的 RingBuffer(索引范围 0 - 7),每个格子存储一个可复用的事件对象,序号初始为 0。
  2. 生产者写入数据 :生产者通过取模,申请索引 0(序号 0),将数据 “A” 写入事件对象,提交后序号递增为 1,下一个写入索引变为 1。
  3. 消费者读取数据 :消费者检查索引 0(序号 0),读取数据 “A”,处理后提交,序号递增为 1,下一个读取索引变为 1。
  4. 环形队列循环使用 :当生产者写入到索引 7(序号 7)后,索引回到 0(序号 8),形成循环存储,但序号会持续自增以区分数据的先后顺序
  5. 防止数据覆盖 :如果生产者追上消费者,消费者尚未处理完数据,生产者会等待(策略是不继续生产,而不是扩容),确保数据不被覆盖。

下图是一个 Disruptor 生产者的模型,仅供参考,了解一下即可:

【智能协同云图库】智能协同云图库第十一弹:基于 WebSocket 实现协同编辑功能、基于 Disruptor 优化 WebSocket 长连接、高并发场景下的阻塞、顺序保证与低延迟问题_websocket 保证顺序

其实对大家来说,先将 Disruptor 当做一个高性能的队列来使用就可以了,可以向队列中添加事件并定义处理方式。

  • 上面的内容重点记忆 Disruptor 的无锁,CAS,环形队列,消息生产时序号的控制,以及防止数据覆盖的机制
  • 感兴趣的同学可以阅读 这篇文章 深入了解 Disruptor 性能高的原因。

下面我们来引入 Disruptor 来优化代码。


4、Disruptor 实战


(1) 引入 Disruptor 依赖

<dependency> <groupId>com.lmax</groupId> <artifactId>disruptor</artifactId> <version>3.4.2</version></dependency>

(2) 定义图片编辑事件

引入 Disruptor 后,第一件事是把“收到消息 → 处理消息”这一整条链路拆成「事件」

【智能协同云图库】智能协同云图库第十一弹:基于 WebSocket 实现协同编辑功能、基于 Disruptor 优化 WebSocket 长连接、高并发场景下的阻塞、顺序保证与低延迟问题_websocket 保证顺序


Disruptor 只认识事件(Event),所以必须把原来的同步处理逻辑(handleTextMessage 及 3 个 handleXxx 方法)改造成「生产事件 → 消费事件」模型:

【智能协同云图库】智能协同云图库第十一弹:基于 WebSocket 实现协同编辑功能、基于 Disruptor 优化 WebSocket 长连接、高并发场景下的阻塞、顺序保证与低延迟问题_websocket 保证顺序


websocket.disruptor 包中新建 PictureEditEvent 类,充当了上下文容器,所有处理消息所需的数据都被封装在其中

也就是,这三个消息处理方法的参数,单独提取出来作为事件类的属性

【智能协同云图库】智能协同云图库第十一弹:基于 WebSocket 实现协同编辑功能、基于 Disruptor 优化 WebSocket 长连接、高并发场景下的阻塞、顺序保证与低延迟问题_websocket 保证顺序

@Datapublic class PictureEditEvent { /** * 消息 */ private PictureEditRequestMessage pictureEditRequestMessage; /** * 当前用户的 session */ private WebSocketSession session; /** * 当前用户 */ private User user; /** * 图片 id */ private Long pictureId;}

(3) 定义事件处理器(消费者)

这里基本上是把 PictureEditHandler 分发消息的逻辑搬了过来,它的作用就是将不同类型的消息分发到对应的处理器中。

【智能协同云图库】智能协同云图库第十一弹:基于 WebSocket 实现协同编辑功能、基于 Disruptor 优化 WebSocket 长连接、高并发场景下的阻塞、顺序保证与低延迟问题_websocket 保证顺序

/** * 图片编辑事件处理器(消费者) */@Component@Slf4jpublic class PictureEditEventWorkHandler implements WorkHandler<PictureEditEvent> { // 1. 定义事件处理器(消费者), 实现 WorkHandler 接口, T 表示生产者事件 @Resource private PictureEditHandler pictureEditHandler; @Resource private UserService userService; // 2. 实现 WorkHandler 接口的方法, 参数就是接口的泛型, 也就是处理生产者事件的方法  // 3. 消费者事件的逻辑, 就是刚刚编写的处理图片的逻辑, 复制过来改造为消费者事件即可 @Override public void onEvent(PictureEditEvent pictureEditEvent) throws Exception { // 4. user, pictureId 不需要从 session 中获取, 直接从参数(生产者事件)中获取 PictureEditRequestMessage pictureEditRequestMessage = pictureEditEvent.getPictureEditRequestMessage(); WebSocketSession session = pictureEditEvent.getSession(); User user = pictureEditEvent.getUser(); Long pictureId = pictureEditEvent.getPictureId(); // 5. 从图片编辑请求的 message 属性中获取到类别 String type = pictureEditRequestMessage.getType(); // 6. 从类别获取枚举 PictureEditMessageTypeEnum pictureEditMessageTypeEnum = PictureEditMessageTypeEnum.getEnumByValue(type); // 7. 根据消息类型的枚举, 进行对应的处理(需要引入图片处理的 Bean: PictureEditHandler) switch (pictureEditMessageTypeEnum){ case ENTER_EDIT: pictureEditHandler.handleEnterEditMessage(pictureEditRequestMessage, session, user, pictureId); break; case EXIT_EDIT: pictureEditHandler.handleEditActionMessage(pictureEditRequestMessage, session, user, pictureId); break; case EDIT_ACTION: pictureEditHandler.handleExitEditMessage(pictureEditRequestMessage, session, user, pictureId); break; default: // 收到 info 、error 类型的消息, 如果是 error, 说明用户参数传递错误, 只给当前用户发送信息 PictureEditResponseMessage pictureEditResponseMessage = new PictureEditResponseMessage(); pictureEditResponseMessage.setType(PictureEditMessageTypeEnum.ERROR.getValue()); pictureEditResponseMessage.setMessage(\"消息类型错误\"); pictureEditResponseMessage.setUser(userService.getUserVO(user)); // 这里严格来说也需要处理 Long 精度丢失, 但是这个 switch 情况只是一个错误消息, 就不转了 session.sendMessage(new TextMessage(JSONUtil.toJsonStr(pictureEditRequestMessage))); break; } }}

(4) 添加 Disruptor 配置类

将我们刚定义的事件及处理器关联到 Disruptor 实例中:

【智能协同云图库】智能协同云图库第十一弹:基于 WebSocket 实现协同编辑功能、基于 Disruptor 优化 WebSocket 长连接、高并发场景下的阻塞、顺序保证与低延迟问题_websocket 保证顺序

/** * 图片编辑事件 Disruptor 配置 */@Configurationpublic class PictureEditEventDisruptorConfig { // 1. 注册消费者 @Resource private PictureEditEventWorkHandler pictureEditEventWorkHandler; // 2. 初始化 Disruptor 对象, 将该对象交给 spring 管理, 并且命名为 PictureEditEventDisruptor @Bean(\"PictureEditEventDisruptor\") public Disruptor<PictureEditEvent> messageModelRingBuffer(){ // 3. 定义环形缓冲区 ringBuffer 大小(类似于我们定义一个消息队列大小) int bufferSize = 1024 * 256; // Disruptor 队列在高并发, 大吞吐量下的延迟优化效果最明显, 所以我们定义百万级别的 ringBuffer 大小 // 如果定义得太小, 任务又不能被覆盖, 会造成严重的阻塞, 生产者无法继续添加任务 // 4. 实例 Disruptor Disruptor<PictureEditEvent> disruptor = new Disruptor<>( PictureEditEvent::new, bufferSize, ThreadFactoryBuilder.create() .setNamePrefix(\"pictureEditEventDisruptor\") .build() ); // 第一个参数表示事件工厂, PictureEditEvent::new 会创建一个事件对象, 用于指定每次放入缓冲区的事件类型  // 第二个参数为队列的缓冲区大小 // 第三个参数为线程工厂, 引入的包为 Hutool, 通过线程工厂创建一个线程, 并指定线程名的前缀 // 5. 为 Disruptor 绑定消费者 disruptor.handleEventsWithWorkerPool(pictureEditEventWorkHandler); // 使用 disruptor 的工作线程, 来执行我们的消费者事件 // 5. 开启 Disruptor disruptor.start(); // 6. 返回当前的 Disruptor 对象, 便于后续调用 return disruptor; // 后续在消息生产者中, 使用这个方法返回的 disruptor 对象, 往该对象的缓冲区塞任务  }}

(5) 定义事件生产者

生产者负责将数据(事件)发到 Disruptor 的环形缓冲区中。

【智能协同云图库】智能协同云图库第十一弹:基于 WebSocket 实现协同编辑功能、基于 Disruptor 优化 WebSocket 长连接、高并发场景下的阻塞、顺序保证与低延迟问题_websocket 保证顺序

为了保证在停机时所有的消息都能够被处理,我们通过 shutdown 方法完成 Disruptor 的优雅停机。

@Component // 1. 定义为一个 bean@Slf4jpublic class PictureEditEventProducer { // 2. 引入 Disruptor 队列 bean @Resource private Disruptor<PictureEditEvent> pictureEditEventDisruptor; // 3. 编写发布事件的方法, 参数为生产者事件类的属性 public void publishEvent(PictureEditRequestMessage pictureEditRequestMessage, WebSocketSession session, User user, Long pictureId){ // 4. 获取缓冲区对象 RingBuffer<PictureEditEvent> ringBuffer = pictureEditEventDisruptor.getRingBuffer(); // 5. 获取环形缓冲区下一个可以放置事件的位置 long next = ringBuffer.next(); // 6. 根据下一个放置事件的位置, 获取这个位置的事件对象 PictureEditEvent pictureEditEvent = ringBuffer.get(next); // 7. 将外层传入的参数, 设置到环形缓冲区下一个位置的事件对象是属性中 pictureEditEvent.setPictureEditRequestMessage(pictureEditRequestMessage); pictureEditEvent.setSession(session); pictureEditEvent.setUser(user); pictureEditEvent.setPictureId(pictureId); // 8. 发布事件 ringBuffer.publish(next); } // 9. 事件未处理完, 队列进行优雅停机 @PreDestroy public void destroy(){ pictureEditEventDisruptor.shutdown(); // shutdown() 是 Disruptor 提供的现成的方法, 让队列默认处理完所有的让任务后, 再关闭 // shutdown() 可以传递事件参数, 表示默认多少秒后关闭 // 注意: 关闭可以类似于终止服务, 关闭程序 }}

(6) 修改 PictureEditHandler 的原有逻辑

修改 PictureEditHandler 的原有逻辑(去掉 switch),改为使用事件生产者

【智能协同云图库】智能协同云图库第十一弹:基于 WebSocket 实现协同编辑功能、基于 Disruptor 优化 WebSocket 长连接、高并发场景下的阻塞、顺序保证与低延迟问题_websocket 保证顺序

@Resource@Lazyprivate PictureEditEventProducer pictureEditEventProducer;@Overrideprotected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { // 将消息解析为 PictureEditMessage PictureEditRequestMessage pictureEditRequestMessage = JSONUtil.toBean(message.getPayload(), PictureEditRequestMessage.class); // 从 Session 属性中获取公共参数 Map<String, Object> attributes = session.getAttributes(); User user = (User) attributes.get(\"user\"); Long pictureId = (Long) attributes.get(\"pictureId\"); // 根据消息类型处理消息(生产消息到 Disruptor 环形队列中) pictureEditEventProducer.publishEvent(pictureEditRequestMessage, session, user, pictureId);}

这样,我们就实现了基于 Disruptor 的异步消息处理机制,将原有的同步消息分发逻辑,改造为高效解耦的异步处理模型,也更有利于代码的扩展。


7、扩展


  1. 为防止消息丢失,可以使用 Redis 等高性能存储保存执行的操作记录。目前如果图片已经被编辑了,新用户加入编辑时没办法查看到已编辑的状态,这一点也可以利用 Redis 保存操作记录来解决,新用户加入编辑时读取 Redis 的操作记录即可。
  2. 每种类型的消息处理可以封装为独立的 Handler 处理器类,也就是采用策略模式。
  3. 支持分布式 WebSocket。实现思路很简单,只需要保证要编辑同一图片的用户,连接的是相同的服务器即可,和游戏分服务器大区、聊天室分房间是类似的原理。
  4. 一些小问题的优化:比如 WebSocket 连接建立之后,如果用户退出了登录,这时 WebSocket 的连接是没有断开的。不过影响并不大,大家可以思考下怎么处理。

在这里插入图片描述

在这里插入图片描述