springboot使用kafka_springboot kafka
启动kafka
确保本地已安装并启动 Kafka 服务(或连接远程 Kafka 集群 ),比如通过 Kafka 官网下载解压后,启动 Zookeeper(老版本 Kafka 依赖,新版本用 KRaft 可不依赖 )和 Kafka 服务:
# 启动 Zookeeper(若用 KRaft 模式可跳过)
bin/zookeeper-server-start.sh config/zookeeper.properties
# 启动 Kafka 服务
bin/kafka-server-start.sh config/server.properties
版本:
Kafka 从2.8.0版本开始引入了 KIP-500,提供了无 Zookeeper 的早期访问功能1。不过,此时的实现并不完全,不建议在生产环境中使用。
3.0版本开始真正全面摒弃 Zookeeper,使用新的元数据管理方式 Kraft,提高了 Kafka 的可扩展性、可用性和性能4。
4.0版本是第一个完全无需 Apache Zookeeper 运行的重大版本,将不再支持以 ZK 模式运行或从 ZK 模式迁移。
项目引依赖
org.apache.kafka kafka-clients 3.6.0
创建 Producer 类(编写生产者代码)
import org.apache.kafka.clients.producer.*;import java.util.Properties;public class KafkaProducerDemo { public static void main(String[] args) { // 1. 配置 Kafka 连接、序列化等参数 Properties props = new Properties(); props.put(\"bootstrap.servers\", \"localhost:9092\"); // Kafka 集群地址 props.put(\"key.serializer\", \"org.apache.kafka.common.serialization.StringSerializer\"); // 键的序列化器 props.put(\"value.serializer\", \"org.apache.kafka.common.serialization.StringSerializer\"); // 值的序列化器 // 2. 创建 Producer 实例 Producer producer = new KafkaProducer(props); // 3. 构造消息(指定主题、键、值) String topic = \"test_topic\"; // 要发送到的主题,需提前在 Kafka 创建或允许自动创建 String key = \"key1\"; String value = \"Hello, Kafka from IDEA!\"; ProducerRecord record = new ProducerRecord(topic, key, value); // 4. 发送消息(异步发送 + 回调处理结果) producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null) { System.err.println(\"消息发送失败:\" + exception.getMessage()); } else { System.out.printf(\"消息发送成功!主题:%s,分区:%d,偏移量:%d%n\", metadata.topic(), metadata.partition(), metadata.offset()); } } }); // 5. 关闭 Producer(实际生产环境可能在程序结束时或合适时机关闭) producer.close(); }}
import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties; public class KafkaProducerExample { private final static String TOPIC = \"mytopic\"; private final static String BOOTSTRAP_SERVERS = \"localhost:9092\"; public static void main(String[] args) { Properties props = new Properties(); props.put(\"bootstrap.servers\", BOOTSTRAP_SERVERS); props.put(\"key.serializer\", \"org.apache.kafka.common.serialization.StringSerializer\"); props.put(\"value.serializer\", \"org.apache.kafka.common.serialization.StringSerializer\"); KafkaProducer producer = new KafkaProducer(props); try { for (int i = 0; i < 10; i++) { String message = \"Message \" + i; producer.send(new ProducerRecord(TOPIC, message)); } } catch (Exception e) { e.printStackTrace(); } finally { producer.close(); } }}
创建 Consumer 类(编写消费者代码)
import org.apache.kafka.clients.consumer.*;import java.time.Duration;import java.util.Collections;import java.util.Properties;public class KafkaConsumerDemo { public static void main(String[] args) { // 1. 配置 Kafka 连接、反序列化、消费者组等参数 Properties props = new Properties(); props.put(\"bootstrap.servers\", \"localhost:9092\"); // Kafka 集群地址 props.put(\"group.id\", \"test_group\"); // 消费者组 ID,同一组内消费者协调消费 props.put(\"key.deserializer\", \"org.apache.kafka.common.serialization.StringDeserializer\"); // 键的反序列化器 props.put(\"value.deserializer\", \"org.apache.kafka.common.serialization.StringDeserializer\"); // 值的反序列化器 props.put(\"auto.offset.reset\", \"earliest\"); // 没有已提交偏移量时,从最早消息开始消费 // 2. 创建 Consumer 实例 KafkaConsumer consumer = new KafkaConsumer(props); // 3. 订阅主题 String topic = \"test_topic\"; consumer.subscribe(Collections.singletonList(topic)); // 4. 循环拉取消息(长轮询) try { while (true) { ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord record : records) { System.out.printf(\"收到消息:主题=%s,分区=%d,偏移量=%d,键=%s,值=%s%n\", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } // 手动提交偏移量(也可配置自动提交,生产环境建议手动更可靠) consumer.commitSync(); } } catch (Exception e) { e.printStackTrace(); } finally { // 5. 关闭 Consumer consumer.close(); } }}
import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Collections;import java.util.Properties; public class KafkaConsumerExample { private final static String TOPIC = \"mytopic\"; private final static String BOOTSTRAP_SERVERS = \"localhost:9092\"; private final static String GROUP_ID = \"mygroup\"; public static void main(String[] args) { Properties props = new Properties(); props.put(\"bootstrap.servers\", BOOTSTRAP_SERVERS); props.put(\"group.id\", GROUP_ID); props.put(\"key.deserializer\", \"org.apache.kafka.common.serialization.StringDeserializer\"); props.put(\"value.deserializer\", \"org.apache.kafka.common.serialization.StringDeserializer\"); KafkaConsumer consumer = new KafkaConsumer(props); consumer.subscribe(Collections.singletonList(TOPIC)); try { while (true) { ConsumerRecords records = consumer.poll(100); // 处理接收到的消息 records.forEach(record -> { System.out.println(\"Received message: \" + record.value()); }); } } catch (Exception e) { e.printStackTrace(); } finally { consumer.close(); } }}
必须要素:
- 必要配置:
bootstrap.servers
:Kafka 集群地址。group.id
:消费者组 ID(相同组内的消费者会负载均衡消费)。key.deserializer
和value.deserializer
:消息键和值的反序列化器。auto.offset.reset
:消费位置重置策略(如earliest
从最早消息开始消费)。
- 订阅主题:通过
consumer.subscribe()
订阅目标主题。 - 消息消费:通过
consumer.poll()
轮询拉取消息,并处理ConsumerRecords
。 - 偏移量管理:自动提交(
enable.auto.commit=true
)或手动提交(consumer.commitSync()
)消费偏移量。 - 资源管理:使用后调用
consumer.close()
关闭连接。
与 Kafka 的对比
Kafka的Producer和Consumer需要手动管理连接和资源的关闭,因此在使用完毕后需要调用close方法来关闭Producer(或Consumer)。
总结来说,可以使用KafkaProducer的send方法来替代RabbitTemplate的convertAndSend方法在Kafka中发送消息。
Spring AMQP 是 Spring 框架提供的一个用于简化 AMQP(Advanced Message Queuing Protocol) 消息中间件开发的模块。它基于 AMQP 协议,提供了一套高层抽象和模板类,帮助开发者更便捷地实现消息发送和接收,支持多种 AMQP 消息中间件(如 RabbitMQ、Apache Qpid 等)。