Netty实战:优雅的创建TCP客户端(附源码)
文章目录
前言
Springboot使用Netty优雅、快速的创建TCP客户端。
本博客项目源码地址:
- 项目源码github地址
- 项目源码国内gitee地址
1. 前置准备
pom.xml
依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.75.Final</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.80</version> </dependency>
- 客户端配置
application.yml
server: port: 10000spring: application: name: tcp-client# tcpnetty: # 客户端监听端口 client-port: 19999 # 默认连接的服务器ip server-ip: 127.0.0.1 # 默认连接的服务器端口 server-port: 20000# 日记配置logging: level: com.netty: debug
- 导入我们自定义的配置
/** * 读取YML中的服务配置 * * @author ding */@Configuration@ConfigurationProperties(prefix = ClientProperties.PREFIX)@Datapublic class ClientProperties { public static final String PREFIX = "netty"; /** * 客户端ip */ private Integer clientPort; /** * 默认连接的服务器ip */ private String serverIp; /** * 默认连接的服务器端口 */ private Integer serverPort;}
2. 消息处理器
MessageHandler.java
import io.netty.channel.ChannelHandler;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;import io.netty.handler.codec.mqtt.MqttMessage;import io.netty.handler.timeout.IdleStateEvent;import lombok.RequiredArgsConstructor;import lombok.extern.slf4j.Slf4j;import org.springframework.stereotype.Component;/** * 消息处理,单例启动 * * @author qiding */@Slf4j@Component@ChannelHandler.Sharable@RequiredArgsConstructorpublic class MessageHandler extends SimpleChannelInboundHandler<String> { @Override protected void channelRead0(ChannelHandlerContext ctx, String message) throws Exception { log.debug("\n"); log.debug("channelId:" + ctx.channel().id()); } @Override public void channelInactive(ChannelHandlerContext ctx) { log.debug("\n"); log.debug("开始连接"); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.debug("\n"); log.info("成功建立连接,channelId:{}", ctx.channel().id()); super.channelActive(ctx); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { log.info("心跳事件时触发"); if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; // 当我们长时间没有给服务器发消息时,发送ping消息,告诉服务器我们还活跃 if (event.state().equals(IdleState.WRITER_IDLE)) { log.debug("发送心跳"); ctx.writeAndFlush("ping"); } } else { super.userEventTriggered(ctx, evt); } }}
3. 重写通道初始化类
添加我们需要的解码器,这里添加了String解码器和编码器
ChannelInit.java
import com.netty.server.handler.MessageHandler;import io.netty.channel.ChannelInitializer;import io.netty.channel.socket.SocketChannel;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;import io.netty.handler.timeout.IdleStateHandler;import lombok.RequiredArgsConstructor;import org.springframework.stereotype.Component;import java.util.concurrent.TimeUnit;/** * Netty 通道初始化 * * @author qiding */@Component@RequiredArgsConstructorpublic class ChannelInit extends ChannelInitializer<SocketChannel> { private final MessageHandler messageHandler; @Override protected void initChannel(SocketChannel channel) { channel.pipeline() // 每隔60s的时间触发一次userEventTriggered的方法,并且指定IdleState的状态位是WRITER_IDLE,事件触发给服务器发送ping消息 .addLast("idle", new IdleStateHandler(0, 60, 0, TimeUnit.SECONDS)) // 添加解码器 .addLast(new StringDecoder()) // 添加编码器 .addLast(new StringEncoder()) // 添加消息处理器 .addLast("messageHandler", messageHandler); }}
4. 核心服务
- 接口
ITcpClient.java
/** * 启动 Client * * @author qiding */@Component@Slf4j@RequiredArgsConstructorpublic class TcpClient implements ITcpClient { private final ChannelInit channelInit; private final ClientProperties clientProperties; private static final EventLoopGroup WORKER_GROUP = new NioEventLoopGroup(); private SocketChannel socketChannel; private Bootstrap bootstrap; /** * 记录当前连接的服务器ip(用于重连) */ public String connectedIp; /** * 记录当前连接的服务器端口(用于重连) */ public Integer connectedPort; @Override public void start() throws Exception { log.info("初始化 Client ..."); this.tcpClient(); } @Override public void reconnect() throws Exception { if (socketChannel != null && socketChannel.isActive()) { socketChannel.close(); this.connect(clientProperties.getServerIp(), clientProperties.getServerPort()); } } public void disconnect() { if (socketChannel != null && socketChannel.isActive()) { socketChannel.close(); } } /** * Client初始化 */ private void tcpClient() { try { bootstrap = new Bootstrap() .remoteAddress("127.0.0.1", clientProperties.getClientPort()) .handler(channelInit) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true); bootstrap.group(WORKER_GROUP); this.connect(clientProperties.getServerIp(), clientProperties.getServerPort()); } catch (Exception e) { e.printStackTrace(); WORKER_GROUP.shutdownGracefully(); } } /** * 连接服务器 */ public void connect(String ip, Integer port) throws InterruptedException { this.disconnect(); this.connectedIp = ip; this.connectedPort = port; ChannelFuture future = bootstrap.connect(connectedIp, connectedPort).sync(); if (future.isSuccess()) { socketChannel = (SocketChannel) future.channel(); log.info("connect server success"); } } /** * 销毁 */ @PreDestroy @Override public void destroy() { WORKER_GROUP.shutdownGracefully(); socketChannel.closeFuture(); } /** * 获取频道 */ public SocketChannel getSocketChannel() { return this.socketChannel; }}
5. 效果预览
- 主启动类添加启动方法
/** * @author qiding */@SpringBootApplication@RequiredArgsConstructorpublic class NettyClientApplication implements ApplicationRunner { private final TcpClient tcpClient; public static void main(String[] args) { SpringApplication.run(NettyClientApplication.class, args); } @Override public void run(ApplicationArguments args) throws Exception { tcpClient.start(); }}
- 运行
6. 编写http测试接口
为了方便测试,我们编写几个http接口进行测试
HttpApi.java
import com.alibaba.fastjson.JSONObject;import com.netty.client.server.TcpClient;import lombok.RequiredArgsConstructor;import lombok.extern.slf4j.Slf4j;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.PostMapping;import org.springframework.web.bind.annotation.RequestBody;import org.springframework.web.bind.annotation.RestController;/** * 模拟发送api * * @author qiding */@RequiredArgsConstructor@RestController@Slf4jpublic class HttpApi { private final TcpClient tcpClient; /** * 消息发布 */ @GetMapping("/send") public String send(String message) { tcpClient.getSocketChannel().writeAndFlush(message); return "发送成功"; } /** * 消息发布 */ @PostMapping("/send/json") public String send(@RequestBody JSONObject body) { tcpClient.getSocketChannel().writeAndFlush(body.toJSONString()); return "发送成功"; } /** * 连接 */ @GetMapping("connect") public String connect(String ip, Integer port) throws Exception { tcpClient.connect(ip, port); return "重启指令发送成功"; } /** * 重连 */ @GetMapping("reconnect") public String reconnect() throws Exception { tcpClient.reconnect(); return "重启指令发送成功"; }}
http端口为我们开头yml配置的端口,默认9999,建议使用postman或apifox进行http调用
-
postman示例,http调用发送接口,向服务器发送消息
-
或直接浏览器输入
http://localhost:9999/send?message=hello
7. 接口列表
- 消息发送
http://localhost:10000/send?message=hello
- 连接服务器
http://localhost:10000/connect?ip=127.0.0.1&port=20000
- 重连
http://localhost:10000/reconnect?ip=127.0.0.1&port=20000
- json消息发送示例
http://localhost:9999/send/json
Request URL: http://localhost:9999/send/jsonRequest Method: POSTRequest Headers:{ "Content-Type":"application/json"}Request Body:{ "test": "hello"}
8. 源码分享
- Springboot-cli开发脚手架,集合各种常用框架使用案例,完善的文档,致力于让开发者快速搭建基础环境并让应用跑起来。
- 项目源码国内gitee地址
- 项目源码github地址