Netty实战:Springboot+Netty+websocket优雅的高性能服务器 (附源码下载)
Springboot-cli 开发脚手架系列
Netty系列:Springboot+Netty优雅的开发websocket高性能服务器
文章目录
前言
首先我们需要使用Netty搭建基础的tcp框架,参考Springboot使用Netty优雅的创建高性能TCP服务器,接下来我们开始集成websocket。
本博客项目源码地址:
- 项目源码github地址
- 项目源码国内gitee地址
1. 环境
pom.xml
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>${netty-all.version}</version> </dependency>
- yml开启日记debug级别打印
# 日记配置logging: level: # 开启debug日记打印 com.netty: debug
2. 引入websocket编码解码器
这里我们需要加入websocket编码解码器,因为websocket的握手是通过http完成的,所以我们还需要加入http的编码器。
/** * Netty 通道初始化 * * @author qiding */@Component@RequiredArgsConstructorpublic class ChannelInit extends ChannelInitializer<SocketChannel> { private final MessageHandler messageHandler; @Override protected void initChannel(SocketChannel channel) { channel.pipeline() // 心跳时间 .addLast("idle", new IdleStateHandler(0, 0, 60, TimeUnit.SECONDS)) // 对http协议的支持. .addLast(new HttpServerCodec()) // 对大数据流的支持 .addLast(new ChunkedWriteHandler()) // 聚合 Http 将多个requestLine、requestHeader、messageBody信息转化成单一的request或者response对象 .addLast(new HttpObjectAggregator(8192)) // 聚合 websocket 的数据帧,因为客户端可能分段向服务器端发送数据 .addLast(new WebSocketFrameAggregator(1024 * 62)) // 添加消息处理器 .addLast("messageHandler", messageHandler); }}
3. 编写websocket处理器
WebsocketMessageHandler
处理器主要负责处理握手协议和文本交互
/** * Websocket 消息处理器 * * @author qiding */@Slf4j@Componentpublic class WebsocketMessageHandler { /** * 对webSocket 首次握手进行解析 */ public void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request) { // 首次握手进行校验 this.isFullHttpRequest(ctx, request); // 获取请求uri String uri = request.uri(); // 参数分别是 (ws地址,子协议,是否扩展,最大frame长度) WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory(getWebSocketLocation(request), null, true, 5 * 1024 * 1024); WebSocketServerHandshaker handShaker = factory.newHandshaker(request); if (handShaker == null) { WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel()); } else { handShaker.handshake(ctx.channel(), request); } WebSocketSession.setChannelShaker(ctx.channel().id(), handShaker); } /** * 处理消息 */ public void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) { // 获取webSocket 会话 WebSocketServerHandshaker handShaker = WebSocketSession.getChannelShaker(ctx.channel().id()); // 关闭 if (frame instanceof CloseWebSocketFrame) { log.debug("收到关闭请求"); handShaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); return; } // 握手PING/PONG if (frame instanceof PingWebSocketFrame) { ctx.writeAndFlush(new PongWebSocketFrame(frame.content().retain())); return; } // 文本接收和回复 if (frame instanceof TextWebSocketFrame) { log.debug("收到消息:\n{}", ((TextWebSocketFrame) frame).text()); ctx.writeAndFlush(new TextWebSocketFrame("服务器接收成功!")); return; } // 二进制文本 if (frame instanceof BinaryWebSocketFrame) { ctx.writeAndFlush(frame.retain()); } } /** * 判断是否是正确的websocket 握手协议 */ private void isFullHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request) { if (!request.decoderResult().isSuccess()) { log.error("非webSocket请求"); this.sendResponse(ctx, request, new DefaultFullHttpResponse(request.protocolVersion(), HttpResponseStatus.BAD_REQUEST, ctx.alloc().buffer())); ctx.close(); return; } if (!HttpMethod.GET.equals(request.method())) { log.error("非GET请求"); this.sendResponse(ctx, request, new DefaultFullHttpResponse(request.protocolVersion(), HttpResponseStatus.FORBIDDEN, ctx.alloc().buffer())); ctx.close(); } } /** * SSL支持采用wss: */ private String getWebSocketLocation(FullHttpRequest request) { return "ws://" + request.headers().get(HttpHeaderNames.HOST) + "/websocket"; } /** * http 握手通用响应 */ private void sendResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse resp) { HttpResponseStatus status = resp.status(); if (status != HttpResponseStatus.OK) { ByteBufUtil.writeUtf8(resp.content(), status.toString()); HttpUtil.setContentLength(req, resp.content().readableBytes()); } boolean keepAlive = HttpUtil.isKeepAlive(req) && status == HttpResponseStatus.OK; HttpUtil.setKeepAlive(req, keepAlive); ChannelFuture future = ctx.write(resp); if (!keepAlive) { future.addListener(ChannelFutureListener.CLOSE); } }}
- 修改主消息处理器
MessageHandler.java
,加入握手过程处理
/** * 消息处理,单例启动 * * @author qiding */@Slf4j@Component@ChannelHandler.Sharable@RequiredArgsConstructorpublic class MessageHandler extends SimpleChannelInboundHandler<WebSocketFrame> { private final WebsocketMessageHandler websocketHandler; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof FullHttpRequest) { websocketHandler.handleHttpRequest(ctx, (FullHttpRequest) msg); log.debug("\n"); log.debug("客户端{}握手成功!", ctx.channel().id()); } super.channelRead(ctx, msg); } @Override protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception { log.debug("\n"); log.debug("channelId:" + ctx.channel().id()); websocketHandler.handleWebSocketFrame(ctx, frame); } @Override public void channelInactive(ChannelHandlerContext ctx) { log.debug("\n"); log.debug("断开连接"); // 释放缓存 ChannelStore.closeAndClean(ctx); WebSocketSession.clear(ctx.channel().id()); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.debug("\n"); log.debug("成功建立连接,channelId:{}", ctx.channel().id()); super.channelActive(ctx); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { log.debug("心跳事件时触发"); }}
4. 效果演示
服务类和启动类基础模块的搭建参考开头提供的连接进行搭建即可,这里就不重复了
这里启动项目,以postman测试为例
-
postman模拟客户端连接服务器
-
服务器
5. 源码分享
- Springboot-cli开发脚手架,集合各种常用框架使用案例,完善的文档,致力于让开发者快速搭建基础环境并让应用跑起来。
- 项目源码github地址
- 项目源码国内gitee地址