> 文档中心 > Netty实战:Netty+protobuf客户端(附源码)

Netty实战:Netty+protobuf客户端(附源码)


Springboot-cli 开发脚手架系列

Netty系列:Springboot使用Netty集成protobuf开发客户端


文章目录

  • Springboot-cli 开发脚手架系列
  • 前言
    • 1. 下载protoc.exe
    • 2. 编写.proto文件
    • 3. 生成.java 的协议包
    • 4. 引入协议文件
    • 5. 编写http测试接口
    • 6. 效果演示
    • 6. 源码分享

前言

首先我们需要使用Netty搭建基础的tcp客户端框架,参考Springboot使用Netty优雅的创建TCP客户端(附源码下载),接下来我们开始集成protubuf。
本博客项目源码地址:

  • 项目源码github地址
  • 项目源码国内gitee地址

1. 下载protoc.exe

官网下载地址

2. 编写.proto文件

编写规则参考官网中文文档
创建message.proto
注意,这里的协议文件需和服务器的协议保持一致,否则服务器是无法解码的

// 声明使用proto3syntax = "proto3";// 包名option java_package = "com.netty.client.procotol";// 类名option java_outer_classname = "MessageBuf"; // 消息整合,便于netty导入编码解码器message Message {  // 包类型  PackType packType = 1;  // 根据包类型多选1  oneof Pack{      LoginRequest loginRequest = 2;  LoginResponse loginResponse = 3;  MessageRequest messageRequest = 4;  MessageResponse messageResponse = 5;  }  // 包类型枚举  enum PackType {     LOGIN_REQ = 0; LOGIN_RESP = 1; MESSAGE_REQ = 2; MESSAGE_RESP = 3;  }} // 登录请求,包含用户名message LoginRequest {  string username = 1;  string password = 2;}// 登录响应message LoginResponse {  int32 code = 1;  string message = 2;} // 消息请求message MessageRequest {  int32 messageId = 1;  int32 type = 2;  string data = 3;}// 消息响应message MessageResponse {  int32 messageId = 1;  int32 code = 2;  string message = 3;}

3. 生成.java 的协议包

找到第一步下载的protoc.exe,在protoc.exe同目录下执行

# --java_out 输出路径、message.proto 要执行的文件protoc.exe --java_out=E:\lqd\protoc-3.20.1-rc-1-win64\bin message.proto

Netty实战:Netty+protobuf客户端(附源码)

4. 引入协议文件

这里的项目开头有介绍搭建教程和源码下载地址,点开连接根据教程搭建基础框架即可,非常简单,这里就不重复搭建了。

  • 在我们tcp项目的基础上加入protobuf依赖
    pom.xml
 <dependency>     <groupId>com.google.protobuf</groupId>     <artifactId>protobuf-java</artifactId>     <version>3.20.1-rc-1</version> </dependency>

复制第三步生成的.java文件到我们的tcp项目中,把netty的编码器换成我们第三步生成的协议包即可。
Netty实战:Netty+protobuf客户端(附源码)

  • 我们修改channel包下的ChannelInit .java,编码解码器改为Protubuf
@Component@RequiredArgsConstructorpublic class ChannelInit extends ChannelInitializer<SocketChannel> {    private final MessageHandler messageHandler;    @Override    protected void initChannel(SocketChannel channel) { channel.pipeline()  // 每隔5s的时间触发一次userEventTriggered的方法,并且指定IdleState的状态位是WRITER_IDLE,事件触发给服务器发送ping消息  .addLast("idle", new IdleStateHandler(0, 60, 0, TimeUnit.SECONDS)) // 添加编码解码器  .addLast(new ProtobufVarint32FrameDecoder())  // 此处引用我们生成的协议类MessageBuf  .addLast(new ProtobufDecoder(MessageBuf.Message.getDefaultInstance()))  .addLast(new ProtobufVarint32LengthFieldPrepender())  .addLast(new ProtobufEncoder())  // 添加消息处理器  .addLast("messageHandler", messageHandler);    }}
  • 新增消息构建器,方便我们构建消息体,MessageBuilder .java
/** * protobuf 消息构建 * * @author qiding */public class MessageBuilder {    /**     * 登录请求     *     * @param username 用户名     * @param password 密码     */    public static MessageBuf.Message.Builder newLogin(String username, String password) { MessageBuf.LoginRequest.Builder loginMes = MessageBuf.LoginRequest.newBuilder().setUsername(username).setPassword(password); return MessageBuf.Message.newBuilder().setLoginRequest(loginMes).setPackType(MessageBuf.Message.PackType.LOGIN_REQ);    }    /**     * 登录响应     *     * @param msg  提示消息     * @param code 错误码     */    public static MessageBuf.Message.Builder newLoginResp(String msg, Integer code) { MessageBuf.LoginResponse.Builder loginResp = MessageBuf.LoginResponse.newBuilder().setMessage(msg).setCode(code); return MessageBuf.Message.newBuilder().setLoginResponse(loginResp).setPackType(MessageBuf.Message.PackType.LOGIN_RESP);    }    /**     * 业务消息请求     *     * @param msgId 消息id 请求的id和响应的id需一致,便于判断服务器响应的是哪个业务请求     * @param data  请求内容     * @param type  业务类型     */    public static MessageBuf.Message.Builder newMessageReq(Integer msgId, String data, Integer type) { MessageBuf.MessageRequest.Builder messageReq = MessageBuf.MessageRequest.newBuilder().setMessageId(msgId).setData(data).setType(type); return MessageBuf.Message.newBuilder().setMessageRequest(messageReq).setPackType(MessageBuf.Message.PackType.MESSAGE_REQ);    }    /**     * 业务消息响应     *     * @param msgId 消息id 请求的id和响应的id需一致,便于判断服务器响应的是哪个业务请求     * @param msg   提示消息     * @param code  错误码     */    public static MessageBuf.Message.Builder newMessageResp(Integer msgId, String msg, Integer code) { MessageBuf.MessageResponse.Builder messageResp = MessageBuf.MessageResponse.newBuilder().setMessageId(msgId).setMessage(msg).setCode(code); return MessageBuf.Message.newBuilder().setMessageResponse(messageResp).setPackType(MessageBuf.Message.PackType.MESSAGE_RESP);    }}
  • 修改handle包下的消息处理器,MessageHandler .java
/** * 消息处理,单例启动 * * @author qiding */@Slf4j@Component@ChannelHandler.Sharable@RequiredArgsConstructorpublic class MessageHandler extends SimpleChannelInboundHandler<MessageBuf.Message> {    @Override    protected void channelRead0(ChannelHandlerContext ctx, MessageBuf.Message message) throws Exception { log.debug("\n"); log.debug("channelId:" + ctx.channel().id()); log.debug("消息类型:{}", message.getPackType().name()); switch (message.getPackType()) {     case LOGIN_RESP:  log.debug("收到登录回复\n{}", message.getLoginResponse());  // 回复客户端  break;     case MESSAGE_RESP:  log.debug("收到消息回复\n{}", message.getMessageResponse());  // 回复客户端  break;     default:  log.error("不支持的消息类型"); }    }    @Override    public void channelInactive(ChannelHandlerContext ctx) { log.debug("\n"); log.debug("开始连接");    }    @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception { log.debug("\n"); log.debug("成功建立连接,channelId:{}", ctx.channel().id()); super.channelActive(ctx);    }    @Override    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { log.debug("心跳事件时触发"); if (evt instanceof IdleStateEvent) {    // 发送心跳包给服务器 } else {     super.userEventTriggered(ctx, evt); }    }}

5. 编写http测试接口

  • 加入web依赖
 <dependency>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-web</artifactId></dependency>

新增HttpProtobufApi.java,用于线上测试

** * 模拟发送api * * @author qiding */@RequiredArgsConstructor@RestController@Slf4jpublic class HttpProtobufApi {    private final TcpClient tcpClient;    /**     * 消息发布     */    @GetMapping("/login")    public String send(String username, String password) { tcpClient.getSocketChannel().writeAndFlush(MessageBuilder.newLogin(username, password)); return "发送成功";    }    /**     * 消息发布     */    @GetMapping("/send")    public String send(Integer msgId, String data, Integer type) { tcpClient.getSocketChannel().writeAndFlush(MessageBuilder.newMessageReq(msgId, data, type)); return "发送成功";    }    /**     * json消息发布     */    @PostMapping("/send/json")    public String send(@RequestBody JSONObject body) { Integer msgId = body.getInteger("msgId"); JSONObject data = body.getJSONObject("data"); Integer type = body.getInteger("type"); tcpClient.getSocketChannel().writeAndFlush(MessageBuilder.newMessageReq(msgId, data.toJSONString(), type)); return "发送成功";    }    /**     * 连接服务器     */    @GetMapping("connect")    public String connect(String ip, Integer port) throws Exception { tcpClient.connect(ip, port); return "重启指令发送成功";    }    /**     * 重连     */    @GetMapping("reconnect")    public String reconnect() throws Exception { tcpClient.reconnect(); return "重启指令发送成功";    }}
  • 接口列表
  1. 登录
    /login?username=admin&password=123456
  2. 发送消息
    /send?msgId=1&type=1&data=hello
  3. 连接
    /connect?ip=192.168.0.99&port=20000
  4. 重连
    /reconnect
  5. 发送json
Request URL:  http://localhost:9999/send/jsonRequest Method: POSTRequest Headers:{   "Content-Type":"application/json"}Request Body:{   "msgId": 1,   "type": 1,   "data": {     "message":"hello"    }}

6. 效果演示

这里需要配合服务器进行测试
参考Springboot使用Netty集成protobuf开发高性能服务器 (附源码下载)

调用http接口进行测试
Netty实战:Netty+protobuf客户端(附源码)

Netty实战:Netty+protobuf客户端(附源码)

6. 源码分享

  • Springboot-cli开发脚手架,集合各种常用框架使用案例,完善的文档,致力于让开发者快速搭建基础环境并让应用跑起来。
  • 项目源码github地址
  • 项目源码国内gitee地址