> 技术文档 > Kafka学习记录_kafka的server.properyies配置文件怎么改

Kafka学习记录_kafka的server.properyies配置文件怎么改


0. 消息中间件概述

1.1 消息队列简介

消息队列(message queue)简称MQ,是一种以“先进先出”的数据结构为基础的消息服务器。

消息:两个系统间要传输的数据

作用:实现消息的传递

原始的数据传递方式:

上述的数据传输方式为同步传输【调用方必须等待被调用方执行完毕后,才可以继续向下执行】,同步传输存在的弊端:传输效率较低。

基于MQ实现消息的传输,如下图所示:

上述的数据的传输方式属于异步传输【调用方不用等待被调用方执行完毕就可以接续传递消息】,数据传输的效率较高。

1.2 消息队列的应用场景

首先先说明一下消息中间件的主要的作用:

[1]系统解耦

[2]流量消锋

[3]数据分发

上面的三点是我们使用消息中间件最主要的目的。

1.2.1 系统解耦

系统的耦合性越高,容错性【是指系统在部分组件(一个或多个)发生故障时仍能正常运作的能力】就越低。以电商应用为例,用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障或者因为升级等原因暂时不可用,都会造成下单操作异常,影响用户使用体验。

如下下图所示:

使用消息队列以后,整个下单操作的架构如下图所示:

使用消息队列解耦合,系统的耦合性就会降低,容错性就提高了。比如物流系统发生故障,需要几分钟才能来修复,在这段时间内,物流系统要处理的数据被缓存到消息队列中,用户的下单操作正常完成。当物流系统恢复后,补充处理存在消息队列中的订单消息即可,终端系统感知不到物流系统发生过几分钟故障。

1.2.2 流量消锋

流量消锋:消除系统中的高峰值流量(流量可以理解为就是请求)

现有一个电商系统下单初始架构如下所示:

假设用户每秒需要发送5k个请求,而A系统每秒只能处理2K个请求,这样就会导致大量的下单请求失败。而且由于实际请求的数量远远超过系统的处理能力,此时也有可能导致系统宕机。

使用消息队列改进以后的架构如下所示:

1.2.3 异步通信

假设A系统进行了某一个业务操作以后,需要将这个业务操作结果通知给其他的系统,原始的架构如下所示:

此时B系统、C系统、D系统就需要提供对应的接口,然后让A系统进行调用。如果此时不需要通知D系统了,那么就需要更改A系统的代码,将调用D系统的代码删除掉。并且如此时项目中添加了一个新的系统E,A系统也需要将处理结果通知给E系统,那么同时也需要更改A系统的代码。这样就不利于后期的维护。

使用MQ改进以后的架构如下所示:


A系统需要将业务操作结果通知给其他的系统时,A系统只需要将结果发送到MQ中。其他的系统只需要从MQ中获取结果即可,如果不需要结果了,此时只需要取消从MQ中获取结果的操作即可。并且如果新增了一个系统需要获取结果,只需要从MQ中获取结果数据就可以了,A系统的代码不需要进行改动。这样就大大的提高了系统的维护性。

1.3 MQ的优缺点

优点

1、应用解耦提高了系统的容错性

2、流量消锋提高了系统的并发能力

3、异步通信提高了系统的可维护性

缺点:

1、系统可用性降低:系统引入的外部依赖越多,系统稳定性越差。一旦MQ宕机,就会对业务造成影响。

2、系统复杂度提高:MQ的加入大大增加了系统的复杂度。

MQ的选择依据是什么? 调用方是否需要获取到被调用方的执行结果,如果需要获取到结果,那么就需要使用同步通信,如果不需要就可以使用异步通信。

1.概述以及环境准备

1.1 概述

1.1.1 概念

Apache Kafka是一个开源分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用程序。

1.1.2 Kafka版本迭代演进

Kafka前期项目版本似乎有点凌乱,Kafka在1.x之前的版本,是采用4位版本号;比如:0.8.2.2、0.9.0.1、0.10.0.0...等等;

在1.x之后,kafka 采用 Major.Minor.Patch 三位版本号;

Major表示大版本,通常是一些重大改变,因此彼此之间功能可能会不兼容;

Minor表示小版本,通常是一些新功能的增加;

Patch表示修订版,主要为修复一些重点Bug而发布的版本;

比如:Kafka 2.1.3,大版本就是2,小版本是1,Patch版本为3,是为修复Bug发布的第3个版本;

Kafka总共发布了8个大版本,分别是0.7.x、0.8.x、0.9.x、0.10.x、0.11.x、1.x、2.x 3.x 版本,截止目前,最新版本是Kafka 3.7.0,也是最新稳定版本;

1.1.3 Kafka运行环境前置要求

Kafka是由Scala语言编写而成,而Scala运行在Java虚拟机上,并兼容现有的Java程序,因此部署Kakfa的时候,需要先安装JDK(必须安装Java 8+以上的版本)。

1.1.4 启动Kafka

Apache Kafka可以使用ZooKeeper或KRaft启动;但只能使用其中一种方式,不能同时使用;

  • KRaft:是Kafka内置共识机制,用于取代 Apache ZooKeeper,早期kafka的运行依赖于zookeeper,现在kafka3.0之后有了KRaft,没有zookeeper也可以独立运行了;

Kafka启动基于Zookeeper(目前kafka内部已经自带了zookeeper)

1、进入到kafka家目录下的bin目录中运行命令,启动zookeeper./zookeeper-server-start.sh ../config/zookeeper.properties &

2、进入到kafka家目录下的bin目录中运行命令,启动kafka./kafka-server-start.sh ../config/server.properties &

3、进入到kafka家目录下的bin目录中运行命令,关闭Kafka:./kafka-server-stop.sh ../config/server.properties

4、进入到kafka家目录下的bin目录中运行命令,关闭zookeeper: ./zookeeper-server-stop.sh ../config/zookeeper.properties

1.1.5 使用独立的Zookeeper启动Kafka

下载好独立的zookeeper后需要先配置Zookeeper:

进入到zookeeper家目录下的conf目录下,执行命令:cp zoo_sample.cfg  zoo.cfg(zookeeper的配置文件名必须叫zoo.cfg,这里通过样例文件复制一份出来即可)

zoo.cfg 不需要修改,直接使用即可

1、必须先启动Zookeeper

 zookeeper启动默认会占用8080端口,需要其修改配置文件zoo.cfg,在结尾添加如下配置:

admin.serverPort=9089

到zookeeper家目录下的bin目录下运行命令,启动Zookeeper:zkServer.sh start

到zookeeper家目录下的bin目录下运行命令,关闭Zookeeper:zkServer.sh stop

2、进入到kafka家目录下的bin目录中运行命令,启动kafka./kafka-server-start.sh ../config/server.properties &

1.1.6 使用KRaft启动运行Kafka

1、多个kafka节点和单个都需要先,生成Cluster UUID(集群UUID):到kafka的家目录/bin目录下执行命令: ./kafka-storage.sh random-uuid

2、格式化日志目录:到kafka的家目录/bin目录下执行命令:./kafka-storage.sh format -t 刚刚生成的集群UUID -c ../config/kraft/server.properties

3、启动Kafka,到kafka的家目录/bin目录下执行命令:./kafka-server-start.sh ../config/kraft/server.properties &

4、关闭Kafka,到kafka的家目录/bin目录下执行命令:./kafka-server-stop.sh ../config/kraft/server.properties

1.1.7 使用Docker启动运行Kafka

Docker安装:

安装最新版的Docker:

1、yum install yum-utils -y

2、yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo

3、yum install docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker-compose-plugin -y

查看是否安装成功,查看docker版本:docker --versiondocker version,docker -v


Docker启动:

启动:systemctl start docker 或者 service docker start

停止:systemctl stop docker 或者 service docker stop

重启:systemctl restart docker 或者 service docker restart


检查Docker进程的运行状态:systemctl status docker 或者 service docker status

查看docker进程:ps -ef | grep docker

查看docker系统信息:docker info

查看所有的帮助信息:docker --help

查看某个commond命令的帮助信息:docker commond --help


使用Docker启动kafka

1、拉取Kafka镜像:docker pull apache/kafka:3.7.0

2、启动Kafka容器:docker run -p 9092:9092 apache/kafka:3.7.0

查看已安装的镜像:docker images

删除镜像:docker rmi apache/kafka:3.7.0

2.Kafka操作

2.1 Kafka操作

2.1.1创建主题Topic

在使用Kafka之前,必须先在kafka服务器中创建主题(Topic):

        主题(Topic)类似于文件系统中的文件夹。

        主题(Topic)用于存储事件(Events)。

                事件(Events)也称为记录或消息,比如支付交易、手机地理位置更新、运输订单、物联网设备或医疗设备的传感器测量数据等等都是事件(Events)。

                事件(Events)被组织和存储在主题(Topic)中。

                简单来说,主题(Topic)类似于文件系统中的文件夹,事件(Events)是该文件夹中的文件。

2.1.2 Kafka操作

1、创建kafka中的topic

创建主题使用这个命令:kafka-topics.sh

1、不带任何参数会告知该脚本如何使用:./kafka-topics.sh

2、创建主题:./kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092

3、列出所有的主题:./kafka-topics.sh --list --bootstrap-server localhost:9092

4、删除主题:./kafka-topics.sh --delete --topic quickstart-events --bootstrap-server localhost:9092

5、显示主题详细信息:./kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092

6、修改主题信息:./kafka-topics.sh --alter --topic quickstart-events --partitions 5  --bootstrap-server localhost:9092

2、在主题(Topic)中写入一些事件(Events

Kafka客户端通过网络与Kafka Brokers进行通信,可以 读/写 主题Topic中的事件Events;

Kafka Brokers一旦收到事件Event,就会将事件Event以持久和容错的方式存储起来,可以永久地存储;

通过 kafka-console-producer.sh 脚本写入事件Events:

  • 不带任何参数会告知该脚本如何使用:./kafka-console-producer.sh
  • ./kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
  • 每一次换行是一个事件Event
  • 使用Ctrl+C退出,停止发送事件Event到主题Topic

3、从主题Topic中读取事件Events

使用kafka-console-consumer.sh消费者客户端读取之前写入的事件Event

  • 不带任何参数会告知该脚本如何使用:./kafka-console-consumer.sh
  • ./kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
  • --from-beginning 表示从kafka最早的消息开始消费
  • 使用Ctrl+C停止消费者客户端;

事件Events是持久存储在Kafka中的,所以它们可以被任意读取多次;

2.2 在另外一台机器上连接Kafka

1、先在Linux下使用docker启动Kafka容器:docker run -d -p 9092:9092 apache/kafka:3.7.0

2、在windows下安装外部连接工具;

3、使用外部连接工具连接Kafka;

这里使用idea中的kafka插件作为客户端连接kafka服务器: 

如果外部环境连接不上Kafka怎么办?

文件输入:提供一个本地kafka属性配置文件,替换docker容器中的默认配置文件;

0.先随便创建并运行一个kafka容器:docker run -p 9092:9092 apache/kafka:3.7.0

①然后执行这个命令进入到此容器的命令行中:docker exec -it 容器id /bin/bash

②把docker的kafka容器中的server.properties文件复制到linux中:

docker cp 容器id:/etc/kafka/docker/server.properties /opt/kafka/docker

③在Linux下编辑这个配置文件:server.properties

listeners=PLAINTEXT://0.0.0.0:9092(listeners用于指定kafka 监听来自于哪个网卡的请求)

advertised.listeners=PLAINTEXT://192.168.11.128:9092(指定客户端或其他broker连接kafka服务器时,实际使用连接地址,即“对外宣传”的地址)

advertise的含义表示宣称的、公布的,Kafka服务对外开放的IP和端口;

④文件映射:docker run --volume /opt/kafka/docker:/mnt/shared/config -d -p 9092:9092 apache/kafka:3.7.0

其他的Kafka图形界面连接工具:

1、Offset Explorer (以前叫 Kafka Tool),官网:Offset Explorer

        直接进入官网点击下载到window系统中即可连接linux中的kafka:

2、CMAK(以前叫 Kafka Manager) 官网:https://github.com/yahoo/CMAK

        是一个web后台管理系统,可以管理kafka,和kafka一起运行在Linux系统中,项目地址: https://github.com/yahoo/CMAK

        注意该管控台运行需要JDK11版本的支持。

        下载:https://github.com/yahoo/CMAK/releases

        下载下来是一个zip压缩包,直接 unzip解压:unzip cmak-3.0.0.6.zip,解压后即完成了安装;

CMAK客户端需要基于zookeeper的方式启动kafka才可以使用该web管理后台,否则不行;

        1、CMAK配置:

                修改CMAK家目录中的conf目录下的application.conf配置文件:

                kafka-manager.zkhosts=\"192.168.184.128:2181\"

                cmak.zkhosts=\"127.0.0.1:2181\"

        2、CMAK启动:

                切换到CMAK家目录/bin目录下执行命令:./cmak -Dconfig.file=../conf/application.conf -java-home /usr/local/jdk-11.0.22(这里必须用jdk11启动,jdk17是不行的)

                其中-Dconfig.file是指定配置文件,-java-home是指定jdk11所在位置,如果机器上已经是jdk11,则不需要指定;

        3、CMAK访问:

                启动之后CMAK默认端口为9000,在windows系统下访问:http://192.168.11.128:9000/

3、EFAK(以前叫 kafka-eagle) 官网:EFAK

EFAK概述:

        EFAK一款优秀的开源免费的Kafka集群监控工具;(国人开发并开源)

        官网:EFAK    Github:https://github.com/smartloli/EFAK

EFAK下载与安装:

                下载:https://github.com/smartloli/kafka-eagle-bin/archive/v3.0.1.tar.gz

                安装,需要解压两次:

                        1、tar -zxvf kafka-eagle-bin-3.0.1.tar.gz

                        2、cd kafka-eagle-bin-3.0.1

                        3、tar -zxvf efak-web-3.0.1-bin.tar.gz

                        4、cd efak-web-3.0.1

EFAK的配置:

        1、安装MySQL数据库,需要MySQL,并创建数据库ke,数据库创建好即可,其他什么都不用管;

        2、修改配置文件EFAK家目录//conf/system-config.properties

                主要修改Zookeeper配置MySQL数据库配置

                

        3、在/etc/profile文件中配置环境变量KE_HOME,在profile文件的最后添加:

                export KE_HOME=/usr/local/efak-web-3.0.1

                export PATH=$KE_HOME/bin:$PATH

                执行source让环境变量配置生效:source /etc/profile

启动EFAK(也是和kafka一起运行在Linux系统中)

                1、EFAK需要kafka采用zookeeper的方式启动才能使用;

                2、在EFAK安装目录的bin目录下执行这个命令:./ke.sh start 命令使用:ke.sh [start|status|stop|restart|stats]

访问EFAK

        1、在window系统下访问:http://192.168.11.128:8048/

        2、登录账号:admin , 密码:123456

3.Spring Boot集成Kafka开发

3.1 最基础的kafka程序

1、新建项目:新建一个空项目,然后再使用SpringBoot脚手架Spring Initializr在空项目下创建好SpringBoot项目:

下面是kafka的依赖:

  • 这个不是kafka的起步依赖,但是SpringBoot中其实也自动配置好了kafka
  org.springframework.kafka spring-kafka 

2、配置SpringBoot配置文件

spring: application: name: springnboot-01-kafka-base #配置afka相关信息 kafka: #配置kafka服务器的地址 bootstrap-servers: 192.168.184.129:9092 #生产者配置信息 #消费者配置信息

3、写代码

编写生产者(写入事件)

package com.example.producer;import jakarta.annotation.Resource;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.stereotype.Component;@Componentpublic class EventProducer { //引入spring-kakfa依赖后,SpringBoot就会自动装配KakfaTemplate对象到Ioc容器中 //KafkaTemplate类的泛型分别表示发送消息的key和value的类型 @Resource private KafkaTemplate kafkaTemplate; public void send(){ //先使用最基础的send方法发送事件:向某个主题发生某个消息 kafkaTemplate.send(\"hello-topic\",\"hello kafka\"); }}

消费者(读取事件)

package com.example.consumer;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Component;@Componentpublic class EventConsumer { //采用监听的方式接收事件(消息、数据) //默认只会监听最新发到kafka的消息,消费者启动之前发送到kafka的消息是监听不到的 //@KafkaListener注解标注在方法上声明这是一个消费者,topics属性:指定此消费者要监听的主题列表 @KafkaListener(topics = \"hello-topic\",groupId = \"hello-group\") public void onEvent(String event){ System.out.println(\"读取到事件:\" + event); }}

主启动类:

package com.example;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublic class Springnboot01KafkaBaseApplication { public static void main(String[] args) { SpringApplication.run(Springnboot01KafkaBaseApplication.class, args); }}

 测试类:

package com.example;import com.example.producer.EventProducer;import jakarta.annotation.Resource;import org.junit.jupiter.api.Test;import org.springframework.boot.test.context.SpringBootTest;@SpringBootTestclass Springnboot01KafkaBaseApplicationTests { @Resource private EventProducer eventProducer; @Test void test01() { eventProducer.send(); }}

先运行主启动类,再运行测试类中的test01方法发生消息,结果为:

3.2 kafka的中几个主要概念

Kafka的几个概念:

1、生产者Producer:

2、消费者Consumer:

3、主题Topic:

4、分区Partition:在Kafka中,每个topic可以有一个或多个partition分区,当创建topic时,如果不指定该topic中partition的数量,那么默认分区数量是1。

5、偏移量Offset:offset是标识分区中消息的唯一位置,从0开始,然后顺序往上增长,每条消息进来分区都会对应一个offset

6、默认情况下,当启动一个新的消费者组(编写消费者需要指定一个消费者组)时,它会从每个分区的最新偏移量(即该分区中最后一条消息的下一个位置)开始消费消息。如果希望从第一条消息开始消费,需要在SpringBoot配置文件中将消费者的auto.offset.reset属性设置为earliest:

spring: application: name: springnboot-01-kafka-base #配置afka相关信息 kafka: #kafka的连接地址 bootstrap-servers: 192.168.184.129:9092 #生产者配置信息 #消费者配置信息 consumer: auto-offset-reset: earliest

注意: 如果之前已经用相同的消费者组ID消费过该主题中的消息,这时Kafka就已经记录了该消费者组的偏移量(也就是这个消费者组消费到哪个消息),那么即使设置了auto.offset.reset=earliest,该设置也不会生效,因为Kafka只会在找不到偏移量时使用这个配置。在这种情况下,你需要手动重置偏移量使用一个新的消费者组ID

  • 手动重置偏移量
    • 在kafka家目录/bin下执行下面命令:
      #将偏移量重置到00./kafka-consumer-groups.sh --bootstrap-server  --group  --topic  --reset-offsets --to-earliest --execute./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group hello-group-02 --topic hello-topic --reset-offsets --to-earliest --execute#将偏移量重置到最新的那条消息./kafka-consumer-groups.sh --bootstrap-server  --group  --topic  --reset-offsets --to-latest --execute

3.3 消费者消费消息时的偏移量策略配置

消息消费时偏移量策略的配置:

spring: kafka: #消费者配置信息 consumer: auto-offset-reset: earliest

取值: earliest 、latest、 none 、exception earliest 自动将偏移量重置为最早的偏移量,从0开始消费; latest: 自动将偏移量重置为最新偏移量,从该分区中的最后一条消息的下一个位置开始消费; none: 如果没有为消费者组找到以前的偏移量(也就是以前消费者组有没有消费过消息),则向消费者抛出异常; exception: 向消费者抛出异常; spring- kafka 不支持)

3.4 生产者向kafka发送消息的方法

package com.example.producer;import jakarta.annotation.Resource;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.common.header.Headers;import org.apache.kafka.common.header.internals.RecordHeaders;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.kafka.support.KafkaHeaders;import org.springframework.messaging.Message;import org.springframework.messaging.support.MessageBuilder;import org.springframework.stereotype.Component;import java.nio.charset.StandardCharsets;@Componentpublic class EventProducer { //引入spring-kakfa依赖后,SpringBoot就会自动装配KakfaTemplate对象到Ioc容器中 @Resource private KafkaTemplate kafkaTemplate; public void send(){ //先使用最基础的send(String topic, @Nullable V data)方法发送事件:向某个主题发生某个消息 kafkaTemplate.send(\"hello-topic\",\"hello kafka\"); } //使用send(Message message)方法发送事件:向某个主题发生某个消息 public void send2(){ //1.使用MessageBuilder创建消息对象,泛型表示发送的消息是什么类型的 Message message = MessageBuilder.withPayload(\"hello kafka\") //1.1调用这个方法设置要发送的消息 .setHeader(KafkaHeaders.TOPIC,\"test-topic\") //1.2调用这个方法设置消息要发送到test-topic主题 .build(); kafkaTemplate.send(message); } //使用send(ProducerRecord record)方法发送消息 public void send3(){ //Headers对象里面可以放一些信息,到时候消费者接受到消息后,可以拿到此对象中放的那些信息 Headers headers = new RecordHeaders(); headers.add(\"photo\",\"12345678901\".getBytes(StandardCharsets.UTF_8)); headers.add(\"name\",\"张三\".getBytes(StandardCharsets.UTF_8)); //1.创建ProducerRecord对象,代表生产者要发生的一条完整消息,其泛型分别表示消息的key和value的类型 //1.1:构造方法ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable
headers) //1.1.1:ProducerRecord类的构造方法的每个参数分别为向哪个主题的哪个分区发送消息,发送消息时的时间,消息的key和value,消息的headers ProducerRecord record = new ProducerRecord(\"test-topic02\", 0, System.currentTimeMillis(), \"key1\", \"hello kafka\", headers ); kafkaTemplate.send(record); } /* * 使用send(String topic, Integer partition, Long timestamp, K key, @Nullable V data)方法发送消息 * 这个send方法的每个参数代表向topic主题的partition分区发送消息,发送消息的时间,消息的key和value */ public void send4(){ kafkaTemplate.send(\"test-topic02\",0,System.currentTimeMillis(),\"key2\",\"hello kafka\"); } /* * 使用sendDefault(Integer partition, Long timestamp, K key, @Nullable V data)方法发送消息 * 这个sendDefault方法的每个参数代表向yml配置文件中配置的默认主题的partition分区发送消息,发送消息的时间,消息的key和value */ public void send5(){ kafkaTemplate.sendDefault(0,System.currentTimeMillis(),\"key3\",\"hello kafka\"); }}

 kafkaTemplate.send(...) 和 kafkaTemplate.sendDefault(...) 方法发送消息的区别?

主要区别是每次发送消息到Kafka时,是否需要指定主题topic;

1、kafkaTemplate.send(...) 方法需要指定要发送消息的目标主题topic

2、kafkaTemplate.sendDefault() 该方法不需要指定要发送消息的目标主题topic

kafkaTemplate.send(...) 方法适用于需要根据业务逻辑或外部输入动态确定消息目标topic的场景;

kafkaTemplate.sendDefault() 方法适用于总是需要将消息发送到特定默认topic的场景;kafkaTemplate.sendDefault() 是一个便捷方法,它使用配置中指定的默认主题topic来发送消息,如果应用中所有消息都发送到同一个主题时采用该方法非常方便,可以减少代码的重复或满足特定的业务需求;

3.5 获取生产者发送消息后的结果

3.4节介绍的所有send()方法和sendDefault()方法都会返回CompletableFuture<SendResult>类型的对象。

  • CompletableFuture是Java 8中引入的一个类,用于异步编程,它表示一个异步计算的结果,这个特性使得调用者不必等待操作完成就能继续执行其他任务,从而提高了应用程序的响应速度和吞吐量。(CompletableFuture单词含义为未来能够完成的
  • 其中泛型SendResult类封装了kafkaTemplate对象发送消息后的发送结果。

因为调用 kafkaTemplate.send() 方法发送消息时,Kafka需要一些时间来处理该消息(例如:网络延迟、消息序列化、Kafka集群的负载等),如果 send() 方法是同步的,那么发送消息可能会阻塞调用线程,直到消息发送成功或发生错误这会导致应用程序的性能下降,尤其是在高并发场景下。

使用 CompletableFuture类后,kafkaTemplate.send方法发送消息时会立即返回一个表示异步操作结果的未来对象(在消息发送完成后可以从这个对象中获取发送结果),而不是等待操作完成,这样,调用线程可以继续执行其他任务,而不必等待消息发送完成。当消息发送完成时(无论是成功还是失败),CompletableFuture会相应地更新其状态,并允许我们通过回调、阻塞等方式来获取操作结果。

实例:

package com.example.producer;import jakarta.annotation.Resource;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.common.header.Headers;import org.apache.kafka.common.header.internals.RecordHeaders;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.kafka.support.KafkaHeaders;import org.springframework.kafka.support.SendResult;import org.springframework.messaging.Message;import org.springframework.messaging.support.MessageBuilder;import org.springframework.stereotype.Component;import java.nio.charset.StandardCharsets;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;@Componentpublic class EventProducer { //引入spring-kakfa依赖后,SpringBoot就会自动装配KakfaTemplate对象到Ioc容器中 @Resource private KafkaTemplate kafkaTemplate; /* * 阻塞等待的方式获取发送结果,如果还没拿到发送结果就会阻塞等待,直到获取到结果 */ public void send6(){ CompletableFuture<SendResult> completableFuture = kafkaTemplate.sendDefault(0, System.currentTimeMillis(), \"key3\", \"hello kafka\"); //在消息发送完成后可以从这个对象中获取发送结果 try { //1.阻塞等待的方式获取发送结果,如果还没拿到发送结果就会在这里阻塞等待,直到获取到结果 SendResult sendResult = completableFuture.get(); if (sendResult.getRecordMetadata() != null){ //如果SendResult对象中的RecordMetadata对象不为空,就表示消息发送到kafka服务器 System.out.println(\"消息发送成功:\" + sendResult.getRecordMetadata().toString()); } System.out.println(\"producerRecord\" + sendResult.getProducerRecord()); } catch (Exception e) { throw new RuntimeException(e); } } /* * 异步方式获取发送结果: * 使用CompletableFuture对象内的thenAccept(), thenApply(), thenRun()等方法来注册回调函数,回调函数会在CompletableFuture对象完成时被执行 */ public void send7(){ CompletableFuture<SendResult> completableFuture = kafkaTemplate.sendDefault(0, System.currentTimeMillis(), \"key3\", \"hello kafka\"); //在消息发送完成后可以从这个对象中获取发送结果 try { //1.使用非阻塞的方式获取发送消息的结果 completableFuture.thenAccept((t) -> { if (t.getRecordMetadata() != null){  //如果SendResult对象中的RecordMetadata对象不为空,就表示消息发送到kafka服务器  System.out.println(\"消息发送成功:\" + t.getRecordMetadata().toString()); } System.out.println(\"producerRecord\" + t.getProducerRecord()); }).exceptionally((t) -> { //如果发送消息过程中出现异常,就会执行这个回调方法 t.printStackTrace(); return null; }); } catch (Exception e) { throw new RuntimeException(e); } }}

3.6 生产者发送对象类型的消息

生产者类:

package com.example.producer;import com.example.entity.User;import jakarta.annotation.Resource;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.common.header.Headers;import org.apache.kafka.common.header.internals.RecordHeaders;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.kafka.support.KafkaHeaders;import org.springframework.kafka.support.SendResult;import org.springframework.messaging.Message;import org.springframework.messaging.support.MessageBuilder;import org.springframework.stereotype.Component;import java.nio.charset.StandardCharsets;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;@Componentpublic class EventProducer { //引入spring-kakfa依赖后,SpringBoot就会自动装配KakfaTemplate对象到Ioc容器中 @Resource private KafkaTemplate kafkaTemplate; @Resource private KafkaTemplate kafkaTemplate2; public void send8(){ User user = new User(); user.setId(1); user.setName(\"张三\"); user.setAge(18); //分区传null表示让kafka自己选择将消息发到哪个分区 kafkaTemplate2.sendDefault(null, System.currentTimeMillis(), \"key3\", user); }}

测试类:

package com.example;import com.example.producer.EventProducer;import jakarta.annotation.Resource;import org.junit.jupiter.api.Test;import org.springframework.boot.test.context.SpringBootTest;@SpringBootTestclass Springnboot01KafkaBaseApplicationTests { @Resource private EventProducer eventProducer; @Test void test08() { eventProducer.send8(); }}

此时会出现异常:org.apache.kafka.common.errors.SerializationException: Can\'t convert value of class com.example.entity.User to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer

因为kafka默认使用字符串序列化器StringSerializer来序列化消息的key和value,而此时发送过去的消息是一个对象,所以会报错,这个时候需要配置kafka使用的序列化方式:

spring: application: name: springnboot-01-kafka-base #配置afka相关信息 kafka: #kafka的连接地址 bootstrap-servers: 192.168.184.130:9092 #生产者配置信息 producer: key-serializer: org.springframework.kafka.support.serializer.JsonSerializer #指定消息key和消息value的编码(序列化)方式 value-serializer: org.springframework.kafka.support.serializer.JsonSerializer #消费者配置信息 consumer: auto-offset-reset: earliest template: default-topic: default-topic #当代码中没有指定向哪个主题发送消息时,默认使用这里配置的主题

4. Kafka的核心概念

4.1 Replica副本

Replica:副本kafka实现备份功能保证kafka集群中的某个节点发生故障时使该节点上partition中的数据不丢失,且 Kafka仍然能够继续工作,Kafka提供了副本机制,一个topic主题中的每个分区patition都有1个或多个副本;

Replica副本分为Leader ReplicaFollower Replica

  • Leader:一个分区的多个副本中的“主”副本,生产者发送数据以及消费者消费数据,都是来自leader副本
  • Follower:一个分区的多个副本中的“从”副本,实时从leader副本中同步数据,保持和leader副本数据的同步,leader副本发生故障时,某个follower副本会成为新的leader副本

注意:设置副本的个数不能为0,也不能大于节点个数,否则将不能创建Topic;

4.2 创建主题时指定分区和副本

在直接使用send()方法发送消息时,kafka会自动创建topic,这种情况下创建的topic默认只有一个分区,分区只有1个副本,也就是有它自己本身的副本(主副本),没有额外的副本备份;

我们可以在项目中新建一个配置类专门用来初始化topic:

package com.example.config;import org.apache.kafka.clients.admin.NewTopic;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class KafkaConfig { @Bean public NewTopic newTopic(){ //在配置类中配置一个Bean,使用NewTopic创建一个主题,后面项目启动后会加载配置类,然后就会创建这个Bean去创建一个topic //这个构造方法的每个参数为:主题的名称、分区数、每个分区的副本数 return new NewTopic(\"heTopic\", 5, (short) 1); }}

如果要修改分区数,只需修改配置值重启项目即可,修改分区数并不会导致数据的丢失,但是分区数只能增大不能减小,如果主题已经存在并不会重新创建。

4.3 生产者发送消息时的分区策略

生产者发生消息时的分区策略其实就是:控制生产者发送消息时,将消息发送到主题的哪个分区中

生产者发送消息到topic时,Kafka将依据不同的策略将数据分配到不同的分区中:

1、默认分配策略:BuiltInPartitioner 

        有指定消息的key:Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;()

        没有指定消息的key:是使用随机数 % numPartitions

2、轮询分配策略:RoundRobinPartitioner实现类 (实现的接口:Partitioner)

  • 轮流往每个分区中发消息:如何配置:
    • package com.example.config;import org.apache.kafka.clients.admin.NewTopic;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.clients.producer.RoundRobinPartitioner;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.kafka.core.DefaultKafkaProducerFactory;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.kafka.core.ProducerFactory;import java.util.HashMap;import java.util.Map;@Configurationpublic class KafkaConfig { @Value(\"${spring.kafka.bootstrap-servers}\") private String bootstrapServers; @Value(\"${spring.kafka.producer.value-serializer}\") private String valueSerializer; @Value(\"${spring.kafka.producer.key-serializer}\") private String keySerializer; /** * 创建并返回一个包含Kafka生产者相关配置的Map集合 * 随后会根据这个配置来创建生产者工厂 */ public Map producerConfigs(){ Map props = new HashMap(); //指定kafka的连接地址 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); //指定key的序列化器 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer); //指定值的序列化器 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer); //指定分区策略为轮询的方式 props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class); return props; } /** * 创建并返回一个生产者工厂,使用前面定义的producerConfigs()方法提供的配置 * 这个工厂随后会被注入到 KafkaTemplate类的构造方法中,模板通过工厂获取实际的KafkaProducer(Kafka生产者实例) */ public ProducerFactory producerFactory(){ return new DefaultKafkaProducerFactory(producerConfigs()); } /** * KafkaTemplate:用于覆盖默认的KafkaTemplate对象,使用前面创建的生产者工厂构建KafkaTemplate对象 */ @Bean public KafkaTemplate kafkaTemplate(){ return new KafkaTemplate(producerFactory()); } /** * 创建一个主题 */ @Bean public NewTopic newTopic(){ //在配置类中配置一个Bean,使用NewTopic创建一个主题,后面项目启动后会加载配置类,然后就会创建这个Bean去创建一个topic //这个构造方法的每个参数为:主题的名称、分区数、每个分区的副本数 return new NewTopic(\"heTopic\", 9, (short) 1); }}

3、自定义分配策略

定义一个类实现kafka提供的Partitioner接口:

package com.example.config;import org.apache.kafka.clients.producer.Partitioner;import org.apache.kafka.common.Cluster;import java.util.Map;public class CustomPatitioner implements Partitioner { @Override public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) { //在这个方法里面写分区策略的代码即可,其他两个方法可以不用管 return 0; } @Override public void close() { } @Override public void configure(Map map) { }}

之后在配置类中配置一下这个类即可:

package com.example.config;import org.apache.kafka.clients.admin.NewTopic;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.clients.producer.RoundRobinPartitioner;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.kafka.core.DefaultKafkaProducerFactory;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.kafka.core.ProducerFactory;import java.util.HashMap;import java.util.Map;@Configurationpublic class KafkaConfig { @Value(\"${spring.kafka.bootstrap-servers}\") private String bootstrapServers; @Value(\"${spring.kafka.producer.value-serializer}\") private String valueSerializer; @Value(\"${spring.kafka.producer.key-serializer}\") private String keySerializer; /** * 创建并返回一个包含Kafka生产者相关配置的Map集合 */ public Map producerConfigs(){ Map props = new HashMap(); //指定kafka的连接地址 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); //指定key的序列化器 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer); //指定值的序列化器 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer); //指定分区策略为轮询的方式 props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPatitioner.class); return props; } /** * 创建并返回一个生产者工厂,使用前面定义的producerConfigs()方法提供的配置 */ public ProducerFactory producerFactory(){ return new DefaultKafkaProducerFactory(producerConfigs()); } /** * KafkaTemplate:用于覆盖默认的KafkaTemplate对象,使用前面创建的producerFactory()构建 */ @Bean public KafkaTemplate kafkaTemplate(){ return new KafkaTemplate(producerFactory()); } /** * 创建一个主题 */ @Bean public NewTopic newTopic(){ //在配置类中配置一个Bean,使用NewTopic创建一个主题,后面项目启动后会加载配置类,然后就会创建这个Bean去创建一个topic //这个构造方法的每个参数为:主题的名称、分区数、每个分区的副本数 return new NewTopic(\"heTopic\", 9, (short) 1); }}

4.4 生产者发送消息的流程

首先,生产者发消息时,首先会经过拦截器(可以有0个到多个拦截器,默认是没有拦截器的),然后会走序列化器,分别对消息的key和value进行序列化,默认使用字符串序列化器,然后走分区器(消息会发到哪个分区),最后才会把消息发到topic中。

4.4.1 拦截生产者发送的消息

1、自定义生产者拦截器,拦截生产者发送的消息,实现ProducerInterceptor接口,这个接口的泛型分别表示消息的key和value是什么类型:

package com.example.config;import org.apache.kafka.clients.producer.ProducerInterceptor;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Map;public class CustomerProducerInterceptor implements ProducerInterceptor { /** * 发送消息时会先调用此方法,对消息进行拦截,可以对消息进行一些处理,比如记录日志等 * @param producerRecord 拦截到的完整的消息对象 * @return 返回原本的完整消息对象,继续传给下面的方法 */ @Override public ProducerRecord onSend(ProducerRecord producerRecord) { System.out.println(\"拦截消息:\" + producerRecord.toString()); return producerRecord; } /** * 生产者发送消息给kafka服务器,服务器会返回一个响应,表示服务器是否收到这个消息 * @param recordMetadata 如果服务器收到消息,就可以从RecordMetadata对象中拿到偏移量等信息 * @param e 如果没有收到消息,RecordMetadata对象就会为null,会出现异常信息 */ @Override public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) { if (recordMetadata != null){ System.out.println(\"服务器收到消息:\" + recordMetadata.offset()); }else { System.out.println(\"消息发送失败:\" + e.getMessage()); } } @Override public void close() { } @Override public void configure(Map map) { }}

2、消息拦截器定义好后,还需要在生产者配置中配置一下拦截器让其生效:

package com.example.config;import org.apache.kafka.clients.admin.NewTopic;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.clients.producer.RoundRobinPartitioner;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.kafka.core.DefaultKafkaProducerFactory;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.kafka.core.ProducerFactory;import java.util.HashMap;import java.util.Map;@Configurationpublic class KafkaConfig { @Value(\"${spring.kafka.bootstrap-servers}\") private String bootstrapServers; @Value(\"${spring.kafka.producer.value-serializer}\") private String valueSerializer; @Value(\"${spring.kafka.producer.key-serializer}\") private String keySerializer; /** * 创建并返回一个包含Kafka生产者相关配置的Map集合 */ public Map producerConfigs(){ Map props = new HashMap(); //指定kafka的连接地址 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); //指定key的序列化器 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer); //指定值的序列化器 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer); //指定分区策略为轮询的方式 props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class); //添加自定义的拦截器 props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomerProducerInterceptor.class.getName()); return props; } /** * 创建并返回一个生产者工厂,使用前面定义的producerConfigs()方法提供的配置 */ public ProducerFactory producerFactory(){ return new DefaultKafkaProducerFactory(producerConfigs()); } /** * KafkaTemplate:用于覆盖默认的KafkaTemplate对象,使用前面创建的producerFactory()构建 */ @Bean public KafkaTemplate kafkaTemplate(){ return new KafkaTemplate(producerFactory()); } /** * 创建一个主题 */ @Bean public NewTopic newTopic(){ //在配置类中配置一个Bean,使用NewTopic创建一个主题,后面项目启动后会加载配置类,然后就会创建这个Bean去创建一个topic //这个构造方法的每个参数为:主题的名称、分区数、每个分区的副本数 return new NewTopic(\"heTopic\", 9, (short) 1); }}

之后使用生产者发送消息:
 

 public void send8(){ User user = new User(); user.setId(1); user.setName(\"张三\"); user.setAge(18); //分区传null表示让kafka自己选择将消息发到哪个分区 kafkaTemplate2.send(\"heTopic\", user); } @Test void test08() { eventProducer.send8(); }

控制台输出:

拦截消息:ProducerRecord(topic=heTopic, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=null, value=com.example.entity.User@3688baab, timestamp=null)2025-08-05T00:52:57.036+08:00 INFO 20980 --- [springnboot-01-kafka-base] [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Cluster ID: Some(5L6g3nShT-eMCtK--X86sw)2025-08-05T00:52:57.036+08:00 INFO 20980 --- [springnboot-01-kafka-base] [ad | producer-1] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-1] ProducerId set to 2 with epoch 0服务器收到消息:2

5. 消费者消费消息详解

5.1 获取生产者发送的消息

package com.example.springboot02kafkabase.consumer;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.kafka.support.KafkaHeaders;import org.springframework.messaging.handler.annotation.Header;import org.springframework.messaging.handler.annotation.Payload;import org.springframework.stereotype.Component;@Componentpublic class EventConsumer { @KafkaListener(topics = \"helloTopic\",groupId = \"helloGroup\") public void onEvent(@Payload String event, @Header(value = KafkaHeaders.RECEIVED_TOPIC) String topic, //@Header(value = KafkaHeaders.RECEIVED_KEY) String key, @Header(value = KafkaHeaders.RECEIVED_PARTITION) String partition){ System.out.println(\"读取到事件:\" + event + \",topic:\" + topic + \",partition:\" + partition); }}

代码讲解:

  • @KafkaListener: Spring Kafka提供的注解,标注在方法上,用于声明Kafka的消费者

  • @Payload: 标识此形参是用于获取消息体内容的(即消息的实际内容),并注入到方法形参中,而不是消息头或其他元数据。

  • @Header注解:标识此形参是用于从消息头中获取指定值的,并注入到方法的形参中

  • KafkaHeaders类: 包含Kafka消息头常量的类

    • 常用常量

      常量 说明 示例值 RECEIVED_TOPIC 消息来源的主题 \"test-topic\" RECEIVED_PARTITION 消息来源的分区ID 2 RECEIVED_KEY 消息的键 \"user-123\" RECEIVED_TIMESTAMP 消息的时间戳 1625097600000 OFFSET 消息的偏移量 15 GROUP_ID 消费者组ID \"my-group\"

使用ConsumerRecord来接收消息:

package com.example.springboot02kafkabase.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Component;@Componentpublic class EventConsumer { @KafkaListener(topics = \"helloTopic\",groupId = \"helloGroup\") public void onEvent(ConsumerRecord consumerRecord){ System.out.println(\"consumerRecord:\" + consumerRecord.toString()); System.out.println(\"key:\" + consumerRecord.key()); System.out.println(\"value:\" + consumerRecord.value()); System.out.println(\"topic:\" + consumerRecord.topic()); }}

生产者发送消息后的控制台:

ConsumerRecord讲解:

  • 作用: 此对象表示消费者接收到的单条完整消息记录,用于消费者接收消息通过这个对象可以获取到最完整的消息数据,一次性访问消息的所有数据。

  • 泛型参数:

    • 第一个 String: 消息键(key)的类型

    • 第二个 String: 消息值(value)的类型

  • 主要方法

    方法 返回类型 说明 key() String 获取消息的键 value() String 获取消息的值(正文内容) topic() String 获取消息所属主题 partition() int 获取消息所在分区ID offset() long 获取消息在分区中的偏移量 timestamp() long 获取消息时间戳 headers() Headers 获取消息头信息

配置消费者监听的主题和指定消费者组的时候可以不写死,而是写在配置文件:

spring: application: name: springnboot-02-kafka-base #配置afka相关信息 kafka: #kafka的连接地址 bootstrap-servers: 192.168.184.130:9092 #生产者配置信息 #producer: #消费者配置信息 #consumer:#自定义配置kafka: topic: name: helloTopic consumer: group: helloGroup
package com.example.springboot02kafkabase.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Component;@Componentpublic class EventConsumer { @KafkaListener(topics = \"${kafka.topic.name}\",groupId = \"${kafka.consumer.group}\") public void onEvent(ConsumerRecord consumerRecord){ System.out.println(\"consumerRecord:\" + consumerRecord.toString()); System.out.println(\"key:\" + consumerRecord.key()); System.out.println(\"value:\" + consumerRecord.value()); System.out.println(\"topic:\" + consumerRecord.topic()); }}

5.2 开启消费者手动确认消息模式

1、首先在SpringBoot配置文件中开启消费者手动确认模式:

spring: application: name: springnboot-02-kafka-base #配置afka相关信息 kafka: #kafka的连接地址 bootstrap-servers: 192.168.184.130:9092 #生产者配置信息 #producer: #消费者配置信息 #consumer: #配置消息监听器 listener: ack-mode: manual #开启消息监听的手动确认模式#自定义配置kafka: topic: name: helloTopic consumer: group: helloGroup

2、然后在消费者方法形参上添加Acknowledgment对象,最后在代码中使用Acknowledgment对象调用其acknowledge()方法手动确认消息:

package com.example.springboot02kafkabase.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.kafka.support.Acknowledgment;import org.springframework.kafka.support.KafkaHeaders;import org.springframework.messaging.handler.annotation.Header;import org.springframework.messaging.handler.annotation.Payload;import org.springframework.stereotype.Component;@Componentpublic class EventConsumer { @KafkaListener(topics = \"${kafka.topic.name}\",groupId = \"${kafka.consumer.group}\") public void onEvent(@Header(value = KafkaHeaders.RECEIVED_TOPIC) String topic, @Header(value = KafkaHeaders.RECEIVED_PARTITION) String partition, @Payload ConsumerRecord consumerRecord, Acknowledgment ack){ System.out.println(\"消费者接收到的单条消息记录:\" + consumerRecord.toString()); ack.acknowledge(); //手动确认消息,告诉kafka服务器,该消息已经确认收到,默认情况下kafka是自动确认的 }}

注意:

  • 如果消费者不确认收到消息,kafka服务器会认为消息没有收到,相当于消息没有被消费,会造成重复消费消息;
  • 原理就是消费者确认消息后,kafka会将offset偏移量更新,如果不确认收到消息偏移量就没有更新,所以会重复消费消息。

实际的使用实例:

默认情况下,Kafka消费者消费消息后,会自动发送确认信息给Kafka服务器,表示消息已经被成功消费。但在某些场景下,我们希望在消息处理成功后(也就是业务处理成功)再发送确认收到消息,以便Kafka能够重新发送该消息。

下面代码中,如果业务处理中发送异常,消费者就不会确认收到消息,消费者还可以再次接收到刚刚处理失败的那条消息。

package com.example.springboot02kafkabase.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.kafka.support.Acknowledgment;import org.springframework.kafka.support.KafkaHeaders;import org.springframework.messaging.handler.annotation.Header;import org.springframework.messaging.handler.annotation.Payload;import org.springframework.stereotype.Component;@Componentpublic class EventConsumer { @KafkaListener(topics = \"${kafka.topic.name}\",groupId = \"${kafka.consumer.group}\") public void onEvent(@Header(value = KafkaHeaders.RECEIVED_TOPIC) String topic, @Header(value = KafkaHeaders.RECEIVED_PARTITION) String partition, @Payload ConsumerRecord consumerRecord, Acknowledgment ack){ try{ //-----------业务代码开始------------- System.out.println(\"消费者接收到的单条消息记录:\" + consumerRecord.toString()); //-----------业务代码结束------------- //业务处理完成后,消费者告诉kafka服务器收到消息 ack.acknowledge(); }catch (Exception e){ e.printStackTrace(); } }}

5.3 消费者接收消息的时候指定topic、partition、offset进行消费

application.yml

spring: application: name: springnboot-02-kafka-base #配置afka相关信息 kafka: #kafka的连接地址 bootstrap-servers: 192.168.184.130:9092 #生产者配置信息 #producer: #消费者配置信息 consumer: auto-offset-reset: earliest #配置消息监听器 listener: ack-mode: manual #开启消息监听的手动确认模式#自定义配置kafka: topic: name: helloTopic consumer: group: helloGroup

消费者:

package com.example.springboot02kafkabase.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.kafka.annotation.PartitionOffset;import org.springframework.kafka.annotation.TopicPartition;import org.springframework.kafka.support.Acknowledgment;import org.springframework.kafka.support.KafkaHeaders;import org.springframework.messaging.handler.annotation.Header;import org.springframework.messaging.handler.annotation.Payload;import org.springframework.stereotype.Component;@Componentpublic class EventConsumer { @KafkaListener(groupId = \"${kafka.consumer.group}\", topicPartitions = {@TopicPartition( topic = \"${kafka.topic.name}\", //指定监听的topic partitions = {\"0\",\"1\",\"2\"}, //指定监听的分区,设置从哪个偏移量开始消费  //3分区和4分区的从3和4偏移量后面的数据开始消费 partitionOffsets = {@PartitionOffset(partition = \"3\",initialOffset = \"3\"),@PartitionOffset(partition = \"4\",initialOffset = \"4\")} )}) public void onEvent(@Header(value = KafkaHeaders.RECEIVED_TOPIC) String topic, @Header(value = KafkaHeaders.RECEIVED_PARTITION) String partition, @Payload ConsumerRecord consumerRecord, Acknowledgment ack){ try{ //-----------业务代码开始------------- System.out.println(\"消费者接收到的单条消息记录:\" + consumerRecord.toString()); //-----------业务代码结束------------- //业务处理完成后,消费者告诉kafka服务器收到消息 ack.acknowledge(); }catch (Exception e){ e.printStackTrace(); } }}

kafka配置类:
 

package com.example.springboot02kafkabase.config;import org.apache.kafka.clients.admin.NewTopic;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class KafkaConfig { @Bean public NewTopic newTopic(){ return new NewTopic(\"helloTopic\",5, (short) 1); }}

生产者:

package com.example.springboot02kafkabase.producer;import jakarta.annotation.Resource;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.stereotype.Component;@Componentpublic class EventProducer { //引入spring-kakfa依赖后,SpringBoot就会自动装配KakfaTemplate对象到Ioc容器中 @Resource private KafkaTemplate kafkaTemplate; public void send(){ for (int i = 0; i < 25; i++) { //指定消息的key,让消息可以发送到不同的分区中 kafkaTemplate.send(\"helloTopic\",\"k\" + i,\"hello kafka\"); } }}

测试类:

package com.example.springboot02kafkabase;import com.example.springboot02kafkabase.producer.EventProducer;import jakarta.annotation.Resource;import org.junit.jupiter.api.Test;import org.springframework.boot.test.context.SpringBootTest;@SpringBootTestclass Springboot02KafkaBaseApplicationTests { @Resource private EventProducer eventProducer; @Test public void test01(){ eventProducer.send(); }}

运行单元测试后的控制台:

5.4 开启消费者批量消费消息

1、配置application.yml开启批量消费:

spring: application: name: springnboot-02-kafka-base #配置afka相关信息 kafka: #kafka的连接地址 bootstrap-servers: 192.168.184.130:9092 #生产者配置信息 #producer: #消费者配置信息 consumer: auto-offset-reset: earliest max-poll-records: 20 #每次最多批量消费的消息数量 #配置消息监听器 listener: type: batch #配置为batch,表示批量消费消息#自定义配置kafka: topic: name: helloTopic consumer: group: helloGroup

2、消费者接收消息时用List集合来接收

package com.example.springboot02kafkabase.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.kafka.annotation.PartitionOffset;import org.springframework.kafka.annotation.TopicPartition;import org.springframework.kafka.support.Acknowledgment;import org.springframework.kafka.support.KafkaHeaders;import org.springframework.messaging.handler.annotation.Header;import org.springframework.messaging.handler.annotation.Payload;import org.springframework.stereotype.Component;import java.util.List;@Componentpublic class EventConsumer { @KafkaListener(topics = {\"batchTopic\"},groupId = \"batchGroup\") public void onEvent(List<ConsumerRecord> consumerRecordList){ try{ //-----------业务代码开始------------- System.out.println(\"批量消费消息,消息总数=\" + consumerRecordList.size() + \",consumerRecordList=\" + consumerRecordList); //-----------业务代码结束------------- }catch (Exception e){ e.printStackTrace(); } }}

其余代码:
生产者

package com.example.springboot02kafkabase.producer;import jakarta.annotation.Resource;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.stereotype.Component;@Componentpublic class EventProducer { //引入spring-kakfa依赖后,SpringBoot就会自动装配KakfaTemplate对象到Ioc容器中 @Resource private KafkaTemplate kafkaTemplate; public void send(){ for (int i = 0; i < 125; i++) { //指定消息的key,让消息可以发送到不同的分区中 kafkaTemplate.send(\"batchTopic\",\"k\" + i,\"hello kafka\"); } }}

测试类:

package com.example.springboot02kafkabase;import com.example.springboot02kafkabase.producer.EventProducer;import jakarta.annotation.Resource;import org.junit.jupiter.api.Test;import org.springframework.boot.test.context.SpringBootTest;@SpringBootTestclass Springboot02KafkaBaseApplicationTests { @Resource private EventProducer eventProducer; @Test public void test01(){ eventProducer.send(); }}

运行单元测试后,可以看到服务的控制台,消费者一次接收20条消息到List集合中:

5.5 消费消息时对消息进行拦截

在消费者消费消息之前,通过配置拦截器可以对消息进行拦截,在消息被实际处理之前对其进行一些操作,例如记录日志、修改消息内容或执行一些安全检查等。

1、自定义一个类实现kafka的ConsumerInterceptor拦截器接口:

package com.example.springboot02kafkabase.config.interceptor;import org.apache.kafka.clients.consumer.ConsumerInterceptor;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.OffsetAndMetadata;import org.apache.kafka.common.TopicPartition;import java.util.Map;public class CustomConsumerInterceptor implements ConsumerInterceptor { /** * 在消费消息之前执行该方法 * @param consumerRecords * @return */ @Override public ConsumerRecords onConsume(ConsumerRecords consumerRecords) { System.out.println(\"onConsume方法执行\" + consumerRecords); return consumerRecords; } /** * 在消息拿到之后,提交offset之前执行该方法 * @param map */ @Override public void onCommit(Map map) { System.out.println(\"onCommit方法执行\" + map); } @Override public void close() { } @Override public void configure(Map map) { }}

2、在Kafka消费者的ConsumerFactory配置中注册这个拦截器:

package com.example.springboot02kafkabase.config;import com.example.springboot02kafkabase.interceptor.CustomConsumerInterceptor;import org.apache.kafka.clients.admin.NewTopic;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.clients.producer.RoundRobinPartitioner;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;import org.springframework.kafka.config.KafkaListenerContainerFactory;import org.springframework.kafka.core.ConsumerFactory;import org.springframework.kafka.core.DefaultKafkaConsumerFactory;import org.springframework.kafka.core.DefaultKafkaProducerFactory;import org.springframework.kafka.core.ProducerFactory;import java.util.HashMap;import java.util.Map;@Configurationpublic class KafkaConfig { @Value(\"${spring.kafka.bootstrap-servers}\") private String bootstrapServers; @Value(\"${spring.kafka.consumer.value-deserializer}\") private String valueSerializer; @Value(\"${spring.kafka.consumer.key-deserializer}\") private String keySerializer; /** * 创建并返回一个包含Kafka消费者相关配置的Map集合 */ public Map consumerConfigs(){ Map props = new HashMap(); //指定kafka的连接地址 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); //指定key的序列化器 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keySerializer); //指定值的序列化器 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueSerializer); //添加自定义的消费者拦截器 props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomConsumerInterceptor.class.getName()); return props; } /** * 创建并返回一个消费者工厂,使用前面定义的consumerConfigs()方法提供的配置 */ @Bean public ConsumerFactory myConsumerFactory(){ return new DefaultKafkaConsumerFactory(consumerConfigs()); } /** * 创建并返回一个Kafka监听器工厂,使用前面定义的消费者工厂 * @param myConsumerFactory * @return */ @Bean public KafkaListenerContainerFactory myKafkaListenerContainerFactory(ConsumerFactory myConsumerFactory){ ConcurrentKafkaListenerContainerFactory listenerContainerFactory = new ConcurrentKafkaListenerContainerFactory(); listenerContainerFactory.setConsumerFactory(myConsumerFactory); return listenerContainerFactory; }}

3、编写消费者:

package com.example.springboot02kafkabase.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Component;@Componentpublic class EventConsumer { @KafkaListener(topics = {\"batchTopic\"},groupId = \"batchGroup\",containerFactory = \"myKafkaListenerContainerFactory\") public void onEvent(ConsumerRecord consumerRecord){ try{ //-----------业务代码开始------------- System.out.println(\"消费消息,consumerRecord=\" + consumerRecord); //-----------业务代码结束------------- }catch (Exception e){ e.printStackTrace(); } }}

生产者发送消息:
 

package com.example.springboot02kafkabase.producer;import jakarta.annotation.Resource;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.stereotype.Component;@Componentpublic class EventProducer { //引入spring-kakfa依赖后,SpringBoot就会自动装配KakfaTemplate对象到Ioc容器中 @Resource private KafkaTemplate kafkaTemplate; public void send(){ //指定消息的key,让消息可以发送到不同的分区中 kafkaTemplate.send(\"batchTopic\",\"hello kafka\"); }}

application.yml

spring: application: name: springnboot-02-kafka-base #配置afka相关信息 kafka: #kafka的连接地址 bootstrap-servers: 192.168.184.131:9092 #生产者配置信息 #producer: #消费者配置信息 consumer: auto-offset-reset: earliest max-poll-records: 20 #消费者每次最多批量消费的消息数量 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #配置消息监听器 listener: type: batch #配置为batch,表示批量消费消息#自定义配置kafka: topic: name: helloTopic consumer: group: helloGroup

运行测试类:

package com.example.springboot02kafkabase;import com.example.springboot02kafkabase.producer.EventProducer;import jakarta.annotation.Resource;import org.junit.jupiter.api.Test;import org.springframework.boot.test.context.SpringBootTest;@SpringBootTestclass Springboot02KafkaBaseApplicationTests { @Resource private EventProducer eventProducer; @Test public void test01(){ eventProducer.send(); }}

查看控制台可以发送消费者拦截器成功拦截到消息并处理:

5.6 消息转发

消息转发就是应用A的消费者从TopicA接收到消息经过处理后,再转发到TopicB,再由应用B的消费者监听接收该消息,即一个应用处理完成后将该消息转发至其他应用处理,这在实际开发中,是可能存在这样的需求的。

消费者:

package com.example.springboot02kafkabase.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.messaging.handler.annotation.SendTo;import org.springframework.stereotype.Component;@Componentpublic class EventConsumer { @KafkaListener(topics = {\"topicA\"},groupId = \"aGroup\") @SendTo(value = \"topicB\") //指定要转发到的主题 public String onEventA(ConsumerRecord consumerRecord){ System.out.println(\"消费消息A,consumerRecord=\" + consumerRecord); return consumerRecord.value() + \"---forward message\"; //需要将消息返回 } @KafkaListener(topics = {\"topicB\"},groupId = \"bGroup\") public void onEventB(ConsumerRecord consumerRecord){ System.out.println(\"消费消息B,consumerRecord=\" + consumerRecord); }}

生产者:

package com.example.springboot02kafkabase.producer;import jakarta.annotation.Resource;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.stereotype.Component;@Componentpublic class EventProducer { //引入spring-kakfa依赖后,SpringBoot就会自动装配KakfaTemplate对象到Ioc容器中 @Resource private KafkaTemplate kafkaTemplate; public void send(){ //向TopicA发送消息hello kafka kafkaTemplate.send(\"topicA\",\"hello kafka\"); }}

application.yml:

spring: application: name: springnboot-02-kafka-base #配置afka相关信息 kafka: #kafka的连接地址 bootstrap-servers: 192.168.184.131:9092 #生产者配置信息 #producer: #消费者配置信息 consumer: auto-offset-reset: earliest max-poll-records: 20 #消费者每次最多批量消费的消息数量 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer#自定义配置kafka: topic: name: helloTopic consumer: group: helloGroup

5.7 消费者消费消息时的分区策略

Kafka消费者消费消息的分区策略:指定消费者去消费消息时,消费topic中的哪些分区(指定topic中的哪些分区由哪些消费者来消费)

Kafka有多种分区分配策略,默认的分区分配策略是RangeAssignor,除了RangeAssignor策略外,Kafka还有其他分区分配策略:

  • RoundRobinAssignor
  • StickyAssignor
  • CooperativeStickyAssignor

这些策略各有特点,可以根据实际的应用场景和需求来选择适合的分区分配策略。

5.7.1 RangeAssignor分区分配策略

Kafka消费者默认的消费消息分区策略是:RangeAssignor(范围分配策略),假设如下:

  • 一个主题myTopic有10个分区;(p0 - p9
  • 一个消费者组内有3个消费者:consumer1、consumer2、consumer3;

RangeAssignor消费分区策略原理:

1、先计算每个消费者应得的分区数:主题中分区的总数(10)/  消费者组中消费者数量(3)= 3 ... 余1

  • 每个消费者理论上应该得到3个分区,但由于有余数1,所以第1个消费者会多得到一个分区
  • consumer1(作为第一个消费者)将得到 3 + 1 = 4 个分区
  • consumer2 consumer3 将各得到 3 个分区

2、再按照分区编号顺序为消费者分配分区:

  • consumer1 将分配得到分区 0、1、2、3;
  • consumer2 将分配得到分区 4、5、6;
  • consumer3 将分配得到分区 7、8、9;

总结:RangeAssignor策略是根据消费者组内的消费者数量和主题的分区数量,来均匀地为每个消费者分配分区。

RangeAssignor分区分配策略的例子:

KafkaConfig:

package com.example.springboot02kafkabase.config;import org.apache.kafka.clients.admin.NewTopic;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class KafkaConfig { @Bean public NewTopic newTopic(){ return new NewTopic(\"myTopic\", 10, (short) 1); }}

消费者:

package com.example.springboot02kafkabase.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Component;@Componentpublic class EventConsumer { /** * concurrency属性:为同一个 @KafkaListener 创建多个消费者实例(线程),并并行消费同一个Topic中的分区的消息 * @param consumerRecord */ @KafkaListener(topics = {\"myTopic\"},groupId = \"myGroup\",concurrency = \"3\") public void onEventB(ConsumerRecord consumerRecord){ System.out.println(Thread.currentThread().getName() + \"---消费消息,consumerRecord=\" + consumerRecord); }}

生产者:

package com.example.springboot02kafkabase.producer;import jakarta.annotation.Resource;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.stereotype.Component;@Componentpublic class EventProducer { //引入spring-kakfa依赖后,SpringBoot就会自动装配KakfaTemplate对象到Ioc容器中 @Resource private KafkaTemplate kafkaTemplate; public void send(){ //向TopicA发送消息hello kafka for (int i = 0; i < 100; i++) { kafkaTemplate.send(\"myTopic\",\"k\" + i,\"hello kafka\"); } }}

测试类:

package com.example.springboot02kafkabase;import com.example.springboot02kafkabase.producer.EventProducer;import jakarta.annotation.Resource;import org.junit.jupiter.api.Test;import org.springframework.boot.test.context.SpringBootTest;@SpringBootTestclass Springboot02KafkaBaseApplicationTests { @Resource private EventProducer eventProducer; @Test public void test01(){ eventProducer.send(); }}

最后在控制台输出中可以发现每个消费者都按默认的分区策略在分配到的分区中消费消息:

5.7.2 RoundRobinAssignor分区分配策略

继续以前面的例子数据,采用RoundRobinAssignor轮询分区分配策略进行测试

轮询的消息消费分区分配策略就是:每个消费者轮流的按顺序消费每个分区

1、在配置类中去指定分区分配策略:

package com.example.springboot02kafkabase.config;import org.apache.kafka.clients.admin.NewTopic;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.RoundRobinAssignor;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;import org.springframework.kafka.config.KafkaListenerContainerFactory;import org.springframework.kafka.core.ConsumerFactory;import org.springframework.kafka.core.DefaultKafkaConsumerFactory;import java.util.HashMap;import java.util.Map;@Configurationpublic class KafkaConfig { @Value(\"${spring.kafka.bootstrap-servers}\") private String bootstrapServers; @Value(\"${spring.kafka.consumer.value-deserializer}\") private String valueSerializer; @Value(\"${spring.kafka.consumer.key-deserializer}\") private String keySerializer; @Value(\"${spring.kafka.consumer.auto-offset-reset}\") private String autoOffsetReset; @Bean public NewTopic newTopic(){ return new NewTopic(\"myTopic\", 10, (short) 1); } /** * 创建并返回一个包含Kafka消费者相关配置的Map集合 */ public Map consumerConfigs(){ Map props = new HashMap(); //指定kafka的连接地址 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); //指定key的序列化器 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keySerializer); //指定值的序列化器 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueSerializer); //指定消费者偏移量策略 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); //指定消息消费分区器,这里指定为轮询的消息消费分区器 props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName()); return props; } /** * 创建并返回一个消费者工厂,使用前面定义的consumerConfigs()方法提供的配置 */ @Bean public ConsumerFactory myConsumerFactory(){ return new DefaultKafkaConsumerFactory(consumerConfigs()); } /** * 创建并返回一个Kafka监听器工厂,使用前面定义的消费者工厂 * @param myConsumerFactory * @return */ @Bean public KafkaListenerContainerFactory myKafkaListenerContainerFactory(ConsumerFactory myConsumerFactory){ ConcurrentKafkaListenerContainerFactory listenerContainerFactory = new ConcurrentKafkaListenerContainerFactory(); listenerContainerFactory.setConsumerFactory(myConsumerFactory); return listenerContainerFactory; }}

消费者:

package com.example.springboot02kafkabase.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Component;@Componentpublic class EventConsumer { /** * concurrency属性:为同一个 @KafkaListener 创建多个消费者实例(线程),并并行消费同一个Topic中的分区的消息 * @param consumerRecord */ @KafkaListener(topics = {\"myTopic\"},groupId = \"myGroup\",concurrency = \"3\",containerFactory = \"myKafkaListenerContainerFactory\") public void onEventB(ConsumerRecord consumerRecord){ System.out.println(Thread.currentThread().getName() + \"---消费消息,consumerRecord=\" + consumerRecord); }}

生产者:

package com.example.springboot02kafkabase.producer;import jakarta.annotation.Resource;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.stereotype.Component;@Componentpublic class EventProducer { //引入spring-kakfa依赖后,SpringBoot就会自动装配KakfaTemplate对象到Ioc容器中 @Resource private KafkaTemplate kafkaTemplate; public void send(){ //向TopicA发送消息hello kafka for (int i = 0; i < 100; i++) { kafkaTemplate.send(\"myTopic\",\"k\" + i,\"hello kafka\"); } }}

application:

spring: application: name: springnboot-02-kafka-base #配置afka相关信息 kafka: #kafka的连接地址 bootstrap-servers: 192.168.184.131:9092 #生产者配置信息 #producer: #消费者配置信息 consumer: key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer auto-offset-reset: earliest

测试类:

package com.example.springboot02kafkabase;import com.example.springboot02kafkabase.producer.EventProducer;import jakarta.annotation.Resource;import org.junit.jupiter.api.Test;import org.springframework.boot.test.context.SpringBootTest;@SpringBootTestclass Springboot02KafkaBaseApplicationTests { @Resource private EventProducer eventProducer; @Test public void test01(){ eventProducer.send(); }}

5.7.3 StickyAssignor消费分区分配策略

尽可能保持消费者与分区之间的分配关系不变,即使消费者组中消费者成员发生变化,减少不必要的分区重分配;

尽量保持现有的分区分配不变,对新加入的消费者或离开的消费者进行分区调整。这样,大多数消费者可以继续消费它们之前消费的分区,只有少数消费者需要处理额外的分区;所以叫“粘性”分配;

5.7.4 CooperativeStickyAssignor消费分区策略

与 StickyAssignor 类似,但增加了对协作式重新平衡的支持,即消费者可以在它离开消费者组之前通知协调器,以便协调器可以预先计划分区迁移,而不是在消费者突然离开时立即进行分区重分配;

6. Kafka事件(消息、数据)的存储

  1. kafka中的所有事件(消息、数据)都存储在/tmp/kafka-logs目录中,可以通过kafka的server.properties配置文件中的配置项log.dirs=/tmp/kafka-logs配置事件存储的目录。
  2. Kafka主题的所有事件(消息、数据)都是以日志文件的方式来保存,某个主题的某个分区可以看成就是一个目录,专门存储该主题该分区的数据
  3. Kafka一般都是海量的消息数据,为了避免日志文件过大,日志文件被存放在多个日志目录下,日志目录的命名规则为:-
  4. 比如创建一个名为 firstTopic 的 topic,其中有 3 个 partition,那么在 kafka 的数据目录/tmp/kafka-log下就有 3 个目录,firstTopic-0、firstTopic-1、firstTopic-2
    1. 00000000000000000000.index  消息索引文件
    2. 00000000000000000000.log  消息数据文件,也就是发送的实际消息内容存储在这个文件中
    3. 00000000000000000000.timeindex  消息的时间戳索引文件
    4. 00000000000000000006.snapshot  快照文件,生产者发生故障或重启时能够恢复并继续之前的操作
    5. leader-epoch-checkpoint  记录每个分区当前领导者的epoch以及领导者开始写入消息时的起始偏移量
    6. partition.metadata  存储关于特定分区的元数据(metadata)信息

每次消费者消费一个消息并且提交以后,会保存当前消费到的最近的一个offset,在kafka中,内置了一个名叫__consumer_offsets的topic, 消费者提交的offset信都息会写入到该topic中,__consumer_offsets保存了每个consumer group某一时刻提交的offset信息,__consumer_offsets默认有50个分区;

consumer_group在消费消息后提交的偏移量保存在__consumer_offsets的哪个分区中的计算公式:Math.abs(“groupid”.hashCode())%groupMetadataTopicPartitionCount ;

7. Offset详解

offset就是消息的唯一标识,或者说是消息的位置

7.1 生产者Offset

生产者发送一条消息到Kafka broker的某个topic下的某个partition中。

生产者offset:生产者每发送一条消息,Kafka内部都会为其分配一个唯一offset(从0开始顺序增长),该offset是该消息在partition中的位置。

代码演示:

消费者:

package com.example.springboot02kafkabase.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Component;@Componentpublic class EventConsumer { @KafkaListener(topics = {\"offsetTopic\"},groupId = \"offsetGroup\") public void onEventB(ConsumerRecord consumerRecord){ System.out.println(Thread.currentThread().getName() + \"---消费消息,consumerRecord=\" + consumerRecord); }}

生产者:

package com.example.springboot02kafkabase.producer;import jakarta.annotation.Resource;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.stereotype.Component;@Componentpublic class EventProducer { //引入spring-kakfa依赖后,SpringBoot就会自动装配KakfaTemplate对象到Ioc容器中 @Resource private KafkaTemplate kafkaTemplate; public void send(){ //向TopicA发送消息hello kafka for (int i = 0; i < 2; i++) { kafkaTemplate.send(\"offsetTopic\",\"k\" + i,\"hello kafka\"); } }}

发送消息后通过kafka客户端工具查看,因为我执行了三次生产者代码。所以一共发了六条消息:

  • 通过第一张图可以看到,offsetTopic中的0号分区的开始偏移量是0,结束偏移量是6,现在偏移量的位置是在6(也就是生产者的偏移量现在是最后一条消息的位置的下一个位置,下次放消息就放到6号位置)。

7.2 消费者Offset

消费者offset:记录消费者在某个分区中将要消费哪个位置上的消息。

每个消费者组(Consumer Group)中的每个消费者都会独立维护自己所消费分区的offset,当消费者从某个partition读取消息时,它会记录当前读取到的offset,这样,即使消费者崩溃或重启,它也可以从上次读取的offset位置继续消费,而不会重复消费或遗漏消息;(注意:消费者的offset需要消费消息并提交后才记录offset)。

每个消费者组启动开始监听消息的时候,默认是从消息的最新位置(也就是生产者offset)开始监听消息,即把最新位置作为消费者的offset

  • 启动消费者开始监听的时候,如果分区中还没有生产者发送过消息,则最新的消费者偏移量就是0,从最新的位置开始消费消息     (情况:如果之后出现宕机就算先用生产者发送消息,消费者还是从0开始消费消息,因为最开始启动的时候已经记录消费者的偏移量是0了)
  • 启动消费者开始监听的时候,如果分区中已经有生产者发送过消息,则最新的消费者偏移量就是生产者的offset,在消费者开始监听之前发送的消息默认是消费不了的

消费者消费消息后,如果不提交确认(ack),则消费者offset不更新,提交了才更新

查看消费者组的详情命令行命令:./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group osGroup --describe

结论:消费者从什么位置开始消费,就看消费者的offset是多少,消费者offset是多少,它启动后,可以通过上面的命令查看;

8. Kafka集群

8.0 Kafka的集群机制

1、Kafka是天然支持集群的,哪怕是一个节点实际上也是集群模式

2、Kafka集群依赖于zookeeper进行协调,并且在早期的Kafka版本中很多数据都是存放在Zookeeper中的

3、Kafka节点只要注册到同一个Zookeeper上就代表它们是同一个集群的

4、Kafka通过brokerId(kafka节点的id)来区分集群中的不同节点

8.1 Kafka集群搭建

一种是基于Zookeeper的方式搭建集群

一种是基于Kraft的方式搭建集群

8.1.1 kafka集群搭建(基于Zookeeper方式)

1、kafka是一个压缩包,直接解压即可使用,所以我们就解压三个kafka(在三台机器上分别安装三个kafka,或在一台机器上安装三个kafka,用不同的目录、不同的端口)

2、配置kafka的这个配置文件:server.properties 

  • (1)三台kafka分别配置为:
    • broker.id=1、broker.id=2、broker.id=3
    • 该配置项是每个broker的唯一id,取值在0~255之间;
  • (2)三台分别配置listener=PAINTEXT:IP:PORT
    • listeners=PLAINTEXT://0.0.0.0:9091
    • listeners=PLAINTEXT://0.0.0.0:9092
    • listeners=PLAINTEXT://0.0.0.0:9093
  • (3)三台分别配置advertised.listeners=PAINTEXT:IP:PORT
    • advertised.listeners=PLAINTEXT://192.168.11.128:9091
    • advertised.listeners=PLAINTEXT://192.168.11.128:9092
    • advertised.listeners=PLAINTEXT://192.168.11.128:9093
  • (3)配置消息存放的日志目录
    • log.dirs=/tmp/kafka-logs-9091
    • log.dirs=/tmp/kafka-logs-9092
    • log.dirs=/tmp/kafka-logs-9093
    • 这是极为重要的配置项,kafka所有数据就是写入这个目录下的磁盘文件中
  • (4)配置kafka的server.properties文件中的zookeeper连接地址
    • zookeeper.connect=localhost:2181
    • 如果zookeeper是集群,则:​​​zookeeper.connect=localhost:2181,localhost:2182,localhost:2183

3、kafka集群启动:

  1. 先启动Zookeeper,切换到zookeeper家目录/bin目录下执行命令./zkServer.sh start
  2. 启动三个Kafka,切换到bin目录:./kafka-server-start.sh ../config/server.properties
  3. 查看topic详情:./kafka-topics.sh --bootstrap-server 127.0.0.1:9091 --describe --topic clusterTopic

4、测试:

  • 使用idea的zookeeper客户端插件连接zookeeper:

  • 使用idea的kafka客户端插件连接三台kafka:

  • 在SpringBoot中连接kafka集群

先编写application.yml

spring: application: name: springnboot-02-kafka-base #配置afka相关信息 kafka: #kafka的连接地址 bootstrap-servers: 192.168.184.132:9091,192.168.184.132:9092,192.168.184.132:9093 #生产者配置信息 #producer: #消费者配置信息 consumer: key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

KafkaConfig:

package com.example.springboot02kafkabase.config;import org.apache.kafka.clients.admin.NewTopic;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class KafkaConfig { @Bean public NewTopic newTopic(){ //这里可以将主题中分区的副本数设置成3,因为现在有三个kafka节点了 //分区的副本数不能为0,也不能大于节点的个数 return new NewTopic(\"clusterTopic\",3,(short)3); }}

最后启动SpringBoot应用,可以在zookeeper客户端中看到对应的主题已经创建出来:

向kafka集群中发送消息,并消费消息:

消费者:

package com.example.springboot02kafkabase.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Component;@Componentpublic class EventConsumer { @KafkaListener(topics = {\"clusterTopic\"},groupId = \"clusterGroup\") public void onEventB(ConsumerRecord consumerRecord){ System.out.println(Thread.currentThread().getName() + \"---消费消息,consumerRecord=\" + consumerRecord); }}

生产者:

package com.example.springboot02kafkabase.producer;import jakarta.annotation.Resource;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.stereotype.Component;@Componentpublic class EventProducer { //引入spring-kakfa依赖后,SpringBoot就会自动装配KakfaTemplate对象到Ioc容器中 @Resource private KafkaTemplate kafkaTemplate; public void send(){ //向TopicA发送消息hello kafka for (int i = 0; i < 2; i++) { kafkaTemplate.send(\"clusterTopic\",\"k\" + i,\"hello kafka\"); } }}

测试类:

package com.example.springboot02kafkabase;import com.example.springboot02kafkabase.producer.EventProducer;import jakarta.annotation.Resource;import org.junit.jupiter.api.Test;import org.springframework.boot.test.context.SpringBootTest;@SpringBootTestclass Springboot02KafkaBaseApplicationTests { @Resource private EventProducer eventProducer; @Test public void test01(){ eventProducer.send(); }}

运行test01方法发送消息,最终可以在控制台看到消息被消费:

8.1.2 Kafka集群搭建(基于KRaft方式)

服务器规划:

将三个kafka服务器都安装在同一台机器上,也可以将三个kafka服务器安装在不同的机器上:

  • 如果搭建在三台机器上,则端口号可以搞成一样的;如果搭建在同一台机器上端口号不能一样,否则会造成端口冲突
ip=192.168.11.129:9091 roles=broker,controller node.id=1ip=192.168.11.129:9092 roles=broker,controller node.id=2ip=192.168.11.129:9093 roles=broker,controller node.id=3

搭建步骤:

1、准备三个KafkaKafka是一个压缩包,直接解压即可使用,所以我们就解压出三个Kafka即可

2、配置kafka集群的server.properties配置文件:kafka家目录/config/kraft/server.properties

(1)三台分别找到下面的配置项并如下配置,用于指定kafka服务器的id:

  • broker.id=1
  • broker.id=2
  • broker.id=3

2)三台分别找到下面的配置项并如下配置节点的角色:

  • process.roles=broker,controller

3)三台分别配置参与投票的节点

  • controller.quorum.voters=1@192.168.11.129:9081,2@192.168.11.129:9082,3@192.168.11.129:9083

4)三台配置各自监听本机的ip和端口

  • listeners=PLAINTEXT://0.0.0.0:9091,CONTROLLER://0.0.0.0:9081
  • listeners=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9082
  • listeners=PLAINTEXT://0.0.0.0:9093,CONTROLLER://0.0.0.0:9083

5)三台配置对外开放访问的ip和端口

  • advertised.listeners=PLAINTEXT://192.168.11.129:9091
  • advertised.listeners=PLAINTEXT://192.168.11.129:9092
  • advertised.listeners=PLAINTEXT://192.168.11.129:9093

(6)三台分别配置日志目录

  • log.dirs=/tmp/kraft-combined-logs-9091
  • log.dirs=/tmp/kraft-combined-logs-9092
  • log.dirs=/tmp/kraft-combined-logs-9093

3、kafka集群启动

  1. 生成Cluster UUID(集群UUID),在kafka家目录/bin下执行命令: ./kafka-storage.sh random-uuid
  2. 格式化日志目录,在kafka家目录/bin下执行命令(三台kafka都需要执行一下):./kafka-storage.sh format -t 集群UUID -c ../config/kraft/server.properties
  3. 启动Kafka,在kafka家目录/bin下执行命令:./kafka-server-start.sh ../config/kraft/server.properties &
  4. 关闭Kafka,在kafka家目录/bin下执行命令:./kafka-server-stop.sh ../config/kraft/server.properties

4、测试:和使用zookeeper搭建kafka集群的时候一样

8.1.3 Kafka集群架构分析

上图中:

三台kafka组成一个kafka集群,基于一个zookeeper或Kraft运行

  • Topic A在三台kafka上都有,然后这个主题有两个分区,每个分区有三个副本(一个主副本和两个从副本),每个副本都在不同的kafka节点上
  • Topic B在三台kafka上都有,然后这个主题有一个分区,每个分区有三个副本(一个主副本和两个从副本),每个副本都在不同的kafka节点上
  • Topic C在三台kafka上都有,然后这个主题有一个分区,每个分区有三个副本(一个主副本和两个从副本),每个副本都在不同的kafka节点上
  • 在kafka集群中,最终体现形式就是每个主题的每个分区的副本在不同的kafka节点上,从同一个分区的主副本和从副本不会在同一个kafka服务器上。
  • 分区的主副本放在哪个kafka broker中是由kafka内部机制决定的

下面分析下我们用程序创建出来的clusterTopic:

  • 箭头从左往右看:
    • 第一个箭头表示clusterTopic主题
    • 第二个箭头表示clusterTopic主题一共有多少个分区副本
    • 第三个箭头表示clusterTopic主题有多少个分区
    • 第四个箭头表示clusterTopic主题中每个分区的id
    • 第五个箭头表示此分区的主副本在哪台kafka节点上(kafka的broker id)
    • 第六个箭头表示此分区有几个副本

9. Kafka的一些重要概念

kafka服务器 broker

主题 topic

事件  Event message、消息、数据)

生产者 producer

消费者 consumer

消费组 consumer group

分区 partition

偏移量offset(分为生产者偏移量,消费者偏移量)

Replica副本:分为 Leader Replica 和 Follower Replica

ISR副本:在同步中的副本 (In-Sync Replicas)

LEO:日志末端偏移量 (Log End Offset)

HW:高水位值 (High Water mark)

9.1 ISR副本

ISR副本:在同步中的副本 (In-Sync Replicas),包含了Leader副本和所有与Leader副本保持同步的Follower副本

写请求首先由 Leader 副本处理,之后 Follower 副本会从 Leader 上拉取写入的消息,这个过程会有一定的延迟,导致 Follower 副本中保存的消息略少于 Leader 副本,但是只要没有超出阈值都可以容忍,但是如果一个 Follower 副本出现异常,比如宕机、网络断开等原因长时间没有同步到消息,那这个时候,Leader就会把它踢出去,Kafka 通过ISR副本集合来维护一个“可用且消息量与Leader相差不多的副本集合,它是整个副本集合的一个子集”

在Kafka中,一个副本要成为ISR(In-Sync Replicas)副本,需要满足一定条件:

1、Leader副本本身就是一个ISR副本;

2、Follower副本最后一条消息的offset与Leader副本的最后一条消息的offset之间的差值不能超过指定的阈值,超过阈值则该Follower副本将ISR列表剔除

  • replica.lag.time.max.ms:默认是30秒;如果该Follower在此时间间隔内一直没有追上过Leader副本的所有消息,则该Follower副本就会被剔除ISR列表
  • replica.lag.max.messages:落后了多少条消息该Follower副本就会被剔除ISR列表该配置参数现在新版本的Kafka已经过时了

9.2 LEO

日志末端偏移量 (Log End Offset)记录该副本消息日志(log)中下一条消息的偏移量注意是下一条消息,也就是说,如果LEO=10,那么表示该副本保存了偏移量值是[0, 9]的10条消息;

9.3 HW

(High Watermark),即高水位值,它代表一个偏移量offset信息,表示从副本复制主副本消息的复制进度,也就是从副本复制主副本中的消息已经复制到哪个位置了。即在HW之前的所有消息都已经被成功写入副本中并且可以在所有的副本中找到,因此,消费者可以安全地消费这些已成功复制的消息。

对于同一个副本而言,小于等于HW值的所有消息都被认为是“已备份”的(replicated),消费者只能拉取到这个offset之前的消息,确保了数据的可靠性;

9.4 ISR、HW、LEO的关系

10. kafka进阶知识

10.1 如何保证kafka的高可用性?

Kafka中一个最基本的架构认识:搭建kafka集群后,一般kafka集群由多个kafka broker 组成,每个 kafka broker 是一个节点;你创建一个 topic,这个 topic 可以划分为多个partition,并且每个分区一般都有多个副本(主副本和从副本),根据Kafka的副本放置策略,kafka会均匀的将一个partition的所有replica分布在不同的节点上,每个 partition 就放一部分数据。

这就是天然的分布式消息队列,就是说一个 topic 的数据,是分散放在多个机器上的,每个机器就放一部分数据

Kafka的副本放置策略:

  1. 首要目标:将分区的每个副本分配到不同的Kafka Broker上。

  2. 次要目标:确保所有分区的Leader副本均匀分布 across 所有Broker,避免某个Broker成为所有Leader的热点。

  3. 再次要目标:确保所有副本(包括Follower)在集群中均匀分布。

kafka实现高可用性就是通过副本(replica)机制来实现的,前面我们说过,生产者发送数据和消费者消费数据,都是去和 leader 打交道的,而 follower 只是去从 leader 中同步数据,这个时候,如果某个 broker 宕机了,没事儿,宕机的broker上面的 partition 在其他机器上都有副本,所以数据也不会丢失。如果这个宕机的 broker 上面有某个 partition 的 leader,那么这个时候会从 follower 中重新选举一个新的 leader 出来,大家继续读写那个新的 leader 即可,这其实也就是高可用了

  • 写数据的时候,生产者就写 leader,然后 leader 将数据落地写到本地磁盘,接着其他 follower 自己主动从 leader 中 pull 数据。一旦所有 follower 同步好数据,就会发送 ack 给 leader,leader 收到所有 follower 的 ack 之后,就会返回写成功的消息给生产者。(当然,这只是其中一种模式,还可以适当调整这个行为)
  • 消费的时候,只会从 leader 去读,但是只有当一个消息已经被所有 follower 都同步成功返回 ack 的时候,这个消息才会被消费者读到。

10.2 如何保证消费消息的幂等性?

如何保证消息不被重复消费?或者说,如何保证消息消费的幂等性?,既然是消费消息,那肯定要考虑会不会重复消费?能不能避免重复消费?或者重复消费了也别造成系统异常,这个是 MQ 领域的基本问题

RabbitMQ、RocketMQ、Kafka,都有可能会出现消息重复消费的问题,正常。因为这问题通常不是 MQ 自己保证的,是由我们开发来保证的。挑一个 Kafka 来举个例子,说说怎么重复消费吧。

在 Kafka 中有个 offset 的概念,就是每个消息写进去,kafka都会给它一个 offset,代表消息的序号,然后 consumer 消费了数据之后,每隔一段时间(定时定期),会把自己消费过的消息的 offset 提交一下,表示“我已经消费过了,下次我要是重启啥的,你就让我继续从上次消费到的 offset 来继续消费吧”。

但是凡事总有意外,比如我们之前生产经常遇到的,就是你有时候重启系统,看你怎么重启了,如果碰到点着急的,直接 kill 进程了,再重启。这会导致 consumer 有些消息处理了,但是没来得及提交 offset,尴尬了。重启之后,少数消息会再次消费一次。

举个栗子:

有这么个场景。数据 1/2/3 依次进入 Kafka,Kafka 会给这三条数据每条分配一个 offset,代表这条数据的序号,我们就假设分配的 offset 依次是 152/153/154。消费者从 Kafka 去消费的时候,也是按照这个顺序去消费。假如当消费者消费到了 offset=153 的这条数据,刚准备去提交 offset 的时候,此时消费者进程被重启了。那么此时消费过的数据 1/2 的 offset 并没有提交,Kafka 也就不知道你已经消费了 offset=153 这条数据。那么重启之后,消费者会找 Kafka 说,嘿,哥儿们,你给我接着把上次我消费到的那个地方后面的数据继续给我传递过来,由于之前的 offset 消费者没有提交成功,那么数据 1/2 还会再次传过来,如果此时消费者没有去重的话,那么就会导致重复消费消息

其实复消费消息不可怕,可怕的是没考虑到重复消费消息之后,如何保证MQ的消费是幂等性,在实际应用中需要结合具体的业务来看,这里有几个思路:

  • 比如你拿个数据要写库,你先根据主键查一下,如果这数据都有了,你就别插入了,update 一下好吧。
  • 比如你是写 Redis,那没问题了,反正每次都是 set,天然幂等性。
  • 比如你不是上面两个场景,那做的稍微复杂一点,你需要让生产者发送每条数据的时候,里面加一个全局唯一的 id,类似订单 id 之类的东西,然后你这里消费到了之后,先根据这个 id 去比如 Redis 里查一下,之前消费过吗?如果没有消费过,你就处理,然后这个 id 写 Redis。如果消费过了,那你就别处理了,保证别重复处理相同的消息即可。
  • 比如基于数据库的唯一键来保证重复数据不会重复插入多条。因为有唯一键约束了,重复数据插入只会报错,不会导致数据库中出现脏数据。

幂等性(Idempotence) 是指:无论相同的操作执行一次还是多次,对系统状态造成的影响是相同的。

在MQ消费的语境下,即:无论同一条消息被消费一次、两次还是多次(可能由于网络重发、消费者重启、offset未及时提交等原因导致),最终的业务结果都应该与只消费一次相同

消息队列本身通常提供“至少一次(At-least-once)”的投递语义,这意味着它可能会重投消息。因此,幂等性必须由消费者来保证

10.3 如何保证消息的可靠性传输?

如何保证消息的可靠性传输?或者说,如何处理消息丢失的问题?

这个是肯定的,用 MQ 有个基本原则,就是数据不能多一条,也不能少一条。不能多,就是前面说的重复消费和幂等性的问题。不能少,就是说这数据别搞丢了。

如果说你这个是用 MQ 来传递非常核心的消息,比如说计费、扣费的一些消息,那必须确保这个 MQ 传递过程中绝对不会把计费消息给弄丢

数据的丢失问题,可能出现在生产者、MQ、消费者中,这里用kafka来分析:

1、消费端弄丢了数据

唯一可能导致消费者弄丢数据的情况,就是,消费者消费到了某个消息,然后消费者自动提交了 offset,让 Kafka 以为你已经消费好了这个消息,但其实它才刚准备处理这个消息,它还没处理,它自己就挂了,此时这条消息就丢咯

大家都知道 Kafka 会自动提交 offset,那么只要关闭自动提交 offset,在处理完之后自己手动提交 offset,就可以保证数据不会丢,但是此时还是可能会有重复消费,比如你刚处理完,还没提交 offset,结果消费者自己挂了,此时肯定会重复消费一次,自己保证幂等性就好了。

2、Kafka 弄丢了数据

这块比较常见的一个场景,就是 Kafka集群中 某个 broker 宕机,然后重新选举 partition 的 leader的时候,要是此时其他的 follower 刚好还有些数据没有同步,结果此时 leader 挂了,然后选举某个 follower 成 leader 之后,不就少了一些数据?这就丢了一些数据啊。

生产环境也遇到过,我们也是,之前 Kafka 的 leader 机器宕机了,将 follower 切换为 leader 之后,就会发现说这个数据就丢了。

所以此时一般是要求起码设置如下 4 个参数:

  • 给 topic 设置 replication.factor 参数:这个值必须大于 1,要求每个 partition 必须有至少 2 个副本。
  • 在 Kafka 服务端设置 min.insync.replicas 参数:这个值必须大于 1,这个是要求一个 leader 至少感知到有至少一个 follower 还跟自己保持联系,没掉队,这样才能确保 leader 挂了还有一个 follower 吧。
  • 在 producer 端设置 acks=all :这个是要求生产者发送的每条数据,必须是写入了所有 replica 之后,才认为是写成功了。
  • 在 producer 端设置 retries=MAX (很大很大很大的一个值,无限次重试的意思):这个是要求一旦写入失败,就无限重试,卡在这里了。

我们生产环境就是按照上述要求配置的,这样配置之后,至少在 Kafka broker 端就可以保证在 leader 所在 broker 发生故障,进行 leader 切换时,数据不会丢失。

3、生产者会不会弄丢数据?

如果按照上述的思路设置了 acks=all ,一定不会丢,要求是,你的 leader 接收到消息,所有的 follower 都同步到了消息之后,才认为本次写成功了。如果没满足这个条件,生产者会自动不断的重试,重试无限次。

10.4 如何解决消息队列的延时以及过期失效问题?

如何解决消息队列的延时以及过期失效问题?

消息队列满了以后该怎么处理?

有几百万消息持续积压几小时,说说怎么解决?

其实本质针对的场景,都是说,可能你的消费端出了问题,不消费了;或者消费的速度极其慢。接着就坑爹了,可能你的消息队列集群的磁盘都快写满了,都没人消费,这个时候怎么办?或者是这整个就积压了几个小时,你这个时候怎么办?或者是你积压的时间太长了,导致比如 RabbitMQ 设置了消息过期时间后就没了怎么办?

所以就这事儿,其实线上挺常见的,一般不出,一出就是大 case。一般常见于,举个例子,消费端每次消费之后要写 mysql,结果 mysql 挂了,消费端 hang 那儿了,不动了;或者是消费端出了个什么岔子,导致消费速度极其慢。

未完待续。。。。。。。