> 文档中心 > Netty实战《原理》

Netty实战《原理》

Netty介绍

      • 官网说明
      • Netty优点
      • Netty工作原理示意图
      • Netty异步模型
      • Netty核心组件模块
        • Bootstrap和ServerBootstrap
        • Future和ChannelFuture
        • Channel
        • Selector
        • ChannelHandler 及其实现类
        • Pipeline 和 ChannelPipeline
        • ChannelHandlerContext
        • ChannelOption
        • Unpooled 类

官网说明

1)netty是由JBOSS提供的一个java开源框架。Netty提供异步的,基于事件驱动的网络应用程序框架,用于快速开发高性能,高可靠的网络IO程序
2)netty可以帮助你快速,简单的开发一个网络应用,相当于简化和流程化NIO的开发流程
3)netty目前最流行的NIO框架,在互联网,大数据分布式计算领域,游戏行业,通信行业等有广泛的应用,知名的Es,Dubbo等框架内部都采用netty

Netty优点

1)设计优雅
2)使用方便
3)高性能,吞吐量高,延迟低,减少资源损耗,减少不必要的内存复制
4)安全,完整的ssl/tls 和start TLS支持
5)社区活跃

Netty工作原理示意图

在这里插入图片描述
说明
1) netty抽象出两组线程池 BossGroup专门负责接受客户端请求,WorkerGroup专门负责网络读写
2) BossGroup和WorkerGroup 类型都是NioEventLoopGroup
3)NioEventLoopGroup相当于一个事件循环组,这个组中含有多个事件循环 ,每个循环相当于NioEventLoop
4)NioEventLoop表示一个不断循环的执行处理任务的线程,每个NioEventLoop都有一个selector,用于监听绑定在其上的socket的网络通讯
5)NioEventLoopGroup可以有多个NioEventLoop
6)每个Boss NioEventLoop循环执行的步骤

  • 轮询accept事件
  • 处理accept事件,与client建立连接后,生成NioSocketChannel,并将其注册到某个worker NioEventLoop上的selector
  • 处理任务队列的任务,即runAllTasks
  1. 每个Worker NIOEventLoop循环执行的步骤
  • 轮询read,write事件
  • 处理io事件,在对应的NioSocketChannel处理
  • 处理任务队列的任务,即runAllTasks

8)每个Worker NioEventLoop处理业务的时候,会使用pipline,pipline中包含lchannel,即可以通过pipline可以获取对应的通道,管道中维护多个处理器。

9)NioEventLoop内部采用串行化设计,消息的读取-》解码-》处理-》编码-》发送,始终由IO线程的NioEventLoop负责

代码实例1 简单的聊天
实例要求:使用IDEA 创建Netty项目
1)Netty 服务器在 6668 端口监听,客户端能发送消息给服务器 “hello, 服务器~”
2)服务器可以回复消息给客户端 “hello, 客户端~”

server

/** * @author gusteu * @date 2022/04/16 21:50:33 */public class NettyServer {    public static void main(String[] args) throws InterruptedException { //创建NioEventLoopGroup NioEventLoopGroup bossGroup = new NioEventLoopGroup(); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); //创建netty 引导类 ServerBootstrap serverBootstrap = new ServerBootstrap(); //配置 serverBootstrap.group(bossGroup, workerGroup)  .channel(NioServerSocketChannel.class)//使用NioServerSocketChannel作为服务器端通道的实现  .option(ChannelOption.SO_BACKLOG, 128)//设置线程队列中等待连接的个数  .childOption(ChannelOption.SO_KEEPALIVE, true)//保持活动的连接  .childHandler(new ChannelInitializer<SocketChannel>() {      @Override      protected void initChannel(SocketChannel serverSocketChannel) throws Exception {   //添加自定义处理器   serverSocketChannel.pipeline().addLast(new NettyServerHandler());      }  }); System.out.println("......Server is ready......"); ChannelFuture cf = serverBootstrap.bind(6666).sync();  //绑定端口 bind方法是异步的  sync方法是同步阻塞的 System.out.println("......Server is starting......"); //11. 关闭通道,关闭线程组 cf.channel().closeFuture().sync(); //异步 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully();    }}

serverHandler

import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.util.CharsetUtil;/** * @author gusteu * @date 2022/04/16 21:51:10 */public class NettyServerHandler extends ChannelInboundHandlerAdapter {    //读取客户端消息    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("服务端:" + ctx); ByteBuf byteBuffer = (ByteBuf) msg; System.out.println("客户端发送的消息:" + byteBuffer.toString(CharsetUtil.UTF_8));    }    //读取完成后返回个信息    @Override    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.copiedBuffer("hello client(>^ω^<)喵", CharsetUtil.UTF_8));    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close();    }}

client

import io.netty.bootstrap.Bootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;/** * @author gusteu * @date 2022/04/16 21:40:23 */public class NettyClient {    public static void main(String[] args) throws InterruptedException { //创建一个线程组 NioEventLoopGroup group = new NioEventLoopGroup(); //创建客户端启动对象 Bootstrap bootstrap = new Bootstrap(); //客户端配置 bootstrap.group(group)  .channel(NioSocketChannel.class)  .handler(new ChannelInitializer<SocketChannel>() {      @Override      protected void initChannel(SocketChannel socketChannel) throws Exception {   //pipline添加处理器   socketChannel.pipeline().addLast(new NettyClientHandler());      }  }); //绑定服务器端口 //7.启动客户端去连接服务器端  connect方法是异步的   sync方法是同步阻塞的 ChannelFuture cf = bootstrap.connect("127.0.0.1", 6666).sync(); //8.关闭连接(异步非阻塞) cf.channel().closeFuture().sync();    }}

clientHandler

import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.util.CharsetUtil;/** * @author gusteu * @date 2022/04/16 21:33:11 * 客户端业务处理类 */public class NettyClientHandler extends ChannelInboundHandlerAdapter {    //通道就绪事件    @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("client: " + ctx); ctx.writeAndFlush(Unpooled.copiedBuffer("Hello Server", CharsetUtil.UTF_8));    }    /**     * 读取数据事件     *     * @param ctx     * @param msg     * @throws Exception     */    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf byteBuffer = (ByteBuf) msg; System.out.println("服务端返回的数据: " + byteBuffer.toString(CharsetUtil.UTF_8));    }}

Netty异步模型

基本介绍
1)Netty中的IO操作时异步的,包括Bind,Write,Connect等操作会简单的返回一个ChannelFuture
2)Netty异步模型是建立在Future和callback的之上的,callback就是回调,核心思想:假设一个方法fun,计算过程非常耗时,等待fun返回显然不可以,那么可以在调用fun方法的时候,立马返回一个Future,后续可以通过Future去监控方法fun的处理过程(Future-Listener机制)
3)ChannelFuture是一个接口public interface ChannelFuture extends Future 我们可以添加监听器,当监听的事件发送时候,就会通知监听器

Future-Listener机制
1)当Future对象刚刚创建的时候,处于非完成的状态,调用者可以通过返回ChannelFuture来获取操作执行的状态,注册监听函数来执行完成后的操作

在这里插入图片描述

serverBootstrap.bind(port).addListener(future -> {if(future.isSuccess()) {    System.out.println(newDate() + ": 端口["+ port + "]绑定成功!");} else{    System.err.println("端口["+ port + "]绑定失败!");}   });

小结:相比传统阻塞 I/O,执行 I/O 操作后线程会被阻塞住, 直到操作完成;异步处理的好处是不会造成线程阻塞,线程在 I/O 操作期间可以执行别的程序,在高并发情形下会更稳定和更高的吞吐量

代码实例2 Http服务
实例要求:使用IDEA 创建Netty项目
Netty 服务器在 6668 端口监听,浏览器发出请求 "http://localhost:6668/ "
服务器可以回复消息给客户端 "Hello! 我是服务器 5 " , 并对特定请求资源进行过滤.

server

import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;/** * @author gusteu * @date 2022/04/16 22:37:14 */public class HttpNettyServer {    public static void main(String[] args) { /**  * 事件循环组,就是死循环  */ //仅仅接受连接,转给workerGroup,自己不做处理 EventLoopGroup bossGroup = new NioEventLoopGroup(); //真正处理 EventLoopGroup workerGroup = new NioEventLoopGroup(); try {     //很轻松的启动服务端代码     ServerBootstrap serverBootstrap = new ServerBootstrap();     //childHandler子处理器,传入一个初始化器参数TestServerInitializer(这里是自定义)     //TestServerInitializer在channel被注册时,就会创建调用     serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).      childHandler(new HttpServerInitializer());     //绑定一个端口并且同步,生成一个ChannelFuture对象     ChannelFuture channelFuture = serverBootstrap.bind(6668).sync();     //对关闭的监听     channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) {     e.printStackTrace(); } finally {     //循环组优雅关闭     bossGroup.shutdownGracefully();     workerGroup.shutdownGracefully(); }    }}

ChildChannel

import io.netty.channel.ChannelInitializer;import io.netty.channel.socket.SocketChannel;import io.netty.handler.codec.http.HttpServerCodec;/** * @author gusteu * @date 2022/04/16 22:37:41 */public class HttpServerInitializer extends ChannelInitializer<SocketChannel> {    @Override    protected void initChannel(SocketChannel socketChannel) throws Exception { //添加解码 socketChannel.pipeline().addLast(new HttpServerCodec()); //添加自定义处理器 socketChannel.pipeline().addLast(new HttpServerHandler());    }}

自定义Handler

/** * @author gusteu * @date 2022/04/16 22:39:49 *   继承InboundHandler类,代表处理进入的请求,还有OutboundHandler,处理出去请求 *  其中里面的泛型表示msg的类型,如果指定了HttpObject,表明相互通讯的数据被封装成HttpObject */public class HttpServerHandler extends SimpleChannelInboundHandler<HttpObject>  {    int count = 4; // 通过这个可以看到在服务器 每一个客户端对应一个 独立的handler    //channelRead0读取客户端请求,并返回响应的方法    @Override    protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception { //判断是不是HttpRequest if(msg instanceof HttpRequest){     System.out.println(msg.getClass());     System.out.println(ctx.channel().remoteAddress());     HttpRequest httpRequest= (HttpRequest) msg;     URI uri=new URI(httpRequest.uri());     System.out.println("uri----"+uri.toString());     System.out.println("请求方法名:"+httpRequest.method().name());     //ByteBuf,neety中极为重要的概念,代表响应返回的数据     ByteBuf content = Unpooled.copiedBuffer("Hello! 我是服务器" + (++count), CharsetUtil.UTF_8);     //构造一个http响应,HttpVersion.HTTP_1_1:采用http1.1协议,HttpResponseStatus.OK:状态码200     FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,      HttpResponseStatus.OK, content);     response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");     response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());     //如果只是调用write方法,他仅仅是存在缓冲区里,并不会返回客户端     //调用writeAndFlush可以     ctx.writeAndFlush(response); }    }    /**     * 处理顺序如下:     * handlerAdded     * channelRegistered     * channelActive     * 请求方法名:GET(channelRead0)     * (下面的表示的是断开连接后)     * 1.如果是使用curl :连接会立刻关闭     * 2.如果是浏览器访问,http1.0:它是短连接,会立刻关闭。http1.1,是长连接,连接保持一段时间     * channelInactive     * channelUnregistered     * @param ctx     * @throws Exception     */    @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("channelActive"); super.channelActive(ctx);    }    @Override    public void channelRegistered(ChannelHandlerContext ctx) throws Exception { System.out.println("channelRegistered"); super.channelRegistered(ctx);    }    @Override    public void handlerAdded(ChannelHandlerContext ctx) throws Exception { System.out.println("handlerAdded"); super.handlerAdded(ctx);    }    @Override    public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("channelInactive"); super.channelInactive(ctx);    }    @Override    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { System.out.println("channelUnregistered"); super.channelUnregistered(ctx);    }}

Netty核心组件模块

Bootstrap和ServerBootstrap

Bootstrap 意思是引导,一个 Netty 应用通常由一个 Bootstrap 开始,主要作用是配置整个 Netty 程序,串联各个组件,Netty 中 Bootstrap 类是客户端程序的启动引导类,ServerBootstrap 是服务端启动引导类

常见的方法有
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup),该方法用于服务器端,用来设置两个 EventLoop
public B group(EventLoopGroup group) ,该方法用于客户端,用来设置一个 EventLoop
public B channel(Class channelClass),该方法用来设置一个服务器端的通道实现
public B option(ChannelOption option, T value),用来给 ServerChannel 添加配置
public ServerBootstrap childOption(ChannelOption childOption, T value),用来给接收到的通道添加配置
public ServerBootstrap childHandler(ChannelHandler childHandler),该方法用来设置业务处理类(自定义的 handler)
public ChannelFuture bind(int inetPort) ,该方法用于服务器端,用来设置占用的端口号
public ChannelFuture connect(String inetHost, int inetPort) ,该方法用于客户端,用来连接服务器端

Future和ChannelFuture

Netty 中所有的 IO 操作都是异步的,不能立刻得知消息是否被正确处理。但是可以过一会等它执行完成或者直接注册一个监听,具体的实现就是通过 Future 和 ChannelFutures,他们可以注册一个监听,当操作执行成功或失败时监听会自动触发注册的监听事件

常见的方法有
Channel channel(),返回当前正在进行 IO 操作的通道
ChannelFuture sync(),等待异步操作执行完毕

Channel

1)Netty 网络通信的组件,能够用于执行网络 I/O 操作。
2)通过Channel 可获得当前网络连接的通道的状态
3)通过Channel 可获得 网络连接的配置参数 (例如接收缓冲区大小)
4)Channel 提供异步的网络 I/O 操作(如建立连接,读写,绑定端口),异步调用意味着任何 I/O 调用都将立即返回,并且不保证在调用结束时所请求的 I/O 操作已完成
5)调用立即返回一个 ChannelFuture 实例,通过注册监听器到 ChannelFuture 上,可以 I/O 操作成功、失败或取消时回调通知调用方
6)支持关联 I/O 操作与对应的处理程序
7)不同协议、不同的阻塞类型的连接都有不同的 Channel 类型与之对应,常用的 Channel 类型:
NioSocketChannel,异步的客户端 TCP Socket 连接。
NioServerSocketChannel,异步的服务器端 TCP Socket 连接。
NioDatagramChannel,异步的 UDP 连接。
NioSctpChannel,异步的客户端 Sctp 连接。
NioSctpServerChannel,异步的 Sctp 服务器端连接,这些通道涵盖了 UDP 和 TCP 网络 IO 以及文件 IO。

Selector

1)Netty 基于 Selector 对象实现 I/O 多路复用,通过 Selector 一个线程可以监听多个连接的 Channel 事件。
2)当向一个 Selector 中注册 Channel 后,Selector 内部的机制就可以自动不断地查询(Select) 这些注册的 Channel 是否有已就绪的 I/O 事件(例如可读,可写,网络连接完成等),这样程序就可以很简单地使用一个线程高效地管理多个 Channel

ChannelHandler 及其实现类

1)ChannelHandler 是一个接口,处理 I/O 事件或拦截 I/O 操作,并将其转发到其 ChannelPipeline(业务处理链)中的下一个处理程序。
2)ChannelHandler 本身并没有提供很多方法,因为这个接口有许多的方法需要实现,方便使用期间,可以继承它的子类
在这里插入图片描述

Pipeline 和 ChannelPipeline

ChannelPipeline 是一个重点:

1)ChannelPipeline 是一个 Handler 的集合,它负责处理和拦截 inbound 或者 outbound 的事件和操作,相当于一个贯穿 Netty 的链。(也可以这样理解:ChannelPipeline 是 保存 ChannelHandler 的 List,用于处理或拦截 Channel 的入站事件和出站操作)
2)ChannelPipeline 实现了一种高级形式的拦截过滤器模式,使用户可以完全控制事件的处理方式,以及 Channel 中各个的 ChannelHandler 如何相互交互
3)在 Netty 中每个 Channel 都有且仅有一个 ChannelPipeline 与之对应,它们的组成关系如下

在这里插入图片描述
说明
1) 一个 Channel 包含了一个 ChannelPipeline,而 ChannelPipeline 中又维护了一个由 ChannelHandlerContext 组成的双向链表,并且每个 ChannelHandlerContext 中又关联着一个 ChannelHandler
2)入站事件和出站事件在一个双向链表中,入站事件会从链表 head 往后传递到最后一个入站的 handler,出站事件会从链表 tail 往前传递到最前一个出站的 handler,两种类型的 handler 互不干扰

常用方法:
ChannelPipeline addFirst(ChannelHandler… handlers),把一个业务处理类(handler)添加到链中的第一个位置
ChannelPipeline addLast(ChannelHandler… handlers),把一个业务处理类(handler)添加到链中的最后一个位置

ChannelHandlerContext

1)保存 Channel 相关的所有上下文信息,同时关联一个 ChannelHandler 对象,即ChannelHandlerContext 中 包 含 一 个 具 体 的 事 件 处 理 器 ChannelHandler , 同 时ChannelHandlerContext 中2)也绑定了对应的 pipeline 和 Channel 的信息,方便对 ChannelHandler进行调用.

ChannelOption

一般参数如下
ChannelOption.SO_BACKLOG
对应 TCP/IP 协议 listen 函数中的 backlog 参数,用来初始化服务器可连接队列大小。服
务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。多个客户
端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理,backlog 参数指定
了队列的大小。

ChannelOption.SO_KEEPALIVE
一直保持连接活动状态

Unpooled 类

Netty 提供一个专门用来操作缓冲区(即Netty的数据容器)的工具类
常用方法:
//通过给定的数据和字符编码返回一个 ByteBuf 对象(类似于 NIO 中的 ByteBuffer 但有区别)
public static ByteBuf copiedBuffer(CharSequence string, Charset charset)

未完待续。。。。。。