> 技术文档 > Spring Boot整合Kafka实战指南:从环境搭建到消息处理全解析_springboot整合kafka

Spring Boot整合Kafka实战指南:从环境搭建到消息处理全解析_springboot整合kafka


一、环境准备

  1. 安装 Kafka

    • 下载 Kafka:从 Apache Kafka 官网下载对应版本的 Kafka。

    • 解压并启动 Kafka:

      # 启动 Zookeeper(Kafka 依赖 Zookeeper)bin/zookeeper-server-start.sh config/zookeeper.properties# 启动 Kafkabin/kafka-server-start.sh config/server.properties
  2. 创建主题

    bin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
  3. 添加依赖 在 Spring Boot 项目的 pom.xml 文件中添加 Kafka 依赖:

     org.springframework.boot spring-boot-starter-kafka

二、基础配置(YML)

application.yml 文件中配置 Kafka 的基本参数:

spring: kafka: bootstrap-servers: localhost:9092 consumer: group-id: test-group auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer
  • bootstrap-servers:Kafka 服务的地址。

  • group-id消费者组 ID,同一组的消费者消费不同的分区。

  • auto-offset-reset:当 Kafka 中没有初始偏移量或偏移量超出范围时的处理方式,可选值为 earliest(从最早记录开始读取)、latest(从最新记录开始读取)或 none(抛出异常)。

三、生产者与消费者代码实现

1. 生产者

import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.stereotype.Component;@Componentpublic class KafkaProducer { @Autowired private KafkaTemplate kafkaTemplate; public void sendMessage(String topic, String message) { kafkaTemplate.send(topic, message); }}
  • KafkaTemplate:Spring 提供的 Kafka 操作类,用于发送消息

2. 消费者

import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Component;@Componentpublic class KafkaConsumer { @KafkaListener(topics = \"test-topic\") public void receiveMessage(String message) { System.out.println(\"Received message: \" + message); }}
  • @KafkaListener:指定监听的 Kafka 主题。

四、高级用法

1. 消息分区策略

自定义分区器
import org.apache.kafka.clients.producer.Partitioner;import org.apache.kafka.common.Cluster;import java.util.Map;public class CustomPartitioner implements Partitioner { @Override public void configure(Map configs) { // 配置初始化 } @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { // 自定义分区逻辑,例如根据 key 的哈希值取模 return key.hashCode() % cluster.partitionCountForTopic(topic); } @Override public void close() { // 关闭资源 }}

application.yml 中配置自定义分区器:

spring: kafka: producer: partitioner-class: com.example.CustomPartitioner

2. 消息序列化与反序列化

自定义序列化器
import org.apache.kafka.common.serialization.Serializer;public class CustomSerializer implements Serializer { @Override public byte[] serialize(String topic, MyObject data) { // 自定义序列化逻辑,例如使用 JSON 序列化 return data.toJson().getBytes(); }}

application.yml 中配置自定义序列化器:

spring: kafka: producer: value-serializer: com.example.CustomSerializer

3. 消费者并发处理

application.yml 中配置消费者并发数:

spring: kafka: listener: concurrency: 3
  • concurrency:每个消费者线程的并发数,可以根据需求调整。

4. 消费者手动提交偏移量

import org.springframework.kafka.annotation.KafkaListener;import org.springframework.kafka.support.Acknowledgment;import org.springframework.stereotype.Component;@Componentpublic class KafkaConsumer { @KafkaListener(topics = \"test-topic\") public void receiveMessage(String message, Acknowledgment acknowledgment) { System.out.println(\"Received message: \" + message); // 处理完消息后手动提交偏移量 acknowledgment.acknowledge(); }}
  • Acknowledgment:手动提交偏移量的接口。

5. 消费者拦截器

自定义拦截器
import org.apache.kafka.clients.consumer.ConsumerInterceptor;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.OffsetAndMetadata;import org.apache.kafka.common.TopicPartition;import java.util.HashMap;import java.util.Map;public class CustomConsumerInterceptor implements ConsumerInterceptor { @Override public ConsumerRecords onConsume(ConsumerRecords records) { // 消费前的逻辑,例如过滤消息 return records; } @Override public void onCommit(Map offsets) { // 提交偏移量前的逻辑 } @Override public void close() { // 关闭资源 } @Override public void configure(Map configs) { // 配置初始化 }}

application.yml 中配置自定义拦截器:

spring: kafka: consumer: interceptor-classes: com.example.CustomConsumerInterceptor

6. 生产者拦截器

自定义拦截器
import org.apache.kafka.clients.producer.ProducerInterceptor;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Map;public class CustomProducerInterceptor implements ProducerInterceptor { @Override public ProducerRecord onSend(ProducerRecord record) { // 发送前的逻辑,例如修改消息内容 return record; } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { // 消息发送后的逻辑 } @Override public void close() { // 关闭资源 } @Override public void configure(Map configs) { // 配置初始化 }}

application.yml 中配置自定义拦截器:

spring: kafka: producer: interceptor-classes: com.example.CustomProducerInterceptor

7. 消费者重试机制

application.yml 中配置消费者重试机制:

spring: kafka: listener: retry: initial-interval: 1000ms max-attempts: 3 back-off-multiplier: 2.0 max-interval: 10000ms
  • initial-interval:初始重试间隔。

  • max-attempts:最大重试次数。

  • back-off-multiplier:重试间隔的倍数。

  • max-interval:最大重试间隔。

8. 消费者死信队列

在 Kafka 中创建死信队列主题:

bin/kafka-topics.sh --create --topic dlq-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1

application.yml 中配置死信队列:

spring: kafka: listener: dead-letter: topic: dlq-topic

9. 消息过滤

application.yml 中配置消息过滤:

spring: kafka: listener: filter: expression: headers[\'type\'] == \'important\'
  • expression:SpEL 表达式,用于过滤消息。

10. 消费者负载均衡

Spring Boot 默认支持 Kafka 的消费者负载均衡,无需额外配置。只需在同一个消费者组中启动多个