Springboot实战:优雅的整合Kafka发送及监听消息示例(附源码下载)
Springboot-cli 开发脚手架系列
Springboot优雅的整合Kafka发送及监听消息示例(附源码下载)
文章目录
- Springboot-cli 开发脚手架系列
- 前言
-
- 1. 环境
- 2. 提供者
- 3. 消费者
- 4. 效果演示
- 6. 源码分享
前言
致力于让开发者快速搭建基础环境并让应用跑起来,提供使用示例供使用者参考,让初学者快速上手。
- 项目源码github地址
- 项目源码国内gitee地址
1. 环境
pom.xml
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.80</version> </dependency>
2. 提供者
yml配置
bootstrap-servers 改为你kafka的实际地址
server: port: 9999spring: kafka: bootstrap-servers: 127.0.0.1:9092 producer: # 每次批量发送消息的数量 batch-size: 16 # 缓存容量 buffer-memory: 33554432 #设置大于0的值,则客户端会将发送失败的记录重新发送 retries: 0 # 指定消息key和消息体的编解码方式 UTF-8 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer
- 编写发送者
KafkaProducer.java
@Component@RequiredArgsConstructor@Slf4jpublic class KafkaProducer { private final KafkaTemplate<String, String> kafkaTemplate; /** * 发送json消息 * * @param topic 频道 * @param message 消息 */ public void send(String topic, String message) { kafkaTemplate.send(topic, message); log.info("send success"); }}
- 编写测试接口
ProducerController.java
@RestController@RequiredArgsConstructorpublic class ProducerController { private final KafkaProducer kafkaProducer; /** * 发送消息 */ @RequestMapping("/send") public String send(String topic, String message) { kafkaProducer.send(topic, message); return "success"; } /** * 发送消息 */ @PostMapping("/send") public String sendJson(@RequestBody JSONObject data) { String topic = data.getString("topic"); JSONObject message = data.getJSONObject("message"); kafkaProducer.send(topic, message.toJSONString()); return "success"; }}
3. 消费者
yml配置
bootstrap-servers改为你kafka的实际地址
spring: kafka: bootstrap-servers: 127.0.0.1:9092 consumer: # 默认的消费组ID group-id: javagroup # 是否自动提交offset enable-auto-commit: true # 提交offset延时(接收到消息后多久提交offset) auto-commit-interval: 100 # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 # none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常 auto-offset-reset: latest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializerkafka: # 订阅的主题 topic: test # 主题消费分组 group: group-test
- kafka消费者实现
KafkaConsumer.java
@Component@RequiredArgsConstructor@Slf4jpublic class KafkaConsumer { /** * 消费者配置,kafka.topic为监听的topic,kafka.group为消费分组,可在yml中修改 */ @KafkaListener(topics = "${kafka.topic}", groupId = "${kafka.group}") @Async public void kafkaListener(ConsumerRecord<String, String> consumerRecord) { String value = consumerRecord.value(); if (log.isInfoEnabled()) { log.info("读取到消息:offset {}, value {}", consumerRecord.offset(), value); } if (null == value) { log.error("kafka消费数据为空"); } }}
4. 效果演示
-
整体结构
-
启动提供者和消费者
-
浏览器输入
localhost:9999/send?message=hello&topic=test
6. 源码分享
本项目已收录
- Springboot-cli开发脚手架,集合各种常用框架使用案例,完善的文档,致力于让开发者快速搭建基础环境并让应用跑起来,并提供丰富的使用示例供使用者参考,帮助初学者快速上手。
- 项目源码github地址
- 项目源码国内gitee地址