> 文档中心 > 高性能网络通信框架Netty一套就够(作者原创)

高性能网络通信框架Netty一套就够(作者原创)


个人简介

作者是一个来自河源的大三在校生,以下笔记都是作者自学之路的一些浅薄经验,如有错误请指正,将来会不断的完善笔记,帮助更多的Java爱好者入门。

文章目录

    • 个人简介
    • Netty
      • Netty入门
        • Netty著名项目
        • Netty的优势
        • Netty Maven
        • 第一个Netty应用
        • Netty组件
          • EventLoop
          • Channel
          • Future&Promise
          • Handler&Pipeline
          • ByteBuf
          • ByteBuf日志工具类
      • Netty进阶
        • 粘包和半包/拆包问题
        • 粘包和半包/拆包解决方案
          • 短连接
          • 定长解码器
          • 行解码器(推荐)
          • 自定义分隔符解码器
        • Netty协议解析
          • Redis协议
          • Http协议
        • 群聊

Netty

Netty入门

Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序

Netty著名项目

由Netty开发的开源框架:

  • dubbo
  • Zookeeper
  • RocketMQ

Netty的优势

  • 不需要自己构建协议,Netty自带了多种协议,例如HTTP协议
  • 解决了TCP传输问题,如粘包、半包
  • 解决了一个epoll空轮询的JDK bug。(作者遇到过),即selector的select方法默认是阻塞的,但是并没有阻塞会一直空轮询。
  • Netty对JDK的NIO API进行增强,如下:
    • ThreadLocal==>FastThreadLocal
    • ByteBuffer==>ByteBuf(重要),支持动态扩容,不像原厂的JDK的ByteBuffer超过缓存就报错

Netty Maven

 <dependency>     <groupId>io.netty</groupId>     <artifactId>netty-all</artifactId>     <version>4.1.65.Final</version> </dependency>

依赖说明:暂时不推荐使用Netty5,使用Netty4即可

第一个Netty应用

服务器端:

  private static final Logger log = LoggerFactory.getLogger(NettyServer.class);  public static void main(String[] args) {    // Netty的服务器端启动器,装配Netty组件    new ServerBootstrap() // NioEventLoopGroup底层就是线程池+selector .group(new NioEventLoopGroup()) // 通道 .channel(NioServerSocketChannel.class) //“每一个”SocketChannel客户端连接上服务器端“都会”执行这个初始化器ChannelInitializer //但是每一个SocketChannel只能够让这个初始化器执行一次 .childHandler(     new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {    log.info("initChannel start......");    //往处理器流水线pipeline添加处理器    //因为'客户端'发送数据会进行'字符串的编码'再发送到服务器端,所以这里要'创建一个字符串解码器'StringDecoder    nioSocketChannel.pipeline().addLast(new StringDecoder());    //添加接收数据需要的处理器适配器    nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){ //重写通道的‘’读‘’方法,msg就是接收到的数据 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {     log.warn(msg.toString()); //打印数据     super.channelRead(ctx, msg); }    });    log.info("initChannel end......");}     }) .bind(8082);  }

客户端:

  private static final Logger log = LoggerFactory.getLogger(NettyClient.class);  public static void main(String[] args) throws InterruptedException {      //创建Netty客户端的启动器,装配Netty组件    new Bootstrap() .group(new NioEventLoopGroup()) .channel(NioSocketChannel.class) //一旦执行这个应用立刻初始化,这个和childHandler有所不同 //childHandler是需要socket连接上在初始化,这个不需要。。。。。 .handler(     new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel channel) throws Exception {    log.info("initChannel start......");    //由于发送的数据需要进行编码再发送,所以需要一个字符串编码器    //往通道流水线添加一个字符串编码器    channel.pipeline().addLast(new StringEncoder());    log.info("initChannel end......");}     }) // connect方法是“”异步“”的 .connect("localhost", 8082) //坑点:由于connect方法是异步的,所以要同步。。。。。 //由于connect方法是异步的,如果没有进行同步,可能会造成发送数据在连接服务器之前。 //一般来说connect连接服务器大概需要>1s,而writeAndFlush是立刻发送数据,所以这里一定要使用sync方法进行同步 .sync() // 获取通道。然后发送数据 .channel() .writeAndFlush("hello你好");  }

Netty组件

查看CPU最大核心数

int hx = NettyRuntime.availableProcessors(); //cpu核心数
EventLoop

事件循环对象EventLoop

EventLoop本质是一个单线程执行器(同时维护了一个 Selector),里面有run方法处理一个或多个Channel上源源不断的io事件

事件循环组EventLoopGroup

EventLoopGroup是一组EventLoop,而每一个EventLoop都维护着一个selector,Channel 一般会调用EventLoopGroup的register方法来绑定其中一个EventLoop。

      int count=3;      EventLoopGroup ev=new NioEventLoopGroup(count);      System.out.println(ev.next().hashCode());//1      System.out.println(ev.next().hashCode());//2      System.out.println(ev.next().hashCode());//3      System.out.println(ev.next().hashCode());//4

通过上面的代码可以看出1和4是同一个对象,因为他们的hashCode相同。得出EventLoopGroup是一个线程池,里面装载着>1个的EventLoop,
EventLoop底层维护了一个线程和selector,而count可以指定EventLoopGroup的线程池大小。

EventLoop普通任务与定时任务

      EventLoopGroup ev=new NioEventLoopGroup(3);      //普通任务      ev.next().submit(()->{   System.out.println("111");      });      System.out.println("222");      //定时任务      ev.next().scheduleAtFixedRate(()->{   System.out.println("333");      },0,1,TimeUnit.SECONDS);

关闭EventLoopGroup

      EventLoopGroup eventLoopGroup = new NioEventLoopGroup();      eventLoopGroup.shutdownGracefully(); //优雅的关闭EventLoopGroup

分工

      // Netty的服务器端启动器,装配Netty组件      new ServerBootstrap() //******NioEventLoopGroup的分工合作,第一个NioEventLoopGroup处理accept事件//第二个NioEventLoopGroup处理读写事件.group(new NioEventLoopGroup(),new NioEventLoopGroup())// 通道.channel(NioServerSocketChannel.class)//“每一个”SocketChannel客户端连接上服务器端“都会”执行这个初始化器ChannelInitializer//但是每一个SocketChannel只能够让这个初始化器执行一次.childHandler( new ChannelInitializer<NioSocketChannel>() {     @Override     protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {  log.info("initChannel start......");  //往处理器流水线pipeline添加处理器  //因为'客户端'发送数据会进行'字符串的编码'再发送到服务器端,所以这里要'创建一个字符串解码器'StringDecoder  nioSocketChannel.pipeline().addLast(new StringDecoder());  //添加接收数据需要的处理器适配器  nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){      //重写通道的‘’读‘’方法,msg就是接收到的数据      @Override      public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {   log.warn(msg.toString()); //打印数据   super.channelRead(ctx, msg);      }  });  log.info("initChannel end......");     } }).bind(8082);
Channel

Channel常用方法:

  • close()
    • 可以用来关闭Channel
  • closeFuture()
    • 用来处理 Channel 的关闭
  • pipeline()
    • 添加处理器
  • write()
    • 写入数据,只有当缓冲满了或者调用了flush()方法后,才会将数据通过 Channel 发送出去
  • writeAndFlush()
    • 立即发送数据,相当于同时调用write和flush方法,好处是不用等缓存满了才能发出数据的问题

ChannelFuture

获取ChannelFuture

      //创建Netty客户端的启动器,装配Netty组件      ChannelFuture channelFuture = new Bootstrap().group(new NioEventLoopGroup()).channel(NioSocketChannel.class)//一旦执行这个应用立刻初始化,这个和childHandler有所不同//childHandler是需要socket连接上在初始化,这个不需要。。。。。.handler( new ChannelInitializer<Channel>() {     @Override     protected void initChannel(Channel channel) throws Exception {  //由于发送的数据需要进行编码再发送,所以需要一个字符串编码器  //往通道流水线添加一个字符串编码器  channel.pipeline().addLast(new StringEncoder());     } })// connect方法是“”异步“”的.connect("localhost", 8082);

发送数据的两种方式

  • sync同步channelFuture再发送数据
  • channelFuture添加监听器

这两种方法本质上都是为了让channelFuture成功创建也就是connect方法完成调用之后才发送数据

      //创建Netty客户端的启动器,装配Netty组件      ChannelFuture channelFuture = new Bootstrap().group(new NioEventLoopGroup()).channel(NioSocketChannel.class)//一旦执行这个应用立刻初始化,这个和childHandler有所不同//childHandler是需要socket连接上在初始化,这个不需要。。。。。.handler( new ChannelInitializer<Channel>() {     @Override     protected void initChannel(Channel channel) throws Exception {  //由于发送的数据需要进行编码再发送,所以需要一个字符串编码器  //往通道流水线添加一个字符串编码器  channel.pipeline().addLast(new StringEncoder());     } })// connect方法是“”异步“”的.connect("localhost", 8082);      //"方法一":      //由于connect方法是异步的,如果没有进行同步,可能会造成发送数据在连接服务器之前。      //一般来说connect连接服务器大概需要>1s,而writeAndFlush是立刻发送数据,所以这里一定要使用sync方法进行同步//      channelFuture.sync();//      Channel channel = channelFuture.channel();//      channel.writeAndFlush("你好");      //方法二:使用监听器,监听channelFuture是否完成连接。因为channelFuture只有connect完成之后才会创建      //使用这种监听器方法就不需要sync进行同步了      channelFuture.addListener(new ChannelFutureListener() {   //当connect成功连接之后就会进入这个方法   @Override   public void operationComplete(ChannelFuture future) throws Exception {Channel channel = future.channel();channel.writeAndFlush("operationComplete");   }      });

关闭通道channel

      EventLoopGroup eventLoopGroup = new NioEventLoopGroup();      //创建Netty客户端的启动器,装配Netty组件      ChannelFuture channelFuture = new Bootstrap().group(eventLoopGroup).channel(NioSocketChannel.class)//一旦执行这个应用立刻初始化,这个和childHandler有所不同//childHandler是需要socket连接上在初始化,这个不需要。。。。。.handler( new ChannelInitializer<Channel>() {     @Override     protected void initChannel(Channel channel) throws Exception { //日志  channel.pipeline().addLast(new LoggingHandler(LogLevel.INFO));//由于发送的数据需要进行编码再发送,所以需要一个字符串编码器  //往通道流水线添加一个字符串编码器  channel.pipeline().addLast(new StringEncoder());     } })// connect方法是“”异步“”的.connect("localhost", 8082);      //"方法一":      //由于connect方法是异步的,如果没有进行同步,可能会造成发送数据在连接服务器之前。      //一般来说connect连接服务器大概需要>1s,而writeAndFlush是立刻发送数据,所以这里一定要使用sync方法进行同步//      channelFuture.sync();//      Channel channel = channelFuture.channel();//      channel.writeAndFlush("你好");      //方法二:使用监听器,监听channelFuture是否完成连接。因为channelFuture只有connect完成之后才会创建      //使用这种监听器方法就不需要sync进行同步了      channelFuture.addListener(new ChannelFutureListener() {   //当connect成功连接之后就会进入这个方法   @Override   public void operationComplete(ChannelFuture future) throws Exception {Channel channel = future.channel();channel.writeAndFlush("operationComplete");//只有close之后才会调用下面的关闭监听器channel.close(); //关闭channel,这个关闭方法也是**异步**的,所以也需要进行监听ChannelFuture closeFuture = channel.closeFuture();//关闭通道监听器closeFuture.addListener(new ChannelFutureListener() {    @Override    public void operationComplete(ChannelFuture future) throws Exception { log.info("已经关闭channel"); //关闭group eventLoopGroup.shutdownGracefully();    }});   }      });
Future&Promise

Future都是用线程池去返回得到的,所以JDK Future需要依赖线程池,Netty Future需要依赖于EventLoopGroup

JDK Futhure和Netty Future、Netty Promise区别:

Netty的Future继承与JDK的Future,Netty Promise又对Netty Future进行扩展

  • JDK Future只能同步等待任务结束(或成功、或失败)才能得到结果,例如JDK Future的get是阻塞的获取结果
  • Netty Future既阻塞的获取结果,也可以非阻塞的获取结果,阻塞就是get,非阻塞就是getNow。
  • Netty Promise有Netty Future所有的功能且增加了几个方法,setSuccess、setFailure,而且脱离了任务独立存在,只作为两个线程间传递结果的容器。

JDK Future

      ExecutorService executorService = Executors.newFixedThreadPool(2); //创建一个固定大小的线程池      //Callable有返回值。      Future<String> future = executorService.submit(new Callable<String>() {   @Override   public String call() throws Exception {Thread.sleep(1000);return "hello";   }      });      String res = future.get(); //get方法会阻塞,直到线程池的submit执行完毕,返回了future对象才会解除阻塞      System.out.println(res);      executorService.shutdown(); //关闭线程池

Netty Future

      EventLoopGroup eventLoopGroup = new NioEventLoopGroup(2);      Future<String> future = eventLoopGroup.next().submit(new Callable<String>() {   @Override   public String call() throws Exception {Thread.sleep(1000);return "Netty Future";   }      });//      String s1 = future.get(); //阻塞方法,这个方法和jdk的future一样//      System.out.println(s1);      String s2 = future.getNow(); //非阻塞方法,如果future没有立刻返回值则不会等待,直接返回null      System.out.println(s2);

Netty Promise

Promise相当于一个容器,可以用于存放各个线程中的结果,然后让其他线程去获取该结果

      NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();      EventLoop executors = eventLoopGroup.next();      DefaultPromise<Integer> promise = new DefaultPromise<>(executors);      new Thread(()->{   try {Thread.sleep(1000);   } catch (InterruptedException e) {e.printStackTrace();   }   promise.setSuccess(100);      }).start();      Integer res = promise.get();      System.out.println(res);
Handler&Pipeline

服务端:

new ServerBootstrap().group(new NioEventLoopGroup(),new NioEventLoopGroup(2)).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {    //pipeline结构    //head->handle1->handle2->handle3->handle4->handle5->handle6->tail    //且为‘双向链表’,触发Inbound事件则会从head->tail一直走Inbound方法。    //触发Outbound事件则会从tail->head一直走Outbound方法。只有触发了对应事件才会走对应的方法。。。。。。    @Override    protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new StringDecoder()); //Inbound处理器 //为处理器取名字 socketChannel.pipeline().addLast("handle1",new ChannelInboundHandlerAdapter(){     @Override     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  log.warn(Thread.currentThread().getName()+"==>"+"handle1");  super.channelRead(ctx, msg); //向下传递     } }); socketChannel.pipeline().addLast("handle2",new ChannelInboundHandlerAdapter(){     @Override     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  log.warn(msg.toString());  log.warn(Thread.currentThread().getName()+"==>"+"handle2");  super.channelRead(ctx, msg); //向下传递     } }); socketChannel.pipeline().addLast("handle3",new ChannelInboundHandlerAdapter(){     @Override     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  //***不能用这种方法,client会收不到//  ByteBuffer buffer = StandardCharsets.UTF_8.encode("hello world");  //***用这种,记住*****一定要指定字符类型UTF-8***  ByteBuf byteBuf = ctx.alloc().buffer().writeBytes("hello".getBytes("utf-8"));  //发送数据,触发OutBound事件  socketChannel.writeAndFlush(byteBuf);  log.warn(Thread.currentThread().getName()+"==>"+"handle3");  super.channelRead(ctx, msg); //向下传递     } }); //Outbound处理器 socketChannel.pipeline().addLast("handle4",new ChannelOutboundHandlerAdapter(){     @Override     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {  log.warn(Thread.currentThread().getName()+"==>"+"handle4");  super.write(ctx, msg, promise);     } }); socketChannel.pipeline().addLast("handle5",new ChannelOutboundHandlerAdapter(){     @Override     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {  log.warn(Thread.currentThread().getName()+"==>"+"handle5");  super.write(ctx, msg, promise);     } }); socketChannel.pipeline().addLast("handle6",new ChannelOutboundHandlerAdapter(){     @Override     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {  log.warn(Thread.currentThread().getName()+"==>"+"handle6");  super.write(ctx, msg, promise);     } });    }}).bind(8080);

客户端:

      NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();      ChannelFuture channelFuture = new Bootstrap().group(eventLoopGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<Channel>() {    @Override    protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new StringEncoder()); ch.pipeline().addLast(new LoggingHandler()); ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){     @Override     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  System.out.println("--------------"+msg.toString());  super.channelRead(ctx, msg);     } });    }}).connect("localhost", 8080);      channelFuture.addListener(new ChannelFutureListener() {   @Override   public void operationComplete(ChannelFuture future) throws Exception {Channel channel = future.channel();channel.writeAndFlush("client-----");//channel.close();//ChannelFuture closeFuture = channel.closeFuture();//closeFuture.addListener(new ChannelFutureListener() {//    @Override//    public void operationComplete(ChannelFuture future) throws Exception {// eventLoopGroup.shutdownGracefully();//    }//});   }      });

通过channel.pipeline().addLast(name, handler)添加handler时,记得给handler取名字。这样可以调用pipeline的addAfter、addBefore等方法更灵活地向pipeline中添加handler.

handler需要放入通道的pipeline中,才能根据放入顺序来使用handler:

  • pipeline是结构是一个带有head与tail指针的双向链表,其中的节点为handler处理器
    • 要通过ctx.fireChannelRead(msg)等方法,将当前handler的处理结果传递给下一个handler
  • 当有入站(Inbound)操作时,会从head开始向tail方向调用handler,直到handler不是处理Inbound操作为止
  • 当有出站(Outbound)操作时,会从tail开始向head方向调用handler,直到handler不是处理Outbound操作为止

结构图:

高性能网络通信框架Netty一套就够(作者原创)

ByteBuf

创建ByteBuf

      //创建ByteBuf      ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(10);      log(byteBuf);      StringBuffer stringBuffer = new StringBuffer();    for (int i = 0; i < 50; i++) {      stringBuffer.append('1');    }    byteBuf.writeBytes(stringBuffer.toString().getBytes("utf-8"));    log(byteBuf);

运行结果:

read index:0 write index:0 capacity:10read index:0 write index:50 capacity:64  +-------------------------------------------------+  |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |+--------+-------------------------------------------------+----------------+|00000000| 31 31 31 31 31 31 31 31 31 31 31 31 31 31 31 31 |1111111111111111||00000010| 31 31 31 31 31 31 31 31 31 31 31 31 31 31 31 31 |1111111111111111||00000020| 31 31 31 31 31 31 31 31 31 31 31 31 31 31 31 31 |1111111111111111||00000030| 31 31 |11|+--------+-------------------------------------------------+----------------+Process finished with exit code 0

根据打印的capacity可知ByteBuf是会自动扩容的,而NIO的ByteBuffer是不能超出容量的。

public abstract class AbstractByteBufAllocator implements ByteBufAllocator {    static final int DEFAULT_INITIAL_CAPACITY = 256; //默认初始化容量    static final int DEFAULT_MAX_CAPACITY = Integer.MAX_VALUE; //最大容量    static final int DEFAULT_MAX_COMPONENTS = 16;

ByteBuf通过ByteBufAllocator选择allocator并调用对应的buffer()方法来创建的,默认使用直接内存作为ByteBuf,容量为256个字节,可以指定初始容量的大小

如果在handler中创建ByteBuf,建议使用ChannelHandlerContext ctx.alloc().buffer()来创建

3种创建池化的ByteBuf方式

      ByteBuf byteBuf1 = ByteBufAllocator.DEFAULT.buffer(10); //默认创建的是‘’直接内存‘’的ByteBuf      ByteBuf byteBuf2 = ByteBufAllocator.DEFAULT.heapBuffer(10);//指定创建‘’堆内存‘’的ByteBuf      ByteBuf byteBuf3 = ByteBufAllocator.DEFAULT.directBuffer(10);//指定创建‘’直接内存‘’的ByteBuf

查看当前ByteBuf对象类型

      ByteBuf byteBuf1 = ByteBufAllocator.DEFAULT.buffer(10); //默认创建的是‘’直接内存‘’的ByteBuf      ByteBuf byteBuf2 = ByteBufAllocator.DEFAULT.heapBuffer(10);//指定创建‘’堆内存‘’的ByteBuf      ByteBuf byteBuf3 = ByteBufAllocator.DEFAULT.directBuffer(10);//指定创建‘’直接内存‘’的ByteBuf      System.out.println(byteBuf1.getClass());      System.out.println(byteBuf2.getClass());      System.out.println(byteBuf3.getClass());

输出结果:

class io.netty.buffer.PooledUnsafeDirectByteBufclass io.netty.buffer.PooledUnsafeHeapByteBufclass io.netty.buffer.PooledUnsafeDirectByteBuf

池化和非池化

  • Netty4.1之前默认是非池化
  • Netty4.1之后默认是池化,但是Android平台默认是非池化

池化优点:

  • 本质上池化的意义就是可重用ByteBuf
    • 没有池化的话每次需要使用ByteBuf都要重新申请内存。即使是堆内存,释放内存也会增大GC的压力
    • 有了池化,则可以重用池中ByteBuf实例,并且采用了与jemalloc类似的内存分配算法提升分配效率
    • 高并发下,池化更节约内存,减少内存溢出的可能。

IDEA IDE如何设置为非池化

只需要在IDEA IDE的VM options里面设置下面一段代码即可:

-Dio.netty.allocator.type={unpooled|pooled}

ByteBuf组成

  • 最大容量与当前容量
    • 在构造ByteBuf时,可传入两个参数,分别代表初始容量和最大容量,若未传入第二个参数(最大容量),最大容量默认为Integer.MAX_VALUE
    • 当ByteBuf容量无法容纳所有数据时,会进行扩容操作,若超出最大容量,会抛出java.lang.IndexOutOfBoundsException异常
  • 读写操作不同于ByteBuffer只用position进行控制,ByteBuf分别由读指针写指针两个指针控制。进行读写操作时,无需进行模式的切换
    • 读指针前的部分被称为废弃部分,是已经读过的内容
    • 读指针与写指针之间的空间称为可读部分
    • 写指针与当前容量之间的空间称为可写部分

高性能网络通信框架Netty一套就够(作者原创)

ByteBuf写入

      ByteBuf byteBuf1 = ByteBufAllocator.DEFAULT.buffer(10); //默认创建的是‘’直接内存‘’的ByteBuf     byteBuf1.writeBytes("hello".getBytes("utf-8"));

write和set方法的区别:

ByteBuf中set开头的一系列方法,也可以写入数据,但不会改变写指针位置

ByteBuf的扩容机制

当ByteBuf中的当前容量无法容纳写入的数据时,会自动进行扩容

触发扩容:

      ByteBuf byteBuf1 = ByteBufAllocator.DEFAULT.buffer(10); //默认创建的是‘’直接内存‘’的ByteBuf      log(byteBuf1);      byteBuf1.writeBytes("helloaaaaaaaa".getBytes("utf-8"));      log(byteBuf1);

结果:

read index:0 write index:0 capacity:10read index:0 write index:13 capacity:16  +-------------------------------------------------+  |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |+--------+-------------------------------------------------+----------------+|00000000| 68 65 6c 6c 6f 61 61 61 61 61 61 61 61   |helloaaaaaaaa   |+--------+-------------------------------------------------+----------------+

扩容机制如下:

有两种情况:

  • 写入后的数据小于512字节
    • 这种情况会选择使用16的整数倍进行扩容,比如写入后的数据是14字节,则16*1为最小整数倍,则会扩容到16字节
  • 写入后的数据大于512字节
    • 这种情况会以2的n次方扩容,例如写入后的数据是600字节,此时大于512字节,那么容纳它的容量为2的10次方,因为2的9次方是512容纳不了,所以会扩容到1024字节
    • 如果扩容后的大小大于maxCapacity,则会抛出java.lang.IndexOutOfBoundsException异常

ByteBuf读取

读取后会移动读指针

      ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(10);      byteBuf.writeBytes("hello".getBytes("utf-8"));      byte b[]=new byte[5];      byteBuf.readBytes(b);     System.out.println(Arrays.toString(b));

ByteBuf以get开头的方法,这些方法不会改变读指针的位置

ByteBuf日志工具类
    public class ByteBufferUtil {    private static final char[] BYTE2CHAR = new char[256];    private static final char[] HEXDUMP_TABLE = new char[256 * 4];    private static final String[] HEXPADDING = new String[16];    private static final String[] HEXDUMP_ROWPREFIXES = new String[65536 >>> 4];    private static final String[] BYTE2HEX = new String[256];    private static final String[] BYTEPADDING = new String[16];    static { final char[] DIGITS = "0123456789abcdef".toCharArray(); for (int i = 0; i < 256; i++) {     HEXDUMP_TABLE[i << 1] = DIGITS[i >>> 4 & 0x0F];     HEXDUMP_TABLE[(i << 1) + 1] = DIGITS[i & 0x0F]; } int i; // Generate the lookup table for hex dump paddings for (i = 0; i < HEXPADDING.length; i++) {     int padding = HEXPADDING.length - i;     StringBuilder buf = new StringBuilder(padding * 3);     for (int j = 0; j < padding; j++) {  buf.append("   ");     }     HEXPADDING[i] = buf.toString(); } // Generate the lookup table for the start-offset header in each row (up to 64KiB). for (i = 0; i < HEXDUMP_ROWPREFIXES.length; i++) {     StringBuilder buf = new StringBuilder(12);     buf.append(StringUtil.NEWLINE);     buf.append(Long.toHexString(i << 4 & 0xFFFFFFFFL | 0x100000000L));     buf.setCharAt(buf.length() - 9, '|');     buf.append('|');     HEXDUMP_ROWPREFIXES[i] = buf.toString(); } // Generate the lookup table for byte-to-hex-dump conversion for (i = 0; i < BYTE2HEX.length; i++) {     BYTE2HEX[i] = ' ' + StringUtil.byteToHexStringPadded(i); } // Generate the lookup table for byte dump paddings for (i = 0; i < BYTEPADDING.length; i++) {     int padding = BYTEPADDING.length - i;     StringBuilder buf = new StringBuilder(padding);     for (int j = 0; j < padding; j++) {  buf.append(' ');     }     BYTEPADDING[i] = buf.toString(); } // Generate the lookup table for byte-to-char conversion for (i = 0; i < BYTE2CHAR.length; i++) {     if (i <= 0x1f || i >= 0x7f) {  BYTE2CHAR[i] = '.';     } else {  BYTE2CHAR[i] = (char) i;     } }    }    /**     * 打印所有内容     * @param buffer     */    public static void debugAll(ByteBuffer buffer) { int oldlimit = buffer.limit(); buffer.limit(buffer.capacity()); StringBuilder origin = new StringBuilder(256); appendPrettyHexDump(origin, buffer, 0, buffer.capacity()); System.out.println("+--------+-------------------- all ------------------------+----------------+"); System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), oldlimit); System.out.println(origin); buffer.limit(oldlimit);    }    /**     * 打印可读取内容     * @param buffer     */    public static void debugRead(ByteBuffer buffer) { StringBuilder builder = new StringBuilder(256); appendPrettyHexDump(builder, buffer, buffer.position(), buffer.limit() - buffer.position()); System.out.println("+--------+-------------------- read -----------------------+----------------+"); System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), buffer.limit()); System.out.println(builder);    }    private static void appendPrettyHexDump(StringBuilder dump, ByteBuffer buf, int offset, int length) { if (MathUtil.isOutOfBounds(offset, length, buf.capacity())) {     throw new IndexOutOfBoundsException(      "expected: " + "0 <= offset(" + offset + ") <= offset + length(" + length+ ") <= " + "buf.capacity(" + buf.capacity() + ')'); } if (length == 0) {     return; } dump.append(  "  +-------------------------------------------------+" +   StringUtil.NEWLINE + "  |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |" +   StringUtil.NEWLINE + "+--------+-------------------------------------------------+----------------+"); final int startIndex = offset; final int fullRows = length >>> 4; final int remainder = length & 0xF; // Dump the rows which have 16 bytes. for (int row = 0; row < fullRows; row++) {     int rowStartIndex = (row << 4) + startIndex;     // Per-row prefix.     appendHexDumpRowPrefix(dump, row, rowStartIndex);     // Hex dump     int rowEndIndex = rowStartIndex + 16;     for (int j = rowStartIndex; j < rowEndIndex; j++) {  dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);     }     dump.append(" |");     // ASCII dump     for (int j = rowStartIndex; j < rowEndIndex; j++) {  dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);     }     dump.append('|'); } // Dump the last row which has less than 16 bytes. if (remainder != 0) {     int rowStartIndex = (fullRows << 4) + startIndex;     appendHexDumpRowPrefix(dump, fullRows, rowStartIndex);     // Hex dump     int rowEndIndex = rowStartIndex + remainder;     for (int j = rowStartIndex; j < rowEndIndex; j++) {  dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);     }     dump.append(HEXPADDING[remainder]);     dump.append(" |");     // Ascii dump     for (int j = rowStartIndex; j < rowEndIndex; j++) {  dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);     }     dump.append(BYTEPADDING[remainder]);     dump.append('|'); } dump.append(StringUtil.NEWLINE +  "+--------+-------------------------------------------------+----------------+");    }    private static void appendHexDumpRowPrefix(StringBuilder dump, int row, int rowStartIndex) { if (row < HEXDUMP_ROWPREFIXES.length) {     dump.append(HEXDUMP_ROWPREFIXES[row]); } else {     dump.append(StringUtil.NEWLINE);     dump.append(Long.toHexString(rowStartIndex & 0xFFFFFFFFL | 0x100000000L));     dump.setCharAt(dump.length() - 9, '|');     dump.append('|'); }    }    public static short getUnsignedByte(ByteBuffer buffer, int index) { return (short) (buffer.get(index) & 0xFF);    }    public static void log(ByteBuf buffer) { int length = buffer.readableBytes(); int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4; StringBuilder buf = new StringBuilder(rows * 80 * 2)  .append("read index:").append(buffer.readerIndex())  .append(" write index:").append(buffer.writerIndex())  .append(" capacity:").append(buffer.capacity())  .append(NEWLINE); io.netty.buffer.ByteBufUtil.appendPrettyHexDump(buf, buffer); System.out.println(buf.toString());    }}

ByteBuf的释放

由于ByteBuf中有堆外内存(直接内存)的实现,堆外内存最好是手动来释放,而不是等GC来进行垃圾回收。

  • UnpooledHeapByteBuf使用的是JVM内存,只需等GC回收内存即可。
  • UnpooledDirectByteBuf使用的是直接内存,需要特殊的方法来回收内存
  • PooledByteBuf和它的子类使用了池化机制,需要更复杂的规则来回收内存

Netty这里采用了引用计数法来控制回收内存,每个ByteBuf都实现了ReferenceCounted接口

具体如下:

  • 新创建的ByteBuf默认计数为1
  • 调用release方法会使计数-1如果计数为0,则内存将会被回收
  • 调用retain方法会使计数+1,表示调用者没用完之前,其它handler即使调用了release也不会造成回收
  • 当计数为 0 时,底层内存会被回收,这时即使ByteBuf对象还在,其各个方法均无法正常使用。

ByteBuf内存释放规则是:谁最后使用这块内存,谁就要调用release方法进行释放。

  • 入站(Inbound处理器链)ByteBuf处理原则:
    • 可以遵循谁最后使用内存谁就release。也可以让尾释放内存。
      • 我们知道Inbound是从head->tail,所以tail是入站的终点,TailContext也会处理内存释放的问题。
  • 出站(Outbound处理器链)ByteBuf处理原则
    • 可以遵循谁最后使用内存谁就release。也可以让头释放内存。
    • 我们知道Outbound是从tail->head,所以head是出站的终点,HeadContext也会处理内存释放的问题。
  • 有时候不清楚ByteBuf的计数是多少次,但又必须彻底释放,可以循环调用release直到返回true
while (!buffer.release()) {}

内存释放源码

public interface ReferenceCounted {    ReferenceCounted retain();    /**  * Decreases the reference count by {@code 1} and deallocates this object if the reference count reaches at  * {@code 0}.  *  * @return {@code true} if and only if the reference count became {@code 0} and this object has been deallocated  */    boolean release();}

从注释可以看出,让release成功释放内存后将会返回true。

头尾释放内存源码:

    /**     * Called once a message hit the end of the {@link ChannelPipeline} without been handled by the user     * in {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)}. This method is responsible     * to call {@link ReferenceCountUtil#release(Object)} on the given msg at some point.     */    protected void onUnhandledInboundMessage(Object msg) { try {     logger.debug(      "Discarded inbound message {} that reached at the tail of the pipeline. " +"Please check your pipeline configuration.", msg); } finally {     ReferenceCountUtil.release(msg); }    }
    /**     * Try to call {@link ReferenceCounted#release()} if the specified message implements {@link ReferenceCounted}.     * If the specified message doesn't implement {@link ReferenceCounted}, this method does nothing.     */    public static boolean release(Object msg) { if (msg instanceof ReferenceCounted) {     return ((ReferenceCounted) msg).release(); } return false;    }

使用被释放的内存会怎样

     ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(10);      byteBuf.writeBytes("helloWorld".getBytes("utf-8"));      byteBuf.release(); //释放内存      ByteBufferUtil.log(byteBuf);

结果:

Exception in thread "main" io.netty.util.IllegalReferenceCountException: refCnt: 0

注意:一旦ByteBuf的计数到0,再进行retain也没用

      ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(10);      byteBuf.writeBytes("helloWorld".getBytes("utf-8"));      byteBuf.release(); //-1      byteBuf.retain(); //+1      ByteBufferUtil.log(byteBuf);

结果

Exception in thread "main" io.netty.util.IllegalReferenceCountException: refCnt: 0, increment: 1

内存切片slice

public abstract ByteBuf slice(int index, int length);
  • ByteBuf的内存切片也是零拷贝的体现之一,切片后的内存还是原来ByteBuf的内存,过程没有发生过内存复制,切片后的 ByteBuf 维护独立的 read,write 指针.
  • 切片后的ByteBuf需要调用retain使计数+1,防止原来的ByetBuf调用release释放内存导致切片的内存不可用。
  • 修改原ByteBuf中的值,也会影响切片后得到的ByteBuf。

代码案例:

      ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(10);      byteBuf.writeBytes("helloWorld".getBytes("utf-8"));      ByteBufferUtil.log(byteBuf);      ByteBuf buf = byteBuf.slice(0, 5); //内存分片      ByteBufferUtil.log(buf);      System.out.println("---------------");      buf.setByte(1,'g'); //修改分片内存的值      //重新打印      ByteBufferUtil.log(byteBuf);      ByteBufferUtil.log(buf);

结果:

read index:0 write index:10 capacity:10  +-------------------------------------------------+  |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |+--------+-------------------------------------------------+----------------+|00000000| 68 65 6c 6c 6f 57 6f 72 6c 64     |helloWorld      |+--------+-------------------------------------------------+----------------+read index:0 write index:5 capacity:5  +-------------------------------------------------+  |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |+--------+-------------------------------------------------+----------------+|00000000| 68 65 6c 6c 6f      |hello    |+--------+-------------------------------------------------+----------------+---------------read index:0 write index:10 capacity:10  +-------------------------------------------------+  |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |+--------+-------------------------------------------------+----------------+|00000000| 68 67 6c 6c 6f 57 6f 72 6c 64     |hglloWorld      |+--------+-------------------------------------------------+----------------+read index:0 write index:5 capacity:5  +-------------------------------------------------+  |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |+--------+-------------------------------------------------+----------------+|00000000| 68 67 6c 6c 6f      |hgllo    |+--------+-------------------------------------------------+----------------+Process finished with exit code 0

结论:可以看出修改分片的内存的值,原内存也会受到影响,因为他们都是用同一块内存。

ByteBuf优势

  • 池化思想-可以重用池中ByteBuf实例,更节约内存,减少内存溢出的可能
  • 读写指针分离,不需要像 ByteBuffer一样切换读写模式
  • 可以自动扩容
  • 支持链式调用,使用更流畅
  • 很多地方体现零拷贝,例如slice、duplicate、CompositeByteBuf

Netty进阶

粘包和半包/拆包问题

粘包问题演示

服务器端:

  private static final Logger log= LoggerFactory.getLogger(NettyServer.class);  public static void main(String[] args) {      NioEventLoopGroup boss = new NioEventLoopGroup(1);      NioEventLoopGroup worker = new NioEventLoopGroup(6);      new ServerBootstrap().group(boss,worker).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {    @Override    protected void initChannel(NioSocketChannel ch) throws Exception { //不进行加解密不然展示不出粘包效果// ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new LoggingHandler()); ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){     @Override     public void channelActive(ChannelHandlerContext ctx) throws Exception {  log.info("客户端已成功连接服务器");  super.channelActive(ctx);     }     @Override     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  log.info("msg={}",msg);  super.channelRead(ctx, msg);     } });    }}).bind(8080);  }

客户端:

  private static final Logger log= LoggerFactory.getLogger(NettyClient.class);  public static void main(String[] args) {    NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();    ChannelFuture channelFuture = new Bootstrap()     .group(eventLoopGroup)     .channel(NioSocketChannel.class)     .handler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel ch) throws Exception {  //不进行加解密不然展示不出粘包效果//  ch.pipeline().addLast(new StringEncoder());  ch.pipeline().addLast(new LoggingHandler());}     }).connect("localhost", 8080);    channelFuture.addListener(new ChannelFutureListener() {      @Override      public void operationComplete(ChannelFuture future) throws Exception { Channel channel = future.channel(); ByteBuf byteBuf = channel.alloc().buffer(16); for (int i=0;i<10;i++){   byteBuf.retain();   byteBuf.writeBytes(("hello").getBytes("utf-8"));   channel.writeAndFlush(byteBuf);   byteBuf.clear(); } channel.close(); ChannelFuture closeFuture = channel.closeFuture(); closeFuture.addListener(new ChannelFutureListener() {   @Override   public void operationComplete(ChannelFuture future) throws Exception {     eventLoopGroup.shutdownGracefully();   } });      }    });  }

服务器端输出结果:

16:00:37.869 [nioEventLoopGroup-3-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xa191631f, L:/127.0.0.1:8080 - R:/127.0.0.1:53693] READ: 50B  +-------------------------------------------------+  |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |+--------+-------------------------------------------------+----------------+|00000000| 68 65 6c 6c 6f 68 65 6c 6c 6f 68 65 6c 6c 6f 68 |hellohellohelloh||00000010| 65 6c 6c 6f 68 65 6c 6c 6f 68 65 6c 6c 6f 68 65 |ellohellohellohe||00000020| 6c 6c 6f 68 65 6c 6c 6f 68 65 6c 6c 6f 68 65 6c |llohellohellohel||00000030| 6c 6f |lo|+--------+-------------------------------------------------+----------------+

可以看出原来我们是在客户端分10次发送,而服务器端却一下把10次的数据都粘在一起了,这就是粘包问题。

半包问题展示

服务器端:

  private static final Logger log= LoggerFactory.getLogger(NettyServer.class);  public static void main(String[] args) {      NioEventLoopGroup boss = new NioEventLoopGroup(1);      NioEventLoopGroup worker = new NioEventLoopGroup(6);    new ServerBootstrap() .group(boss, worker) .channel(NioServerSocketChannel.class) // 半包问题:例如,发送方发送100字节数据,而接收方最多只能接收30字节数据,这就是半包问题     //option(ChannelOption.SO_RCVBUF,10),调整接收缓冲区大小(滑动窗口) .option(ChannelOption.SO_RCVBUF,10) .childHandler(     new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {  // 不进行加解密不然展示不出粘包效果  // ch.pipeline().addLast(new StringDecoder());  ch.pipeline().addLast(new LoggingHandler());  ch.pipeline()      .addLast(   new ChannelInboundHandlerAdapter() {     @Override     public void channelActive(ChannelHandlerContext ctx) throws Exception {log.info("客户端已成功连接服务器");super.channelActive(ctx);     }     @Override     public void channelRead(ChannelHandlerContext ctx, Object msg)  throws Exception {log.info("msg={}", msg);super.channelRead(ctx, msg);     }   });}     }) .bind(8080);  }

只需使用这个方法即可

  • option(ChannelOption.SO_RCVBUF,10)

option(ChannelOption.SO_RCVBUF,10),调整接收缓冲区大小。由于接收缓存区的大小<发送方发送的数据大小,所以产生了半包问题。

现象分析

粘包:

  • 产生现象
    • 第一次发送abc,第二次发送def,接收到的是一整个abcdef
  • 原因
    • Netty层
      • 接收方的接收缓冲区太大,Netty的接收缓冲区默认是1024字节
    • 网络层
      • TCP滑动窗口:假如发送方发送100字节数据,而滑动窗口缓冲区可容纳>100字节数据,这时候就会出现粘包问题。
      • Nagle 算法:会造成粘包

半包/拆包:

  • 产生现象
    • 发送abcdef数据,接收方第一次收到ab,第二次收到cd,第三次收到ef
  • 原因
    • Netty层
      • 接收方的接收缓冲区太小,发送方的数据过大,导致接收方无法一次接收下所有数据,就会半包/拆包
    • 网络层
      • 滑动窗口:假设接收方的窗口只剩了128bytes,发送方的报文大小是256bytes,这时接收方窗口中无法容纳发送方的全部报文,发送方只能先发送前128bytes,等待ack后才能发送剩余部分,这就造成了半包
    • 数据链路层
      • MSS 限制:当发送的数据超过MSS限制后,会将数据切分发送,就会造成半包

发送这些问题的本质:因为 TCP 是流式协议,消息无边界

粘包和半包/拆包解决方案

短连接

短连接:即每发送一条数据就重新连接再次发送,反复此操作。

短连接的缺点是显而易见的,每次发送一条数据都要重新连接,这样会大大的浪费时间,因为连接是需要时间的。

客户端每次向服务器发送数据以后,就与服务器断开连接,此时的消息边界为连接建立到连接断开。
这时便无需使用滑动窗口等技术来缓冲数据,则不会发生粘包现象。
但如果一次性数据发送过多,接收方无法一次性容纳所有数据,还是会发生半包现象,所以短链接无法解决半包现象

采用短连接解决粘包代码

服务端:

  private static final Logger log = LoggerFactory.getLogger(NettyServer.class);  public static void main(String[] args) {    NioEventLoopGroup boss = new NioEventLoopGroup(1);    NioEventLoopGroup worker = new NioEventLoopGroup(6);    new ServerBootstrap() .group(boss, worker) .channel(NioServerSocketChannel.class) .childHandler(     new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {  ch.pipeline().addLast(new LoggingHandler());  ch.pipeline()      .addLast(   new ChannelInboundHandlerAdapter() {     @Override     public void channelActive(ChannelHandlerContext ctx) throws Exception {log.info("客户端已成功连接服务器");super.channelActive(ctx);     }@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  log.info("msg="+msg);  super.channelRead(ctx, msg);}   });}     }) .bind(8080);  }

客户端:

  private static final Logger log= LoggerFactory.getLogger(NettyClient.class);  public static void main(String[] args) {    //采用短连接解决“”粘包“”问题,无法解决半包问题    for (int i = 0; i < 10; i++) {      sendMessage("hello");    }  }  public static void sendMessage(String msg){     NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();     ChannelFuture channelFuture = new Bootstrap()     .group(eventLoopGroup)     .channel(NioSocketChannel.class)     .handler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel ch) throws Exception {  //不进行加解密不然展示不出粘包效果//  ch.pipeline().addLast(new StringEncoder());  ch.pipeline().addLast(new LoggingHandler());  ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){    @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception {      ByteBuf buffer = ctx.alloc().buffer(16);      buffer.writeBytes(msg.getBytes("utf-8"));      ch.writeAndFlush(buffer);      ch.close();      ChannelFuture closeFuture = ch.closeFuture();      closeFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception {   eventLoopGroup.shutdownGracefully(); }      });    }  });}     }).connect("localhost", 8080);  }
定长解码器

客户端于服务器约定一个最大长度,保证客户端每次发送的数据长度都不会大于该长度。若发送数据长度不足则需要补齐至该长度。
服务器接收数据时,将接收到的数据按照约定的最大长度进行拆分,即使发送过程中产生了粘包,也可以通过定长解码器将数据正确地进行拆分。服务端需要用到FixedLengthFrameDecoder对数据进行定长解码

行解码器(推荐)

对于其他解码器,我还是更喜欢行解码器。行解码器主要是靠分隔符\n来判断行进行解码,不过需要进行限制长度,以免服务器一直搜索\n造成卡死。

改造前的粘包代码

服务端:

      NioEventLoopGroup boss = new NioEventLoopGroup(1);      NioEventLoopGroup worker = new NioEventLoopGroup(6);      new ServerBootstrap().group(boss,worker).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {    @Override    protected void initChannel(NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new LoggingHandler()); ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){     @Override     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  log.info("msg={}",msg);  super.channelRead(ctx, msg);     } });    }}).bind(8080);

客户端:

     NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();      try{      new Bootstrap().group(nioEventLoopGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<Channel>() {    @Override    protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new LoggingHandler()); ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){     @Override     public void channelActive(ChannelHandlerContext ctx) throws Exception {  ByteBuf buffer = ctx.alloc().buffer(16);  for(int i=0;i<10;i++){      buffer.retain();      buffer.writeBytes("hello world".getBytes("utf-8"));      ctx.channel().writeAndFlush(buffer);  }  ch.close();//关闭Channel  ChannelFuture closeFuture = ch.closeFuture();  closeFuture.addListener(new ChannelFutureListener() {      @Override      public void operationComplete(ChannelFuture future) throws Exception {   nioEventLoopGroup.shutdownGracefully();      }  });     } });    }}).connect("localhost",8080);      }catch (Exception e){   e.printStackTrace();      }

结果:

13:37:15.286 [nioEventLoopGroup-3-3] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x36ce6c5f, L:/127.0.0.1:8080 - R:/127.0.0.1:64550] READ: 110B  +-------------------------------------------------+  |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |+--------+-------------------------------------------------+----------------+|00000000| 68 65 6c 6c 6f 20 77 6f 72 6c 64 68 65 6c 6c 6f |hello worldhello||00000010| 20 77 6f 72 6c 64 68 65 6c 6c 6f 20 77 6f 72 6c | worldhello worl||00000020| 64 68 65 6c 6c 6f 20 77 6f 72 6c 64 68 65 6c 6c |dhello worldhell||00000030| 6f 20 77 6f 72 6c 64 68 65 6c 6c 6f 20 77 6f 72 |o worldhello wor||00000040| 6c 64 68 65 6c 6c 6f 20 77 6f 72 6c 64 68 65 6c |ldhello worldhel||00000050| 6c 6f 20 77 6f 72 6c 64 68 65 6c 6c 6f 20 77 6f |lo worldhello wo||00000060| 72 6c 64 68 65 6c 6c 6f 20 77 6f 72 6c 64|rldhello world  |+--------+-------------------------------------------------+----------------+

接收方使用行解码器改造后

服务端:

  private static final Logger log= LoggerFactory.getLogger(NettyServer.class);  public static void main(String[] args) {      NioEventLoopGroup boss = new NioEventLoopGroup(1);      NioEventLoopGroup worker = new NioEventLoopGroup(6);      new ServerBootstrap().group(boss,worker).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {    @Override    protected void initChannel(NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new LineBasedFrameDecoder(1024));//配置行解码器  ch.pipeline().addLast(new LoggingHandler()); ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){     @Override     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  log.info("msg={}",msg);  super.channelRead(ctx, msg);     } });    }}).bind(8080);

客户端:

 //把消息加工成可以被行解码器识别的消息    private static String getMsg(String oldMsg){ oldMsg+='\n'; return oldMsg;    }  public static void main(String[] args) {      NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();      try{      new Bootstrap().group(nioEventLoopGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<Channel>() {    @Override    protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new LoggingHandler()); ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){     @Override     public void channelActive(ChannelHandlerContext ctx) throws Exception {  ByteBuf buffer = ctx.alloc().buffer(16);  for(int i=0;i<10;i++){      buffer.retain();      String msg = getMsg("hello world");      buffer.writeBytes(msg.getBytes("utf-8"));      ctx.channel().writeAndFlush(buffer);      //清理缓存,防止数据堆叠      buffer.clear();  }  ch.close();//关闭Channel  ChannelFuture closeFuture = ch.closeFuture();  closeFuture.addListener(new ChannelFutureListener() {      @Override      public void operationComplete(ChannelFuture future) throws Exception {   nioEventLoopGroup.shutdownGracefully();      }  });     } });    }}).connect("localhost",8080);      }catch (Exception e){   e.printStackTrace();      }  }

输出结果:

13:47:15.199 [nioEventLoopGroup-3-4] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835] REGISTERED13:47:15.199 [nioEventLoopGroup-3-4] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835] ACTIVE13:47:15.224 [nioEventLoopGroup-3-4] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835] READ: 11B  +-------------------------------------------------+  |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |+--------+-------------------------------------------------+----------------+|00000000| 68 65 6c 6c 6f 20 77 6f 72 6c 64  |hello world     |+--------+-------------------------------------------------+----------------+13:47:15.224 [nioEventLoopGroup-3-4] INFO com.netty.netty.high.demo3.NettyServer - msg=PooledSlicedByteBuf(ridx: 0, widx: 11, cap: 11/11, unwrapped: PooledUnsafeDirectByteBuf(ridx: 12, widx: 12, cap: 2048))13:47:15.224 [nioEventLoopGroup-3-4] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded inbound message PooledSlicedByteBuf(ridx: 0, widx: 11, cap: 11/11, unwrapped: PooledUnsafeDirectByteBuf(ridx: 12, widx: 12, cap: 2048)) that reached at the tail of the pipeline. Please check your pipeline configuration.13:47:15.224 [nioEventLoopGroup-3-4] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded message pipeline : [LineBasedFrameDecoder#0, LoggingHandler#0, NettyServer$1$1#0, DefaultChannelPipeline$TailContext#0]. Channel : [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835].13:47:15.224 [nioEventLoopGroup-3-4] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835] READ COMPLETE13:47:15.224 [nioEventLoopGroup-3-4] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835] READ: 11B  +-------------------------------------------------+  |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |+--------+-------------------------------------------------+----------------+|00000000| 68 65 6c 6c 6f 20 77 6f 72 6c 64  |hello world     |+--------+-------------------------------------------------+----------------+13:47:15.224 [nioEventLoopGroup-3-4] INFO com.netty.netty.high.demo3.NettyServer - msg=PooledSlicedByteBuf(ridx: 0, widx: 11, cap: 11/11, unwrapped: PooledUnsafeDirectByteBuf(ridx: 12, widx: 24, cap: 2048))13:47:15.224 [nioEventLoopGroup-3-4] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded inbound message PooledSlicedByteBuf(ridx: 0, widx: 11, cap: 11/11, unwrapped: PooledUnsafeDirectByteBuf(ridx: 12, widx: 24, cap: 2048)) that reached at the tail of the pipeline. Please check your pipeline configuration.13:47:15.224 [nioEventLoopGroup-3-4] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded message pipeline : [LineBasedFrameDecoder#0, LoggingHandler#0, NettyServer$1$1#0, DefaultChannelPipeline$TailContext#0]. Channel : [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835].13:47:15.224 [nioEventLoopGroup-3-4] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835] READ: 11B  +-------------------------------------------------+  |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |+--------+-------------------------------------------------+----------------+|00000000| 68 65 6c 6c 6f 20 77 6f 72 6c 64  |hello world     |+--------+-------------------------------------------------+----------------+13:47:15.224 [nioEventLoopGroup-3-4] INFO com.netty.netty.high.demo3.NettyServer - msg=PooledSlicedByteBuf(ridx: 0, widx: 11, cap: 11/11, unwrapped: PooledUnsafeDirectByteBuf(ridx: 24, widx: 24, cap: 2048))13:47:15.224 [nioEventLoopGroup-3-4] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded inbound message PooledSlicedByteBuf(ridx: 0, widx: 11, cap: 11/11, unwrapped: PooledUnsafeDirectByteBuf(ridx: 24, widx: 24, cap: 2048)) that reached at the tail of the pipeline. Please check your pipeline configuration.13:47:15.224 [nioEventLoopGroup-3-4] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded message pipeline : [LineBasedFrameDecoder#0, LoggingHandler#0, NettyServer$1$1#0, DefaultChannelPipeline$TailContext#0]. Channel : [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835].13:47:15.224 [nioEventLoopGroup-3-4] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835] READ COMPLETE13:47:15.224 [nioEventLoopGroup-3-4] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835] READ: 11B  +-------------------------------------------------+  |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |+--------+-------------------------------------------------+----------------+|00000000| 68 65 6c 6c 6f 20 77 6f 72 6c 64  |hello world     |+--------+-------------------------------------------------+----------------+13:47:15.225 [nioEventLoopGroup-3-4] INFO com.netty.netty.high.demo3.NettyServer - msg=PooledSlicedByteBuf(ridx: 0, widx: 11, cap: 11/11, unwrapped: PooledUnsafeDirectByteBuf(ridx: 12, widx: 24, cap: 1024))13:47:15.225 [nioEventLoopGroup-3-4] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded inbound message PooledSlicedByteBuf(ridx: 0, widx: 11, cap: 11/11, unwrapped: PooledUnsafeDirectByteBuf(ridx: 12, widx: 24, cap: 1024)) that reached at the tail of the pipeline. Please check your pipeline configuration.13:47:15.225 [nioEventLoopGroup-3-4] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded message pipeline : [LineBasedFrameDecoder#0, LoggingHandler#0, NettyServer$1$1#0, DefaultChannelPipeline$TailContext#0]. Channel : [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835].13:47:15.225 [nioEventLoopGroup-3-4] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835] READ: 11B  +-------------------------------------------------+  |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |+--------+-------------------------------------------------+----------------+|00000000| 68 65 6c 6c 6f 20 77 6f 72 6c 64  |hello world     |+--------+-------------------------------------------------+----------------+13:47:15.225 [nioEventLoopGroup-3-4] INFO com.netty.netty.high.demo3.NettyServer - msg=PooledSlicedByteBuf(ridx: 0, widx: 11, cap: 11/11, unwrapped: PooledUnsafeDirectByteBuf(ridx: 24, widx: 24, cap: 1024))13:47:15.225 [nioEventLoopGroup-3-4] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded inbound message PooledSlicedByteBuf(ridx: 0, widx: 11, cap: 11/11, unwrapped: PooledUnsafeDirectByteBuf(ridx: 24, widx: 24, cap: 1024)) that reached at the tail of the pipeline. Please check your pipeline configuration.13:47:15.225 [nioEventLoopGroup-3-4] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded message pipeline : [LineBasedFrameDecoder#0, LoggingHandler#0, NettyServer$1$1#0, DefaultChannelPipeline$TailContext#0]. Channel : [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835].13:47:15.225 [nioEventLoopGroup-3-4] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835] READ COMPLETE13:47:15.225 [nioEventLoopGroup-3-4] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835] READ: 11B  +-------------------------------------------------+  |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |+--------+-------------------------------------------------+----------------+|00000000| 68 65 6c 6c 6f 20 77 6f 72 6c 64  |hello world     |+--------+-------------------------------------------------+----------------+13:47:15.225 [nioEventLoopGroup-3-4] INFO com.netty.netty.high.demo3.NettyServer - msg=PooledSlicedByteBuf(ridx: 0, widx: 11, cap: 11/11, unwrapped: PooledUnsafeDirectByteBuf(ridx: 12, widx: 36, cap: 1024))13:47:15.225 [nioEventLoopGroup-3-4] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded inbound message PooledSlicedByteBuf(ridx: 0, widx: 11, cap: 11/11, unwrapped: PooledUnsafeDirectByteBuf(ridx: 12, widx: 36, cap: 1024)) that reached at the tail of the pipeline. Please check your pipeline configuration.13:47:15.225 [nioEventLoopGroup-3-4] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded message pipeline : [LineBasedFrameDecoder#0, LoggingHandler#0, NettyServer$1$1#0, DefaultChannelPipeline$TailContext#0]. Channel : [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835].13:47:15.225 [nioEventLoopGroup-3-4] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835] READ: 11B  +-------------------------------------------------+  |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |+--------+-------------------------------------------------+----------------+|00000000| 68 65 6c 6c 6f 20 77 6f 72 6c 64  |hello world     |+--------+-------------------------------------------------+----------------+13:47:15.225 [nioEventLoopGroup-3-4] INFO com.netty.netty.high.demo3.NettyServer - msg=PooledSlicedByteBuf(ridx: 0, widx: 11, cap: 11/11, unwrapped: PooledUnsafeDirectByteBuf(ridx: 24, widx: 36, cap: 1024))13:47:15.225 [nioEventLoopGroup-3-4] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded inbound message PooledSlicedByteBuf(ridx: 0, widx: 11, cap: 11/11, unwrapped: PooledUnsafeDirectByteBuf(ridx: 24, widx: 36, cap: 1024)) that reached at the tail of the pipeline. Please check your pipeline configuration.13:47:15.225 [nioEventLoopGroup-3-4] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded message pipeline : [LineBasedFrameDecoder#0, LoggingHandler#0, NettyServer$1$1#0, DefaultChannelPipeline$TailContext#0]. Channel : [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835].13:47:15.225 [nioEventLoopGroup-3-4] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835] READ: 11B  +-------------------------------------------------+  |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |+--------+-------------------------------------------------+----------------+|00000000| 68 65 6c 6c 6f 20 77 6f 72 6c 64  |hello world     |+--------+-------------------------------------------------+----------------+13:47:15.225 [nioEventLoopGroup-3-4] INFO com.netty.netty.high.demo3.NettyServer - msg=PooledSlicedByteBuf(ridx: 0, widx: 11, cap: 11/11, unwrapped: PooledUnsafeDirectByteBuf(ridx: 36, widx: 36, cap: 1024))13:47:15.225 [nioEventLoopGroup-3-4] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded inbound message PooledSlicedByteBuf(ridx: 0, widx: 11, cap: 11/11, unwrapped: PooledUnsafeDirectByteBuf(ridx: 36, widx: 36, cap: 1024)) that reached at the tail of the pipeline. Please check your pipeline configuration.13:47:15.225 [nioEventLoopGroup-3-4] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded message pipeline : [LineBasedFrameDecoder#0, LoggingHandler#0, NettyServer$1$1#0, DefaultChannelPipeline$TailContext#0]. Channel : [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835].13:47:15.225 [nioEventLoopGroup-3-4] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835] READ COMPLETE13:47:15.225 [nioEventLoopGroup-3-4] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835] READ: 11B  +-------------------------------------------------+  |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |+--------+-------------------------------------------------+----------------+|00000000| 68 65 6c 6c 6f 20 77 6f 72 6c 64  |hello world     |+--------+-------------------------------------------------+----------------+13:47:15.225 [nioEventLoopGroup-3-4] INFO com.netty.netty.high.demo3.NettyServer - msg=PooledSlicedByteBuf(ridx: 0, widx: 11, cap: 11/11, unwrapped: PooledUnsafeDirectByteBuf(ridx: 12, widx: 24, cap: 512))13:47:15.225 [nioEventLoopGroup-3-4] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded inbound message PooledSlicedByteBuf(ridx: 0, widx: 11, cap: 11/11, unwrapped: PooledUnsafeDirectByteBuf(ridx: 12, widx: 24, cap: 512)) that reached at the tail of the pipeline. Please check your pipeline configuration.13:47:15.225 [nioEventLoopGroup-3-4] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded message pipeline : [LineBasedFrameDecoder#0, LoggingHandler#0, NettyServer$1$1#0, DefaultChannelPipeline$TailContext#0]. Channel : [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835].13:47:15.225 [nioEventLoopGroup-3-4] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x2596d7b1, L:/127.0.0.1:8080 - R:/127.0.0.1:64835] READ: 11B  +-------------------------------------------------+  |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |+--------+-------------------------------------------------+----------------+|00000000| 68 65 6c 6c 6f 20 77 6f 72 6c 64  |hello world     |+--------+-------------------------------------------------+----------------+

可以看出已经解决了粘包问题。

自定义分隔符解码器

核心类DelimiterBasedFrameDecoder

    /**     * Creates a new instance.     *     * @param maxFrameLength  the maximum length of the decoded frame.     *   A {@link TooLongFrameException} is thrown if     *   the length of the frame exceeds this value.     * @param delimiter  the delimiter     */    public DelimiterBasedFrameDecoder(int maxFrameLength, ByteBuf delimiter) { this(maxFrameLength, true, delimiter);    }

服务端:

  private static final Logger log= LoggerFactory.getLogger(NettyServer.class);  public static void main(String[] args) {      NioEventLoopGroup boss = new NioEventLoopGroup(1);      NioEventLoopGroup worker = new NioEventLoopGroup(6);      new ServerBootstrap().group(boss,worker).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {    @Override    protected void initChannel(NioSocketChannel ch) throws Exception { ByteBuf delimiter = ch.alloc().buffer(6); delimiter.writeBytes("\r".getBytes("utf-8")); //自定义分隔符 ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter)); ch.pipeline().addLast(new LoggingHandler()); ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){     @Override     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  log.info("msg={}",msg);  super.channelRead(ctx, msg);     } });    }}).bind(8080);  }

客户端:

 //把消息加工成可以被行解码器识别的消息    private static String getMsg(String oldMsg){ oldMsg+='\r'; return oldMsg;    }  public static void main(String[] args) {      NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();      try{      new Bootstrap().group(nioEventLoopGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<Channel>() {    @Override    protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new LoggingHandler()); ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){     @Override     public void channelActive(ChannelHandlerContext ctx) throws Exception {  ByteBuf buffer = ctx.alloc().buffer(16);  for(int i=0;i<10;i++){      buffer.retain();      String msg = getMsg("hello world");      buffer.writeBytes(msg.getBytes("utf-8"));      ctx.channel().writeAndFlush(buffer);      //清理缓存,防止数据堆叠      buffer.clear();  }  ch.close();//关闭Channel  ChannelFuture closeFuture = ch.closeFuture();  closeFuture.addListener(new ChannelFutureListener() {      @Override      public void operationComplete(ChannelFuture future) throws Exception {   nioEventLoopGroup.shutdownGracefully();      }  });     } });    }}).connect("localhost",8080);      }catch (Exception e){   e.printStackTrace();      }  }

Netty协议解析

Redis协议

我们要用netty执行Redis命令就需要遵循Redis协议。

Redis协议格式

* \r\n$ \r\n \r\n...$ \r\n \r\n

使用Netty搭建一个Redis client

/** * @author 游政杰 * @date 2022/1/13 * 模拟 Redis client */public class RedisSender {    //true为继续循环,false是退出循环  private static ThreadLocal<Boolean> threadLocal=new ThreadLocal<Boolean>();  private static final Logger log= LoggerFactory.getLogger(RedisSender.class);  public static void main(String[] args) {      //Netty“”客户端“”执行Redis命令      NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();      try{      new Bootstrap().group(nioEventLoopGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<Channel>() {    @Override    protected void initChannel(Channel ch) throws Exception { threadLocal.set(true); //默认是继续循环 ch.pipeline().addLast(new LoggingHandler()); ch.pipeline().addLast("handle1",new ChannelInboundHandlerAdapter(){     //连接成功之后调用     @Override     public void channelActive(ChannelHandlerContext ctx) throws Exception {  Scanner sc = new Scanner(System.in);  for (;;){      if(!threadLocal.get()){   System.out.println("退出成功");   break;      }      printInfo();      String sel = sc.next();      switch (sel)      {   case "a":sc.nextLine(); //***上面会传下来字符,导致无法输入字符串,所以要加上这句,目的是吸收上面传下来的多余字符串System.out.println("请输入Redis命令[以单空格分隔]:");String redis_cmd = sc.nextLine();String decodeProtocol = decodeProtocol(redis_cmd);ByteBuf buffer = ctx.alloc().buffer(16);buffer.writeBytes(decodeProtocol.getBytes("utf-8"));ctx.writeAndFlush(buffer);buffer.clear();break;   case "q":ch.close();ChannelFuture closeFuture = ch.closeFuture();closeFuture.addListener(new ChannelFutureListener() {    @Override    public void operationComplete(ChannelFuture future) throws Exception { nioEventLoopGroup.shutdownGracefully(); threadLocal.set(false); //退出只需要设置为false即可    }});break;   default:System.out.println("无该选项");break;      }  }     } });    }}).connect("localhost",6379);      }catch (Exception e){   e.printStackTrace();   nioEventLoopGroup.shutdownGracefully();      }  }  private static void printInfo()  {    System.out.println("请输入以下字符选项:");    System.out.println("输入a:执行Redis命令");    System.out.println("输入q:退出");  }    /**     * 协议解析     * @param redis_cmd 命令     * @return     */    //set myname abc    //del key    //get key  private static synchronized String decodeProtocol(String redis_cmd){      String delimiter1="*";      String delimiter2="$";      String delimiter3="\r\n";      StringBuffer decodeCmd = new StringBuffer();//使用线程安全的StringBuffer      List<String> cmd = Arrays.asList(redis_cmd.split(" "));      decodeCmd.append(delimiter1+cmd.size()+delimiter3);      cmd.forEach((e)->{   decodeCmd.append(delimiter2+e.length()+delimiter3);   decodeCmd.append(e+delimiter3);      });      return decodeCmd.toString();  }}
Http协议

http服务端:

public class HttpServer {    //http服务器  public static void main(String[] args) {      NioEventLoopGroup boss = new NioEventLoopGroup(1);      NioEventLoopGroup worker = new NioEventLoopGroup(6);      new ServerBootstrap().group(boss,worker).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {    @Override    protected void initChannel(NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new LoggingHandler()); //netty自带的http协议转换 ch.pipeline().addLast(new HttpServerCodec()); ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){     @Override     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  //msg有两种类型  //class io.netty.handler.codec.http.DefaultHttpRequest  //class io.netty.handler.codec.http.LastHttpContent$1  if(msg instanceof HttpRequest){      HttpRequest request=(HttpRequest)msg;      //输出响应DefaultFullHttpResponse(HttpVersion version, HttpResponseStatus status)      DefaultFullHttpResponse response = new DefaultFullHttpResponse(request.protocolVersion(), HttpResponseStatus.OK);      String s="hello 2022";      byte b[]=s.getBytes("utf-8");      //从请求头设置响应数据长度,以免浏览器空转      //content_length是io.netty.handler.codec.http包下的类      response.headers().setInt(CONTENT_LENGTH,b.length);      //输出内容      response.content().writeBytes(b);      ctx.channel().writeAndFlush(response);  }  super.channelRead(ctx, msg);     } });    }}).bind("localhost",8080);  }}

高性能网络通信框架Netty一套就够(作者原创)

输出结果:

16:03:05.785 [nioEventLoopGroup-3-3] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x924b66ea, L:/127.0.0.1:8080 - R:/127.0.0.1:52486] REGISTERED16:03:05.785 [nioEventLoopGroup-3-3] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x924b66ea, L:/127.0.0.1:8080 - R:/127.0.0.1:52486] ACTIVE16:03:05.788 [nioEventLoopGroup-3-3] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x924b66ea, L:/127.0.0.1:8080 - R:/127.0.0.1:52486] READ: 756B  +-------------------------------------------------+  |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |+--------+-------------------------------------------------+----------------+|00000000| 47 45 54 20 2f 20 48 54 54 50 2f 31 2e 31 0d 0a |GET / HTTP/1.1..||00000010| 48 6f 73 74 3a 20 6c 6f 63 61 6c 68 6f 73 74 3a |Host: localhost:||00000020| 38 30 38 30 0d 0a 55 73 65 72 2d 41 67 65 6e 74 |8080..User-Agent||00000030| 3a 20 4d 6f 7a 69 6c 6c 61 2f 35 2e 30 20 28 57 |: Mozilla/5.0 (W||00000040| 69 6e 64 6f 77 73 20 4e 54 20 31 30 2e 30 3b 20 |indows NT 10.0; ||00000050| 57 69 6e 36 34 3b 20 78 36 34 3b 20 72 76 3a 39 |Win64; x64; rv:9||00000060| 35 2e 30 29 20 47 65 63 6b 6f 2f 32 30 31 30 30 |5.0) Gecko/20100||00000070| 31 30 31 20 46 69 72 65 66 6f 78 2f 39 35 2e 30 |101 Firefox/95.0||00000080| 0d 0a 41 63 63 65 70 74 3a 20 74 65 78 74 2f 68 |..Accept: text/h||00000090| 74 6d 6c 2c 61 70 70 6c 69 63 61 74 69 6f 6e 2f |tml,application/||000000a0| 78 68 74 6d 6c 2b 78 6d 6c 2c 61 70 70 6c 69 63 |xhtml+xml,applic||000000b0| 61 74 69 6f 6e 2f 78 6d 6c 3b 71 3d 30 2e 39 2c |ation/xml;q=0.9,||000000c0| 69 6d 61 67 65 2f 61 76 69 66 2c 69 6d 61 67 65 |image/avif,image||000000d0| 2f 77 65 62 70 2c 2a 2f 2a 3b 71 3d 30 2e 38 0d |/webp,*/*;q=0.8.||000000e0| 0a 41 63 63 65 70 74 2d 4c 61 6e 67 75 61 67 65 |.Accept-Language||000000f0| 3a 20 7a 68 2d 43 4e 2c 7a 68 3b 71 3d 30 2e 38 |: zh-CN,zh;q=0.8||00000100| 2c 7a 68 2d 54 57 3b 71 3d 30 2e 37 2c 7a 68 2d |,zh-TW;q=0.7,zh-||00000110| 48 4b 3b 71 3d 30 2e 35 2c 65 6e 2d 55 53 3b 71 |HK;q=0.5,en-US;q||00000120| 3d 30 2e 33 2c 65 6e 3b 71 3d 30 2e 32 0d 0a 41 |=0.3,en;q=0.2..A||00000130| 63 63 65 70 74 2d 45 6e 63 6f 64 69 6e 67 3a 20 |ccept-Encoding: ||00000140| 67 7a 69 70 2c 20 64 65 66 6c 61 74 65 0d 0a 43 |gzip, deflate..C||00000150| 6f 6e 6e 65 63 74 69 6f 6e 3a 20 6b 65 65 70 2d |onnection: keep-||00000160| 61 6c 69 76 65 0d 0a 43 6f 6f 6b 69 65 3a 20 48 |alive..Cookie: H||00000170| 6d 5f 6c 76 74 5f 62 33 39 33 64 31 35 33 61 65 |m_lvt_b393d153ae||00000180| 62 32 36 62 34 36 65 39 34 33 31 66 61 62 61 66 |b26b46e9431fabaf||00000190| 30 66 36 31 39 30 3d 31 36 32 31 39 30 35 38 35 |0f6190=162190585||000001a0| 30 3b 20 49 64 65 61 2d 33 35 32 63 36 33 39 66 |0; Idea-352c639f||000001b0| 3d 31 32 37 64 31 61 65 35 2d 34 34 31 37 2d 34 |=127d1ae5-4417-4||000001c0| 61 62 36 2d 61 61 64 33 2d 36 32 36 62 66 38 36 |ab6-aad3-626bf86||000001d0| 34 62 62 62 33 3b 20 55 4d 5f 64 69 73 74 69 6e |4bbb3; UM_distin||000001e0| 63 74 69 64 3d 31 37 61 61 65 35 65 65 63 33 34 |ctid=17aae5eec34||000001f0| 35 31 30 2d 30 39 39 37 35 66 34 36 64 62 65 63 |510-09975f46dbec||00000200| 33 38 38 2d 34 63 33 65 32 35 37 61 2d 31 34 34 |388-4c3e257a-144||00000210| 30 30 30 2d 31 37 61 61 65 35 65 65 63 33 36 33 |000-17aae5eec363||00000220| 61 62 3b 20 43 4e 5a 5a 44 41 54 41 31 32 35 38 |ab; CNZZDATA1258||00000230| 35 36 36 39 36 33 3d 31 36 31 32 35 36 38 34 34 |566963=161256844||00000240| 38 2d 31 36 32 36 34 32 32 37 30 36 2d 25 37 43 |8-1626422706-%7C||00000250| 31 36 32 36 34 32 38 31 35 30 0d 0a 55 70 67 72 |1626428150..Upgr||00000260| 61 64 65 2d 49 6e 73 65 63 75 72 65 2d 52 65 71 |ade-Insecure-Req||00000270| 75 65 73 74 73 3a 20 31 0d 0a 53 65 63 2d 46 65 |uests: 1..Sec-Fe||00000280| 74 63 68 2d 44 65 73 74 3a 20 64 6f 63 75 6d 65 |tch-Dest: docume||00000290| 6e 74 0d 0a 53 65 63 2d 46 65 74 63 68 2d 4d 6f |nt..Sec-Fetch-Mo||000002a0| 64 65 3a 20 6e 61 76 69 67 61 74 65 0d 0a 53 65 |de: navigate..Se||000002b0| 63 2d 46 65 74 63 68 2d 53 69 74 65 3a 20 6e 6f |c-Fetch-Site: no||000002c0| 6e 65 0d 0a 53 65 63 2d 46 65 74 63 68 2d 55 73 |ne..Sec-Fetch-Us||000002d0| 65 72 3a 20 3f 31 0d 0a 43 61 63 68 65 2d 43 6f |er: ?1..Cache-Co||000002e0| 6e 74 72 6f 6c 3a 20 6d 61 78 2d 61 67 65 3d 30 |ntrol: max-age=0||000002f0| 0d 0a 0d 0a  |....     |+--------+-------------------------------------------------+----------------+16:03:05.789 [nioEventLoopGroup-3-3] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x924b66ea, L:/127.0.0.1:8080 - R:/127.0.0.1:52486] WRITE: 49B  +-------------------------------------------------+  |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |+--------+-------------------------------------------------+----------------+|00000000| 48 54 54 50 2f 31 2e 31 20 32 30 30 20 4f 4b 0d |HTTP/1.1 200 OK.||00000010| 0a 63 6f 6e 74 65 6e 74 2d 6c 65 6e 67 74 68 3a |.content-length:||00000020| 20 31 30 0d 0a 0d 0a 68 65 6c 6c 6f 20 32 30 32 | 10....hello 202||00000030| 32    |2 |+--------+-------------------------------------------------+----------------+16:03:05.789 [nioEventLoopGroup-3-3] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x924b66ea, L:/127.0.0.1:8080 - R:/127.0.0.1:52486] FLUSH16:03:05.789 [nioEventLoopGroup-3-3] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded inbound message DefaultHttpRequest(decodeResult: success, version: HTTP/1.1)GET / HTTP/1.1Host: localhost:8080User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:95.0) Gecko/20100101 Firefox/95.0Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8Accept-Language: zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2Accept-Encoding: gzip, deflateConnection: keep-aliveCookie: Hm_lvt_b393d153aeb26b46e9431fabaf0f6190=1621905850; Idea-352c639f=127d1ae5-4417-4ab6-aad3-626bf864bbb3; UM_distinctid=17aae5eec34510-09975f46dbec388-4c3e257a-144000-17aae5eec363ab; CNZZDATA1258566963=1612568448-1626422706-%7C1626428150Upgrade-Insecure-Requests: 1Sec-Fetch-Dest: documentSec-Fetch-Mode: navigateSec-Fetch-Site: noneSec-Fetch-User: ?1Cache-Control: max-age=0 that reached at the tail of the pipeline. Please check your pipeline configuration.16:03:05.790 [nioEventLoopGroup-3-3] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded message pipeline : [LoggingHandler#0, HttpServerCodec#0, HttpServer$1$1#0, DefaultChannelPipeline$TailContext#0]. Channel : [id: 0x924b66ea, L:/127.0.0.1:8080 - R:/127.0.0.1:52486].16:03:05.790 [nioEventLoopGroup-3-3] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded inbound message EmptyLastHttpContent that reached at the tail of the pipeline. Please check your pipeline configuration.16:03:05.790 [nioEventLoopGroup-3-3] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded message pipeline : [LoggingHandler#0, HttpServerCodec#0, HttpServer$1$1#0, DefaultChannelPipeline$TailContext#0]. Channel : [id: 0x924b66ea, L:/127.0.0.1:8080 - R:/127.0.0.1:52486].16:03:05.790 [nioEventLoopGroup-3-3] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x924b66ea, L:/127.0.0.1:8080 - R:/127.0.0.1:52486] READ COMPLETE16:03:05.809 [nioEventLoopGroup-3-3] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x924b66ea, L:/127.0.0.1:8080 - R:/127.0.0.1:52486] READ: 693B  +-------------------------------------------------+  |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |+--------+-------------------------------------------------+----------------+|00000000| 47 45 54 20 2f 66 61 76 69 63 6f 6e 2e 69 63 6f |GET /favicon.ico||00000010| 20 48 54 54 50 2f 31 2e 31 0d 0a 48 6f 73 74 3a | HTTP/1.1..Host:||00000020| 20 6c 6f 63 61 6c 68 6f 73 74 3a 38 30 38 30 0d | localhost:8080.||00000030| 0a 55 73 65 72 2d 41 67 65 6e 74 3a 20 4d 6f 7a |.User-Agent: Moz||00000040| 69 6c 6c 61 2f 35 2e 30 20 28 57 69 6e 64 6f 77 |illa/5.0 (Window||00000050| 73 20 4e 54 20 31 30 2e 30 3b 20 57 69 6e 36 34 |s NT 10.0; Win64||00000060| 3b 20 78 36 34 3b 20 72 76 3a 39 35 2e 30 29 20 |; x64; rv:95.0) ||00000070| 47 65 63 6b 6f 2f 32 30 31 30 30 31 30 31 20 46 |Gecko/20100101 F||00000080| 69 72 65 66 6f 78 2f 39 35 2e 30 0d 0a 41 63 63 |irefox/95.0..Acc||00000090| 65 70 74 3a 20 69 6d 61 67 65 2f 61 76 69 66 2c |ept: image/avif,||000000a0| 69 6d 61 67 65 2f 77 65 62 70 2c 2a 2f 2a 0d 0a |image/webp,*/*..||000000b0| 41 63 63 65 70 74 2d 4c 61 6e 67 75 61 67 65 3a |Accept-Language:||000000c0| 20 7a 68 2d 43 4e 2c 7a 68 3b 71 3d 30 2e 38 2c | zh-CN,zh;q=0.8,||000000d0| 7a 68 2d 54 57 3b 71 3d 30 2e 37 2c 7a 68 2d 48 |zh-TW;q=0.7,zh-H||000000e0| 4b 3b 71 3d 30 2e 35 2c 65 6e 2d 55 53 3b 71 3d |K;q=0.5,en-US;q=||000000f0| 30 2e 33 2c 65 6e 3b 71 3d 30 2e 32 0d 0a 41 63 |0.3,en;q=0.2..Ac||00000100| 63 65 70 74 2d 45 6e 63 6f 64 69 6e 67 3a 20 67 |cept-Encoding: g||00000110| 7a 69 70 2c 20 64 65 66 6c 61 74 65 0d 0a 43 6f |zip, deflate..Co||00000120| 6e 6e 65 63 74 69 6f 6e 3a 20 6b 65 65 70 2d 61 |nnection: keep-a||00000130| 6c 69 76 65 0d 0a 52 65 66 65 72 65 72 3a 20 68 |live..Referer: h||00000140| 74 74 70 3a 2f 2f 6c 6f 63 61 6c 68 6f 73 74 3a |ttp://localhost:||00000150| 38 30 38 30 2f 0d 0a 43 6f 6f 6b 69 65 3a 20 48 |8080/..Cookie: H||00000160| 6d 5f 6c 76 74 5f 62 33 39 33 64 31 35 33 61 65 |m_lvt_b393d153ae||00000170| 62 32 36 62 34 36 65 39 34 33 31 66 61 62 61 66 |b26b46e9431fabaf||00000180| 30 66 36 31 39 30 3d 31 36 32 31 39 30 35 38 35 |0f6190=162190585||00000190| 30 3b 20 49 64 65 61 2d 33 35 32 63 36 33 39 66 |0; Idea-352c639f||000001a0| 3d 31 32 37 64 31 61 65 35 2d 34 34 31 37 2d 34 |=127d1ae5-4417-4||000001b0| 61 62 36 2d 61 61 64 33 2d 36 32 36 62 66 38 36 |ab6-aad3-626bf86||000001c0| 34 62 62 62 33 3b 20 55 4d 5f 64 69 73 74 69 6e |4bbb3; UM_distin||000001d0| 63 74 69 64 3d 31 37 61 61 65 35 65 65 63 33 34 |ctid=17aae5eec34||000001e0| 35 31 30 2d 30 39 39 37 35 66 34 36 64 62 65 63 |510-09975f46dbec||000001f0| 33 38 38 2d 34 63 33 65 32 35 37 61 2d 31 34 34 |388-4c3e257a-144||00000200| 30 30 30 2d 31 37 61 61 65 35 65 65 63 33 36 33 |000-17aae5eec363||00000210| 61 62 3b 20 43 4e 5a 5a 44 41 54 41 31 32 35 38 |ab; CNZZDATA1258||00000220| 35 36 36 39 36 33 3d 31 36 31 32 35 36 38 34 34 |566963=161256844||00000230| 38 2d 31 36 32 36 34 32 32 37 30 36 2d 25 37 43 |8-1626422706-%7C||00000240| 31 36 32 36 34 32 38 31 35 30 0d 0a 53 65 63 2d |1626428150..Sec-||00000250| 46 65 74 63 68 2d 44 65 73 74 3a 20 69 6d 61 67 |Fetch-Dest: imag||00000260| 65 0d 0a 53 65 63 2d 46 65 74 63 68 2d 4d 6f 64 |e..Sec-Fetch-Mod||00000270| 65 3a 20 6e 6f 2d 63 6f 72 73 0d 0a 53 65 63 2d |e: no-cors..Sec-||00000280| 46 65 74 63 68 2d 53 69 74 65 3a 20 73 61 6d 65 |Fetch-Site: same||00000290| 2d 6f 72 69 67 69 6e 0d 0a 43 61 63 68 65 2d 43 |-origin..Cache-C||000002a0| 6f 6e 74 72 6f 6c 3a 20 6d 61 78 2d 61 67 65 3d |ontrol: max-age=||000002b0| 30 0d 0a 0d 0a      |0....    |+--------+-------------------------------------------------+----------------+16:03:05.810 [nioEventLoopGroup-3-3] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x924b66ea, L:/127.0.0.1:8080 - R:/127.0.0.1:52486] WRITE: 49B  +-------------------------------------------------+  |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |+--------+-------------------------------------------------+----------------+|00000000| 48 54 54 50 2f 31 2e 31 20 32 30 30 20 4f 4b 0d |HTTP/1.1 200 OK.||00000010| 0a 63 6f 6e 74 65 6e 74 2d 6c 65 6e 67 74 68 3a |.content-length:||00000020| 20 31 30 0d 0a 0d 0a 68 65 6c 6c 6f 20 32 30 32 | 10....hello 202||00000030| 32    |2 |+--------+-------------------------------------------------+----------------+16:03:05.810 [nioEventLoopGroup-3-3] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x924b66ea, L:/127.0.0.1:8080 - R:/127.0.0.1:52486] FLUSH16:03:05.810 [nioEventLoopGroup-3-3] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded inbound message DefaultHttpRequest(decodeResult: success, version: HTTP/1.1)GET /favicon.ico HTTP/1.1Host: localhost:8080User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:95.0) Gecko/20100101 Firefox/95.0Accept: image/avif,image/webp,*/*Accept-Language: zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2Accept-Encoding: gzip, deflateConnection: keep-aliveReferer: http://localhost:8080/Cookie: Hm_lvt_b393d153aeb26b46e9431fabaf0f6190=1621905850; Idea-352c639f=127d1ae5-4417-4ab6-aad3-626bf864bbb3; UM_distinctid=17aae5eec34510-09975f46dbec388-4c3e257a-144000-17aae5eec363ab; CNZZDATA1258566963=1612568448-1626422706-%7C1626428150Sec-Fetch-Dest: imageSec-Fetch-Mode: no-corsSec-Fetch-Site: same-origin

群聊

服务端:

private static final Logger log= LoggerFactory.getLogger(NettyServer.class);   //维护所有channel,key=名称,value为channel对象    private static Map<String, NioSocketChannel> sessions=new ConcurrentHashMap<>();    public static Map<String, NioSocketChannel> getSessions() { return sessions;    }    public static void putSession(String name,NioSocketChannel channel){ sessions.put(name,channel);    }    public static void removeSession(String name){ sessions.remove(name);    }  public static void main(String[] args) {      NioEventLoopGroup boss = new NioEventLoopGroup(1);      NioEventLoopGroup worker = new NioEventLoopGroup(6);      new ServerBootstrap().group(boss,worker).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {    @Override    protected void initChannel(NioSocketChannel ch) throws Exception {// ch.pipeline().addLast(new LineBasedFrameDecoder(1024));//配置行解码器 ch.pipeline().addLast(new LoggingHandler()); ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){     @Override     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {  super.exceptionCaught(ctx, cause);     }     //读消息     @Override     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  ByteBuf byteBuf=(ByteBuf)msg;  byte b[]=new byte[byteBuf.readableBytes()];  byteBuf.readBytes(b);  String str=new String(b,"utf8");  JSONObject jsonObject = JSONObject.parseObject(str);  int state = (int) jsonObject.get("state");  String username = (String) jsonObject.get("username");  switch (state)      {   case 0: //上线NettyServer.putSession(username, ch);if(username.equals("client4")){ //如果是client4用户登录则群发    sessions.forEach((k,v)->{ ByteBuf buffer = ctx.alloc().buffer(16); try {     buffer.writeBytes("群发hhhh".getBytes("utf8"));     v.writeAndFlush(buffer);//     buffer.clear(); } catch (UnsupportedEncodingException e) {     e.printStackTrace(); }    });}System.out.println("当前在线人数:"+NettyServer.getSessions().size());break;   case 1: //下线NettyServer.removeSession(username);NettyServer.getSessions().forEach((k,v)->{    System.out.println(k);    System.out.println(v.hashCode());});System.out.println("当前在线人数:"+NettyServer.getSessions().size());break;   default:break;      }  super.channelRead(ctx, msg);     } });    }}).bind(8080);  }

客户端1:

public static void main(String[] args) { String name="client1";      NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();      try{      new Bootstrap().group(nioEventLoopGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<Channel>() {    @Override    protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new LoggingHandler()); ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){     @Override     public void channelActive(ChannelHandlerContext ctx) throws Exception {  User user = new User();  user.setUsername(name);  user.setState(0);  String jsonString = JSON.toJSONString(user);  ByteBuf buffer = ctx.alloc().buffer(16);  buffer.writeBytes(jsonString.getBytes("utf8"));  ch.writeAndFlush(buffer);  super.channelActive(ctx);     }     @Override     public void channelInactive(ChannelHandlerContext ctx) throws Exception {  User user = new User();  user.setUsername(name);  user.setState(1);  String jsonString = JSON.toJSONString(user);  ByteBuf buffer = ctx.alloc().buffer(16);  buffer.writeBytes(jsonString.getBytes("utf8"));  ch.writeAndFlush(buffer);  super.channelInactive(ctx);     }     @Override     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  System.out.println(msg);  super.channelRead(ctx, msg);     } });    }}).connect("localhost",8080);      }catch (Exception e){   e.printStackTrace();      }  }

客户端2:

 public static void main(String[] args) { String name="client2";      NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();      try{      new Bootstrap().group(nioEventLoopGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<Channel>() {    @Override    protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new LoggingHandler()); ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){     @Override     public void channelActive(ChannelHandlerContext ctx) throws Exception {  User user = new User();  user.setUsername(name);  user.setState(0);  String jsonString = JSON.toJSONString(user);  ByteBuf buffer = ctx.alloc().buffer(16);  buffer.writeBytes(jsonString.getBytes("utf8"));  ch.writeAndFlush(buffer);  super.channelActive(ctx);     }     @Override     public void channelInactive(ChannelHandlerContext ctx) throws Exception {  User user = new User();  user.setUsername(name);  user.setState(1);  String jsonString = JSON.toJSONString(user);  ByteBuf buffer = ctx.alloc().buffer(16);  buffer.writeBytes(jsonString.getBytes("utf8"));  ch.writeAndFlush(buffer);  super.channelInactive(ctx);     }     @Override     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  System.out.println(msg);  super.channelRead(ctx, msg);     } });    }}).connect("localhost",8080);      }catch (Exception e){   e.printStackTrace();      }  }

客户端3:

 public static void main(String[] args) { String name="client3";      NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();      try{      new Bootstrap().group(nioEventLoopGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<Channel>() {    @Override    protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new LoggingHandler()); ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){     @Override     public void channelActive(ChannelHandlerContext ctx) throws Exception {  User user = new User();  user.setUsername(name);  user.setState(0);  String jsonString = JSON.toJSONString(user);  ByteBuf buffer = ctx.alloc().buffer(16);  buffer.writeBytes(jsonString.getBytes("utf8"));  ch.writeAndFlush(buffer);  super.channelActive(ctx);     }     @Override     public void channelInactive(ChannelHandlerContext ctx) throws Exception {  User user = new User();  user.setUsername(name);  user.setState(1);  String jsonString = JSON.toJSONString(user);  ByteBuf buffer = ctx.alloc().buffer(16);  buffer.writeBytes(jsonString.getBytes("utf8"));  ch.writeAndFlush(buffer);  super.channelInactive(ctx);     }     @Override     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  System.out.println(msg);  super.channelRead(ctx, msg);     } });    }}).connect("localhost",8080);      }catch (Exception e){   e.printStackTrace();      }  }

客户端4:

public static void main(String[] args) { String name="client4";      NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();      try{      new Bootstrap().group(nioEventLoopGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<Channel>() {    @Override    protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new LoggingHandler()); ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){     @Override     public void channelActive(ChannelHandlerContext ctx) throws Exception {  User user = new User();  user.setUsername(name);  user.setState(0);  String jsonString = JSON.toJSONString(user);  ByteBuf buffer = ctx.alloc().buffer(16);  buffer.writeBytes(jsonString.getBytes("utf8"));  ch.writeAndFlush(buffer);  super.channelActive(ctx);     }     @Override     public void channelInactive(ChannelHandlerContext ctx) throws Exception {  User user = new User();  user.setUsername(name);  user.setState(1);  String jsonString = JSON.toJSONString(user);  ByteBuf buffer = ctx.alloc().buffer(16);  buffer.writeBytes(jsonString.getBytes("utf8"));  ch.writeAndFlush(buffer);  super.channelInactive(ctx);     }     @Override     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  System.out.println(msg);  super.channelRead(ctx, msg);     } });    }}).connect("localhost",8080);      }catch (Exception e){   e.printStackTrace();      }  }

实体类:

public class User implements Serializable {    private String username;    private int state; //用户状态,0在线,1下线    public String getUsername() { return username;    }    public void setUsername(String username) { this.username = username;    }    public int getState() { return state;    }    public void setState(int state) { this.state = state;    }    @Override    public String toString() { return "User{" +  "username='" + username + '\'' +  ", state=" + state +  '}';    }}