> 文档中心 > Netty实战:优雅的创建TCP客户端(附源码)

Netty实战:优雅的创建TCP客户端(附源码)

文章目录

  • 前言
    • 1. 前置准备
    • 2. 消息处理器
    • 3. 重写通道初始化类
    • 4. 核心服务
    • 5. 效果预览
    • 6. 编写http测试接口
    • 7. 接口列表
    • 8. 源码分享

前言

Springboot使用Netty优雅、快速的创建TCP客户端。
本博客项目源码地址:

  • 项目源码github地址
  • 项目源码国内gitee地址

1. 前置准备

  • pom.xml 依赖
<dependency>     <groupId>org.springframework.boot</groupId>     <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency>     <groupId>org.springframework.boot</groupId>     <artifactId>spring-boot-starter</artifactId> </dependency> <dependency>     <groupId>org.projectlombok</groupId>     <artifactId>lombok</artifactId>     <optional>true</optional> </dependency>  <dependency>     <groupId>io.netty</groupId>     <artifactId>netty-all</artifactId>     <version>4.1.75.Final</version> </dependency>  <dependency>     <groupId>com.alibaba</groupId>     <artifactId>fastjson</artifactId>     <version>1.2.80</version> </dependency>
  • 客户端配置application.yml
server:  port: 10000spring:  application:    name: tcp-client# tcpnetty:  # 客户端监听端口  client-port: 19999  # 默认连接的服务器ip  server-ip: 127.0.0.1  # 默认连接的服务器端口  server-port: 20000# 日记配置logging:  level:    com.netty: debug
  • 导入我们自定义的配置
/** * 读取YML中的服务配置 * * @author ding */@Configuration@ConfigurationProperties(prefix = ClientProperties.PREFIX)@Datapublic class ClientProperties {    public static final String PREFIX = "netty";    /**     * 客户端ip     */    private Integer clientPort;    /**     * 默认连接的服务器ip     */    private String serverIp;    /**     * 默认连接的服务器端口     */    private Integer serverPort;}

2. 消息处理器

MessageHandler.java

import io.netty.channel.ChannelHandler;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;import io.netty.handler.codec.mqtt.MqttMessage;import io.netty.handler.timeout.IdleStateEvent;import lombok.RequiredArgsConstructor;import lombok.extern.slf4j.Slf4j;import org.springframework.stereotype.Component;/** * 消息处理,单例启动 * * @author qiding */@Slf4j@Component@ChannelHandler.Sharable@RequiredArgsConstructorpublic class MessageHandler extends SimpleChannelInboundHandler<String> {    @Override    protected void channelRead0(ChannelHandlerContext ctx, String message) throws Exception { log.debug("\n"); log.debug("channelId:" + ctx.channel().id());    }    @Override    public void channelInactive(ChannelHandlerContext ctx) { log.debug("\n"); log.debug("开始连接");    }    @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception { log.debug("\n"); log.info("成功建立连接,channelId:{}", ctx.channel().id()); super.channelActive(ctx);    }    @Override    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { log.info("心跳事件时触发"); if (evt instanceof IdleStateEvent) {     IdleStateEvent event = (IdleStateEvent) evt;     // 当我们长时间没有给服务器发消息时,发送ping消息,告诉服务器我们还活跃     if (event.state().equals(IdleState.WRITER_IDLE)) {  log.debug("发送心跳");  ctx.writeAndFlush("ping");     } } else {     super.userEventTriggered(ctx, evt); }    }}

3. 重写通道初始化类

添加我们需要的解码器,这里添加了String解码器和编码器
ChannelInit.java

import com.netty.server.handler.MessageHandler;import io.netty.channel.ChannelInitializer;import io.netty.channel.socket.SocketChannel;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;import io.netty.handler.timeout.IdleStateHandler;import lombok.RequiredArgsConstructor;import org.springframework.stereotype.Component;import java.util.concurrent.TimeUnit;/** * Netty 通道初始化 * * @author qiding */@Component@RequiredArgsConstructorpublic class ChannelInit extends ChannelInitializer<SocketChannel> {    private final MessageHandler messageHandler;    @Override    protected void initChannel(SocketChannel channel) { channel.pipeline()  // 每隔60s的时间触发一次userEventTriggered的方法,并且指定IdleState的状态位是WRITER_IDLE,事件触发给服务器发送ping消息  .addLast("idle", new IdleStateHandler(0, 60, 0, TimeUnit.SECONDS))  // 添加解码器  .addLast(new StringDecoder())  // 添加编码器  .addLast(new StringEncoder())  // 添加消息处理器  .addLast("messageHandler", messageHandler);    }}

4. 核心服务

  • 接口
    ITcpClient.java
/** * 启动 Client * * @author qiding */@Component@Slf4j@RequiredArgsConstructorpublic class TcpClient implements ITcpClient {    private final ChannelInit channelInit;    private final ClientProperties clientProperties;    private static final EventLoopGroup WORKER_GROUP = new NioEventLoopGroup();    private SocketChannel socketChannel;    private Bootstrap bootstrap;    /**     * 记录当前连接的服务器ip(用于重连)     */    public  String connectedIp;    /**     * 记录当前连接的服务器端口(用于重连)     */    public  Integer connectedPort;    @Override    public void start() throws Exception { log.info("初始化 Client ..."); this.tcpClient();    }    @Override    public void reconnect() throws Exception { if (socketChannel != null && socketChannel.isActive()) {     socketChannel.close();     this.connect(clientProperties.getServerIp(), clientProperties.getServerPort()); }    }    public void disconnect() { if (socketChannel != null && socketChannel.isActive()) {     socketChannel.close(); }    }    /**     * Client初始化     */    private void tcpClient() { try {     bootstrap = new Bootstrap()      .remoteAddress("127.0.0.1", clientProperties.getClientPort())      .handler(channelInit)      .channel(NioSocketChannel.class)      .option(ChannelOption.TCP_NODELAY, true);     bootstrap.group(WORKER_GROUP);     this.connect(clientProperties.getServerIp(), clientProperties.getServerPort()); } catch (Exception e) {     e.printStackTrace();     WORKER_GROUP.shutdownGracefully(); }    }    /**     * 连接服务器     */    public void connect(String ip, Integer port) throws InterruptedException { this.disconnect(); this.connectedIp = ip; this.connectedPort = port; ChannelFuture future = bootstrap.connect(connectedIp, connectedPort).sync(); if (future.isSuccess()) {     socketChannel = (SocketChannel) future.channel();     log.info("connect server success"); }    }    /**     * 销毁     */    @PreDestroy    @Override    public void destroy() { WORKER_GROUP.shutdownGracefully(); socketChannel.closeFuture();    }    /**     * 获取频道     */    public SocketChannel getSocketChannel() { return this.socketChannel;    }}

5. 效果预览

  • 主启动类添加启动方法
/** * @author qiding */@SpringBootApplication@RequiredArgsConstructorpublic class NettyClientApplication implements ApplicationRunner {    private final TcpClient tcpClient;    public static void main(String[] args) { SpringApplication.run(NettyClientApplication.class, args);    }    @Override    public void run(ApplicationArguments args) throws Exception { tcpClient.start();    }}
  • 运行
    Netty实战:优雅的创建TCP客户端(附源码)

6. 编写http测试接口

为了方便测试,我们编写几个http接口进行测试
HttpApi.java

import com.alibaba.fastjson.JSONObject;import com.netty.client.server.TcpClient;import lombok.RequiredArgsConstructor;import lombok.extern.slf4j.Slf4j;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.PostMapping;import org.springframework.web.bind.annotation.RequestBody;import org.springframework.web.bind.annotation.RestController;/** * 模拟发送api * * @author qiding */@RequiredArgsConstructor@RestController@Slf4jpublic class HttpApi {    private final TcpClient tcpClient;    /**     * 消息发布     */    @GetMapping("/send")    public String send(String message) { tcpClient.getSocketChannel().writeAndFlush(message); return "发送成功";    } /**     * 消息发布     */    @PostMapping("/send/json")    public String send(@RequestBody JSONObject body) { tcpClient.getSocketChannel().writeAndFlush(body.toJSONString()); return "发送成功";    }    /**     * 连接     */    @GetMapping("connect")    public String connect(String ip, Integer port) throws Exception { tcpClient.connect(ip, port); return "重启指令发送成功";    }    /**     * 重连     */    @GetMapping("reconnect")    public String reconnect() throws Exception { tcpClient.reconnect(); return "重启指令发送成功";    }}

http端口为我们开头yml配置的端口,默认9999,建议使用postman或apifox进行http调用

  • postman示例,http调用发送接口,向服务器发送消息

  • 或直接浏览器输入
    http://localhost:9999/send?message=hello
    Netty实战:优雅的创建TCP客户端(附源码)

7. 接口列表

  1. 消息发送
    http://localhost:10000/send?message=hello
  2. 连接服务器
    http://localhost:10000/connect?ip=127.0.0.1&port=20000
  3. 重连
    http://localhost:10000/reconnect?ip=127.0.0.1&port=20000
  4. json消息发送示例
    http://localhost:9999/send/json
Request URL:  http://localhost:9999/send/jsonRequest Method: POSTRequest Headers:{   "Content-Type":"application/json"}Request Body:{   "test": "hello"}

8. 源码分享

  • Springboot-cli开发脚手架,集合各种常用框架使用案例,完善的文档,致力于让开发者快速搭建基础环境并让应用跑起来。
  • 项目源码国内gitee地址
  • 项目源码github地址