> 文档中心 > RocketMQ基于Netty长连接

RocketMQ基于Netty长连接


1 Reactor主线程与长短连接

Broker的 “Reactor” 线程,负责监听网络端口,如监听2888,39150这样的端口。

1.1 短连接

短连接,若你要给别人发送一个请求,须建立连接 -> 发送请求 -> 接收响应 -> 断开连接,下一次你要发送请求时,这过程得重来一遍。

每次建立一个连接后,使用这个连接发送请求的时间很短,很快就会断开这个连接,所以他存在时间太短,就是短连接。

1.2 长连接

长连接,建立一个连接 -> 发送请求 -> 接收响应 -> 发送请求 -> 接收响应。

整个过程是,当你建立好一个长连接后,可不停发送请求和接收响应,连接不会断开,等你不需要时再断开即可,该连接会存在很长时间,即长连接。

1.3 TCP长连接

基于TCP协议建立的长连接。

2 Pro和Broker建立一个长连接

此时有个Pro要跟Broker建立一个TCP长连接,则Broker上的这个Reactor主线程,它会在端口上监听到该Pro建立连接的请求。

img

接着该Reactor主线程就专职跟这Pro按TCP协议规定的一系列步骤和规范,建立好一个长连接。

2.1 SocketChannel

Pro里有个SocketChannel,Broker里也有个SocketChannel,这两个SocketChannel代表他们两建立好的这个长连接。

两个SocketChannel配对代表一个连接。

RocketMQ基于Netty长连接

Pro就是通过SocketChannel去发消息给Broker。

3 基于Reactor线程池监听连接中的请求

3.1 Reactor线程池

在Broker中有个概念,即Reactor线程池。该线程池默认3个线程。

当Reactor主线程建立好的每个连接SocketChannel,都会交给这个Reactor线程池里的其中一个线程去监听请求。

RocketMQ基于Netty长连接

整理分析

有了Reactor线程池后,即可让Pro发请求过来,它发送一个消息过来到达Broker的SocketChannel,此时Reactor线程池里的一个线程会监听到这个SocketChannel中有请求到达。

4 基于Worker线程池完成一系列准备工作

接着Reactor线程从SocketChannel读取出来一个请求,该请求在正式处理前,须先进行一些准备工作和预处理,如SSL加密验证、编码解码、连接空闲检查、网络连接管理等。

4.1 Worker线程池

Worker线程池,默认8个线程。Reactor线程收到的请求会叫个Worker线程池中的一个线程进行处理,来完成上述的一系列准备工作。

RocketMQ基于Netty长连接

5 基于业务线程池完成请求的处理

经过Worker线程完成预处理后,就需对请求进行正式业务处理。即将请求转交给业务线程池。

5.1 SendMessage线程池

业务线程池的一种。

场景:对于处理发送消息请求而言,就会把请求转交给SendMessage线程池。SendMessage线程是可以配置的,配置的越多,处理消息的吞吐量越高。

业务处理逻辑

当Pro发消息过来,Broker接收到消息,肯定要写入CommitLog文件,后续要有一些ConsumeQueue之类需处理,类似这种操作,就是业务处理逻辑。

这一系列预处理后的请求都要转发给业务线程池。

RocketMQ基于Netty长连接

6 为何这套网络通信框架会是高性能及高并发的?

由于专门分配了一个Reactor主线程,和各种Pro、Con建立长连接。

连接建立好之后,大量长连接均匀分配给Reactor线程池里的多个线程。

每个Reactor线程负责监听一部分连接的请求,通过多线程并发的监听不同连接的请求,有效提升网络框架并发力。

接着后续对大量并发过来的请求都是基于Worker线程池预处理,当Worker线程池预处理多个请求的时候,Reactor线程还可有条不紊继续监听和接收大量连接请求是否到达。

而且最终的读写磁盘文件之类的操作都交给业务线程池处理,当他并发执行多个请求的磁盘读写操作的时候,不影响其他线程池同时接收请求、预处理请求,没任何的影响。

所以最终效果:

  • Reactor主线程在端口上监听Producer建立连接的请求,建立长连接
  • Reactor线程池并发的监听多个连接的请求是否到达
  • Worker请求并发的对多个请求进行预处理
  • 业务线程池并发的对多个请求进行磁盘读写业务操作

通过这样一套网络通信架构,最终实现可以高并发、高吞吐的对大量网络连接发送过来的大量请求进行预处理,从而保证Broker实现高吞吐。

总结

NIO概念同步非阻塞,每个请求对应一个socketchannel通道数据通过bytebuffer来传输,所有的socketchannel注册到selector选择器上reactor线程池从轮询来处理请求调用select poll epoll函数来获取数据,获取后的数据交给worker线程池来进行参数验证和信息封装,业务线程再去读取数据进行写入,每个线程池负责不同的内容相互不影响来提升并发。