> 技术文档 > JAVA(SpringBoot)集成Netty实现(TCP、Websocket)服务端与客户端。_java集成netty

JAVA(SpringBoot)集成Netty实现(TCP、Websocket)服务端与客户端。_java集成netty


SpringBoot集成 Netty 。

  • 一、Netty 简介
  • 二、Netty功能
    • 1. 网络通信支持
    • 2. 高性能与低资源消耗
    • 3. 易于使用和定制
    • 4. 内存管理
    • 5. 安全性
  • 三、POM依赖
  • 四、TCP
    • 1、服务端
      • 1.1 创建一个Netty服务端类,NettyTcpServer
      • 1.2 创建一个 NettyTcpServerHandler继承自 ChannelInboundHandlerAdapter,主要负责处理 Netty TCP 服务端各种事件和消息。
    • 2、客户端
      • 2.1 创建一个Netty客户端类,NettyTcpClient
      • 2.2 创建一个 NettyTcpClientHandler 客户端处理器,继承自 SimpleChannelInboundHandler 处理服务器发送的数据。
  • 五、WebSocket
    • 1、服务端
      • 1.1 创建一个Netty WebSocket 服务端类,WebSocketServer
      • 1.2 创建一个 WebSocketHandler 继承自 SimpleChannelInboundHandler,用于处理 WebSocket 文本消息。
  • 六、TCP 与 WebSocket 同时监听不同的端口
    • 1、 NettyConfig 用于配置和管理 Netty 服务器的启动和关闭
    • 2、 AbstractNettyServer 通过继承该抽象类,子类可以实现特定的服务器逻辑,TCP 服务器或 WebSocket 服务器。
    • 3、 TcpServer 类是一个基于 Netty 的 TCP 服务器实现类,继承自 AbstractNettyServer。
    • 4、 TcpServerHandler 类,主要负责处理 Netty TCP 服务端各种事件和消息。
    • 5、 WebSocketServer 类,继承自 AbstractNettyServer,基于 Netty 框架实现的 WebSocket 服务器类。
    • 6、 WebSocketHandler 类,继承自 SimpleChannelInboundHandler,用于处理 WebSocket 文本消息

一、Netty 简介

Netty 是一款基于 Java 语言开发的高性能、异步事件驱动型网络应用框架,它为开发人员提供了对 TCP、UDP、HTTP、WebSocket 等多种网络协议的支持,极大地简化了网络编程工作,无论是开发服务器端应用还是客户端应用都十分便捷。其主要特点包括:
高性能:通过使用异步 I/O 和事件驱动模型,减少线程切换和上下文开销,提升系统性能。
易用性:提供了高层次的抽象,使得开发者可以专注于业务逻辑,而不必过多关注底层网络细节。
灵活性:支持多种协议和编解码方式,易于定制和扩展。

二、Netty功能

Netty是一个基于Java的高性能、异步事件驱动的网络应用框架,用于快速开发可维护的高性能网络服务器和客户端程序。以下是其一些主要功能:

1. 网络通信支持

  • 多种协议
    • Netty支持广泛的网络协议,如TCP、UDP、HTTP、HTTP/2、WebSocket等。这使得开发人员能够轻松构建支持不同应用层协议的网络应用。例如,开发基于HTTP协议的Web服务器,或者基于WebSocket协议的实时通信应用。
    • 以HTTP协议为例,Netty提供了专门的编解码器和处理器,方便处理HTTP请求和响应,包括解析HTTP头、处理HTTP内容体等。
  • 跨平台:它能够在不同的操作系统和Java版本上运行,确保应用的可移植性。无论是在Linux、Windows还是Mac OS系统上,Netty都能发挥其高性能的优势。

2. 高性能与低资源消耗

  • 异步和事件驱动:Netty采用异步I/O和事件驱动模型,这使得它能够在处理大量并发连接时,避免线程阻塞,提高系统的吞吐量和响应能力。例如,当一个新的连接建立或者数据可读时,Netty会触发相应的事件,由预先注册的事件处理器进行处理,而不是让线程一直等待I/O操作完成。
  • 零拷贝技术:Netty通过使用零拷贝技术,减少了数据在内存中的拷贝次数,提高了数据传输的效率。例如,在文件传输场景中,Netty可以直接将文件内容从磁盘传输到网络,而不需要将文件内容先拷贝到应用程序的内存中。

3. 易于使用和定制

  • 简单的API:Netty提供了简洁直观的API,使得开发人员能够快速上手,构建复杂的网络应用。例如,通过ChannelPipeline机制,开发人员可以方便地添加和管理各种网络处理逻辑,如编解码、业务逻辑处理等。
  • 高度可定制:它的架构设计非常灵活,允许开发人员根据具体需求对其进行定制。比如,可以自定义编解码器来处理特定格式的协议数据,或者定制线程模型以适应不同的应用场景。

4. 内存管理

  • 池化内存分配:Netty提供了池化内存分配器,能够有效地管理内存,减少内存碎片,提高内存的使用效率。例如,在高并发场景下,频繁地创建和销毁对象会导致内存碎片问题,而Netty的池化内存分配器可以复用已分配的内存块,避免这种情况的发生。
  • 自动内存回收:它具备自动内存回收机制,能够根据应用的运行情况,自动调整内存的使用,降低内存泄漏的风险。

5. 安全性

  • SSL/TLS支持:Netty内置了对SSL/TLS协议的支持,使得开发人员能够轻松地为网络应用添加安全加密功能,保护数据在传输过程中的机密性和完整性。例如,在开发金融类网络应用时,可以使用Netty的SSL/TLS支持,确保用户的交易数据安全传输。

三、POM依赖

  io.netty netty-transport 4.1.94.Final   io.netty netty-codec 4.1.94.Final   io.netty netty-all 4.1.92.Final 

四、TCP

1、服务端

1.1 创建一个Netty服务端类,NettyTcpServer

import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import lombok.extern.slf4j.Slf4j;import org.springframework.boot.CommandLineRunner;import org.springframework.stereotype.Component;/** * Netty TCP服务类 * * @author chenlei */@Slf4j@Componentpublic class NettyTcpServer implements CommandLineRunner { /** * 端口号 */ private int port = 8972; @Override public void run(String... args) { // 接收连接 EventLoopGroup boss = new NioEventLoopGroup(); // 处理信息 EventLoopGroup worker = new NioEventLoopGroup(); try { // 定义server ServerBootstrap serverBootstrap = new ServerBootstrap(); // 添加分组 serverBootstrap.group(boss, worker)  // 添加通道设置非阻塞  .channel(NioServerSocketChannel.class)  // 服务端可连接队列数量  .option(ChannelOption.SO_BACKLOG, 128)  // 开启长连接  .childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)  // 流程处理  .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ch.pipeline().addLast(new NettyTcpServerHandler()); }  }); // 绑定端口 ChannelFuture cf = serverBootstrap.bind(port).sync(); // 优雅关闭连接 cf.channel().closeFuture().sync(); } catch (Exception e) { log.error(\"连接错误:{}\", e.getMessage()); } finally { boss.shutdownGracefully(); worker.shutdownGracefully(); } }}

1.2 创建一个 NettyTcpServerHandler继承自 ChannelInboundHandlerAdapter,主要负责处理 Netty TCP 服务端各种事件和消息。

import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelId;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.handler.timeout.IdleState;import io.netty.handler.timeout.IdleStateEvent;import lombok.extern.slf4j.Slf4j;import java.net.InetSocketAddress;import java.util.concurrent.ConcurrentHashMap;/** * @author chenlei */@Slf4jpublic class NettyTcpServerHandler extends ChannelInboundHandlerAdapter { /** * 保存连接到服务端的通道信息,对连接的客户端进行管理,包括连接的添加、删除、查找等操作。 */ private static final ConcurrentHashMap<ChannelId, ChannelHandlerContext> CHANNEL_MAP = new ConcurrentHashMap<>(); /** * 当有客户端连接到服务器时,此方法会被触发。 * 它会记录客户端的 IP 地址、端口号以及连接的 ChannelId,并将该连接添加到 CHANNEL_MAP 中。 * 如果连接已经存在于 CHANNEL_MAP 中,会打印相应的日志信息;如果不存在,则添加到映射中并记录连接信息。 * * @param ctx 通道处理器上下文,包含了通道的信息和操作通道的方法 */ @Override public void channelActive(ChannelHandlerContext ctx) { // 获取客户端的网络地址信息 InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); // 获取客户端的 IP 地址 String clientIp = insocket.getAddress().getHostAddress(); // 获取客户端的端口号 int clientPort = insocket.getPort(); // 获取连接通道的唯一标识 ChannelId channelId = ctx.channel().id(); // 如果该连接通道已经在映射中,打印连接状态信息 if (CHANNEL_MAP.containsKey(channelId)) { log.info(\"客户端【\" + channelId + \"】是连接状态,连接通道数量: \" + CHANNEL_MAP.size()); } else { // 将新的连接添加到映射中 CHANNEL_MAP.put(channelId, ctx); log.info(\"客户端【\" + channelId + \"】连接 netty 服务器[IP:\" + clientIp + \"--->PORT:\" + clientPort + \"]\"); log.info(\"连接通道数量: \" + CHANNEL_MAP.size()); } } /** * 当有客户端终止连接服务器时,此方法会被触发。 * 它会从 CHANNEL_MAP 中移除该客户端的连接信息,并打印相应的退出信息和更新后的连接通道数量。 * 首先检查该连接是否存在于 CHANNEL_MAP 中,如果存在则进行移除操作。 * * @param ctx 通道处理器上下文 */ @Override public void channelInactive(ChannelHandlerContext ctx) { InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIp = insocket.getAddress().getHostAddress(); ChannelId channelId = ctx.channel().id(); // 检查映射中是否包含该客户端连接 if (CHANNEL_MAP.containsKey(channelId)) { // 从映射中移除连接 CHANNEL_MAP.remove(channelId); log.info(\"客户端【\" + channelId + \"】退出 netty 服务器[IP:\" + clientIp + \"--->PORT:\" + insocket.getPort() + \"]\"); log.info(\"连接通道数量: \" + CHANNEL_MAP.size()); } } /** * 当有客户端向服务器发送消息时,此方法会被触发。 * 它会打印接收到的客户端消息,并调用 channelWrite 方法将消息返回给客户端。 * 首先会打印接收到客户端报文的日志信息,然后调用 channelWrite 方法进行响应。 * * @param ctx 通道处理器上下文 * @param msg 从客户端接收到的消息对象 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { log.info(\"加载客户端报文......\"); log.info(\"【\" + ctx.channel().id() + \"】\" + \" :\" + msg); // 下面可以解析数据,保存数据,生成返回报文,将需要返回报文写入 write 函数 // 调用 channelWrite 方法将消息返回给客户端 this.channelWrite(ctx.channel().id(), msg); } /** * 服务端给客户端发送消息的方法。 * 首先根据传入的 ChannelId 从 CHANNEL_MAP 中获取对应的 ChannelHandlerContext, * 然后检查消息是否为空以及 ChannelHandlerContext 是否存在,若存在则将消息写入通道并刷新缓冲区。 * * @param channelId 连接通道的唯一标识 * @param msg 需要发送的消息内容 */ public void channelWrite(ChannelId channelId, Object msg) { // 获取与 ChannelId 对应的 ChannelHandlerContext ChannelHandlerContext ctx = CHANNEL_MAP.get(channelId); if (ctx == null) { log.info(\"通道【\" + channelId + \"】不存在\"); return; } if (msg == null || msg == \"\") { log.info(\"服务端响应空的消息\"); return; } // 将消息写入通道 ctx.write(msg); // 刷新通道的输出缓冲区,确保消息被发送出去 ctx.flush(); } /** * 当触发用户事件时,此方法会被调用,主要用于处理空闲状态事件。 * 根据不同的空闲状态(读、写、总超时)进行相应的处理,如断开连接。 * 首先检查触发的事件是否是 IdleStateEvent,如果是则判断具体的空闲状态并进行相应处理。 * * @param ctx 通道处理器上下文 * @param evt 触发的事件对象 */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { String socketString = ctx.channel().remoteAddress().toString(); if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.READER_IDLE) { log.info(\"Client: \" + socketString + \" READER_IDLE 读超时\"); // 读超时,断开连接 ctx.disconnect(); } else if (event.state() == IdleState.WRITER_IDLE) { log.info(\"Client: \" + socketString + \" WRITER_IDLE 写超时\"); // 写超时,断开连接 ctx.disconnect(); } else if (event.state() == IdleState.ALL_IDLE) { log.info(\"Client: \" + socketString + \" ALL_IDLE 总超时\"); // 总超时,断开连接 ctx.disconnect(); } } } /** * 当发生异常时,此方法会被触发。 * 它会关闭通道并打印相应的错误信息,同时打印当前的连接通道数量。 * 异常发生时,会关闭通道以防止资源泄漏,并且打印异常发生的通道信息和当前的连接数量。 * * @param ctx 通道处理器上下文 * @param cause 引发异常的原因 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { ctx.close(); log.info(ctx.channel().id() + \" 发生了错误,此连接被关闭\" + \"此时连通数量: \" + CHANNEL_MAP.size()); }}

2、客户端

2.1 创建一个Netty客户端类,NettyTcpClient

import io.netty.bootstrap.Bootstrap;import io.netty.buffer.Unpooled;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.DelimiterBasedFrameDecoder;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;import lombok.extern.slf4j.Slf4j;import org.springframework.boot.CommandLineRunner;import org.springframework.stereotype.Component;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicInteger;/** * Netty TCP客户端 * * @author chenlei */@Slf4j@Componentpublic class NettyTcpClient implements CommandLineRunner { /** * 最大连接次数 */ private final int maxConnectTimes = 10; /** * 地址 */ private final String host = \"192.168.2.154\"; /** * 端口 */ private final int port = 1250; @Override public void run(String... args) { // 创建一个处理 I/O 操作的 EventLoopGroup,用于处理客户端的 I/O 操作和事件 EventLoopGroup group = new NioEventLoopGroup(); try { // 创建 Bootstrap 对象,用于配置和启动 Netty 客户端 Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group)  // 使用 NioSocketChannel 作为通道类型,基于 NIO 的 Socket 通道  .channel(NioSocketChannel.class)  // 设置 TCP 选项,如 TCP_NODELAY 开启无延迟模式,提高性能  .option(ChannelOption.TCP_NODELAY, true)  // 为 ChannelPipeline 添加处理器,用于处理通道的数据  .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { // 添加帧解码器,处理粘包和拆包问题,这里以 \\n 作为分隔符 ch.pipeline().addLast(new DelimiterBasedFrameDecoder(8192, Unpooled.copiedBuffer(\"\\n\".getBytes()))); // 添加字符串解码器,将字节数据解码为字符串 ch.pipeline().addLast(new StringDecoder()); // 添加字符串编码器,将字符串编码为字节数据 ch.pipeline().addLast(new StringEncoder()); // 添加自定义的处理器,用于处理接收到的数据 ch.pipeline().addLast(new NettyTcpClientHandler(bootstrap, host, NettyTcpClient.this)); }  }); // 连接到服务器的指定端口范围 final AtomicInteger connectTimes = new AtomicInteger(0); // 尝试连接到服务器,并添加连接结果监听器 bootstrap.connect(host, port).addListener((ChannelFutureListener) future -> { if (future.isSuccess()) {  // 连接成功时打印日志  log.info(\"成功连接到端口: {}\", port); } else {  // 连接失败时打印日志  log.error(\"连接到服务器时出错,端口: {}\", port);  // 连接失败时,使用 eventLoop 安排重连任务,60 秒后重连  if (connectTimes.get() < maxConnectTimes) { connectTimes.incrementAndGet(); future.channel().eventLoop().schedule(() -> { // 重连逻辑,再次尝试连接 bootstrap.connect(host, port); }, 60, TimeUnit.SECONDS);  } else { log.error(\"已达到最大连接次数,停止重连,端口: {}\", port);  } } }); } catch (Exception e) { // 发生异常时打印错误日志 log.error(\"Netty 客户端出错\", e); } } /** * 定义重新连接方法 * * @param bootstrap 用于配置和启动 Netty 客户端的 Bootstrap 对象 * @param host 服务器的主机地址 * @param port 要连接的服务器端口号 * @param future 表示连接操作的 ChannelFuture 对象 */ void reconnect(Bootstrap bootstrap, String host, int port, ChannelFuture future) { final AtomicInteger connectTimes = new AtomicInteger(0); try { bootstrap.connect(host, port).addListener((ChannelFutureListener) f -> { if (f.isSuccess()) {  log.info(\"重连成功,端口: {}\", port);  connectTimes.set(0); } else {  if (connectTimes.get() < maxConnectTimes) { connectTimes.incrementAndGet(); future.channel().eventLoop().schedule(() -> reconnect(bootstrap, host, port, f), 60, TimeUnit.SECONDS);  } else { log.error(\"已达到最大连接次数,停止重连,端口: {}\", port);  } } }); } catch (Exception e) { log.error(\"重连时发生异常,端口: {}\", port); reconnect(bootstrap, host, port, future); } }}

2.2 创建一个 NettyTcpClientHandler 客户端处理器,继承自 SimpleChannelInboundHandler 处理服务器发送的数据。

import io.netty.bootstrap.Bootstrap;import io.netty.channel.Channel;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;import lombok.extern.slf4j.Slf4j;import java.net.InetSocketAddress;/** * Netty 客户端处理器类,继承自 SimpleChannelInboundHandler 以处理服务器发送的字符串数据 * * @author chenlei */@Slf4jpublic class NettyTcpClientHandler extends SimpleChannelInboundHandler<String> { /** * Bootstrap 对象,用于客户端连接的配置和启动 */ private final Bootstrap bootstrap; /** * 服务器的主机地址 */ private final String host; /** * Netty 客户端实例,用于调用重连等操作 */ private final NettyTcpClient nettyTcpClient; /** * 初始化相关参数 * * @param bootstrap 用于客户端连接的 Bootstrap 对象 * @param host 服务器的主机地址 * @param nettyClient Netty 客户端实例 */ public NettyTcpClientHandler(Bootstrap bootstrap, String host, NettyTcpClient nettyTcpClient) { this.bootstrap = bootstrap; this.host = host; this.nettyTcpClient= nettyTcpClient; } /** * 当接收到服务器发送的数据时调用此方法 * * @param ctx 通道处理器上下文,可用于执行通道操作,如发送数据、关闭通道等 * @param response 从服务器接收到的响应字符串 */ @Override protected void channelRead0(ChannelHandlerContext ctx, String response) { log.info(\"接收处理服务器响应数据\"); //以下进行具体的业务操作 } /** * 当发生异常时调用此方法 * * @param ctx 通道处理器上下文 * @param cause 异常对象 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // 打印异常信息 log.error(\"处理服务器响应时出错,异常信息: {}\", cause.getMessage(), cause); // 关闭当前通道 ctx.close(); } /** * 当通道关闭时调用此方法 * * @param ctx 通道处理器上下文 */ @Override public void channelInactive(ChannelHandlerContext ctx) { final Channel channel = ctx.channel(); // 获取远程服务器的端口号 int port = ((InetSocketAddress) channel.remoteAddress()).getPort(); log.info(\"通道处于非活动状态,正在尝试在端口上重新连接: {}\", port); // 获取 NettyTcpClientHandler 中存储的 NettyClient 实例 NettyTcpClient nettyTcpClient = ((NettyTcpClientHandler) ctx.handler()).nettyTcpClient; // 调用 nettyTcpClient 中的 reconnect 方法进行重连 nettyTcpClient .reconnect(bootstrap, host, port, ctx.channel().newFailedFuture(new RuntimeException(\"频道处于非活动状态\"))); }}

五、WebSocket

1、服务端

1.1 创建一个Netty WebSocket 服务端类,WebSocketServer

import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import lombok.extern.slf4j.Slf4j;import org.springframework.boot.CommandLineRunner;import org.springframework.stereotype.Component;/** * WebSocket 服务类 * * @author chenlei */@Slf4j@Componentpublic class WebSocketServer implements CommandLineRunner { /** * 端口号 */ private int port = 897; @Override public void run(String... args) { // 接收 WebSocket 连接的主 EventLoopGroup,用于处理接收新连接的请求 EventLoopGroup webSocketBoss = new NioEventLoopGroup(); // 处理 WebSocket 信息的从 EventLoopGroup,用于处理连接建立后的读写操作 EventLoopGroup webSocketWorker = new NioEventLoopGroup(); try {  // 定义 WebSocket 服务器启动器 ServerBootstrap webSocketServerBootstrap = new ServerBootstrap(); webSocketServerBootstrap.group(webSocketBoss, webSocketWorker) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE) .childHandler(new ChannelInitializer<SocketChannel>() {  @Override  protected void initChannel(SocketChannel ch) { // 在通道的处理器链中添加 HttpServerCodec,用于处理 HTTP 协议的编解码 ch.pipeline().addLast(new HttpServerCodec()); // 添加 HttpObjectAggregator,用于将 HTTP 消息聚合成完整的请求或响应 ch.pipeline().addLast(new HttpObjectAggregator(65536)); // 添加 WebSocketServerProtocolHandler,用于处理 WebSocket 的握手、控制帧等 ch.pipeline().addLast(new WebSocketServerProtocolHandler(\"/websocket\")); // 添加自定义的处理器,用于处理 WebSocket 数据 ch.pipeline().addLast(new WebSocketHandler());  } }); // 绑定端口 ChannelFuture cf = serverBootstrap.bind(port).sync(); // 优雅关闭连接 cf.channel().closeFuture().sync(); } catch (Exception e) { log.error(\"连接错误:{}\", e.getMessage()); } finally { boss.shutdownGracefully(); worker.shutdownGracefully(); } }}

1.2 创建一个 WebSocketHandler 继承自 SimpleChannelInboundHandler,用于处理 WebSocket 文本消息。

import com.casictime.system.domain.vo.DeviceInfo;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;import lombok.extern.slf4j.Slf4j;/** * WebSocketHandler 类,继承自 SimpleChannelInboundHandler,用于处理 WebSocket 文本消息 * * @author chenlei */@Slf4jpublic class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { /** * 处理接收到的消息 * * @param ctx 通道处理上下文,包含了与通道相关的信息,如通道本身、管道等 * @param msg 接收到的 TextWebSocketFrame 消息,包含了客户端发送的文本消息内容 */ @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) { String message = msg.text(); // 接收到客户端消息时记录日志,使用 debug 级别,方便在生产环境中调整日志输出 log.debug(\"收到客户端的消息: {}\", message); // 将信息返回给客户端 ctx.writeAndFlush(msg); // 回复消息示例(如果需要) // ctx.channel().writeAndFlush(new TextWebSocketFrame(\"服务器收到您的消息: \" + message)); } /** * 记录客户端的连接信息 * * @param ctx 通道处理上下文 */ @Override public void channelActive(ChannelHandlerContext ctx) { // 当有新的 WebSocket 客户端连接时记录日志 log.info(\"一个新的 WebSocket 客户端已连接: {}\", ctx.channel().remoteAddress()); } /** * 当 WebSocket 客户端断开连接时调用此方法 * * @param ctx 通道处理上下文 */ @Override public void channelInactive(ChannelHandlerContext ctx) { // 当 WebSocket 客户端断开连接时记录日志 log.info(\"WebSocket 客户端已断开连接: {}\", ctx.channel().remoteAddress()); } /** * 当处理 WebSocket 连接过程中发生异常时调用此方法 * 该方法主要负责记录异常信息,并关闭连接以防止异常扩散影响其他客户端 * * @param ctx 通道处理上下文 * @param cause 引发异常的原因,包含了异常的详细信息 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // 当处理过程中发生异常时记录日志 log.error(\"WebSocket 连接中发生错误: {}\", cause.getMessage()); // 关闭连接以避免异常扩散影响其他客户端 ctx.close(); }}

六、TCP 与 WebSocket 同时监听不同的端口

1、 NettyConfig 用于配置和管理 Netty 服务器的启动和关闭

import com.casictime.framework.netty.server.tcp.TcpServer;import com.casictime.framework.netty.server.websocket.WebSocketServer;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Configuration;import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import javax.annotation.PostConstruct;import javax.annotation.PreDestroy;/** * NettyConfig 用于配置和管理 Netty 服务器的启动和关闭。 * 负责启动 Netty 的 TCP 服务器和 WebSocket 服务器,并使用线程池来执行启动任务。 * * @author chenlei */@Slf4j@Configurationpublic class NettyConfig { /** * TcpServer 用于启动 TCP 服务器 */ @Autowired private TcpServer tcpServer; /** * WebSocketServer 用于启动 WebSocket 服务器 */ @Autowired private WebSocketServer webSocketServer; /** * ThreadPoolTaskExecutor 执行服务器启动任务的线程池 (可根据具体情况修改,这个是我自己配置的线程池) */ @Autowired private ThreadPoolTaskExecutor taskExecutor; /** * init 方法在 Spring 容器创建并初始化该类的实例后被调用。 * 该方法使用线程池来执行启动 Netty TCP 服务器和 WebSocket 服务器的任务。 */ @PostConstruct public void init() { // 启动 Netty TCP 服务器 taskExecutor.execute(() -> { try { // 调用 TcpServer 的 start 方法启动 TCP 服务器 tcpServer.start(); } catch (Exception e) { log.error(\"启动 Netty TCP 服务器 失败,{}\", e.getMessage()); e.printStackTrace(); } }); // 启动 Netty Websocket 服务器 taskExecutor.execute(() -> { try { // 调用 WebSocketServer 的 start 方法启动 WebSocket 服务器 webSocketServer.start(); } catch (Exception e) { log.error(\"启动 Netty WebSocketServer 服务器 失败,{}\", e.getMessage()); e.printStackTrace(); } }); } /** * destroy 方法在 Spring 容器销毁该类的实例前被调用。 * 该方法负责关闭线程池,以释放资源。 */ @PreDestroy public void destroy() { // 关闭线程池 if (taskExecutor != null) { taskExecutor.shutdown(); } }}

2、 AbstractNettyServer 通过继承该抽象类,子类可以实现特定的服务器逻辑,TCP 服务器或 WebSocket 服务器。

import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import lombok.extern.slf4j.Slf4j;import org.springframework.stereotype.Component;import java.util.ArrayList;import java.util.List;/** * 启动和管理功能。 * 通过继承该抽象类,子类可以实现特定的服务器逻辑,如 TCP 服务器或 WebSocket 服务器。 * * @author chenlei */@Slf4j@Componentpublic abstract class AbstractNettyServer { /** * 存储绑定服务器的 ChannelFuture 对象列表,以便后续对服务器启动过程中的操作和状态进行管理 */ protected final List<ChannelFuture> channelFutures = new ArrayList<>(); /** * 启动 Netty 服务器的方法 */ public void start() { // 创建一个 NioEventLoopGroup 作为服务器的 boss 线程组,用于接收客户端的连接请求 EventLoopGroup boss = new NioEventLoopGroup(); // 创建一个 NioEventLoopGroup 作为服务器的 worker 线程组,用于处理客户端的业务逻辑 EventLoopGroup worker = new NioEventLoopGroup(); // 创建一个 ServerBootstrap 实例,用于配置和启动 Netty 服务器 ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(boss, worker) // 设置服务器的通道类型为 NioServerSocketChannel,即使用 NIO 进行网络通信 .channel(NioServerSocketChannel.class) // 设置服务器的 TCP 选项,SO_BACKLOG 表示服务器可接收的最大连接请求队列长度,用于处理大量并发连接时的请求堆积 .option(ChannelOption.SO_BACKLOG, 128) // 设置子通道的 TCP 选项,SO_KEEPALIVE 表示启用 TCP 的 Keep-Alive 机制,保持长连接 .childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE) // 为子通道添加处理器,使用 ChannelInitializer 对每个新的 SocketChannel 进行初始化 .childHandler(new ChannelInitializer<SocketChannel>() {  @Override  // 初始化新的 SocketChannel 的处理器链  protected void initChannel(SocketChannel ch) { // 调用抽象方法,由子类实现具体的通道处理器初始化逻辑 initChannelHandlers(ch);  } }); //端口 int port = getPort(); //地址 String host = getHost(); try { // 尝试将服务器绑定到当前端口 ChannelFuture cf = serverBootstrap.bind(port); // 为绑定操作添加监听器,监听绑定操作的成功和失败状态 cf.addListener(future -> { if (future.isSuccess()) {  // 绑定成功时,记录日志,显示服务器绑定的端口和主机地址  log.info(\"服务器在端口 {} 上成功绑定,主机地址为 {}\", host, getHost()); } else {  // 绑定失败时,记录错误日志,显示绑定的端口和失败原因  log.error(\"绑定端口 {} 时发生连接错误: {}\", host, future.cause().getMessage()); } }); // 将 ChannelFuture 添加到列表中,以便后续管理 channelFutures.add(cf); } catch (Exception e) { // 处理绑定端口时发生的异常,记录错误日志 log.error(\"绑定端口 {} 时发生连接错误: {}\", port, e.getMessage()); } // 遍历已绑定的 ChannelFuture 列表,等待通道关闭 for (ChannelFuture cf : channelFutures) { try { cf.channel().closeFuture().sync(); } catch (InterruptedException e) { // 处理等待通道关闭时发生的异常,记录错误日志 log.error(\"等待通道关闭时发生错误: {}\", e.getMessage()); // 中断当前线程状态,避免潜在的异常 Thread.currentThread().interrupt(); } } // 关闭 boss 线程组,释放资源 boss.shutdownGracefully(); // 关闭 worker 线程组,释放资源 worker.shutdownGracefully(); } /** * 获取服务器的主机地址,由子类实现具体逻辑 * * @return 服务器的主机地址 */ protected abstract String getHost(); /** * 获取服务器的端口范围,由子类实现具体逻辑 * * @return 服务器的端口范围,多个端口使用逗号分隔 */ protected abstract int getPort(); /** * 初始化通道处理器,由子类实现具体逻辑,根据不同的服务器类型添加不同的处理器 * * @param ch 要初始化的 SocketChannel */ protected abstract void initChannelHandlers(SocketChannel ch);}

3、 TcpServer 类是一个基于 Netty 的 TCP 服务器实现类,继承自 AbstractNettyServer。

import io.netty.buffer.Unpooled;import io.netty.channel.socket.SocketChannel;import io.netty.handler.codec.DelimiterBasedFrameDecoder;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Value;import org.springframework.stereotype.Component;/** * TcpServer 类是一个基于 Netty 的 TCP 服务器实现类,继承自 AbstractNettyServer。 * 启动时配置相应的通道处理器,以处理 TCP 连接和数据传输。 * 当客户端与服务器建立 TCP 连接时,该类将对连接进行一系列的处理操作。 * * @author chenlei */@Slf4j@Componentpublic class TcpServer extends AbstractNettyServer { /** * TCP 服务器的主机地址。 */ @Value(\"${tcp.host}\") private String host; /** * TCP 服务器的端口。 */ @Value(\"${tcp.port}\") private int port; /** * 获取 TCP 服务器的主机地址。 * * @return 从配置文件中获取的 TCP 服务器的主机地址 */ @Override protected String getHost() { return host; } /** * 获取 TCP 服务器的端口范围。 * * @return 从配置文件中获取的 TCP 服务器的端口范围 */ @Override protected int getPort() { return port; } /** * 初始化 TCP 服务器的通道处理器。 * 为每个新建立的 SocketChannel 配置处理器链。 * * @param ch 新建立的 SocketChannel,用于与客户端通信 */ @Override protected void initChannelHandlers(SocketChannel ch) { // 优化:可以使用更灵活的分隔符,如可配置的分隔符,提高代码的可扩展性 // 添加帧解码器,解决粘包和拆包问题,使用 \"\\n\" 作为分隔符,最大帧长度为 8192 字节 ch.pipeline().addLast(new DelimiterBasedFrameDecoder(8192, Unpooled.copiedBuffer(\"\\n\".getBytes()))); // 添加字符串解码器,将字节数据解码为字符串 ch.pipeline().addLast(new StringDecoder()); // 添加字符串编码器,将字符串编码为字节数据 ch.pipeline().addLast(new StringEncoder()); // 添加自定义的处理器,用于处理 TCP 数据 ch.pipeline().addLast(new TcpServerHandler()); }}

4、 TcpServerHandler 类,主要负责处理 Netty TCP 服务端各种事件和消息。

import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelId;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.handler.timeout.IdleState;import io.netty.handler.timeout.IdleStateEvent;import lombok.extern.slf4j.Slf4j;import java.net.InetSocketAddress;import java.util.concurrent.ConcurrentHashMap;/** * @author chenlei */@Slf4jpublic class TcpServerHandler extends ChannelInboundHandlerAdapter { /** * 保存连接到服务端的通道信息,对连接的客户端进行管理,包括连接的添加、删除、查找等操作。 */ private static final ConcurrentHashMap<ChannelId, ChannelHandlerContext> CHANNEL_MAP = new ConcurrentHashMap<>(); /** * 当有客户端连接到服务器时,此方法会被触发。 * 它会记录客户端的 IP 地址、端口号以及连接的 ChannelId,并将该连接添加到 CHANNEL_MAP 中。 * 如果连接已经存在于 CHANNEL_MAP 中,会打印相应的日志信息;如果不存在,则添加到映射中并记录连接信息。 * * @param ctx 通道处理器上下文,包含了通道的信息和操作通道的方法 */ @Override public void channelActive(ChannelHandlerContext ctx) { // 获取客户端的网络地址信息 InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); // 获取客户端的 IP 地址 String clientIp = insocket.getAddress().getHostAddress(); // 获取客户端的端口号 int clientPort = insocket.getPort(); // 获取连接通道的唯一标识 ChannelId channelId = ctx.channel().id(); // 如果该连接通道已经在映射中,打印连接状态信息 if (CHANNEL_MAP.containsKey(channelId)) { log.info(\"客户端【\" + channelId + \"】是连接状态,连接通道数量: \" + CHANNEL_MAP.size()); } else { // 将新的连接添加到映射中 CHANNEL_MAP.put(channelId, ctx); log.info(\"客户端【\" + channelId + \"】连接 netty 服务器[IP:\" + clientIp + \"--->PORT:\" + clientPort + \"]\"); log.info(\"连接通道数量: \" + CHANNEL_MAP.size()); } } /** * 当有客户端终止连接服务器时,此方法会被触发。 * 它会从 CHANNEL_MAP 中移除该客户端的连接信息,并打印相应的退出信息和更新后的连接通道数量。 * 首先检查该连接是否存在于 CHANNEL_MAP 中,如果存在则进行移除操作。 * * @param ctx 通道处理器上下文 */ @Override public void channelInactive(ChannelHandlerContext ctx) { InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIp = insocket.getAddress().getHostAddress(); ChannelId channelId = ctx.channel().id(); // 检查映射中是否包含该客户端连接 if (CHANNEL_MAP.containsKey(channelId)) { // 从映射中移除连接 CHANNEL_MAP.remove(channelId); log.info(\"客户端【\" + channelId + \"】退出 netty 服务器[IP:\" + clientIp + \"--->PORT:\" + insocket.getPort() + \"]\"); log.info(\"连接通道数量: \" + CHANNEL_MAP.size()); } } /** * 当有客户端向服务器发送消息时,此方法会被触发。 * 它会打印接收到的客户端消息,并调用 channelWrite 方法将消息返回给客户端。 * 首先会打印接收到客户端报文的日志信息,然后调用 channelWrite 方法进行响应。 * * @param ctx 通道处理器上下文 * @param msg 从客户端接收到的消息对象 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { // 将信息返回给客户端 ctx.writeAndFlush(msg); } /** * 当触发用户事件时,此方法会被调用,主要用于处理空闲状态事件。 * 根据不同的空闲状态(读、写、总超时)进行相应的处理,如断开连接。 * 首先检查触发的事件是否是 IdleStateEvent,如果是则判断具体的空闲状态并进行相应处理。 * * @param ctx 通道处理器上下文 * @param evt 触发的事件对象 */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { String socketString = ctx.channel().remoteAddress().toString(); if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.READER_IDLE) { log.info(\"Client: \" + socketString + \" READER_IDLE 读超时\"); // 读超时,断开连接 ctx.disconnect(); } else if (event.state() == IdleState.WRITER_IDLE) { log.info(\"Client: \" + socketString + \" WRITER_IDLE 写超时\"); // 写超时,断开连接 ctx.disconnect(); } else if (event.state() == IdleState.ALL_IDLE) { log.info(\"Client: \" + socketString + \" ALL_IDLE 总超时\"); // 总超时,断开连接 ctx.disconnect(); } } } /** * 当发生异常时,此方法会被触发。 * 它会关闭通道并打印相应的错误信息,同时打印当前的连接通道数量。 * 异常发生时,会关闭通道以防止资源泄漏,并且打印异常发生的通道信息和当前的连接数量。 * * @param ctx 通道处理器上下文 * @param cause 引发异常的原因 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { ctx.close(); log.info(ctx.channel().id() + \" 发生了错误,此连接被关闭\" + \"此时连通数量: \" + CHANNEL_MAP.size()); }}

5、 WebSocketServer 类,继承自 AbstractNettyServer,基于 Netty 框架实现的 WebSocket 服务器类。

import io.netty.channel.socket.SocketChannel;import io.netty.handler.codec.http.HttpObjectAggregator;import io.netty.handler.codec.http.HttpServerCodec;import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Value;import org.springframework.stereotype.Component;/** * WebSocketServer 类,继承自 AbstractNettyServer,基于 Netty 框架实现的 WebSocket 服务器类。 * 它负责从配置文件中获取 WebSocket 服务器的配置信息(主机地址和端口范围)。 * * @author chenlei */@Slf4j@Componentpublic class WebSocketServer extends AbstractNettyServer { /** * WebSocket 服务器的主机地址 */ @Value(\"${websocket.host}\") private String host; /** * WebSocket 服务器的端口范围,多个端口使用逗号分隔。 */ @Value(\"${websocket.port}\") private int port; /** * 获取 WebSocket 服务器的主机地址。 * * @return 从配置文件中获取的 WebSocket 服务器的主机地址。 */ @Override protected String getHost() { return host; } /** * 获取 WebSocket 服务器的端口范围。 * * @return 从配置文件中获取的 WebSocket 服务器的端口范围。 */ @Override protected int getPort() { return port; } /** * 初始化 WebSocket 服务器的通道处理器。 * 方法会在服务器启动时为每个新建立的 SocketChannel 配置处理器链,以处理 WebSocket 通信的各个阶段。 * * @param ch 新建立的 SocketChannel,用于与客户端进行通信。 */ @Override protected void initChannelHandlers(SocketChannel ch) { // 在通道的处理器链中添加 HttpServerCodec,用于处理 HTTP 协议的编解码 ch.pipeline().addLast(new HttpServerCodec()); // 添加 HttpObjectAggregator,用于将 HTTP 消息聚合成完整的请求或响应 ch.pipeline().addLast(new HttpObjectAggregator(65536)); // 添加 WebSocketServerProtocolHandler,用于处理 WebSocket 的握手、控制帧等 ch.pipeline().addLast(new WebSocketServerProtocolHandler(\"/websocket\")); // 添加自定义的处理器,用于处理 WebSocket 数据 ch.pipeline().addLast(new WebSocketHandler()); }}

6、 WebSocketHandler 类,继承自 SimpleChannelInboundHandler,用于处理 WebSocket 文本消息

import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;import lombok.extern.slf4j.Slf4j;/** * WebSocketHandler 类,继承自 SimpleChannelInboundHandler,用于处理 WebSocket 文本消息 * * @author chenlei */@Slf4jpublic class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { /** * 处理接收到的消息 * * @param ctx 通道处理上下文,包含了与通道相关的信息,如通道本身、管道等 * @param msg 接收到的 TextWebSocketFrame 消息,包含了客户端发送的文本消息内容 */ @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) { String message = msg.text(); // 创建一个新的 TextWebSocketFrame 来存储要发送回客户端的消息 TextWebSocketFrame responseFrame = new TextWebSocketFrame(message); // 将消息发送回客户端 ctx.channel().writeAndFlush(responseFrame); } /** * 记录客户端的连接信息 * * @param ctx 通道处理上下文 */ @Override public void channelActive(ChannelHandlerContext ctx) { // 当有新的 WebSocket 客户端连接时记录日志 log.info(\"一个新的 WebSocket 客户端已连接: {}\", ctx.channel().remoteAddress()); } /** * 当 WebSocket 客户端断开连接时调用此方法 * * @param ctx 通道处理上下文 */ @Override public void channelInactive(ChannelHandlerContext ctx) { // 当 WebSocket 客户端断开连接时记录日志 log.info(\"WebSocket 客户端已断开连接: {}\", ctx.channel().remoteAddress()); } /** * 当处理 WebSocket 连接过程中发生异常时调用此方法 * 该方法主要负责记录异常信息,并关闭连接以防止异常扩散影响其他客户端 * * @param ctx 通道处理上下文 * @param cause 引发异常的原因,包含了异常的详细信息 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // 当处理过程中发生异常时记录日志 log.error(\"WebSocket 连接中发生错误: {}\", cause.getMessage()); // 关闭连接以避免异常扩散影响其他客户端 ctx.close(); }}