> 文档中心 > RocketMQ(七)RocketMQ消息生产及消息储存机制

RocketMQ(七)RocketMQ消息生产及消息储存机制

目录

  • 1、消息生产
    • 1.1 消息的生产过程
    • 1.2 Queue选择算法
  • 2、消息储存
    • 2.1 存储介质
    • 2.2 消息的存储和发送
    • 2.3 消息存储结构
    • 2.4 刷盘机制

1、消息生产

1.1 消息的生产过程

Producer可以将消息写入到某Broker中的某Queue中,其经历了如下过程:

  • Producer发送消息之前,会先向NameServer发出获取消息Topic的路由信息的请求。
  • NameServer返回该Topic的路由表及Broker列表。
  • Producer根据代码中指定的Queue选择策略,从Queue列表中选出一个队列,用于后续存储消息。
  • Produer对消息做一些特殊处理,例如,消息本身超过4M,则会对其进行压缩。
  • Producer向选择出的Queue所在的Broker发出RPC请求,将消息发送到选择出的Queue中。

路由表:
实际是一个Map,key为Topic名称,value是一个QueueData实例列表。QueueData并不是一个Queue对应一个QueueData,而是一个Broker中该Topic的所有Queue对应一个QueueData。简单来说,路由表的key为Topic名称,value则为所有涉及该Topic的BrokerName列表。

Broker列表:
实际是一个Map,key为brokerName,value为BrokerData。brokerName名称相同的Master-Slave集群对应一BrokerData。BrokerData中包含brokerName及一个map。该map的key为brokerId,value为该broker对应的地址。brokerId为0表示该broker为Master,非0表示Slave。

1.2 Queue选择算法

对于无序消息,其Queue选择算法,也称为消息投递算法,常见的有两种:轮询算法、最小投递延迟算法

  • 轮询算法
    默认选择算法。该算法保证了每个Queue中可以均匀的获取到消息。

    该算法存在一个问题:由于某些原因,在某些Broker上的Queue可能投递延迟较严重。从而导致Producer的缓存队列中出现较大的消息积压,影响消息的投递性能。

  • 最小投递延迟算法
    该算法会统计每次消息投递的时间延迟,然后根据统计出的结果将消息投递到时间延迟最小的Queue。如果延迟相同,则采用轮询算法投递。

    该算法可以有效提升消息的投递性能。但是也存在一个问题:消息在Queue上的分配不均匀。投递延迟小的Queue可能会存在大量的消息。而对该Queue的消费者压力会增大,降低消息的消费能力,可能会导致MQ中消息的堆积。

2、消息储存

分布式队列因为有高可靠性的要求,所以数据要进行持久化存储。
在这里插入图片描述

  1. 消息生成者发送消息
  2. MQ收到消息,将消息进行持久化,在存储中新增一条记录
  3. 返回ACK给生产者
  4. MQ push 消息给对应的消费者,然后等待消费者返回ACK
  5. 如果消息消费者在指定时间内成功返回ack,那么MQ认为消息消费成功,在存储中删除消息,即执行第6步;如果MQ在指定时间内没有收到ACK,则认为消息消费失败,会尝试重新push消息,重复执行4、5、6步骤
  6. MQ删除消息

2.1 存储介质

  • 关系型数据库DB

Apache下开源的另外一款MQ—ActiveMQ(默认采用的KahaDB做消息存储)可选用JDBC的方式来做消息持久化,通过简单的xml配置信息即可实现JDBC消息存储。由于,普通关系型数据库(如Mysql)在单表数据量达到千万级别的情况下,其IO读写性能往往会出现瓶颈。在可靠性方面,该种方案非常依赖DB,如果一旦DB出现故障,则MQ的消息就无法落盘存储会导致线上故障

目前业界较为常用的几款产品(RocketMQ/Kafka/RabbitMQ)均采用的是消息刷盘至所部署虚拟机/物理机的文件系统来做持久化(刷盘一般可以分为异步刷盘和同步刷盘两种模式)。消息刷盘为消息存储提供了一种高效率、高可靠性和高性能的数据持久化方式。除非部署MQ机器本身或是本地磁盘挂了,否则一般是不会出现无法持久化的故障问题。

2.2 消息的存储和发送

Linux操作系统分为【用户态】和【内核态】,文件操作、网络操作需要涉及这两种形态的切换,免不了进行数据复制。

一台服务器把本机磁盘文件的内容发送到客户端,一般分为两个步骤:

1)read;读取本地文件内容;
2)write;将读取的内容通过网络发送出去。

这两个看似简单的操作,实际进行了4 次数据复制,分别是:

  1. 从磁盘复制数据到内核态内存;

  2. 从内核态内存复制到用户态内存;

  3. 然后从用户态内存复制到网络驱动的内核态内存;

  4. 最后是从网络驱动的内核态内存复制到网卡中进行传输。在这里插入图片描述
    通过使用mmap的方式,可以省去向用户态的内存复制,提高速度。这种机制在Java中是通过MappedByteBuffer实现的

    RocketMQ充分利用了上述特性,也就是所谓的“零拷贝”技术,提高消息存盘和网络发送的速度。

    这里需要注意的是,采用MappedByteBuffer这种内存映射的方式有几个限制,其中之一是一次只能映射1.5~2G 的文件至用户态的虚拟内存,这也是为何RocketMQ默认设置单个CommitLog日志数据文件为1G的原因了

2.3 消息存储结构

RocketMQ消息的存储是由ConsumeQueue和CommitLog配合完成 的,消息真正的物理存储文件是CommitLog,ConsumeQueue是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。每个Topic下的每个Message Queue都有一个对应的ConsumeQueue文件。
在这里插入图片描述

  • CommitLog:存储消息的元数据
  • ConsumerQueue:存储消息在CommitLog的索引
  • IndexFile:为了消息查询提供了一种通过key或时间区间来查询消息的方法,这种通过IndexFile来查找消息的方法不影响发送与消费消息的主流程

1)CommitLog:

commitlog目录中存放着很多的mappedFile文件,当前Broker中的消息的元数据都是落盘到这些mappedFile文件中的,一个Broker中仅包含一个commitlog目录。mappedFile文件大小为1G(小于等于1G),文件名由20位十进制数构成,表示当前文件的第一条消息的起始位移偏移量。

第一个文件名一定是200构成的。因为第一个文件的第一条消息的偏移量commitlog offset为0当第一个文件放满时,则会自动生成第二个文件继续存放消息。假设第一个文件大小是1073741820字节(1G = 1073741824字节),则第二个文件名就是00000000001073741824。以此类推,第n个文件名应该是前n-1个文件大小之和。一个Broker中所有mappedFile文件的commitlog offset是连续的

2)ConsumeQueue:
为了提高效率,会为每个Topic在~/store/consumequeue中创建一个目录,目录名为Topic名称。在该Topic目录下,会再为每个该Topic的Queue建立一个目录,目录名为queueId。每个目录中存放着若干consumequeue文件,consumequeue文件是commitlog的索引文件,可以根据consumequeue定位到具体的消息。

consumequeue文件名也由20位数字构成,表示当前文件的第一个索引条目的起始位移偏移量。

在这里插入图片描述
每个consumequeue文件可以包含30w个索引条目,每个索引条目包含了三个消息重要属性:消息在mappedFile文件中的偏移量CommitLog Offset、消息长度、消息Tag的hashcode值。这三个属性占20个字节,所以每个文件的大小是固定的30w * 20字节。
在这里插入图片描述
3)IndexFile:

RocketMQ提供了根据key进行消息查询的功能。该查询是通过store目录中的index子目录中的indexFile进行索引实现的快速查询。当然,这个indexFile中的索引数据是在包含了key的消息被发送到Broker时写入的。如果消息中没有包含key,则不会写入,key是发送消息时手动设置的消息标识。

4)对文件的读写:

消息写入
一条消息进入到Broker后经历了以下几个过程才最终被持久化。

  • Broker根据queueId,获取到该消息对应索引条目要在consumequeue目录中的写入偏移量,即
    QueueOffset
  • 将queueId、queueOffset等数据,与消息一起封装为消息单元
  • 将消息单元写入到commitlog
  • 同时,形成消息索引条目
  • 将消息索引条目分发到相应的consumequeue

消息拉取

当Consumer来拉取消息时会经历以下几个步骤:

  • Consumer获取到其要消费消息所在Queue的消费偏移量offset,计算出其要消费消息的消息offset
  • Consumer向Broker发送拉取请求,其中会包含其要拉取消息的Queue、消息offset及消息Tag
  • Broker计算在该consumequeue中的queueOffset
  • 从该queueOffset处开始向后查找第一个指定Tag的索引条目
  • 解析该索引条目的前8个字节,即可定位到该消息在commitlog中的commitlog offset
  • 从对应commitlog offset中读取消息单元,并发送给Consumer

2.4 刷盘机制

RocketMQ的消息是存储到磁盘上的,这样既能保证断电后恢复, 又可以让存储的消息量超出内存的限制。RocketMQ为了提高性能,会尽可能地保证磁盘的顺序写。消息在通过Producer写入RocketMQ的时候,有两种写磁盘方式,分布式同步刷盘和异步刷盘。
在这里插入图片描述
1)同步刷盘:

在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘, 然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态。

2)异步刷盘:

在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘动作,快速写入。

3)配置:

同步刷盘与异步刷盘,都是通过Broker配置文件里的flushDiskType参数设置的,这个参数被配置成SYNC_FLUSH(同步)、ASYNC_FLUSH(异步)中的 一个。