> 文档中心 > spring Boot 中搭建neety框架 亲测有效

spring Boot 中搭建neety框架 亲测有效

第一步:

在maven 中添加 neety 中的jar 包

   io.netty   netty-all  5.0.0.Alpha2

第二步

  代码

 netty 启动类

package com.jeesite.modules.websocket.netty;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;import lombok.Data;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import java.nio.charset.Charset;/** * @Author:lichunhua * @Date: 2021-12-17 12:00 */@Data@Slf4j@Componentpublic class NettyServer {    private EventLoopGroup boosGroup = new NioEventLoopGroup();    private EventLoopGroup workerGroup = new NioEventLoopGroup();    private Channel channel;    @Autowired    private ChannelInitializer initializer;//    public ChannelFuture init(int port) {// ChannelFuture f=null;// try {//     //用于启动NIO服务端的辅助启动类,目的是降低服务端的开发复杂度//     ServerBootstrap b = new ServerBootstrap();//     b.group(boosGroup, workerGroup)//      //对应JDK NIO类库中的ServerSocketChannel//      .channel(NioServerSocketChannel.class)//      //配置NioServerSocketChannel的TCP参数//      .option(ChannelOption.SO_BACKLOG, 1024)//      //绑定I/O的事件处理类//      .childHandler(initializer);//     //调用它的bind操作监听端口号,调用同步阻塞方法sync等待绑定操作完成//     f = b.bind(port).sync();//     channel = f.channel();// } catch (InterruptedException e) {//     log.error(e.getMessage());// }// return f;//    }public ChannelFuture bind(int port) throws InterruptedException{    EventLoopGroup bossGruop=new NioEventLoopGroup();//用于服务器端接受客户端的连接    EventLoopGroup workGroup=new NioEventLoopGroup();//用于网络事件的处理    ChannelFuture f=null;    try    { ServerBootstrap b=new ServerBootstrap(); b.group(bossGruop, workGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer() {     @Override     protected void initChannel(SocketChannel arg0) throws Exception     {  arg0.pipeline().addLast(new MyServerHandler());  arg0.pipeline().addLast(new StringEncoder(Charset.forName("GBK")));  arg0.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));     } }).option(ChannelOption.SO_BACKLOG, 9999);//指定此套接口排队的最大连接个数  f=b.bind(port).sync(); //监听端口号 f.channel().closeFuture().sync(); return f;    }    finally    { bossGruop.shutdownGracefully(); workGroup.shutdownGracefully();    }}    public void close() { if (null == channel) {     return; } channel.close(); boosGroup.shutdownGracefully(); workerGroup.shutdownGracefully();    }}

第二步:

  事件处理类

package com.jeesite.modules.websocket.netty;import io.netty.channel.ChannelInitializer;import io.netty.channel.socket.SocketChannel;import io.netty.handler.codec.LineBasedFrameDecoder;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;import io.netty.util.CharsetUtil;import lombok.extern.slf4j.Slf4j;import org.springframework.stereotype.Component;/** * @Author:lichunhua * @Date: 2021-12-17 13:36 */@Slf4j@Componentpublic class MyServerChannelInitializer extends ChannelInitializer {    @Override    protected void initChannel(SocketChannel socketChannel) throws Exception { log.info("=========有客户端连接服务器========="); log.info("ip:" + socketChannel.localAddress().getHostString() + "port:" + socketChannel.localAddress().getPort()); // 基于换行符号 socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024)); // 解码转String,注意调整自己的编码格式GBK、UTF-8 socketChannel.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8)); // 解码转String,注意调整自己的编码格式GBK、UTF-8 socketChannel.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8)); socketChannel.pipeline().addLast(new MyServerHandler());    }}

  第三步

  事件操作类

  

package com.jeesite.modules.websocket.netty;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.handler.codec.string.StringDecoder;import io.netty.util.CharsetUtil;import lombok.extern.slf4j.Slf4j;import java.io.UnsupportedEncodingException;import java.nio.charset.Charset;import java.text.SimpleDateFormat;import java.util.Date;/** * @Author:lichunhua * @Date: 2021-12-17 13:37 */@Slf4jpublic class MyServerHandler extends ChannelInboundHandlerAdapter {    /**     * 当客户端主动连接服务端,通道活跃后触发     *     * @param ctx     * @throws Exception     */    @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception { //在接收到客户端连接的时候通知客户端连接成功 String msg = "与服务端建立连接成功" + new Date(); ByteBuf buf = Unpooled.buffer(msg.getBytes().length); ctx.pipeline().read(); //ctx.pipeline().fireChannelRead("9999999"); buf.writeBytes(msg.getBytes(CharsetUtil.UTF_8));    }    //    @Override//    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {//// channelRead(ctx); if(1==1){     System.out.println("输出数据"); }// ctx.channel().read();// ctx.fireChannelReadComplete(); log.info(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + "接收到消息:"); log.info(msg.toString()); ctx.writeAndFlush(msg);//    }    @Override    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8"))); ctx.flush();    }    /**     * 通道有消息触发     *     * @param ctx     * @param msg     * @throws Exception     */    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {// if(1==1){//     System.out.println("通道有消息");// } ctx.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8"))); log.info(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + "接收到消息:"); log.info(msg.toString()); ByteBuf buf = (ByteBuf) msg; System.out.println("输出方法后的数据"+ convertByteBufToString(buf) ); byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req,"GBK"); String name = new String(body.getBytes("ISO8859-1"), "UTF-8"); System.out.println("输出数据" + name); //输出数据 System.out.println("输出字节数据" +getSendByteBuf(name)); System.out.println("接收发送消息" + body); ctx.writeAndFlush(msg);    }    /**     * 当客户端主动断开连接,通道不活跃触发     *     * @param ctx     * @throws Exception     */    @Override    public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.info("===================客户端:" + ctx.channel().localAddress().toString() + " 断开连接===================");    }    /**     * 当连接发生异常时触发     *     * @param ctx     * @param cause     * @throws Exception     */    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //在发生异常时主动关掉连接 ctx.close(); log.error("发现异常:\r\n" + cause.toString());    }    private ByteBuf getSendByteBuf(String message) throws UnsupportedEncodingException { byte[] req = message.getBytes("UTF-8"); ByteBuf pingMessage = Unpooled.buffer(); pingMessage.writeBytes(req); return pingMessage;    }    public String convertByteBufToString(ByteBuf buf) { String str; if (buf.hasArray()) { // 处理堆缓冲区     str = new String(buf.array(), buf.arrayOffset() + buf.readerIndex(), buf.readableBytes()); } else { // 处理直接缓冲区以及复合缓冲区     byte[] bytes = new byte[buf.readableBytes()];     buf.getBytes(buf.readerIndex(), bytes);     str = new String(bytes, 0, buf.readableBytes()); } return str;    }}

第三步

事件启动类

 

package com.jeesite.modules.websocket.netty;import io.netty.channel.ChannelFuture;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Value;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;/** * @Author:lichunhua * @Date: 2021-12-17 13:50 */@Slf4j@RestController@RequestMapping("/netty")public class NettyController {    @Value("${netty.port}")    private int port;    @Autowired    private NettyServer nettyServer;    @RequestMapping("/localAddress")    public String localAddress() { return "tcp服务端地址:" + nettyServer.getChannel().localAddress();    }    @RequestMapping("/isOpen")    public String isOpen() throws InterruptedException { log.info("启动tcp无法开始=================0==========="); log.info("启动tcp无法开始=================端口==========="+port); ChannelFuture init = nettyServer.bind(port); log.info("已经执行这段代码的结束=======1========="); Runtime.getRuntime().addShutdownHook(new Thread(() -> nettyServer.close())); log.info("已经执行这段代码的结束========2========"); init.channel().closeFuture().syncUninterruptibly(); log.info("已经执行这段代码的结束========3========"); if (nettyServer.getChannel().isOpen()){     return "tcp服务已启动"; }else {     return "tcp服务未启动"; }    }    @RequestMapping("/close")    public String close() { if (nettyServer.getChannel().isOpen()) {     nettyServer.close(); } return "服务端关闭成功";    }}

第四步

 在application.yml 添加此配置

netty:  port: 1100

如果你想在项目启动就时候启动 neety 不手动启动的话 如下代码中 加入启动类

   

@SpringBootApplicationpublic class NettyApplication implements CommandLineRunner {    public static void main(String[] args) { SpringApplication.run(NettyApplication.class, args);    }    @Value("${netty.port}")    private int port;    @Autowired    private NettyServer nettyServer;    @Override    public void run(String... args) throws Exception { ChannelFuture init = nettyServer.init(port); Runtime.getRuntime().addShutdownHook(new Thread(() -> nettyServer.close())); init.channel().closeFuture().syncUninterruptibly();    }}