> 文档中心 > [消息队列]-Kafka

[消息队列]-Kafka


Kafka

概述

Kafka是一个分布式基于发布/订阅模式消息队列

消息队列

消息就是要传输的各种形式的数据。数据传输的过程由发出消息的生产者以及接收消息的消费者组成。

消息队列的好处

生产者直接把消息传输给消费者然后等待消费者给出相应,以串行的方式完成一系列连续的操作,等所有操作都完成后才返回给客户端的方式,称为同步处理;在两者之间加一个用于存放消息的消息队列,生产者把消息写入消息队列,自己可以先对客户端做出回应,避免一次请求中过多的等待时间,这是异步处理的方式,异步处理的方式起到了解耦削峰异步的好处
在实际应用场景中,一个操作往往是跟许多操作连在一起的,涉及到的操作越多,不同模块之间的耦合就会更多。引入了消息队列,可以让不同模块之间解耦,使队列两端相互独立;让处于后面的一部分操作变成异步执行,也减少客户端请求的耗时;有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况;消息队列的转储消息作用(异步通信)也能帮助服务器缓解压力,即削峰

消息队列的两种模式
  • 点对点模式,生产者将消息发送到队列中,消费者主动从队列中取出并消费消息,消息被消费之后队列中不再存储,也就是说其它消费者不可能消费到已经被消费的消息。可以存在多个消费者,但对于一个消息而言,只会被一个消费者消费
  • 发布/订阅模式,消费者消费之后不会清除消息。一个消息可以被多个消费者订阅,发送到队列的消息会被所有订阅者消费。消息的保留时间也是有限制的。该模式下还有两种模式,一种是消费者主动去拉取消息,坏处是为了得到消息,需要隔一段时间就去轮询,查看队列中的消息是否更新,这样可能会出现浪费资源的情况;一种是队列主动把消息推送给消费者,坏处是不同消费者消费的速度可能不一样,但队列推送的速度肯定是一样的,这样可能就会导致消费者消费能力不足或资源浪费等问题
    Kafka是基于发布/订阅模式中消费者主动拉取消息的方式

Kafka基础架构

Kafka架构
1.Producer:消息生产者,向Kafka broker发消息的客户端
2.Consumer:消息消费者,从Kafka broker取消息的客户端
3.Consumer Group:消费者组,消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费,消费者组之间互不影响,所有消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者
4.Broker:一台Kafka服务器就是一个broker,一个集群由多个broker组成,一个broker可以容纳多个topic
5.Topic:可以理解为一个队列,生产者和消费者面向的都是一个topic
6.Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker上,一个topic可以分为多个partition,每个partition都是一个有序的队列
7.Replica:副本,为保证集群中的某个节点发生故障时,该节点上的partition数据不丢失,且Kafka仍然能够继续工作,Kafka提供了副本机制,一个topic的每个分区都有若干个副本,一个leader和若干个follower
8.leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是leader
9.follower:每个分区多个副本中的“从”,实时从leader中同步数据,保持和leader数据的同步,leader发生故障时,某个follower会成为新的leader。follower与leader不会在同一个broker中

  • Kafka中消息是以topic进行分类的,生产者跟消费者都是面向topic的。topic是逻辑上的概念,而partition是物理上的概念,每个partition对应于一个log文件,该log文件中存储的是生产者生产的数据。生产者生产的数据会被不断追加到该log文件末端,且每条数据都有自己的offset(类似为标识)。消费组中的每个消费者都会实时记录自己消费到了哪个offset,以便出错恢复时从上次的位置继续消费。每个partition中的offset数列都是独立的,并没有全局的对所有数据的offset列

Kafka架构深入

文件存储

由于生产者生产的消息会不断追加到log文件末尾,为防止文件过大导致数据定位效率低下,Kafka采取了分片索引机制,一个partition分为多个segment,每个segment对应两个文件 .index.log,这些文件位于一个文件夹下,该文件夹的命名为:topic名称-分区序号。如first这个topic有三个分区,则其对应文件夹为first-0,first-1,first-2。index文件和log文件以当前segment的第一条消息的offset命名。log文件是存放实际数据的(与日志无关),该文件最大可以达到一个G,达到该大小就会出现一个新的文件。index文件存储的就是log文件中数据的索引:Kafka的文件存储
这里我们模拟一下查找offset为3的数据的过程来了解index文件:
由于offset数列是有序的,因此Kafka使用二分查找的方法找到offset为3的数据的索引是在0000…00.index文件中,index文件中每条数据的大小是一样的,因此可以方便找出offset对应目标索引信息所在的数据,该数据中除了存放offset的值以外还存放了该offset对应的数据在log文件中的物理偏移量以及该数据的大小等其他信息,如图中该数据的物理偏移量为756,假设他的数据大小为1000,那么可以该数据在log文件中的位置就是756~1756,根据这个信息就可以快速找到目标数据的位置

生产者分区策略

1.分区的原因:

  • 方便在集群中扩展,提高负载能力
  • 可以提高并发,因为可以以partition为单位进行读写
    2.分区的原则:
    我们需要把生产者发送的数据封装为一个ProduceRecord对象。在构造函数中:
  • 指明partition的情况下,直接把指明的值作为partition值
  • 没有指明partition值但有key的情况下,将key的哈希值与topic的partition数进行取余得到partition值
  • partition值和key值都没有的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与topic可用partition总数取余得到partition值,这也就是round-robin算法

数据可靠性保证

ack机制

为保证生产者发送的数据能可靠地发送到指定的topic,topic的每个partition收到数据后都需要向生产者发送ack(acknowledgement 确认收到),如果生产者收到了ack,就会进行下一轮的发送否则重新发送数据
何时发送ack:确保有follower与leader同步完成,leader才发送ack,才能保证leader挂掉之后能在follower中选举出新的leader
要多少个follower同步完成:两种方案:

方案 优点 缺点
半数以上 延迟低 选举新的leader时,容忍n台节点故障的话需要2n+1个副本
全部完成 选举新的leader时,容忍n台节点故障的话需要n+1个副本 延迟高

Kafka选择了第二种方案,原因:
1.同样为了容忍n台节点的故障,第一种方案需要2n+1个副本,而第二种方案只需要n+1个副本,而Kafka每个分区都有大量的数据,第一种方案会造成大量数据的冗余
2.网络延迟对Kafka的影响较小

ISR

采取了第二种方案后,面临一个问题:假设leader收到数据,所有follower都开始同步数据,但有一个follower因为某种故障迟迟不能进行同步,那leader就要一直等下去直至它完成同步才发送ack。为此,leader维护了一个动态的in-sync replica set(ISR),意为和leader保持同步的follower集合(包含leader),当ISR中的follower完成数据的同步之后,leader就会给follower发送ack,如果follower长时间未向leader同步数据,该follower就会被踢出ISR,该时间阈值由replica.lag.time.max.ms参数设定,选择进入ISR还有一个参数是follower已同步的数据与leader数据的条数的差值,相差小的也具备进入ISR的资格。0.9版本开始只保留时间这个参数,不再参考数据条数差值,因为生产者发送消息是批量发送的,假设ISR设定的最大条数差值是10,而生产者每次批量发送消息12条,发送到leader后,此时所有follower中数据与leader的差值都大于10,都会被踢出ISR,但在时间阈值内他们又可以完成同步,又满足了时间阈值和数据差值两个条件,又被加入ISR,另一方面,ISR是需要维护的,它被保存在内存以及zookeeper(集群环境下共享信息)中,频繁地将follower踢出以及加入ISR会导致频繁地访问,读写zookeeper,使得效率低下,因此移除了数据条数这个限制,只保留时间限制
leader发生故障之后就会从ISR中选举新的leader
ISR + OSR(OutSynvRepli) = AR(AllRepli)

ack应答机制

对于某些不太重要的数据,对数据得可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等ISR中的follower全部接收成功。Kafka为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡。选择一下的配置:
acks参数:

  • 0,生产者不等待broker的ack,这一操作提供了一个最低的延迟,broker一接收到还没有写入磁盘就已经返回,当broker故障时有可能丢失数据
  • 1,生产者等待broker的ack,partition的leader落盘成功后返回ack,如果在follower同步成功之前leader故障那么将会丢失数据
  • -1(all),生产者等待broker的ack,partition的leader和follower(ISR)全部落盘成功后才返回ack。但是如果在follower同步完成后,broker发送ack之前,leader发生故障,那么会造成数据重复,因为此时follower和leader已经拥有相同的数据,此时还未发送ack的时候leader挂了不能发送ack,那么生产者就会再次发送相同的数据,此时由于旧的leader已经挂了,有一个follower成为新的leader,那么这个新的leader就会接收到完全相同的一份数据。当ISR中只有leader的时候,此时相当于退化到1的情况,也会出现丢失数据,实际中这种情况很少,是一种极限情况
数据一致性
  • 消费数据一致性
    LEO:Log End Offset,每个副本最大的offset
    HW:high watermark,ISR队列中最小的LEO,指的是消费者能见到的最大的offset
    HW的作用:如果没有HW,假设leader的LEO为19,当一个消费者消费leader时消费到18时leader挂了,此时一个LEO为12的follower成为了leader,要从offset为18开始继续消费,然而新的leader中最大offset也只有12,找不到对应18的数据,因此消费者能见到的是HW,只在HW的范围内消费,这就保持了消费数据的一致性
  • 存储数据一致性(副本间数据一致性)
    1.follower故障:follower发生故障后会被临时踢出ISR,待该follower恢复后,follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截掉,从HW开始向leader进行同步,待该follower的LEO大于等于该partition的HW,即follower追上leader之后,就可以重新加入ISR了
    2.leader故障:leader发生故障后会从ISR中选出一个新的leader,之后,为保证多个副本之间的数据一致性,其余的follower会先将各自的log文件高于HW的部分截掉,然后从新的leader同步数据
    这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复,ack解决的才是丢失和重复的问题

Exactly Once语义

将服务器ack级别设置为-1可以保证生产者到服务器之间不会丢失数据,即At Least Once语义,它可以保证数据不丢失,但不能保证数据不重复;相对的,将ack级别设置为0,可以保证生产者每条消息只会被发送一次,即At Most Once语义,可以保证数据不重复,但不能保证数据不丢失
对于一些非常重要的信息,比如说交易数据,下游数据消费者要求数据既不重复也不丢失,即Exactly Once语义。在0.11版本之前的Kafka只能保证数据不丢失,再在下游消费者对数据做全局去重,对于多个下游应用的情况,每个都需要单独去做去重,这就对性能造成了很大影响。0.11版本引入了幂等性,所谓幂等性就是指生产者不论向服务端发送多少次重复数据,服务端都只会持久化一条,幂等性结合At Least Once语义就构成了Exactly Once语义
要启用幂等性,只需要将Producer的参数中enable.idompotence设置为true(此时ack级别也会被设为-1)即可,Kafka幂等性的实现其实就是将原来下游需要做的去重放在了数据上游。开启幂等性的Producer在初始化时会被分配一个PID,发往同一partition的消息会附带Sequence Number,而broker端会对做缓存,当具有相同主键的消息提交时,broker只会持久化一条。但是PID重启就会变化,同时不同的partition也具有不同主键,所以幂等性无法保证跨分区跨会话(生产者挂掉重新启动)的Exactly Once

消费者

消费方式

消费者采用pull模式从broker中读取数据。push模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的,它的目标是尽可能以最快速度传递消息,但是这样很容易造成消费者来不及处理消息,典型的表现就是拒绝服务以及网络拥塞,而pull模式则可以根据消费者的消费能力以适当的速率消费消息
pull模式不足之处是,如果Kafka没有数据,消费者可能会陷入循环中,一致返回空数据,针对这一点,Kafka的消费者在消费数据时会传入一个时长参数timeout,如果当前没有数据可供消费,消费者会等待一段时间之后再返回,这段时长即为timeout

消费分区分配策略

每个topic中不同的partition只会由一个消费者组中的一个消费者来消费,所以要确定哪个partition由哪个消费者消费者。两种策略:

  • RoundRobin
    按消费者组来分配。是一种轮询的方式,前提是消费者组中的消费者订阅的topic是一样的。当只有一个主题时,假设它有7个分区,消费者组中有三个消费者,那么具体分配方式就是分区0给消费者1,分区1给消费者2,分区2给消费者3,分区3给消费者1,分区4给消费者2,分区5给消费者3,以此类推,最后每个消费者分到的区差值最多为1;当有两个以上主题时,所有主题的所有分区会被当成一个整体,每个分区就是TopicAndPartition类对象,然后对这些对象进行排序,然后再以与只有一个主题的情况相同的方法进行分区。由于并不能保证前提条件一定成立,默认的分区策略不是这一种而是下面这一种,如果前提条件不成立的话,是可能会导致某个消费者被分配到自己没有订阅的主题分区,因为分配的时候是把一个组内所有消费者所有订阅的主题来分配给组内所有消费者,不管消费者是否订阅了正在被分配的主题
  • Range
    按主题进行划分,假设一个主题中有n个partition,一个消费组中订阅该主题的消费者有m个,n/m=p,n%m=q,则这m个消费者中前q个消费者分到p+1个分区,剩下的消费者分到p个分区。由于是面向主题的,这种策略可能会导致不同消费者间消费分区个数差别过大
    当消费者个数发生变化就会触发分区策略进行分区
offset的维护

由于消费者在消费过程中可能会出现断电宕机等故障,消费者恢复后需要从故障前的位置继续消费,所以消费者需要实时记录自己消费到了哪个offset以便故障恢复之后继续消费。在一个消费者组内,一个消费者挂掉了,另一个消费者接手时应该接着offset,因此由消费者组+主题+partition唯一确定一个offset。offset既保存在Kafka本地,也保存在zookeeper,Kafka0.9版本之前,消费者默认将offset保存在zookeeper中,从0.9版本开始默认将offset保存在Kafka一个内置的topic中,该topic为_consumer_offsets

高效读写数据的原因

  • 顺序写
    Kafka的生产者生产数据写入log文件,写的过程是一直追加到log文件末端,为顺序写的方式。同样的磁盘,顺序写的速度比随机写速度更快,因为顺序写省去了大量磁头寻址的时间
  • 零拷贝
  • 分布式(分区),并发读写(如果只有单台当然就跟分布式无关)

zookeeper在Kafka中的作用

Kafka集群中有一个broker会被选为Controller,负责管理集群broker的上下线,所有topic的分区副本分配和leader选举等工作。controller的管理工作都是依赖于zookeeper的

事务

Kafka从0.11版本开始引入事务支持,事务可以保证Kafka在Exactly Once语义上实现生产和消费可以跨分区和会话,要么全部成功,要么全部失败。主要指的是生产者事务,目的是完善Exactly Once语义:
为了实现跨分区跨会话的事务,引入了一个全局唯一的TransactionID,并将Producer和TransactionID绑定,这样当Producer重启后就可以通过正在进行的TransactionID获取原来的PID(所以ID应该是来自客户端而不是由broker来随机生成)
为了管理Transaction,Kafka引入了一个新的组件Transaction Coordinator,Producer就是通过和Transaction Coordinator交互获得Transaction ID对应的任务状态Transaction Coordinator还负责将事务所有写入Kafka的一个内部topic,这样即使整个服务重启,由于事务状态得到保存,进行中的事务状态可以得到恢复从而继续进行

API

Producer API

发送流程

Kafka的Producer发送消息采用的是异步发送(在消息量上分批次地发送消息,但发送时间上各批次间是独立的,一个批次来了就发送,不受其它批次成功与否的影响,一个批次对应一个ack,如果该批次发送失败了就重发该批次的数据)的方式,在消息发送的过程中,涉及到了两个线程-main线程和Sender线程,以及一个线程共享变量RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka broker
生产者客户端整体结构:
Kafka消息发送的流程
相关参数:
batch.size:只有数据积累到batch.size之后,Sender才会发送数据
linger.ms:如果数据迟迟未达到batch.size,Sender等待linger.ms之后就会发送数据

代码示例
<dependency>    <groupId>org.apache.kafka</groupId>    <artifactId>kafka-clients</artifactId>    <version>2.8.0</version></dependency>
public static void main(String[] args) {    //1.创建Kafka生产者的配置信息,以下的配置参数都可以在ProducerConfigs中找到,无需记忆。其中部分参数的值都有默认值不用自己设定    Properties properties = new Properties();    //指定连接的Kafka集群    properties.put("boostrap.servers","hadoop102:9092");    //ack应答级别    properties.put("acks","all");    //重试次数    properties.put("retries",3);    //批次大小,字节    properties.put("batch.size",16384);    //等待时间,ms    properties.put("linger.ms",1);    //RecordAccumulator缓冲区大小,字节    properties.put("buffer.memory",33554432);    //序列化器    properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");    properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");    //2.创建生产者对象    KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);    //3.发送数据    for (int i = 0; i < 10; i++) { producer.send(new ProducerRecord<String, String>("first","record" + i));    }    //4.切记关闭资源关闭资源,调用该方法会直接把数据发送出去(就算不满足batch.size等参数)。该方法除了把producer自己的资源关了,还会关闭其它跟producer相关的资源,比如分区器和拦截器,如果有资源的话也会关掉    //也就是说这个close()方法会去调分区器或拦截器等的close()方法,没有这个close(),那些类对象中的close()方法也不会被调用    producer.close();}

带回调函数的生产者:

for (int i = 0; i < 10; i++) {    producer.send(new ProducerRecord<String, String>("first", "record" + i), new Callback() { //成功返回recordMetadata,失败返回异常 public void onCompletion(RecordMetadata recordMetadata, Exception e) {     if(e == null){  System.out.println(recordMetadata.partition());     } }    });}

自定义分区器:

public class MyPartition implements Partitioner {...
//修改生产者的配置信息中的分区器类properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.xxx.www.partition.MyPartition");

同步发送的方式发送数据:
通过在sender线程的时候阻塞main线程实现。具体做法:由于Producer的send方法返回的是Future对象,该类与线程有关,调用该类对象的get()方法时除了会获取对象的值外还会阻塞前面的线程,借此实现线程阻塞,同步发送

producer.send(new ProducerRecord<String, String>("first","record" + i)).get();

消费者

public static void main(String[] args) {    Properties properties = new Properties();    //集群    properties.put("boostrap.servers","hadoop102:9092");    //开启自动提交offset    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);    //自动提交的延时,1000ms(只有开启自动提交时才会生效)    properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");    //反序列化器    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");    //消费者组    properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group");    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);    //订阅主题(单个元素变为list使用这个方法而不使用Arrays.asList())    //可以订阅一个不存在的主题,不会报错,会给警告,但不影响运行以及其它主题获取数据    consumer.subscribe(Collections.singletonList("first"));    //消费者打开就不再关闭,持续获取数据。除了强制kill掉或强制退出    while (true){ //获取数据(会获取到多个) ConsumerRecords<String, String> consumerRecords = consumer.poll(100); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {     //key能获取到,说明key也会被存到集群,而不是只为了分区用的     System.out.println(consumerRecord.key()); } consumer.close();    }}
消费者重置offset
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");

关于该参数的文档如下:

What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):

  • earliest: automatically reset the offset to the earliest offset
  • latest: automatically reset the offset to the latest offset
  • none: throw exception to the consumer if no previous offset is found for the consumer’s group
  • anything else: throw exception to the consumer.

可以看出,只有消费者换组了,或者 offset过期(7天)了,这个参数配置才会生效,否则写了这条配置也不会生效。该参数默认是latest

offset的提交
  • 如果没有提交offset

如果没有开启offset自动提交也没有手动提交到本地,假设本次开启消费者时内存中存储的offset为90,本次消费到了100,关闭消费者后,本地保存的offset还是90,下一次打开,内存中的offset是从本地获取的,所以还是90,虽然每次运行时内存中的offset会随着消费进度而更新,但每次开始运行都是从本地获取的offset值作为本次消费的初始offset值

  • 手动提交offset

自动提交虽然十分便利,但由于是基于时间延迟提交的,开发人员难以把握offset提交的时机,因此还可以选择手动提交offset
手动提交有两种,commitSync同步提交和commitAsync异步提交。两者的相同点是都会将本次poll的一批数据最高的偏移量提交;不同点是,同步提交阻塞当前线程直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败),而异步提交没有失败重试机制,有可能提交失败

  • 自定义存储offset
    逻辑处理和offset提交是一起执行的,如果是先完成逻辑处理再提交offset有可能出现重复消费的问题;如果先提交offset再进行逻辑处理就可能出现漏消费的问题。Kafka提供了自定义存储offset的方式
    PS:消费者提交消费位移时提交的是当前消费到的最新消息的offset+1
    注意区分重复消费,漏消费以及数据重复,数据丢失的不同,一个是消费者这边的,一个是生产者这边的

自定义拦截器

Producer拦截器是在Kafka0.10版本引入的,主要用于实现clients端的定制化控制逻辑。对于producer而言,interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,producer允许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链

代码示例

需求:实现一个双拦截器的拦截链。第一个拦截器用于在消息发送前在消息前添加时间戳信息;第二个拦截器用于在消息发送后更新成功发送消息数及失败发送消息数

public class TimeInterceptor implements ProducerInterceptor<String,String> {    //获取配置信息和初始化数据时调用    public void configure(Map<String, ?> configs) {    }    //==========循环调用以下两个方法,每一条数据都会经过==========    //该方法封装进KafkaProducer.send()方法中,即它运行在用户主线程中。Producer确保在消息被序列化以及计算分区前    //调用该方法。用户可以在方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { return new ProducerRecord<String, String>(record.topic(),record.partition(), record.key(), System.currentTimeMillis() + record.value());    }    //该方法会在消息从RecordAccumulator成功发送到Kafka Broker之后,或者在发送过程中失败时调用。并且通常都是在producer回调逻辑触发之前    //onAcknowledgement运行在producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢producer的消息发送效率    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {    }    //关闭interceptor,主要用于执行一些资源清理工作。如前所述,interceptor可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全    //另外倘若指定了多个interceptor,则producer将按照指定顺序调用它们,    //并仅仅是捕获每个interceptor可能抛出的异常记录到错误日志中而非再向上传递。    public void close() {    }}
public class CounterInterceptor implements ProducerInterceptor<String,String> {    int success;    int error;    public void configure(Map<String, ?> configs) {    }    //这个函数不用添加逻辑,但要把参数的record返回,返回null的话所有数据就都被过滤掉了    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { return record;    }    public void onAcknowledgement(RecordMetadata metadata, Exception exception) { if(metadata != null){     success++; }else {     error++; }    }    public void close() { System.out.println("success:" + success); System.out.println("error:" + error);    }}

生产者配置拦截器

//多个拦截器用集合写在一起,一个一个设的话会覆盖properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Arrays.asList("com.xxx.www.interceptor.TimeInterceptor","com.xxx.www.interceptor.CounterInterceptor"));

这里注意最后,生产者的close()方法必须调用,否则拦截器的close()方法也不会被执行。实际场景中,生产者跟消费者一样是一直开着的,通常是把producer发送数据的代码用try-catch包起来,在finally代码块中调用close()方法

其他问题

1.当你使用kafka-topics.sh创建(删除)了一个topic之后,Kafka内部会执行什么逻辑?
a.会在zookeeper中的/brokers/topics节点下创建一个新的topic节点,如/brokers/topics/first
b.触发Controller的监听程序
c.Kafka Controller负责topic的创建工作,并更新metadata cache
2.topic的分区数可以增加,但不能减少,因为已经存在的数据无法处理
3.kafka中需要选举的地方有Controller跟leader,Controller是通过抢夺资源选出来的
4.kafka日志保存时间:7天
Kafka硬盘大小:每天的数据量*7天
5.Kafka消息数据积压,Kafka消费能力不足怎么处理?
a.如果是kafka消费能力不足,可以考虑增加topic的分区数,并且同时提升消费组的消费者数量,消费者数=分区数(两者缺一不可)
b.如果是下游的数据处理不及时,提高每批次拉取的数量,批次拉取数据过少(拉取数据/处理时间<生产速度),使处理的数据小于生产的数据,也会造成数据积压
方案a比b提高得要更靠谱一点