> 技术文档 > Kafka_kafka启动

Kafka_kafka启动


1. Kafka启动方式

我下载的是kafka_2.13-3.9.1版本,官网下载

1.1. 自带的zookeeper(也可独立安装)

# 先确认在 kafka 目录下cd /path/to/kafka_2.13-3.9.1# 后台启动zookeepernohup bin/zookeeper-server-start.sh config/zookeeper.properties &# 后台启动Kafkanohup bin/kafka-server-start.sh config/server.properties &

关闭:

# 关闭 Zookeeperbin/zookeeper-server-stop.sh config/zookeeper.properties# 关闭 Kafkabin/kafka-server-stop.sh config/server.properties

1.2. jdk安装

运行Kafka需要安装jdk的,去官网下载一个linux版本的jdk17,解压到/usr/local目录下,然后vim /etc/profile进行 环境变量修改:

Kafka_kafka启动

然后source /etc/profile重新加载。

1.2. KRaft启动

从 Kafka 2.8 开始,Kafka 引入了 KRaft 模式(Kafka Raft Metadata mode),就是 用 Raft 协议自己管理元数据,不再需要 ZooKeeper。

# 生成集群uuid(这个命令会打印出一个随机 UUID,比如76ubojBlQu2J0_esxcZt4g )bin/kafka-storage.sh random-uuid# 初始化存储目录bin/kafka-storage.sh format -t 76ubojBlQu2J0_esxcZt4g -c config/kraft/server.properties# 启动kafkabin/kafka-server-start.sh config/kraft/server.properties

2. 使用Docker启动Kafka

拉取kafka镜像

docker pull apache/kafka:3.9.1

启动kafka容器

docker run -p 9092:9092 apache/kafka:3.9.1# 后台运行docker run -d --name kafka apache/kafka:3.9.1

3. 主题Topic

Topic 用于存储事件(Events)

  • 事件也称为记录或消息,比如支付交易、手机地理位置更新、运输订单、物联网设备或医疗设备的传感器测量数据等。
  • 事件被组织和存储在 Topic 中。
  • 简单来说,Topic 类似于文件系统的文件夹,Events 是该文件夹中的文件。

3.1. 使用kafka-topics.sh脚本创建Topic

这里我是在docker容器中执行了。

# 进入Kafka容器docker exec -it kafka bash## 进入Kafka目录cd /opt/kafka# 执行脚本创建hello主题bin/kafka-topics.sh --create --topic hello --bootstrap-server localhost:9092

Kafka_kafka启动

# 查看主题列表bin/kafka-topics.sh --list --bootstrap-server localhost:9092# 查看主题详细信息bin/kafka-topics.sh --describe --topic hello --bootstrap-server localhost:9092# 删除hello主题bin/kafka-topics.sh --delete --topic hello --bootstrap-server localhost:9092

Kafka_kafka启动
Kafka_kafka启动

3.2. 修改主题

  • 修改主题配置(Configs):比如调整 retention.ms(消息保留时间)、segment.bytes(日志分段大小)、cleanup.policy(清理策略)等配置项,可以用 kafka-configs.sh 命令来修改。
  • 修改分区数量(Partition Count):可以增加分区数,但不能减少。分区数一旦增加,不能再减少。
  • 修改副本因子(Replication Factor):副本因子不能直接通过命令修改,需要通过手动重新分配副本实现,比较复杂,一般用 kafka-reassign-partitions.sh 工具配合 JSON 文件进行副本重分配。
# 修改hello主题的消息保留时间为 1 天bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name hello --alter --add-config retention.ms=86400000# 给hello主题增加到 3 个分区bin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic hello --partitions 3

3.3. 写入Events

kafka 客户端通过网络与 Kafka Brokers(代理/服务节点)进行通信,可以写(或读)主题 Topic 中的事件 Events。

Kafka_kafka启动
Kafka Brokers 一旦收到事件 Events,就会将事件 Event 以持久和容错的方式存储起来,可以永久存储。

使用 kafka-console-producer.sh 脚本写入事件:

# 在容器内bin/kafka-console-producer.sh --topic hello --bootstrap-server localhost:9092

执行命令后,命令行会变成可输入模式。你输入什么就写到 hello 这个 topic:

Kafka_kafka启动

每输入一行,Kafka 就写一条消息。ctrl + c 退出

3.4. 读取Events

使用 kafka-console-consumer.sh 消费者客户端读取之前写入的事件:

bin/kafka-console-consumer.sh --topic hello --from-beginning --bootstrap-server localhost:9092

–from-beginning: 从头开始读(包括历史所有消息)。
–bootstrap-server localhost:9092:指定 Kafka Broker 地址。

你可以打开另外一个窗口执行消费者,然后在生产者那边发消息,消费者接收消息,进而完成通信。

Kafka_kafka启动

4. 外部环境连接Kafka

打开idea,安装插件Kafka。输入虚拟机ip 192.168.116.100:9092 发现连接不上,因为我们使用的是默认启动配置。

kafka镜像使用文档

# 在容器中执行,找到server.properties配置文件cd /etc/kafka/docker

要把这个配置文件复制到虚拟机本地(自己创建一个目录)中

docker cp bf9006c4b67e:/etc/kafka/docker/server.properties /home/fgh/kafka/docker

vim 打开复制好的配置文件,找到 # Socket Server Settings #那一行。

Kafka_kafka启动

修改后:

Kafka_kafka启动

然后通过挂载进行文件映射:
文件映射(Volume Mount):就是把宿主机(你的虚拟机)的某个目录 挂载 到容器的某个目录。
容器里对挂载目录的读写,其实就是对宿主机对应目录的读写。

# 先把刚刚启动的容器删了docker rm kafka# 再执行命令docker run -d --name kafka -p 9092:9092 --volume /home/fgh/kafka/docker:/mnt/shared/config apache/kafka:3.9.1

就可以连接成功了

Kafka_kafka启动

5. 其他连接工具

Offset Explorer

CMAK(基于Zookeeper启动的Kafka才能用)
Kafka_kafka启动

EFAK

Kafka_kafka启动
Kafka_kafka启动
Kafka_kafka启动

Kafka_kafka启动

5. SpringBoot集成Kafka

配依赖

<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>3.2.4</version></dependency>

application.yml

spring: kafka: bootstrap-servers: 192.168.116.100:9092

写个代码测试:

Consumer类

package com.fg.consumer;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Component;@Componentpublic class Consumer { @KafkaListener(topics = \"hello-topic\", groupId = \"hello-group\") public void receiveMessage(String message) { System.out.println(\"Received message: \" + message); }}

Producer类

package com.fg.producer;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.stereotype.Component;@Componentpublic class Producer { @Autowired private KafkaTemplate<String, String> kafkaTemplate; public void sendMessage(String message) { kafkaTemplate.send(\"hello-topic\", message); }}

通过测试类测试:

package com.fg;import com.fg.producer.Producer;import org.junit.jupiter.api.Test;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;@SpringBootTestpublic class KafkaTest { @Autowired private Producer producer; @Test public void test() { producer.sendMessage(\"Hello Kafka!\"); }}

Kafka_kafka启动

6. Kafka的几个概念

Kafka_kafka启动

Topic:可以理解成一个消息队列的名字,或者是一个分类桶,所有消息都按主题来分类。

Producer:负责发送消息的人。生产者是消息的 发布方,把数据写到某个 Topic 里。

Consumer:负责读消息的人。消费者是消息的 订阅方,从某个 Topic 里读取消息。

Partition:Topic 里的分片。每个Topic 可以分成一个或多个 Partition,当创建 Topic 时,如果不指定数量,默认就是 1 个。

Offset:消息的位移标记。每个分区里的消息都有一个从 0 开始递增的序号,叫做 Offset。消费者消费到哪里,就记到哪里,下次可以从上次的 Offset 继续读。

7. Kafka的使用

7.1. 读取最早的(历史)消息

默认情况下,当启动一个新的消费者组时,它会从每个分区的最新偏移量(即该分区中最后一条消息的下一个位置)开始消费。如果希望从第一条消息开始消费,需要进行消费者配置。

我比较懒,所以统一在 yml 里配置相关。也可自己写代码配置(推荐)。

Kafka_kafka启动

注意: 如果之前已经用相同的消费者组ID消费过该主题,并且Kafka已经保存了该消费者组的偏移量,那么即使设置了earliest 也不会生效,因为Kafka只会在找不到偏移量时使用这个配置。在这种情况,需要手动设置偏移量或者使用一个新的消费者组ID

手动重置偏移量:

./kafka-consumer-groups.sh --bootstrap-server <your-kafka-bootstrap-servers>--group <your-consumer-group>--topic <your-topic>--reset-offsets --to-earliest --execute

Kafka_kafka启动

7.2. 发送Message

Kafka_kafka启动

// 发送Message对象消息public void sendMessage2() { Message<String> message = MessageBuilder.withPayload(\"Hello Kafka!\") .setHeader(KafkaHeaders.TOPIC, \"hello-topic\").build(); kafkaTemplate.send(message);}
// 发送ProducerRecord对象消息public void sendMessage3() { // Headers里面可以存放自定义信息(key-value),消费者接收到该消息后,可以拿到里面的信息 RecordHeaders headers = new RecordHeaders(); headers.add(\"username\", \"fg\".getBytes()); ProducerRecord<String, String> record = new ProducerRecord<>( \"hello-topic\", // Topic 名称 0,  // 分区 ID(可选) System.currentTimeMillis(), // 时间戳(可选) \"k1\",  // 消息 Key \"Hello Kafka!\", // 消息 Value(主体内容) headers // 自定义 Headers ); kafkaTemplate.send(record);}
// 发送指定分区的消息public void sendMessage4() { kafkaTemplate.send(\"hello-topic\", 0, System.currentTimeMillis(), \"k2\", \"Hello Kafka!\");}
// 发送默认topic消息public void sendMessage5() { kafkaTemplate.sendDefault(0, System.currentTimeMillis(), \"k3\", \"Hello Kafka!\");}

需要配置:

Kafka_kafka启动

7.3. 接收Message

send()sendDefault()方法都返回CompletableFuture<SendResult>

CompletableFuture 是Java 8中引入的一个类,用于异步编程,它表示一个异步计算的结果,这个特性使得调用者不必等待操作完成就能继续执行其他任务,从而提高了应用程序的响应速度和吞吐量。

因为调用kafkaTemplate.send()方法发送消息时,Kafka可能需要一些时间来处理该消息(例如:网路延迟、消息序列化、Kafka集群的负载等),如果send()是同步的,那么发送消息可能会阻塞调用线程,直到消息发送成功或发生错误,这会导致应用程序性能下降,尤其在高并发场景下。

使用CompletableFuturesend()方法可以立即返回一个表示异步操作结果的未来对象,而不是等待操作完成,这样调用线程可以继续执行其他任务,而不必等待消息发送完成。当消息发送完成时(无论是成功还是失败),CompletableFuture会相应地更新其状态,并允许我们通过回调、阻塞等方式来获取操作结果。

  • 使用CompletableFuture.get()方法同步阻塞等待发送结果
public void sendMessage6() { CompletableFuture<SendResult<String, String>> completableFuture = kafkaTemplate.sendDefault(0, System.currentTimeMillis(), \"k3\", \"Hello Kafka!\"); try { // 阻塞等待 SendResult<String, String> result = completableFuture.get(); System.out.println(\"结果:\" + result); // 结果:SendResult [producerRecord=ProducerRecord(topic=default-topic, partition=0, headers=RecordHeaders(headers = [], isReadOnly = true), key=k3, value=Hello Kafka!, timestamp=1752240587138), recordMetadata=default-topic-0@0] } catch (ExecutionException | InterruptedException e) { throw new RuntimeException(e); }}
  • 使用thenAccept(), thenApply(), thenRun()等方法来注册回调函数,回调函数将在CompletableFuture完成时被执行
public void sendMessage7() { CompletableFuture<SendResult<String, String>> completableFuture = kafkaTemplate.sendDefault(0, System.currentTimeMillis(), \"k3\", \"Hello Kafka!\"); completableFuture .thenAccept(result -> { System.out.println(\"结果:\" + result); }) .exceptionally(ex -> { System.out.println(\"异常:\" + ex.getMessage()); return null; // 必须返回点啥,哪怕是 null }); System.out.println(\"消息发送已提交,主线程继续执行...\");}
  1. thenAccept(Consumer):拿到异步结果,执行一些依赖结果的后续操作,不返回新结果。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> \"hello\");future.thenAccept(result -> { System.out.println(\"结果是: \" + result);});
  1. thenApply(Function):拿到异步结果,做一些处理,返回新的结果(可变换)。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> \"hello\");CompletableFuture<String> newFuture = future.thenApply(result -> { return result.toUpperCase(); // 转成大写});newFuture.thenAccept(System.out::println); // 输出 HELLO
  1. thenRun(Runable):前面的异步结果执行完成后,执行一个无参、无返回值的操作。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> \"hello\");future.thenRun(() -> { System.out.println(\"前面的任务执行完了,我就跑!\");});
  1. whenComplete(BiConsumer):无论成功还是失败,都会执行。可以同时拿到结果和异常(一个成功一个 null,或者相反)。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { if (true) throw new RuntimeException(\"出错啦\"); return \"hello\";});future.whenComplete((result, ex) -> { if (ex != null) { System.out.println(\"发生异常: \" + ex.getMessage()); } else { System.out.println(\"执行结果: \" + result); }});
  1. exceptionally(Function):只在出现异常时执行,用于返回一个默认值,避免整个链挂掉。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { if (true) throw new RuntimeException(\"炸了\"); return \"hello\";}).exceptionally(ex -> { System.out.println(\"捕获到异常: \" + ex.getMessage()); return \"default\";});future.thenAccept(System.out::println); // 输出 default

7.4. 发送对象消息

在 Kafka 中,消息本质是字节流,要发对象的话需要:
把对象 序列化(一般转成 JSON) → 作为消息的 value 发送出去

配置:

Kafka_kafka启动

用的JSON序列化,所以要加这个依赖才可将Java对象转成JSON。

<dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId></dependency>

Kafka_kafka启动

public void sendMessage8() { User user = User.builder().id(1).username(\"fg\").age(23).build(); // 分区可为 null,由 Kafka 分配) CompletableFuture<SendResult<String, Object>> completableFuture = kafkaTemplate2.sendDefault(null, System.currentTimeMillis(), \"k3\", user); // 注册回调 - 成功时执行 completableFuture.thenAccept(result -> { System.out.println(\"发送成功: \" + result); }); // 注册回调 - 异常时执行 completableFuture.exceptionally(ex -> { System.out.println(\"发送失败: \" + ex.getMessage()); return null; }); System.out.println(\"已异步发送对象,主线程继续执行...\");}

7.5. Replica副本

Replica:副本,为实现备份功能,保证集群中的某个节点发生故障时,该节点上的 partition 数据不丢失,且 Kafka 仍然能够继续工作,Kafka 提供了副本机制,一个 topic 的每个分区都有 1 个或多个副本。

  • Leader Replica:每个分区多个副本中的“主”副本,生产者发送数据以及消费者消费数据,都是来自 leader 副本。
  • Folower Replica:每个分区多个副本中的“从”副本,实时从 leader 副本中同步数据,保持和 leader 副本数据的同步,leader 副本发生故障时,某个 follower 副本会成为新的 leader。

设置副本个数不能为0,也不能大于节点个数,否则将不能创建Topic

Kafka_kafka启动

Kafka 的核心数据单元是分区(partition),每个分区会被复制多份(称为“分区副本”),分布在不同的 Broker 节点上,以实现冗余和高可用。每个分区在任何时刻只有一个“Leader”副本负责处理所有读写请求,而1其他“Follower”副本则持续从 Leader 拉去数据以保持同步。如果 Leader 副本所在的 Broker 发生故障,Kafka 会自动从同步状态良好(在ISR中)的 Follower 副本里选举出一个新的 Leader 继续提供服务,确保数据安全和服务连续性。所谓“主题副本”指的就是一个主题下所有分区副本的总和,而“节点副本”则是指存储在某个特定 Broker 上的所有分区副本的集合。

方式一:通过命令行在创建topic时指定分区和副本

./kafka-topic.sh --create --topic myTopic --partitions 3 --replication-factor 1 --bootstrap-server 192.168.116.100:9092

方式二:通过代码指定分区和副本

package com.fg.config;import org.apache.kafka.clients.admin.NewTopic;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class KafkaConfig { // 创建一个topic,名称为heTopic,分区数为5,副本数为1 @Bean public NewTopic initialTopic() { return new NewTopic(\"heTopic\", 3, (short) 1); } // 如果要修改分区数,只需修改配置重启项目即可,修改分区数并不会导致数据丢失,但是分区数只能增大不能减少 @Bean public NewTopic updateTopic() { return new NewTopic(\"heTopic\", 5, (short) 1); }}

Kafka_kafka启动

7.6. 生产分区策略

生产者写入消息到topic,Kafka 将依据不同的策略将数据分配到不同的分区中。

  • 默认分配策略:BuiltlnPartitioner
    • 有 key:partition = Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions
    • 没有 key:使用 Sticky Partitioner,它不是单纯随机或轮询,而是会在一段时间或一个 batch 内,把所有消息发到同一个分区,直到 batch 满或时间到,再换一个分区。
  • 轮询分配策略:RoundRobinPartition,和默认的 Sticky 不一样,它直接轮询所有可用分区。只要没有 key,消息就按轮询顺序依次分到各分区。
  • 自定义分配策略:实现接口org.apache.kafka.clients.producer.Partitioner,重写 partition 方法。
public void sendMessage9() { // 有key:相同key保证落到同一个分区 kafkaTemplate.send(\"heTopic\", \"key1\", \"Hello, with key!\"); // 无key:Sticky分区,同一批尽量stick到同一个分区 for (int i = 0; i < 3; i++) { kafkaTemplate.send(\"heTopic\", \"Hello, no key!\"); }}

如果是无key,使用轮询策略:

Kafka_kafka启动

写一个自定义策略:

package com.fg.config;import org.apache.kafka.clients.producer.Partitioner;import org.apache.kafka.common.Cluster;import org.apache.kafka.common.utils.Utils;import java.util.Map;import java.util.concurrent.atomic.AtomicInteger;public class CustomPartitioner implements Partitioner { private final AtomicInteger counter = new AtomicInteger(0); /** * @param topic 消息主题 * @param key 消息的key * @param keyBytes 序列化后的key * @param value 消息的value * @param valueBytes 序列化后的value * @param cluster 集群元数据,包含分区元数据信息(有多少个分区,leader 状态等等) * @return */ @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { int numPartitions = cluster.partitionCountForTopic(topic); if (keyBytes != null) { // 有key,走默认的hash分配 return (Math.abs(Utils.murmur2(keyBytes))) % numPartitions; } // 没有key,轮询分配 return counter.getAndIncrement() % numPartitions; } @Override public void configure(Map<String, ?> configs) { } @Override public void close() { }}

配置:

Kafka_kafka启动

测试:

public void sendMessage10() { // RoundRobin分区,不同key尽量均匀分配到不同分区 for (int i = 0; i < 10; i++) { try { kafkaTemplate.send(\"heTopic\", \"Hello RoundRobin \" + i).get(); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } }}

Kafka_kafka启动

7.7. 生产消息拦截

生产者发送消息的流程:

Kafka_kafka启动

拦截器:实现ProducerInterceptor接口,对即将发送的 ProducerRecord 做修改或过滤。

package com.fg.config;import org.apache.kafka.clients.producer.ProducerInterceptor;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Map;/** * 自定义 Kafka 生产者拦截器: * 主要用于在消息发送前后做一些额外处理,比如: * - 给消息做统一标记 * - 统计成功/失败 * - 打印日志 * - 丢弃不合法消息等 */public class CustomProducerInterceptor implements ProducerInterceptor<String, Object> { /** * 发送消息前会调用,可以在这里对消息进行修改或过滤,如果返回null,则消息会被丢弃 * * @param record * @return */ @Override public ProducerRecord<String, Object> onSend(ProducerRecord<String, Object> record) { // 1.获取原始消息内容 Object originalValue = record.value(); // 2.在消息尾部追加标记 String newValue = originalValue + \" | intercepted\"; // 3.返回新的消息 return new ProducerRecord<>( record.topic(), // 保持原 topic record.partition(), // 保持原分区(也可以自定义) record.timestamp(), // 保持原时间戳 record.key(), // 保持原 key newValue,  // 替换后的 value record.headers() // 保持原 header ); } /** * 消息被Broker成功接收或失败后会调用,可以用于统计消息发送成功率、记录日志等 * * @param recordMetadata * @param e */ @Override public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) { if (e == null) { // 消息发送成功,打印分区信息 System.out.println(\"消息发送成功,分区:\" + recordMetadata.partition()); } else { // 消息发送失败,打印错误信息 System.out.println(\"消息发送失败,错误信息:\" + e.getMessage()); } } /** * Producer 关闭时会调用,用于清理资源 * 如果没有需要释放的资源可以留空 */ @Override public void close() { } /** * 获取生产者的配置,可以在这里做初始化操作 * * @param configs */ @Override public void configure(Map<String, ?> configs) { }}

Kafka_kafka启动

7.8. 接收消息内容

在 Spring Kafka 中,
@Payload用于绑定 Kafka 消息体(就是生产者的 value)。
@Header用于绑定 Kafka 消息头(比如 topic、key、partition、timestamp、自定义 header)。
ConsumerRecord record 接收消息所有内容。

public void sendMessage11() { kafkaTemplate.send( MessageBuilder.withPayload(\"这是消息内容\")  .setHeader(KafkaHeaders.TOPIC, \"heTopic\")  .setHeader(KafkaHeaders.KEY, \"key2\")  .setHeader(\"customHeader\", \"自定义头信息\")  .build() );}
@KafkaListener(topics = \"heTopic\", groupId = \"he-group\")public void receiveMessage( @Payload String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @Header(KafkaHeaders.RECEIVED_PARTITION) int partition, @Header(KafkaHeaders.RECEIVED_KEY) String key, @Header(\"customHeader\") String customHeader, ConsumerRecord<String, String> record) { System.out.println(\"消息体: \" + message); // 消息体: \"这是消息内容 | intercepted\" System.out.println(\"来自 topic: \" + topic); // 来自 topic: heTopic System.out.println(\"分区: \" + partition); // 分区: 1 System.out.println(\"key: \" + key); // key: key2 System.out.println(\"自定义 header: \" + customHeader); // 自定义 header: 自定义头信息 System.out.println(\"ConsumerRecord: \" + record); // ConsumerRecord: ConsumerRecord(topic = heTopic, partition = 1, leaderEpoch = 0, offset = 5, CreateTime = 1752309403290, serialized key size = 4, serialized value size = 34, headers = RecordHeaders(headers = [RecordHeader(key = customHeader, value = [-24, -121, -86, -27, -82, -102, -28, -71, -119, -27, -92, -76, -28, -65, -95, -26, -127, -81]), RecordHeader(key = spring_json_header_types, value = [123, 34, 99, 117, 115, 116, 111, 109, 72, 101, 97, 100, 101, 114, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 125]), RecordHeader(key = __TypeId__, value = [106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103])], isReadOnly = false), key = key2, value = \"这是消息内容 | intercepted\")}

发送消息对象

配置:

Kafka_kafka启动

public void sendMessage12() { User user = User.builder().id(1).username(\"fg\").age(23).build(); kafkaTemplate2.send( MessageBuilder.withPayload(user)  .setHeader(KafkaHeaders.TOPIC, \"heTopic\")  .build() );}
@KafkaListener(topics = \"heTopic\", groupId = \"user-group\")public void receiveMessage( @Payload User user, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @Header(KafkaHeaders.RECEIVED_PARTITION) int partition, ConsumerRecord<String, String> record) { System.out.println(\"收到User对象: \" + user + \"来自Topic: \" + topic + \", 分区: \" + partition); // 收到User对象: User(id=1, username=fg, age=23)来自Topic: heTopic, 分区: 1}

Acknowledgment: 是 Spring Kafka 提供的一个接口,用于手动提交 Kafka 消费者的 offset。

应用场景:

  • 对消息要严格保证幂等性或一致性必须业务成功后再提交offset)。
  • 想实现失败重试(不提交offset,下次还会消费到同一条消息)。

默认情况下,如果 ack-mode 是recordbatchtime,Spring Kafka 会自动提交 offset。如果你想自己控制什么时候提交(比如处理完业务再提交,或出现异常不提交),就用 Acknowledgment.acknowledge(),不ack.acknowledge()就相当于不提交,这条消息会在下一轮重新被消费。

设置手动提交:

Kafka_kafka启动

@KafkaListener(topics = \"heTopic\", groupId = \"user-group\")public void receiveMessage( @Payload User user, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @Header(KafkaHeaders.RECEIVED_PARTITION) int partition, ConsumerRecord<String, String> record, Acknowledgment ack) { try { log.info(\"处理业务逻辑...\"); System.out.println(\"收到User对象: \" + user + \"来自Topic: \" + topic + \", 分区: \" + partition); // 手动提交offset ack.acknowledge(); log.info(\"已手动提交offset\"); } catch (Exception e) { log.error(\"处理失败,不提交offset,稍后会重试\", e); }}

7.9. 指定消费

在 Spring Kafka 里,如果你想精准指定 topic、partition、offset 来消费,可以用@KafkaListener的 topicPartitions 属性,而不是简单写 topics。

可用来在调试或回放数据时,从某个确定的 offset 重放消费。

配置项 说明 topic 要监听的 topic partitions 要监听的分区(从 0 开始) partitionOffsets 指定从哪个分区的哪个 offset 开始 initialOffset 指定初始 offset(如果比当前最小 offset 还小,会从最小的可用 offset 开始)。initialOffset 只在 新的 groupId 或者这个分区之前没有提交 offset 时生效。如果 Kafka 里已经有保存的 offset,还是会从保存的 offset 开始消费。
@KafkaListener( topicPartitions = { @TopicPartition( topic = \"heTopic\", partitions = {\"0\", \"1\", \"2\"}, // 指定分区 partitionOffsets = { @PartitionOffset(partition = \"2\", initialOffset = \"15\") } ) }, groupId = \"he-group\")

7.10. 批量消费

Kafka 消息是批量拉取的,但 Spring Kafka 默认是单条处理即使一次 poll 拉了 N 条,也一条一条执行 @KafkaListener)。

需设置:

Kafka_kafka启动

Kafka_kafka启动

public void sendMessage13() { for (int i = 0; i < 5; i++) { User user = new User(i, \"fg-\" + i, 20 + i); kafkaTemplate2.send(\"heTopic\", user); }}
@KafkaListener(topics = \"heTopic\", groupId = \"user-group\")public void receiveMessage(List<User> users) { users.forEach(u -> { log.info(\"收到消息:{}\", u); });}

7.11. 消费消息拦截

消费拦截器是在 Kafka 消费者端,消息真正被业务代码处理前的“拦截器”或“切面”,可以进行日志记录、消息过滤、消息修改、性能监控、安全校验等。

package com.fg.config;import lombok.extern.slf4j.Slf4j;import org.apache.kafka.clients.consumer.ConsumerInterceptor;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.OffsetAndMetadata;import org.apache.kafka.common.TopicPartition;import java.util.Map;@Slf4jpublic class CustomConsumerInterceptor implements ConsumerInterceptor<String, Object> { /** * 消费消息之前调用,可以在这里对消息进行处理、过滤、修改等操作 * * @param records 本次批量消费的所有消息 * @return 返回要传递给业务代码的消息集合 */ @Override public ConsumerRecords<String, Object> onConsume(ConsumerRecords<String, Object> records) { log.info(\"[拦截器] 拦截到批量消息,记录数: {}\", records.count()); for (TopicPartition partition : records.partitions()) { records.records(partition).forEach(record -> { System.out.println(\"[拦截器] 拦截到消息: \" + record.key() + \" -> \" + record.value()); }); } // 不修改直接返回原消息 return records; } /** * 当消费者提交offset时调用 * * @param offsets 提交的offset信息 */ @Override public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) { } /** * 关闭拦截器时调用 */ @Override public void close() { } /** * 配置拦截器时调用,传入消费者配置参数 * * @param configs 配置参数 */ @Override public void configure(Map<String, ?> configs) { }}

Kafka_kafka启动

7.12. 消息转发

消息转发就是应用 A 从 TopicA 接收到消息,经过处理后转发到 TopicB,再由应用 B 监听接收该消息,即一个应用处理完后将该消息转发至其他应用处理。

Kafka_kafka启动

public void sendMessage14() { User user = User.builder().id(1).username(\"fg\").age(23).build(); kafkaTemplate2.send(\"sourceTopic\", user);}
public class Consumer { @Autowired private KafkaTemplate<String, Object> kafkaTemplate; @KafkaListener(topics = \"sourceTopic\", groupId = \"forward-group\") public void listenAndForward(@Payload User user) { log.info(\"收到[原topic]消息:{}\", user); // 模拟业务 user.setUsername(user.getUsername() + \"_processed\"); // 转发到新topic kafkaTemplate.send(\"targetTopic\", user); log.info(\"[转发]消息已转发到 targetTopic:{}\", user); } @KafkaListener(topics = \"targetTopic\", groupId = \"forward-group\") public void listenForward(@Payload User user) { log.info(\"收到转发后的消息:{}\", user); }}

7.13. 消费分区策略

Kafka 的 Topic 是由多个分区(Partition)组成的,分区是 Kafka 并行度和扩展性的核心。

  • 生产者可以把消息写到指定分区或按分区器分配。
  • 消费者以消费者组为单位来消费分区:一个分区在同一时刻只会被一个消费者消费。
  • Kafka Broker 会自动把分区平均分配给组内消费者,叫分区再平衡。

Kafka_kafka启动

策略 类名 说明 RangeAssignor org.apache.kafka.clients.consumer.RangeAssignor 默认策略:按分区范围顺序分配,Kafka 会把分区按照顺序分配给消费者,尽量让每个消费者拿到连续分区。适合单 Topic,分区数较多且消费者数较少,顺序性好。 RoundRobinAssignor org.apache.kafka.clients.consumer.RoundRobinAssignor 轮询分配,尽量让分区均匀分布到所有消费者。 StickyAssignor org.apache.kafka.clients.consumer.StickyAssignor 粘性分配:在尽量均衡分配的同时,最大化保持前后分配不变,减少 Rebalance 的波动。 CooperativeStickyAssignor org.apache.kafka.clients.consumer.CooperativeStickyAssignor Cooperative Sticky:和 Sticky 类似,但支持 渐进式 Rebalance,避免全量重分配,更平滑。 自定义分配器 实现 org.apache.kafka.clients.consumer.ConsumerPartitionAssignor 可以按自定义逻辑分配(比如按用户ID、地区、权限等做定制)。

想用什么策略在 yml 配置即可。

我这里写个自定义分配器策略:

package com.fg.config;import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;import org.apache.kafka.common.Cluster;import org.apache.kafka.common.TopicPartition;import java.nio.ByteBuffer;import java.util.*;public class CustomPartitionAssignor implements ConsumerPartitionAssignor { /** * 返回自定义策略名称 * Kafka会根据这个名字找到你的策略 */ @Override public String name() { return \"simple-custom\"; } /** * 可选: 返回用户自定义元数据,通常用于给 assign 传递额外信息 * 这里直接返回 null 就行,没用到 */ @Override public ByteBuffer subscriptionUserData(Set<String> topics) { return null; } /** * 核心方法: 分配逻辑 * cluster: 当前集群元数据信息 * subscriptions: 所有消费者的订阅信息 */ @Override public GroupAssignment assign(Cluster cluster, GroupSubscription subscriptions) { // 结果Map: key是消费者ID, value是分配给它的分区 Map<String, Assignment> assignmentMap = new HashMap<>(); // 所有消费者 List<String> consumers = new ArrayList<>(subscriptions.groupSubscription().keySet()); Collections.sort(consumers); // 所有分区 List<TopicPartition> partitions = new ArrayList<>(); for (Subscription sub : subscriptions.groupSubscription().values()) { for (String topic : sub.topics()) { cluster.partitionsForTopic(topic).forEach(info -> partitions.add(new TopicPartition(topic, info.partition()))); } } // 简单轮询分配 for (int i = 0; i < partitions.size(); i++) { String consumer = consumers.get(i % consumers.size()); assignmentMap.computeIfAbsent(consumer, k -> new Assignment(new ArrayList<>()))  .partitions().add(partitions.get(i)); } System.out.println(\"【分配结果】\" + assignmentMap); return new GroupAssignment(assignmentMap); } /** * 分配完成后执行,可用于打印分配结果、记录日志等 */ @Override public void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) { System.out.println(\"【onAssignment】\" + metadata.memberId() + \" 分配到: \" + assignment.partitions()); }}

Kafka_kafka启动

7.14. 消息、数据的存储

Kafka 的存储就是以分区为单位,分段文件顺序写,索引文件快速查找,快照 & 元数据保证一致性,保留策略保证高效回收,整个机制天然支持海量消息的高效写读。

Kafka 的所有事件(消息、数据)都存储在/tmp/kafka-logs目录中,可通过log.dirs=/tmp/kafka-logs配置。

Kafka 的所有事件(消息、数据)都是以日志文件的方式来保存。

Kafka 一般都是海量的消息数据,为了避免日志文件过大,日志文件被存放在多个日志目录下,日志目录的命名规则为:-

Kafka_kafka启动

每次消费一个消息并且提交以后,会保存当前消费到的最近一个 offset。这个进度就是 offset:它表示当前分区里“已经消费到哪里了”。

  • 自动提交(enable.auto.commit=true
  • 手动提交(显式调用 commitSync 或 commitAsync)

每次提交时,消费者会把【消费到的最新 offset】写到一个特殊的内置 topic:__consumer_offsets

__consumer_offsets 是 Kafka 内置的系统 Topic,专门用于存储是所有 consumer group 的 offset 元数据,默认有 50 个分区。

offset.metadata.max.retention.ms=604800000 # 默认保留 7 天offsets.topic.partitions=50  # 默认分 50 个分区

分区规则: Kafka 根据 consumer group id 做哈希运算,确定写到 __consumer_offsets 的哪个分区。

offsetPartition = Math.abs(groupId.hashCode()) % offsetsTopicPartitionsCount

分区哈希保证了:

  • 相同的 group id 的 offset 元数据始终写到同一个分区。
  • 多分区提高并行度,多个 consumer group 提交 offset 时不会互相阻塞。

7.15. Offset

生产者Offset

生产者发送一条消息到 Kafka 的 broker 的某个 topic 下某个 partition 中。

Kafka 内部会为每条消息分配一个唯一的 offset,该 offset 就是该消息在 partition 中的位置。

Kafka_kafka启动

消费者Offset

消费者 offset 是消费者需要知道自己已经读取到哪个位置了,接下来从那个位置开始继续读取消息。

每个消费者组(Consumer Group)中的消费者都会独立地维护自己的 offset,当消费者从某个 partition 读取消息时,它会记录当前读取到的 offset,这样即使消费者崩溃或重启,它也可以从上次读取的位置继续读取,而不会重复读取或遗漏消息。(消费者offset需要消费消息并提交(ack)后才记录)

Offset的生命周期
  • 生产者(Producer) 生产消息时,Kafka Broker 会把消息追加到分区末尾,分配下一个 Offset。
  • 消费者(Consumer) 消费消息时,会记录自己消费到哪里了,就是 Offset。
  • Kafka 不会自动删除已消费的消息,而是基于保留策略(时间/大小/日志压缩)来删除。

8. Kafka集群

在 Kafka 集群中,副本的个数大于 0 且小于等于 Broker 数,Producer 把消息发送到某个 topic 的分区(Partition),每个分区都有一个主副本(Leader)和多个从副本(Follower),Producer 只写 Leader,Leader 接收后同步给 Follower 保证数据可靠性;Consumer 只从分区的 Leader 拉取消息读取,Broker 之间通过副本同步实现高可用,当 Leader 挂掉时,Follower 会被选举成新的 Leader,整个过程保证消息不丢失、不重复且高可用。

Kafka_kafka启动

8.1. 基于Zookeeper的集群搭建

我用的镜像版本新的,默认只能用 Kraft 搭建,所以这里就不用docker了。

启动三个Kafka来配置吧。

Kafka_kafka启动
Kafka_kafka启动
Kafka_kafka启动

准备好这几个。

Kafka_kafka启动

然后启动zookeeper,接着启动三个Kafka。连接之后就可以看到三个 brokers 了。

Kafka_kafka启动

然后现在去 SpringBoot 配置连接:

Kafka_kafka启动
Kafka_kafka启动

Kafka_kafka启动

8.2. ISR(In-Sync Replicas)副本

指的是“和 Leader 保持同步的副本集合”。
它包含:当前分区的 Leader 副本和所有跟 Leader 同步进度在容忍范围内的 Follower 副本。

工作流程:

  • 写请求先写到 Leader 副本。
  • Follower 副本从 Leader 拉取数据进行同步(这是“拉”而不是 Leader 主动推送)。
  • 由于网络和拉取的频率限制,Follower 副本的最新数据可能比 Leader 少一点,但 Kafka 允许这个差距存在(可配置)。

踢出机制:

  • 如果某个 Follower 副本因为宕机、网络异常等原因长时间没能跟上 Leader(落后太多),Kafka 会把它从 ISR 中踢出去。

    • replica.lag.time.max.ms (默认 30 秒)
      如果某个 Follower 副本在这段时间内没有成功追上 Leader 的最新消息(即同步延迟超过 30 秒),则该副本会被移出 ISR。
    • replica.lag.max.messages
      表示 Follower 落后 Leader 多少条消息时被剔除 ISR。但该参数在新版 Kafka 中已被废弃,不建议使用。
  • 被踢出去后,它暂时失去选举 Leader 的资格。

  • 当它重新追上 Leader(Catch-up),会被重新加入 ISR。

ISR 就是 Kafka 用来保证高可用和数据一致性的核心机制之一,确保即使单个 Broker 挂了,也能用“与 Leader 保持同步的其他副本”来继续提供服务。

8.3. LEO(Log End Offset)

LEO 是指一个分区日志中,当前已经写入的最后一条消息的下一个可用的 offset,也就是日志的末尾位置。它标记了这个副本中下一条消息将写入的位置。

作用:

  • Leader 副本的 LEO 代表该分区日志的最新写入进度。
  • Follower 副本的 LEO 表示该副本当前同步到的最新消息位置。

与 ISR 关系: Follower 是否属于 ISR,通常会根据它的 LEO 与 Leader 的 LEO 之间的差值来判断。如果差距太大(超出阈值),就会被剔除 ISR。

举例:假设 Leader 的 LEO 是 1000,说明它已经写入了 0~999 条消息。一个 Follower 的 LEO 是 995,说明它同步到了第 995 条消息,还落后 Leader 5 条。如果阈值设置是最大允许落后 10 条消息,那么这个 Follower 仍在 ISR 中。

LEO 就是 Kafka 日志的“尾巴”,用来标识数据的写入和同步进度,是 Leader 和 Follower 协调复制和判断同步状态的重要指标。

8.4. HW(High Watermark)

HW 表示该分区日志中所有 ISR 副本都已同步完成的最大 offset。换句话说,HW 是所有副本中“最落后”副本的 LEO 的最小值。

Kafka_kafka启动

作用:

  • 消费者只能读取不超过 HW 的消息,即只读取所有同步副本都确认写入的消息。
  • 保证消费者不会读取到尚未被所有同步副本确认的数据,避免“脏数据”或消息丢失。
  • 当 Leader 收到写入请求后,会等待消息被大多数 ISR 副本确认(写入),然后更新 HW。

与 LEO 的区别:

  • LEO 表示“日志末尾”位置(最新消息的偏移量),可能包含未被所有副本确认的消息。
  • HW 是“安全可读点”,代表所有同步副本都确认的消息最大偏移量。

举例:假设 Leader 的 LEO 是 1000,表示它已经写入了消息到 offset 999。ISR 副本中最慢的副本 LEO 是 995,那么 HW 就是 995。消费者只能安全读取到 offset 995(含)之前的消息。

8.5. 基于KRaft的集群搭建

KRaft 是 Kafka 从传统依赖 ZooKeeper 管理元数据(Broker 状态、Topic 配置、分区信息、副本状态等)转向内置 Raft 协议的集群元数据管理方式。

在 KRaft 模式下,Kafka 节点既承担 Broker 的数据读写存储角色,又可以作为 Controller 节点来选举、复制和维护集群的元数据,依靠 Raft 协议取代 ZooKeeper 实现一致性。

Kafka_kafka启动

Controller

  • KRaft模式下,Controller 负责整个集群的元数据管理(原本由Zookeeper承担)。
  • 在一个时刻,只会有一个 Controller Leader,它负责处理:
    • 元数据更新(比如 Topic 创建、分区分配、ISR 维护)
    • 与其他 Broker 的协调
    • Raft 日志写入和复制
  • 其他 Controller 节点是 Follower,跟随 Leader 同步 Raft 元数据日志。

Broker

  • Broker 就是实际存储数据、处理生产者写入和消费者拉取的服务节点。
  • 在 KRaft 模式中,一个节点既可以是 Broker,又可以是 Controller(通过 process.roles 配置角色)。

Raft 选举节点(元数据节点)

  • 所有配置为 controller 角色的节点共同组成一个 Controller Quorum(Raft 选举组)。
  • 通过 Raft 协议在 Controller 节点间选举 Leader,保证元数据一致性。
  • 这部分就是代替 ZooKeeper 的核心:Raft 协议的日志复制 + 一致性选举。

Kafka_kafka启动
Kafka_kafka启动
Kafka_kafka启动

上面是一种方式,我现在用docker部署。

先配置 docker-compose.yml

version: \"3\"services: kafka1: image: apache/kafka:3.9.1 container_name: kafka1 hostname: kafka1 ports: - \"9092:9092\" volumes: - kafka1-data:/var/lib/kafka/data environment: KAFKA_NODE_ID: 1 KAFKA_PROCESS_ROLES: broker,controller KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.116.100:9092 KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka1:9093,2@kafka2:9093,3@kafka3:9093 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_NUM_PARTITIONS: 3 kafka2: image: apache/kafka:3.9.1 container_name: kafka2 hostname: kafka2 ports: - \"9093:9092\" volumes: - kafka2-data:/var/lib/kafka/data environment: KAFKA_NODE_ID: 2 KAFKA_PROCESS_ROLES: broker,controller KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.116.100:9093 KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka1:9093,2@kafka2:9093,3@kafka3:9093 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_NUM_PARTITIONS: 3 kafka3: image: apache/kafka:3.9.1 container_name: kafka3 hostname: kafka3 ports: - \"9094:9092\" volumes: - kafka3-data:/var/lib/kafka/data environment: KAFKA_NODE_ID: 3 KAFKA_PROCESS_ROLES: broker,controller KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.116.100:9094 KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka1:9093,2@kafka2:9093,3@kafka3:9093 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_NUM_PARTITIONS: 3volumes: kafka1-data: kafka2-data: kafka3-data:

然后再执行命令生成集群UUID,每个节点用同一个UUID进行format。

docker run --rm apache/kafka:3.9.1 bash -c \"/opt/kafka/bin/kafka-storage.sh random-uuid\"
# 第一个节点docker run -it --rm \\ -v kafka1-data:/var/lib/kafka/data \\ apache/kafka:3.9.1 \\ kafka-storage.sh format --cluster-id e4WdbK7oQq-FwMgLSTootQ --config /etc/kafka/kafka.properties --ignore-formatted# 第二个节点docker run -it --rm \\ -v kafka2-data:/var/lib/kafka/data \\ apache/kafka:3.9.1 \\ bash -c \"/opt/kafka/bin/kafka-storage.sh format --cluster-id e4WdbK7oQq-FwMgLSTootQ --config /opt/kafka/config/kraft/server.properties --ignore-formatted\"# 第三个节点docker run -it --rm \\ -v kafka3-data:/var/lib/kafka/data \\ apache/kafka:3.9.1 \\ bash -c \"/opt/kafka/bin/kafka-storage.sh format --cluster-id e4WdbK7oQq-FwMgLSTootQ --config /opt/kafka/config/kraft/server.properties --ignore-formatted\"

最后执行启动

docker-compose up -d

Kafka_kafka启动

Kafka_kafka启动

测试:

Kafka_kafka启动

批发市场信息