Kafka 在 java 中的基本使用_java kafka
文章目录
- 1. 使用 kafka 原生客户端
- 2. Kafka 集成 springboot
1. 使用 kafka 原生客户端
现在基本都直接使用 springboot 版本,但了解原生客户端,能更好的理解 springboot 版的 kafka 客户端原理。
步骤1:pom 引入核心依赖:
引入依赖时,尽量选择和 kafka 版本对应的依赖版本。
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.13</artifactId> <version>4.0.0</version></dependency>
步骤2:提供者客户端代码:
提供者客户端要做三件事:
- 设置提供者客户端属性(可选属性都被定义在 ProducerConfig 类中)
- 设置要发送的消息
- 发送(有三种发送方式,下面代码中都有)
public class MyProducer { public static void main(String[] args) throws ExecutionException, InterruptedException { // 第一步:设置提供者属性 Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, \"192.168.2.28:9092\"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); try (Producer<String, String> producer = new KafkaProducer<>(props)) { // 第二步:设置要发送的消息 ProducerRecord<String, String> record = new ProducerRecord<>(\"testTopic\", \"testKey\", \"testValue\"); // 第三部:发送消息 // send(producer, record); // sendSync(producer, record); sendASync(producer, record); } } /** * 发送方式1:单向推送,不关心服务器的应答 */ private static void send(Producer<String, String> producer, ProducerRecord<String, String> record) { producer.send(record); } /** * 发送方式2:同步推送,得到服务器的应答前会阻塞当前线程 */ private static void sendSync(Producer<String, String> producer, ProducerRecord<String, String> record) throws ExecutionException, InterruptedException { RecordMetadata metadata = producer.send(record).get(); System.out.println(metadata.topic()); System.out.println(metadata.partition()); System.out.println(metadata.offset()); } /** * 发送方式3:异步推送,不需等待服务器应答,当服务器有应答后会触发函数回调 */ private static void sendASync(Producer<String, String> producer, ProducerRecord<String, String> record) { producer.send(record, (metadata, exception) -> { if (exception != null) { throw new RuntimeException(\"向 kafka 推送失败\", exception); } System.out.println(metadata.topic()); System.out.println(metadata.partition()); System.out.println(metadata.offset()); }); }}
步骤3:消费者客户端代码:
消费者客户端要做三件事:
- 设置消费者客户端属性(可选属性都被定义在 ConsumerConfig 类中)
- 设置消费者订阅的主题
- 拉取消息
- 提交 offset(有两种提交方式,下面代码中都有)
public class MyConsumer { public static void main(String[] args) { // 第一步:设置消费者属性 Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, \"192.168.2.28:9092\"); props.put(ConsumerConfig.GROUP_ID_CONFIG, \"testGroup\"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); try (Consumer<String, String> consumer = new KafkaConsumer<>(props)) { // 第二步:设置要订阅的主题 consumer.subscribe(Collections.singletonList(\"testTopic\")); while (true) { // 第三步:拉取消息,100 代表最大等待时间,如果时间到了还没有拉取到消息就不阻塞了继续往后执行 ConsumerRecords<String, String> records = consumer.poll(Duration.ofNanos(100)); for (ConsumerRecord<String, String> record : records) { System.out.println(record.value()); } // 第四步:提交 offset // consumer.commitSync(); // 同步提交,表示必须等到 offset 提交完毕,再去消费下⼀批数据 consumer.commitSync(); // 异步提交,表示发送完提交 offset 请求后,就开始消费下⼀批数据了。不⽤等到Broker的确认。 } } }}
2. Kafka 集成 springboot
springboot 版本是最常用的,比原生客户端使用方便。但是道理是一样的,底层也是原生客户端。
pom 引入核心依赖:
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>3.1.0</version></parent><dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency></dependencies>
yaml 配置文件:
这一步无非就是把原生客户端中的属性配置,写在 yaml 中
spring: kafka: bootstrap-servers: 192.168.2.28:9092 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: group-id: testGroup key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
提供者客户端代码:
只需要两步:
- 注入 KafkaTemplate
- 发送
@RestControllerpublic class ProducerController { /** * kafka */ private KafkaTemplate<String, Object> kafkaTemplate; @Autowired public void setKafkaTemplate(KafkaTemplate<String, Object> kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } @GetMapping(\"/test\") public void send() { // 发送 kafka 消息 kafkaTemplate.send(\"testTopic\", \"testKey\", \"testValue\"); }}
消费者客户端代码:
只需要监听主题就可以
@RestControllerpublic class ConsumerController { // 监听 kafka 消息 @KafkaListener(topics = {\"testTopic\"}) public void test(ConsumerRecord<?, ?> record) { System.out.println(record.value()); }}