> 文档中心 > 分布式-MQ-07 kafka高级特性及常见问题调优

分布式-MQ-07 kafka高级特性及常见问题调优

kafka高级特性及常见问题调优

    • 一、kafka高级特性
      • 1.1 producer发布消息机制
      • 1.2 Controller及partition选举机制
        • 1.2.1 Controller选举机制(zk控制)
        • 1.2.2 partition选举Leader机制
      • 1.3 Rebalance再平衡机制
        • 1.3.1 rebalance的前提
        • 1.3.2 什么情况下会发生rebalance
        • 1.3.3 Rebalance的过程
        • 1.3.4 Rebalance分配策略
      • 1.4 日志存储
      • 1.5 分区数如何选择
    • 二、kafka常见问题及调优
      • 2.1 kafka启动慢及消费者重新订阅慢
        • 2.1.1 kafka重启慢问题
        • 2.1.2 kafka消费者重启后重新订阅很慢
      • 2.2 kafka消息顺序性分析
      • 2.3 kafka如何保证不丢消息
      • 2.4 kafka能否保证消息幂等性
      • 2.4 kafka高性能的原因
    • 三、kafka集群的CAP问题
      • 3.1 一致性
        • 3.1.1 数据写一致性控制
        • 3.1.2 数据读一致性控制
      • 3.2 可用性
      • 3.3 分区扩展性

一、kafka高级特性

1.1 producer发布消息机制

  • 消息写入方式
    producer采用push模式将消息发送到broker,消息将被顺序写入对应的partition中,因为顺序写的极高,保证了kafka的吞吐效率。
  • 消息路由机制
    producer发消息到时,因为在多分区中只能写入唯一分区,所以消息写入分区号是需要计算的。
    • 指定分区号,则直接使用该分区
      public ProducerRecord(String topic, Integer partition, K key, V value)
    • 未指定分区,指定key[hash取模指定分区]
      public ProducerRecord(String topic, K key, V value) //对当前key进行hash,取模分区数Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;    
    • 分区号及key均未指定[RoundRobinPartitioner 轮询发送指定分区]
      public ProducerRecord(String topic, V value)//轮询RoundRobinint nextValue = nextValue(topic);List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);if (!availablePartitions.isEmpty()) {    int part = Utils.toPositive(nextValue) % availablePartitions.size();    return availablePartitions.get(part).partition();} else {    // no partitions are available, give a non-available partition    return Utils.toPositive(nextValue) % numPartitions;}
  • 消息写入过程
    在这里插入图片描述

1.2 Controller及partition选举机制

在这里插入图片描述

1.2.1 Controller选举机制(zk控制)

kafka集群启动时,所有broker向zookeeper发送创建**/Controller临时节点的请求,由zookeeper来保证只有1个broker节点能创建成功,该broker则当选为kafka集群中的Controller,负责管理集群分区和副本状态信息。
当controller节点的broker宕机,则zookeeper中的
/Controller**临时节点消失,所有存活的broker将再次竞争创建临时节点,由zookeeper保证唯一的新broker成为Controller。

具备控制器身份的broker需要比其他普通的broker多一份职责,负责监听整个集群所有分区和副本的状态。具体细节如下:

  • 监听broker相关的变化。为Zookeeper中的/brokers/ids/节点添加BrokerChangeListener,用来处理broker增减的变化。

  • 监听topic相关的变化。为Zookeeper中的/brokers/topics节点添加TopicChangeListener,用来处理topic增减的变化;为Zookeeper中的/admin/delete_topics节点添加TopicDeletionListener,用来处理删除topic的动作。

  • 从Zookeeper中读取获取当前所有与topic、partition以及broker有关的信息并进行相应的管理。对于所有topic
    所对应的Zookeeper中的/brokers/topics/[topic]节点添加PartitionModificationsListener,用来监听topic中的分区分配变化。

  • 更新集群的元数据信息,同步到其他普通的broker节点中。

1.2.2 partition选举Leader机制

  • unclean.leader.election.enable=false 不选非Isr集合中的broker
    选择Isr集合中,第一个broker作为leader。(第一个broker最先放进ISR列表,可能是同步数据最多的副本)
  • unclean.leader.election.enable=true 如果Isr列表为空,将选择AR中可用的broker作为partition的lerder

副本进入ISR列表有两个条件:

  • 副本节点不能产生分区,必须能与zookeeper保持会话以及跟leader副本网络连通
  • 副本能复制leader上的所有写操作,并且不能落后太多。(与leader副本同步滞后的副本,是由
    replica.lag.time.max.ms 配置决定的,超过这个时间都没有跟leader同步过的一次的副本会被移出ISR列表)

1.3 Rebalance再平衡机制

1.3.1 rebalance的前提

消费者通过subscribe不指定分区消费的情况下会发生rebalance,assign这种指定分区消费的方式则不会再平衡。

1.3.2 什么情况下会发生rebalance

  • 消费者组的consumer数量发生增减(重启某台消费者服务器时势必也会造成rebalance)
  • 动态给topic增加分区
  • 消费者组订阅了更多的topic

1.3.3 Rebalance的过程

  • 第一阶段:选择组协调器(选小组长)
  • 第二阶段:加入消费组Join Group
  • 第三阶段:Sync Group

1.3.4 Rebalance分配策略

rebalance共有3中再平衡策略:range、round-robin、sticky

  • range(范围分配) 假设有10个分区,消费者组内有3个消费者,则:

    消费者0被分配:partition-0、partition-1、partition-2、partition-3

    消费者1被分配:partition-4、partition-5、partition-6

    消费者2被分配:partition-7、partition-8、partition-9

  • round-robin(轮询分配) 假设有9个分区,消费者组内有3个消费者,则:

    消费者0被分配:partition-0、partition-3、partition-6

    消费者1被分配:partition-1、partition-4、partition-7

    消费者2被分配:partition-2、partition-5、partition-8

  • sticky(稳态分配)

    sticky与round-robin的分配方案比较类似,但显著的区别时,sticky尽可能保证本次分配与上次相同。

    即原有分配方案,尽可能不变。

    sticky发生rebalance的两个原则:

    • 1)分区分配尽可能均匀
    • 2)再平衡尽可能与上次保持相同

1.4 日志存储

kafka消息存储在指定的log.dir目录下,topic对应的日志信息以Topic+分区号命名。

  • 为避免日志文件过大,kafka为.log文件指定分段大小,以避免文件过大影响读写效率

    单个日志分段大小

    log.segment.bytes: The maximum size of a single log file

    Default: 1073741824 = 1GB

  • 存储文件消息内容的文件.log文件

  • 为提高查询效率,.log文件的内容以每次4K(可配置)的offset作为索引值记录到.index文件中。存储索引offset的文件,可以快速定位消息的offset进而查找到.log真是消息内容的位置。

  • .timeindex索引文件,同样在.log文件中每次以4K发送一次来到.timeindex中。

在这里插入图片描述

1.5 分区数如何选择

bin/kafka-producer-perf-test.sh --topic my_local_topic --num-records 1000000 --record-size 1024 --throughput -1 --producer-props bootstrap.server=192.168.149.128:9092 acks=1

二、kafka常见问题及调优

2.1 kafka启动慢及消费者重新订阅慢

2.1.1 kafka重启慢问题

  • kafka-server-start.sh 修改默认初始化堆内存及最大堆内存
#原值if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"fi#根据物理内存情况32G内存可置为16Gexport KAFKA_HEAP_OPTS="‐Xmx16G ‐Xms16G ‐Xmn10G ‐XX:MetaspaceSize=256M ‐XX:+UseG1GC ‐XX:MaxGCPauseMillis=50 ‐XX:G1He apRegionSize=16M"#最大STW50ms,使用G1垃圾回收器,单元格大小16M,元空间256M,初始化内存16G.最大内存16G,年轻代10G
  • 修改启动实例或关闭实例时梳理日志文件线程数
    在启动kafka时,需要后台线程将log.dir目录中topic的分区segment log读取到内核空间,当partition分区中的数据很多(或许日志删除hour过大,保留日志量很大),后台默认的num.recovery.threads.per.data=1将忙不过来,可以试着将该线程数放大

2.1.2 kafka消费者重启后重新订阅很慢

当有消费者宕机或重启,这是生产中非常常见的状态,将导致coordinator认为分区与消费者的对应关系发生变化,需要重新平衡消费者于分区关系。

  • session.timeout.ms 根据服务需求,可大可小,当该值更小时,更容易触发rebalance,当该值更大时,可能消费者已宕机而broker未发现。
  • max.poll.interval.ms 如果消费者处理消息的时间,超过该设置值,将视为消费者处理能力差,或被踢出ConsumerGroup
  • rebalance策略,可以选择sticky以减少已有稳定分区的重新分配。

2.2 kafka消息顺序性分析

能否保证所有消息有序?不能,只能保证分区内有序(局部有序)

2.3 kafka如何保证不丢消息

1、kafka能否保证消息不丢? 不能 why?什么情况下会丢消息?
2、消息丢失及处理(消息保障)
3、消费确认机制等

2.4 kafka能否保证消息幂等性

是否支持消息幂等?producer可调整配置支持幂等,消费者端需要业务自行实现

2.4 kafka高性能的原因

  • 磁盘顺序读写
    kafka消息不能修改及不会在文件中间删除内容,保证了每次写内容都是在文件末尾的追加,从而保证写文件的顺序行;每次读内容时有.index文件的索引查找,及.log文件的分段,又能极大结合磁道读取内容的顺序行,从而提高读取效率。

  • 数据传输零拷贝

  • 读写数据的批量batch处理及压缩传输

三、kafka集群的CAP问题

什么是CAP,当分布式系统同时涉及读写时,需要保证一致性(Consistence)、可用性(Availabiliy)、分区容错性(Partition Tolerance)的平衡。成熟的分布式系统一般是在一致性、可用性方面找到平衡点,依据服务的需求设计出更贴近使用的CAP系统。

当强调一致性时,需保证所有的ISR必须完成数据的同步才算完成生产者数据的写入,而此时从可用性来看,只要Leader存活,是可以保证可用性的。除非Leader宕机,Follower才需要从ISR中靠前的broker选择接替Leader,短时间出现不可用。
当强调可用性时,也可以通过保证最终一致性(如ack=-1,> nums/2个副本完成数据同步),使集群其他节点最终数据一致性,从而实现可用性和一致性得以兼得。

3.1 一致性

分布式系统一般存在多个副本,Leader副本和多个Follower副本,如果系统能够保证线性一致性当然极好,但如果不能保证线性一致,进而保证顺序一致性也能满足需要。

  • 什么是线性一致性?
    线性一致性(Linearizability),也称原子一致性(atomic consistency)、强一致性(strong consistency)。从字面意思就可以理解,多个副本之间的同步状态像是集群只有1个副本。所有的操作都是原子的,当针对leader进行更新时,多个follower副本的状态也需要同步更新完成才能让该条更新结果对客户端可见。
    显而易见,线性一致性对消费者客户端是十分友好的,我和朋友小张分别拿着手机看NBA总决赛最后10s,当小张看到Ray Allen三分绝杀比赛结束时,我看到的却还是比分持平,等待发球,我的内心是无法接收的。
    但做到线性一致性是需要付出更多代价的,比如在kafka中消息生产者可以将 ACKS=ALL或-1,同时 min.insync.replicas(最少同步副本数量)设置为AR(所有副本数),此时生产者发送消息必须所有副本写入完成才算成功。但这毫无疑问将导致集群可用性的降低。
  • 什么是顺序一致性(Sequence Consistency)
    假设分布式系统中,X的原值为0,ClientA对X的值修改为1,那么因为网络的延迟等原因,虽然B0、C1的执行事件与A0存在先后顺序,但在服务端真正执行的顺序却是不确定的。所以B0、C1由于与A0存在并行关系,所以B0、C1得到X=0或X=1都是可能的。
    顺序一致性,需要保证的就是:当一个Client从服务端得到最新的结果后,后续其他Client得到的结果也必须是最新的。既:当B0如果读到X=1,那么C1也只能得到X=1。这样就保证了有序的一致性。
    分布式-MQ-07 kafka高级特性及常见问题调优
    我们从kafka的角度再详细的看读写一致性问题。

3.1.1 数据写一致性控制

更新的数据只能由Leader写入,然后同步超过半数以上副本算写入成功,其他未立即同步的副本会在后续完成写入,可得数据写入是线性一致、强一致的。

Acks=-1 或Acks=Allmin.insync.replicas >= num/2+num%2 (保证同步副本超过半数)

3.1.2 数据读一致性控制

kafka官网中介绍,Consumer客户端连接不同的Partition分区,虽然有了HW高水位的存在保证消费者能读取到的消息水位是相同的,但由于不同分区的log-end-offset偏移量是不同的(显然Leader的最大偏移量是最大的),当有ClientA、B分别读取Follower1和Follower2,此时X=3尚未同步到Follower2。
分布式-MQ-07 kafka高级特性及常见问题调优
如果此时出现Rebalance,则可能出现ClientA、B再平衡到Follower2、Follower1,但由于zookeeper的存在,客户端和Zookeeper分别维护了zxid和lastZxid值(下面细讲),当客户端的zxid>zookeeper维护节点的lastZxid时,也就是ClientA连接到Follower2,但是Follower的lastZxid尚未更新到3,此时如果连接将导致消息丢失。所以zk将保证重连时ClientA不会连接到Follower2。
分布式-MQ-07 kafka高级特性及常见问题调优
上面两幅图的动态过程,大致讲解了zk在保证消息顺序一致性所做的事。那么什么是zxid?

  • zxid
    就是事务id,由64位二进制数组成,是为了保证事务的顺序一致性,zookeeper采用了递增的事务id号(zxid)来标识事务。
    其中高32位为epoch值(eopch值主要用来区分leader的版本,当前leader宕机选出下个leader时则需要epoch+1,以此区别leader朝代的更迭),低32位则是当前leader通知下依次递增的计数号码。
    分布式-MQ-07 kafka高级特性及常见问题调优
    看到这个zxid有没有让你联想到什么?我的感受是:
  • 从外形上看类似java对象头Object Header中Markword,甚至在偏向锁中也有epoch值概念。
  • 从高低位搭配上看,酷似ReentrantReadWriteLock的sync内部类中用高16位表示共享锁状态,低16位表示独占写锁状态。

3.2 可用性

可用性指的是在集群中,一部分节点故障后,集群整体是否还能响应客户端的读写请求。
提到可用性,势必会和一致性做兼容。

  • WARO(Write All Read One)
    所有副本写入完成才算成功,读取数据只从一个节点读。当Write All时保证了原子性、强一致性,所以读取节点时无疑会读取最新数据。但此时可用性并不高。
  • Quorum(NWR) 权衡一定的一致性和可用性。
    • N:总的副本数
    • W:更新数据需要保证完成写入的节点数
    • R:读取数据时,需要读取的副本节点数
  • N=5,W=5,R=1(追求强一致性,需要保证所有节点均写入成功,读取时数据是最新的)
  • N=5,W=3,R=3(写入时需要保证超过半数节点写入成功,读取超过半数节点的数据,未完成同步的节点可以通过zxid比较事物号大小,保证读取时顺序一致性)
  • N=5,W=3,R=1(当追求可用性更高时,该种配置可以在存在2个节点故障时仍然可用,且可以保证顺序一致性。)

3.3 分区扩展性

由于kafka是分布式部署在不同机器上的,并且由于网络不可靠,Leader也可能存在网络异常的场景,当Controller认为当前Leader不可用,broker将会再次选举(broker首先会推选自己为leader,再经由与其他broker投票生成新的leader),若老的leader恢复后,新leader也生成了,将出现新旧Leader并存的问题。此时zookeeper将再次通过上述的epoch值比较,控制版本保证数据一致性。

松山湖网站