> 技术文档 > springboot使用kafka_springboot kafka

springboot使用kafka_springboot kafka


启动kafka

确保本地已安装并启动 Kafka 服务(或连接远程 Kafka 集群 ),比如通过 Kafka 官网下载解压后,启动 Zookeeper(老版本 Kafka 依赖,新版本用 KRaft 可不依赖 )和 Kafka 服务:

# 启动 Zookeeper(若用 KRaft 模式可跳过)

bin/zookeeper-server-start.sh config/zookeeper.properties

# 启动 Kafka 服务

bin/kafka-server-start.sh config/server.properties

版本:

Kafka 从2.8.0版本开始引入了 KIP-500,提供了无 Zookeeper 的早期访问功能1。不过,此时的实现并不完全,不建议在生产环境中使用。

3.0版本开始真正全面摒弃 Zookeeper,使用新的元数据管理方式 Kraft,提高了 Kafka 的可扩展性、可用性和性能4。

4.0版本是第一个完全无需 Apache Zookeeper 运行的重大版本,将不再支持以 ZK 模式运行或从 ZK 模式迁移。

项目引依赖

  org.apache.kafka kafka-clients 3.6.0  

创建 Producer 类(编写生产者代码)

import org.apache.kafka.clients.producer.*;import java.util.Properties;public class KafkaProducerDemo { public static void main(String[] args) { // 1. 配置 Kafka 连接、序列化等参数 Properties props = new Properties(); props.put(\"bootstrap.servers\", \"localhost:9092\"); // Kafka 集群地址 props.put(\"key.serializer\", \"org.apache.kafka.common.serialization.StringSerializer\"); // 键的序列化器 props.put(\"value.serializer\", \"org.apache.kafka.common.serialization.StringSerializer\"); // 值的序列化器 // 2. 创建 Producer 实例 Producer producer = new KafkaProducer(props); // 3. 构造消息(指定主题、键、值) String topic = \"test_topic\"; // 要发送到的主题,需提前在 Kafka 创建或允许自动创建 String key = \"key1\"; String value = \"Hello, Kafka from IDEA!\"; ProducerRecord record = new ProducerRecord(topic, key, value); // 4. 发送消息(异步发送 + 回调处理结果) producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null) {  System.err.println(\"消息发送失败:\" + exception.getMessage()); } else {  System.out.printf(\"消息发送成功!主题:%s,分区:%d,偏移量:%d%n\", metadata.topic(), metadata.partition(), metadata.offset()); } } }); // 5. 关闭 Producer(实际生产环境可能在程序结束时或合适时机关闭) producer.close(); }}
import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties; public class KafkaProducerExample { private final static String TOPIC = \"mytopic\"; private final static String BOOTSTRAP_SERVERS = \"localhost:9092\"; public static void main(String[] args) { Properties props = new Properties(); props.put(\"bootstrap.servers\", BOOTSTRAP_SERVERS); props.put(\"key.serializer\", \"org.apache.kafka.common.serialization.StringSerializer\"); props.put(\"value.serializer\", \"org.apache.kafka.common.serialization.StringSerializer\"); KafkaProducer producer = new KafkaProducer(props); try { for (int i = 0; i < 10; i++) { String message = \"Message \" + i; producer.send(new ProducerRecord(TOPIC, message)); } } catch (Exception e) { e.printStackTrace(); } finally { producer.close(); } }}

创建 Consumer 类(编写消费者代码)

import org.apache.kafka.clients.consumer.*;import java.time.Duration;import java.util.Collections;import java.util.Properties;public class KafkaConsumerDemo { public static void main(String[] args) { // 1. 配置 Kafka 连接、反序列化、消费者组等参数 Properties props = new Properties(); props.put(\"bootstrap.servers\", \"localhost:9092\"); // Kafka 集群地址 props.put(\"group.id\", \"test_group\"); // 消费者组 ID,同一组内消费者协调消费 props.put(\"key.deserializer\", \"org.apache.kafka.common.serialization.StringDeserializer\"); // 键的反序列化器 props.put(\"value.deserializer\", \"org.apache.kafka.common.serialization.StringDeserializer\"); // 值的反序列化器 props.put(\"auto.offset.reset\", \"earliest\"); // 没有已提交偏移量时,从最早消息开始消费 // 2. 创建 Consumer 实例 KafkaConsumer consumer = new KafkaConsumer(props); // 3. 订阅主题 String topic = \"test_topic\"; consumer.subscribe(Collections.singletonList(topic)); // 4. 循环拉取消息(长轮询) try { while (true) { ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord record : records) {  System.out.printf(\"收到消息:主题=%s,分区=%d,偏移量=%d,键=%s,值=%s%n\", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } // 手动提交偏移量(也可配置自动提交,生产环境建议手动更可靠) consumer.commitSync(); } } catch (Exception e) { e.printStackTrace(); } finally { // 5. 关闭 Consumer consumer.close(); } }}
import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Collections;import java.util.Properties; public class KafkaConsumerExample { private final static String TOPIC = \"mytopic\"; private final static String BOOTSTRAP_SERVERS = \"localhost:9092\"; private final static String GROUP_ID = \"mygroup\"; public static void main(String[] args) { Properties props = new Properties(); props.put(\"bootstrap.servers\", BOOTSTRAP_SERVERS); props.put(\"group.id\", GROUP_ID); props.put(\"key.deserializer\", \"org.apache.kafka.common.serialization.StringDeserializer\"); props.put(\"value.deserializer\", \"org.apache.kafka.common.serialization.StringDeserializer\"); KafkaConsumer consumer = new KafkaConsumer(props); consumer.subscribe(Collections.singletonList(TOPIC)); try { while (true) { ConsumerRecords records = consumer.poll(100); // 处理接收到的消息 records.forEach(record -> {  System.out.println(\"Received message: \" + record.value()); }); } } catch (Exception e) { e.printStackTrace(); } finally { consumer.close(); } }}

必须要素:

  1. 必要配置
    • bootstrap.servers:Kafka 集群地址。
    • group.id:消费者组 ID(相同组内的消费者会负载均衡消费)。
    • key.deserializer 和 value.deserializer:消息键和值的反序列化器。
    • auto.offset.reset:消费位置重置策略(如 earliest 从最早消息开始消费)。
  2. 订阅主题:通过 consumer.subscribe() 订阅目标主题。
  3. 消息消费:通过 consumer.poll() 轮询拉取消息,并处理 ConsumerRecords
  4. 偏移量管理:自动提交(enable.auto.commit=true)或手动提交(consumer.commitSync())消费偏移量。
  5. 资源管理:使用后调用 consumer.close() 关闭连接。

与 Kafka 的对比

Kafka的Producer和Consumer需要手动管理连接和资源的关闭,因此在使用完毕后需要调用close方法来关闭Producer(或Consumer)。

总结来说,可以使用KafkaProducer的send方法来替代RabbitTemplate的convertAndSend方法在Kafka中发送消息。

Spring AMQP 是 Spring 框架提供的一个用于简化 AMQP(Advanced Message Queuing Protocol) 消息中间件开发的模块。它基于 AMQP 协议,提供了一套高层抽象和模板类,帮助开发者更便捷地实现消息发送和接收,支持多种 AMQP 消息中间件(如 RabbitMQ、Apache Qpid 等)。

维度 Spring AMQP(RabbitMQ) Spring Kafka 协议 AMQP(高级消息队列协议) Kafka 自研协议 消息模型 支持多种交换器类型(Direct、Topic 等) 基于主题(Topic)和分区(Partition) 顺序性 单队列内保证顺序 分区内保证顺序,多分区需按 Key 路由 吞吐量 中等(万级 TPS) 高(十万级 TPS) 适用场景 企业集成、任务调度、事务性消息 大数据、日志收集、实时流处理