> 技术文档 > 04 | Kafka创建主题_kafka创建topic

04 | Kafka创建主题_kafka创建topic


2.3 创建主题

Topic主题是Kafka中消息的逻辑分类,但是这个分类不应该是固定的,而是应该由外部的业务场景进行定义(注意:Kafka中其实是有两个固定的,用于记录消费者偏移量和事务处理的主题),所以Kafka提供了相应的指令和客户端进行主题操作。

2.3.1 相关概念

2.3.1.1 主题:Topic

Kafka是分布式消息传输系统,采用的数据传输方式为发布,订阅模式,也就是说由消息的生产者发布消息,消费者订阅消息后获取数据。为了对消费者订阅的消息进行区分,所以对消息在逻辑上进行了分类,这个分类我们称之为主题:Topic。消息的生产者必须将消息数据发送到某一个主题,而消费者必须从某一个主题中获取消息,并且消费者可以同时消费一个或多个主题的数据。Kafka集群中可以存放多个主题的消息数据。

为了防止主题的名称和监控指标的名称产生冲突,官方推荐主题的名称中不要同时包含下划线和点。

2.3.1.2 分区:Partition

Kafka消息传输采用发布、订阅模式,所以消息生产者必须将数据发送到一个主题,假如发送给这个主题的数据非常多,那么主题所在broker节点的负载和吞吐量就会受到极大的考验,甚至有可能因为热点问题引起broker节点故障,导致服务不可用。一个好的方案就是将一个主题从物理上分成几块,然后将不同的数据块均匀地分配到不同的broker节点上,这样就可以缓解单节点的负载问题。这个主题的分块我们称之为:分区partition。默认情况下,topic主题创建时分区数量为1,也就是一块分区,可以指定参数--partitions改变。Kafka的分区解决了单一主题topic线性扩展的问题,也解决了负载均衡的问题。

topic主题的每个分区都会用一个编号进行标记,一般是从0开始的连续整数数字。Partition分区是物理上的概念,也就意味着会以数据文件的方式真实存在。每个topic包含一个或多个partition,每个partition都是一个有序的队列。partition中每条消息都会分配一个有序的ID,称之为偏移量:Offset

2.3.1.3 副本:Replication

分布式系统出现错误是比较常见的,只要保证集群内部依然存在可用的服务节点即可,当然效率会有所降低,不过只要能保证系统可用就可以了。咱们Kafka的topic也存在类似的问题,也就是说,如果一个topic划分了多个分区partition,那么这些分区就会均匀地分布在不同的broker节点上,一旦某一个broker节点出现了问题,那么在这个节点上的分区就会出现问题,那么Topic的数据就不完整了。所以一般情况下,为了防止出现数据丢失的情况,我们会给分区数据设定多个备份,这里的备份,我们称之为:副本Replication。

Kafka支持多副本,使得主题topic可以做到更多容错性,牺牲性能与空间去换取更高的可靠性。

注意:这里不能将多个备份放置在同一个broker中,因为一旦出现故障,多个副本就都不能用了,那么副本的意义就没有了。

2.3.1.4 副本类型:Leader & Follower

假设我们有一份文件,一般情况下,我们对副本的理解应该是有一个正式的完整文件,然后这个文件的备份,我们称之为副本。但是在Kafka中,不是这样的,所有的文件都称之为副本,只不过会选择其中的一个文件作为主文件,称之为:Leader(主导)副本,其他的文件作为备份文件,称之为:Follower(追随)副本。在Kafka中,这里的文件就是分区,每一个分区都可以存在1个或多个副本,只有Leader副本才能进行数据的读写,Follower副本只做备份使用。

2.3.1.5 日志:Log

Kafka最开始的应用场景就是日志场景或MQ场景,更多的扮演着一个日志传输和存储系统,这是Kafka立家之本。所以Kafka接收到的消息数据最终都是存储在log日志文件中的,底层存储数据的文件的扩展名就是log。

主题创建后,会创建对应的分区数据Log日志。并打开文件连接通道,随时准备写入数据。

2.3.2 创建第一个主题

创建主题Topic的方式有很多种:命令行,工具,客户端API,自动创建。在server.properties文件中配置参数auto.create.topics.enable=true时,如果访问的主题不存在,那么Kafka就会自动创建主题,这个操作不在我们的讨论范围内。由于我们学习的重点在于学习原理和基础概念,所以这里我们选择比较基础的命令行方式即可。

我们首先创建的主题,仅仅指明主题的名称即可,其他参数暂时无需设定。

2.3.2.1 执行指令

[atguigu@kafka-broker1 ~]$ cd /opt/module/kafka

[atguigu@kafka-broker1 kafka]$ bin/kafka-topics.sh --bootstrap-server kafka-broker1:9092 --create --topic first-topic

2.3.2.2 ZooKeeper节点变化

指令执行后,当前Kafka会增加一个主题,因为指令中没有配置分区和副本参数,所以当前主题分区数量为默认值1,编号为0,副本为1,编号为所在broker的ID值。为了方便集群的管理,创建topic时,会同时在ZK中增加子节点,记录主题相关配置信息:

  • /config/topics节点中会增加first-topic节点。

  • /brokers/topics节点中会增加first-topic节点以及相应的子节点。

节点

节点类型

数据名称

数据值

说明

/topics/first-topic

持久类型

removing_replicas

partitions

{\"0\":[3]}

分区配置

topic_id

随机字符串

adding_replicas

version

3

/topics/first-topic/partitions

持久类型

主题分区节点,每个主题都应该设置分区,保存在该节点

/topics/first-topic/partitions/0

持久类型

主题分区副本节点,因为当前主题只有一个分区,所以编号为0

/topics/first-topic/partitions/0/state

持久类型

controller_epoch

7

主题分区副本状态节点

leader

3

Leader副本所在的broker Id

version

1

leader_epoch

0

isr

[3]

副本同步列表,因为当前只有一个副本,所以副本中只有一个副本编号

2.3.2.3 数据存储位置

主题创建后,需要找到一个用于存储分区数据的位置,根据上面ZooKeeper存储的节点配置信息可以知道,当前主题的分区数量为1,副本数量为1,那么数据存储的位置就是副本所在的broker节点,从当前数据来看,数据存储在我们的第三台broker上。

[atguigu@kafka-broker3 ~]$ cd /opt/module/kafka/datas

[atguigu@kafka-broker3 datas]$ ll

[atguigu@kafka-broker3 datas]$ cd first-topic-0

[atguigu@kafka-broker3 first-topic-0]$ ll

路径中的00000000000000000000.log文件就是真正存储消息数据的文件,文件名称中的0表示当前文件的起始偏移量为0,index文件和timeindex文件都是数据索引文件,用于快速定位数据。只不过index文件采用偏移量的方式进行定位,而timeindex是采用时间戳的方式。

2.3.3 创建第二个主题

接下来我们创建第二个主题,不过创建时,我们需要设定分区参数 --partitions,参数值为3,表示创建3个分区

2.3.3.1 执行指令

[atguigu@kafka-broker1 ~]$ cd /opt/module/kafka

[atguigu@kafka-broker1 kafka]$ bin/kafka-topics.sh --bootstrap-server kafka-broker1:9092 --create --topic second-topic --partitions 3

2.3.3.2 ZooKeeper节点变化

指令执行后,当前Kafka会增加一个主题,因为指令中指定了分区数量(--partitions 3),所以当前主题分区数量为3,编号为[0、1、2],副本为1,编号为所在broker的ID值。为了方便集群的管理,创建Topic时,会同时在ZK中增加子节点,记录主题相关配置信息:

  • /config/topics节点中会增加second-topic节点。

  • /brokers/topics节点中会增加second-topic节点以及相应的子节点。

节点

节点类型

数据名称

数据值

说明

/topics/second-topic

持久类型

removing_replicas

partitions

{\"2\":[3],\"1\":[2],\"0\":[1]}

分区配置

topic_id

随机字符串

adding_replicas

version

3

/topics/second-topic/partitions

持久类型

主题分区节点,每个主题都应该设置分区,保存在该节点

/topics/second-topic/partitions/0

持久类型

主题分区副本节点,因为当前主题有3个分区,第一个分区编号为0

/topics/second-topic/partitions/0/state

持久类型

controller_epoch

7

主题分区副本状态节点

leader

1

Leader副本所在的broker Id

version

1

leader_epoch

0

isr

[1]

副本同步列表,因为当前只有一个副本,所以副本中只有一个副本编号

/topics/second-topic/partitions/1

持久类型

主题分区副本节点,因为当前主题有3个分区,当前为第2个分区,所以编号为1

/topics/second-topic/partitions/1/state

持久类型

controller_epoch

7

主题分区副本状态节点

leader

2

Leader副本所在的broker Id

version

1

leader_epoch

0

isr

[2]

副本同步列表,因为当前只有一个副本,所以副本中只有一个副本编号

/topics/second-topic/partitions/2

持久类型

主题分区副本节点,因为当前主题有3个分区,当前为第3个分区,所以编号为2

/topics/second-topic/partitions/2/state

持久类型

controller_epoch

7

主题分区副本状态节点

leader

3

Leader副本所在的broker Id

version

1

leader_epoch

0

isr

[3]

副本同步列表,因为当前只有一个副本,所以副本中只有一个副本编号

2.3.3.3 数据存储位置

主题创建后,需要找到一个用于存储分区数据的位置,根据上面ZooKeeper存储的节点配置信息可以知道,当前主题的分区数量为3,副本数量为1,那么数据存储的位置就是每个分区Leader副本所在的broker节点。

[atguigu@kafka-broker1 ~]$ cd /opt/module/kafka/datas

[atguigu@kafka-broker1 datas]$ ll

[atguigu@kafka-broker1 datas]$ cd second-topic-0

[atguigu@kafka-broker1 second-topic-0]$ ll

[atguigu@kafka-broker2 ~]$ cd /opt/module/kafka/datas

[atguigu@kafka-broker2 datas]$ ll

[atguigu@kafka-broker2 datas]$ cd second-topic-1

[atguigu@kafka-broker2 second-topic-1]$ ll

[atguigu@kafka-broker3 ~]$ cd /opt/module/kafka/datas

[atguigu@kafka-broker3 datas]$ ll

[atguigu@kafka-broker3 datas]$ cd second-topic-2

[atguigu@kafka-broker3 second-topic-2]$ ll

2.3.4 创建第三个主题

接下来我们创建第三个主题,不过创建时,我们需要设定副本参数 --replication-factor,参数值为3,表示每个分区创建3个副本。

2.3.4.1 执行指令

[atguigu@kafka-broker1 ~]$ cd /opt/module/kafka

[atguigu@kafka-broker1 kafka]$ bin/kafka-topics.sh --bootstrap-server kafka-broker1:9092 --create --topic third-topic --partitions 3 --replication-factor 3

2.3.4.2 ZooKeeper节点变化

指令执行后,当前Kafka会增加一个主题,因为指令中指定了分区数量和副本数量(--replication-factor 3),所以当前主题分区数量为3,编号为[0、1、2],副本为3,编号为[1、2、3]。为了方便集群的管理,创建Topic时,会同时在ZK中增加子节点,记录主题相关配置信息:

  • /config/topics节点中会增加third-topic节点。

  • /brokers/topics节点中会增加third-topic节点以及相应的子节点。

节点

节点类型

数据名称

数据值

说明

/topics/third-topic

持久类型

removing_replicas

partitions

{\"2\":[1,2,3],\"1\":[3,1,2],\"0\":[2,3,1]}

分区配置

topic_id

随机字符串

adding_replicas

version

3

/topics/third-topic/partitions

持久类型

主题分区节点,每个主题都应该设置分区,保存在该节点

/topics/third-topic/partitions/0

持久类型

主题分区副本节点,因为当前主题有3个分区,第一个分区编号为0

/topics/third-topic/partitions/0/state

持久类型

controller_epoch

7

主题分区副本状态节点

leader

2

Leader副本所在的broker Id

version

1

leader_epoch

0

isr

[2,3,1]

副本同步列表,因为当前有3个副本,所以列表中的第一个副本就是Leader副本,其他副本均为follower副本

/topics/third-topic/partitions/1

持久类型

主题分区副本节点,因为当前主题有3个分区,当前为第2个分区,所以编号为1

/topics/third-topic/partitions/1/state

持久类型

controller_epoch

7

主题分区副本状态节点

leader

3

Leader副本所在的broker Id

version

1

leader_epoch

0

isr

[3,1,2]

副本同步列表,因为当前有3个副本,所以列表中的第一个副本就是Leader副本,其他副本均为follower副本

/topics/third-topic/partitions/2

持久类型

主题分区副本节点,因为当前主题有3个分区,当前为第3个分区,所以编号为2

/topics/third-topic/partitions/2/state

持久类型

controller_epoch

7

主题分区副本状态节点

leader

1

Leader副本所在的broker Id

version

1

leader_epoch

0

isr

[1,2,3]

副本同步列表,因为当前有3个副本,所以列表中的第一个副本就是Leader副本,其他副本均为follower副本

2.3.4.3 数据存储位置

主题创建后,需要找到一个用于存储分区数据的位置,根据上面ZooKeeper存储的节点配置信息可以知道,当前主题的分区数量为3,副本数量为3,那么数据存储的位置就是每个分区副本所在的broker节点。

[atguigu@kafka-broker1 ~]$ cd /opt/module/kafka/datas

[atguigu@kafka-broker1 datas]$ ll

[atguigu@kafka-broker1 datas]$ cd third-topic-2

[atguigu@kafka-broker1 third-topic-2]$ ll

[atguigu@kafka-broker2 ~]$ cd /opt/module/kafka/datas

[atguigu@kafka-broker2 datas]$ ll

[atguigu@kafka-broker2 datas]$ cd third-topic-0

[atguigu@kafka-broker2 third-topic-0]$ ll

[atguigu@kafka-broker3 ~]$ cd /opt/module/kafka/datas

[atguigu@kafka-broker3 datas]$ ll

[atguigu@kafka-broker3 datas]$ cd third-topic-1

[atguigu@kafka-broker3 third-topic-1]$ ll

2.3.5 创建主题流程

Kafka中主题、分区以及副本的概念都和数据存储相关,所以是非常重要的。前面咱们演示了一下创建主题的具体操作和现象,那么接下来,我们就通过图解来了解一下Kafka是如何创建主题,并进行分区副本分配的。

2.3.5.1 命令行提交创建指令

  1. 通过命令行提交指令,指令中会包含操作类型(--create)、topic的名称(--topic)、主题分区数量(--partitions)、主题分区副本数量(--replication-facotr)、副本分配策略(--replica-assignment)等参数。
  2. 指令会提交到客户端进行处理,客户端获取指令后,会首先对指令参数进行校验。
    1. 操作类型取值:create、list、alter、describe、delete,只能存在一个。
    2. 分区数量为大于1的整数。
    3. 主题是否已经存在
    4. 分区副本数量大于1且小于Short.MaxValue,一般取值小于等于Broker数量。
  3. 将参数封装主题对象(NewTopic)。
  4. 创建通信对象,设定请求标记(CREATE_TOPICS),查找Controller,通过通信对象向Controller发起创建主题的网络请求。
2.3.5.2 Controller接收创建主题请求

  1. Controller节点接收到网络请求(Acceptor),并将请求数据封装成请求对象放置在队列(requestQueue)中。
  2. 请求控制器(KafkaRequestHandler)周期性从队列中获取请求对象(BaseRequest)。
  3. 将请求对象转发给请求处理器(KafkaApis),根据请求对象的类型调用创建主题的方法。
2.3.5.3 创建主题

  1. 请求处理器(KafkaApis)校验主题参数。
  • 如果分区数量没有设置,那么会采用Kafka启动时加载的配置项:num.partitions(默认值为1
  • 如果副本数量没有设置,那么会采用Kafka启动时记载的配置项:

default.replication.factor(默认值为1

  1. 在创建主题时,如果使用了replica-assignment参数,那么就按照指定的方案来进行分区副本的创建;如果没有指定replica-assignment参数,那么就按照Kafka内部逻辑来分配,内部逻辑按照机架信息分为两种策略:【未指定机架信息】和【指定机架信息】。当前课程中采用的是【未指定机架信息】副本分配策略
  • 分区起始索引设置0
  • 轮询所有分区,计算每一个分区的所有副本位置:

副本起始索引 = (分区编号 + 随机值) %  BrokerID列表长度。

其他副本索引 = 。。。随机值(基本算法为使用随机值执行多次模运算)

##################################################################

# 假设

#     当前分区编号 : 0

#     BrokerID列表 :1234

#     副本数量 : 4

#     随机值(BrokerID列表长度): 2

#     副本分配间隔随机值(BrokerID列表长度): 2

##################################################################

# 第一个副本索引:(分区编号 + 随机值)% BrokerID列表长度 =0 + 2% 4 = 2

# 第一个副本所在BrokerID : 3

# 第二个副本索引(第一个副本索引 + 1 +(副本分配间隔 + 0% BrokerID列表长度 - 1))) % BrokerID列表长度 = 2 +1+2+0%3))% 4 = 1

# 第二个副本所在BrokerID2

# 第三个副本索引:(第一个副本索引 + 1 +(副本分配间隔 + 1% BrokerID列表长度 - 1))) % BrokerID列表长度 = 2 +1+2+1%3))% 4 = 3

# 第三个副本所在BrokerID4

# 第四个副本索引:(第一个副本索引 + 1 +(副本分配间隔 + 2% BrokerID列表长度 - 1))) % BrokerID列表长度 = 2 +1+2+2%3))% 4 = 0

# 第四个副本所在BrokerID1

# 最终分区0的副本所在的Broker节点列表为【3241

# 其他分区采用同样算法

  • 通过索引位置获取副本节点ID
  • 保存分区以及对应的副本ID列表。
  1. 通过ZK客户端在ZK端创建节点:
  • 在 /config/topics节点下,增加当前主题节点,节点类型为持久类型。
  • 在 /brokers/topics节点下,增加当前主题及相关节点,节点类型为持久类型。
  1. Controller节点启动后,会在/brokers/topics节点增加监听器,一旦节点发生变化,会触发相应的功能:
  • 获取需要新增的主题信息
  • 更新当前Controller节点保存的主题状态数据
  • 更新分区状态机的状态为:NewPartition
  • 更新副本状态机的状态:NewReplica
  • 更新分区状态机的状态为:OnlinePartition,从正常的副本列表中的获取第一个作为分区的Leader副本,所有的副本作为分区的同步副本列表,我们称之为ISR( In-Sync Replica)在ZK路径/brokers/topics/主题名上增加分区节点/partitions及状态/state节点。
  • 更新副本状态机的状态:OnlineReplica
  1. Controller节点向主题的各个分区副本所属Broker节点发送LeaderAndIsrRequest请求,向所有的Broker发送UPDATE_METADATA请求,更新自身的缓存。
  • Controller向分区所属的Broker发送请求
  • Broker节点接收到请求后,根据分区状态信息,设定当前的副本为Leader或Follower,并创建底层的数据存储文件目录和空的数据文件。

文件目录名:主题名 + 分区编号

文件名

说明

0000000000000000.log

数据文件,用于存储传输的小心

0000000000000000.index

索引文件,用于定位数据

0000000000000000.timeindex

时间索引文件,用于定位数据