> 技术文档 > SpringBoot3.x入门到精通系列:4.2 整合 Kafka 详解

SpringBoot3.x入门到精通系列:4.2 整合 Kafka 详解


SpringBoot 3.x 整合 Kafka 详解

🎯 Kafka简介

Apache Kafka是一个分布式流处理平台,主要用于构建实时数据管道和流应用程序。它具有高吞吐量、低延迟、可扩展性和容错性等特点。

核心概念

  • Producer: 生产者,发送消息到Kafka集群
  • Consumer: 消费者,从Kafka集群读取消息
  • Topic: 主题,消息的分类,类似于消息队列
  • Partition: 分区,Topic的物理分割,提高并行处理能力
  • Broker: 代理,Kafka集群中的服务器节点
  • Consumer Group: 消费者组,多个消费者组成的组,共同消费Topic
  • Offset: 偏移量,消息在分区中的位置标识

核心特性

  • 高吞吐量: 支持每秒数百万条消息
  • 低延迟: 毫秒级的消息传递延迟
  • 持久化: 消息持久化存储到磁盘
  • 分布式: 支持集群部署和水平扩展
  • 容错性: 支持数据复制和故障恢复

🚀 快速开始

1. 添加依赖

<dependencies>  <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>  <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>  <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency>  <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-validation</artifactId> </dependency>  <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency>  <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <scope>test</scope> </dependency></dependencies>

2. Kafka配置

spring: # Kafka配置 kafka: # Kafka服务器地址 bootstrap-servers: localhost:9092 # 生产者配置 producer: # 重试次数 retries: 3 # 批量发送大小 batch-size: 16384 # 缓冲区大小 buffer-memory: 33554432 # 键序列化器 key-serializer: org.apache.kafka.common.serialization.StringSerializer # 值序列化器 value-serializer: org.springframework.kafka.support.serializer.JsonSerializer # 确认模式 acks: all # 压缩类型 compression-type: gzip # 发送超时时间 properties: delivery.timeout.ms: 120000 request.timeout.ms: 30000 # 消费者配置 consumer: # 消费者组ID group-id: demo-group # 自动提交偏移量 enable-auto-commit: false # 自动提交间隔 auto-commit-interval: 1000 # 键反序列化器 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 值反序列化器 value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer # 从最早的消息开始消费 auto-offset-reset: earliest # 每次拉取的最大记录数 max-poll-records: 500 # 拉取超时时间 fetch-max-wait: 500 # JSON反序列化配置 properties: spring.json.trusted.packages: \"com.example.demo.dto\" spring.json.type.mapping: \"userEvent:com.example.demo.dto.UserEventDto,orderEvent:com.example.demo.dto.OrderEventDto\" # 监听器配置 listener: # 确认模式 ack-mode: manual_immediate # 并发数 concurrency: 3 # 轮询超时时间 poll-timeout: 3000 # 错误处理器 type: batch# 日志配置logging: level: org.springframework.kafka: DEBUG org.apache.kafka: DEBUG

🔧 Kafka配置类

package com.example.demo.config;import com.example.demo.dto.OrderEventDto;import com.example.demo.dto.UserEventDto;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.common.serialization.StringDeserializer;import org.apache.kafka.common.serialization.StringSerializer;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.kafka.annotation.EnableKafka;import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;import org.springframework.kafka.core.*;import org.springframework.kafka.listener.ContainerProperties;import org.springframework.kafka.support.serializer.JsonDeserializer;import org.springframework.kafka.support.serializer.JsonSerializer;import java.util.HashMap;import java.util.Map;@Configuration@EnableKafkapublic class KafkaConfig { @Value(\"${spring.kafka.bootstrap-servers}\") private String bootstrapServers; @Value(\"${spring.kafka.consumer.group-id}\") private String groupId; /** * 生产者配置 */ @Bean public ProducerFactory<String, Object> producerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); configProps.put(ProducerConfig.ACKS_CONFIG, \"all\"); configProps.put(ProducerConfig.RETRIES_CONFIG, 3); configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); configProps.put(ProducerConfig.LINGER_MS_CONFIG, 1); configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); configProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, \"gzip\"); return new DefaultKafkaProducerFactory<>(configProps); } /** * KafkaTemplate配置 */ @Bean public KafkaTemplate<String, Object> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } /** * 消费者配置 */ @Bean public ConsumerFactory<String, Object> consumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, \"earliest\"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500); // JSON反序列化配置 props.put(JsonDeserializer.TRUSTED_PACKAGES, \"com.example.demo.dto\"); props.put(JsonDeserializer.TYPE_MAPPINGS, \"userEvent:com.example.demo.dto.UserEventDto,orderEvent:com.example.demo.dto.OrderEventDto\"); return new DefaultKafkaConsumerFactory<>(props); } /** * 监听器容器工厂配置 */ @Bean public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); // 设置并发数 factory.setConcurrency(3); // 设置确认模式 factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); // 设置错误处理器 factory.setCommonErrorHandler(new org.springframework.kafka.listener.DefaultErrorHandler()); return factory; } /** * 用户事件消费者工厂 */ @Bean public ConsumerFactory<String, UserEventDto> userEventConsumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, \"user-event-group\"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, \"earliest\"); props.put(JsonDeserializer.TRUSTED_PACKAGES, \"com.example.demo.dto\"); props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, UserEventDto.class); return new DefaultKafkaConsumerFactory<>(props); } /** * 用户事件监听器容器工厂 */ @Bean public ConcurrentKafkaListenerContainerFactory<String, UserEventDto> userEventKafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, UserEventDto> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(userEventConsumerFactory()); factory.setConcurrency(2); factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); return factory; }}

📊 消息DTO类

1. 用户事件DTO

package com.example.demo.dto;import com.fasterxml.jackson.annotation.JsonFormat;import jakarta.validation.constraints.NotBlank;import jakarta.validation.constraints.NotNull;import java.time.LocalDateTime;public class UserEventDto { @NotBlank(message = \"事件ID不能为空\") private String eventId; @NotBlank(message = \"事件类型不能为空\") private String eventType; // CREATE, UPDATE, DELETE @NotNull(message = \"用户ID不能为空\") private Long userId; private String username; private String email; private String operation; private String operatorId; @JsonFormat(pattern = \"yyyy-MM-dd HH:mm:ss\") private LocalDateTime timestamp; private Object data; // 额外数据 // 构造函数 public UserEventDto() { this.timestamp = LocalDateTime.now(); } public UserEventDto(String eventId, String eventType, Long userId, String operation) { this(); this.eventId = eventId; this.eventType = eventType; this.userId = userId; this.operation = operation; } // Getter和Setter方法 public String getEventId() { return eventId; } public void setEventId(String eventId) { this.eventId = eventId; } public String getEventType() { return eventType; } public void setEventType(String eventType) { this.eventType = eventType; } public Long getUserId() { return userId; } public void setUserId(Long userId) { this.userId = userId; } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } public String getEmail() { return email; } public void setEmail(String email) { this.email = email; } public String getOperation() { return operation; } public void setOperation(String operation) { this.operation = operation; } public String getOperatorId() { return operatorId; } public void setOperatorId(String operatorId) { this.operatorId = operatorId; } public LocalDateTime getTimestamp() { return timestamp; } public void setTimestamp(LocalDateTime timestamp) { this.timestamp = timestamp; } public Object getData() { return data; } public void setData(Object data) { this.data = data; } @Override public String toString() { return \"UserEventDto{\" + \"eventId=\'\" + eventId + \'\\\'\' + \", eventType=\'\" + eventType + \'\\\'\' + \", userId=\" + userId + \", username=\'\" + username + \'\\\'\' + \", operation=\'\" + operation + \'\\\'\' + \", timestamp=\" + timestamp + \'}\'; }}

2. 订单事件DTO

package com.example.demo.dto;import com.fasterxml.jackson.annotation.JsonFormat;import jakarta.validation.constraints.NotBlank;import jakarta.validation.constraints.NotNull;import java.math.BigDecimal;import java.time.LocalDateTime;import java.util.List;public class OrderEventDto { @NotBlank(message = \"事件ID不能为空\") private String eventId; @NotBlank(message = \"事件类型不能为空\") private String eventType; // CREATED, PAID, SHIPPED, DELIVERED, CANCELLED @NotNull(message = \"订单ID不能为空\") private Long orderId; @NotNull(message = \"用户ID不能为空\") private Long userId; private String orderNo; private BigDecimal totalAmount; private String status; private List<OrderItem> items; @JsonFormat(pattern = \"yyyy-MM-dd HH:mm:ss\") private LocalDateTime timestamp; // 订单项 public static class OrderItem { private Long productId; private String productName; private Integer quantity; private BigDecimal price; // 构造函数 public OrderItem() {} public OrderItem(Long productId, String productName, Integer quantity, BigDecimal price) { this.productId = productId; this.productName = productName; this.quantity = quantity; this.price = price; } // Getter和Setter方法 public Long getProductId() { return productId; } public void setProductId(Long productId) { this.productId = productId; } public String getProductName() { return productName; } public void setProductName(String productName) { this.productName = productName; } public Integer getQuantity() { return quantity; } public void setQuantity(Integer quantity) { this.quantity = quantity; } public BigDecimal getPrice() { return price; } public void setPrice(BigDecimal price) { this.price = price; } } // 构造函数 public OrderEventDto() { this.timestamp = LocalDateTime.now(); } public OrderEventDto(String eventId, String eventType, Long orderId, Long userId) { this(); this.eventId = eventId; this.eventType = eventType; this.orderId = orderId; this.userId = userId; } // Getter和Setter方法 public String getEventId() { return eventId; } public void setEventId(String eventId) { this.eventId = eventId; } public String getEventType() { return eventType; } public void setEventType(String eventType) { this.eventType = eventType; } public Long getOrderId() { return orderId; } public void setOrderId(Long orderId) { this.orderId = orderId; } public Long getUserId() { return userId; } public void setUserId(Long userId) { this.userId = userId; } public String getOrderNo() { return orderNo; } public void setOrderNo(String orderNo) { this.orderNo = orderNo; } public BigDecimal getTotalAmount() { return totalAmount; } public void setTotalAmount(BigDecimal totalAmount) { this.totalAmount = totalAmount; } public String getStatus() { return status; } public void setStatus(String status) { this.status = status; } public List<OrderItem> getItems() { return items; } public void setItems(List<OrderItem> items) { this.items = items; } public LocalDateTime getTimestamp() { return timestamp; } public void setTimestamp(LocalDateTime timestamp) { this.timestamp = timestamp; } @Override public String toString() { return \"OrderEventDto{\" + \"eventId=\'\" + eventId + \'\\\'\' + \", eventType=\'\" + eventType + \'\\\'\' + \", orderId=\" + orderId + \", userId=\" + userId + \", orderNo=\'\" + orderNo + \'\\\'\' + \", totalAmount=\" + totalAmount + \", status=\'\" + status + \'\\\'\' + \", timestamp=\" + timestamp + \'}\'; }}

📤 消息生产者

package com.example.demo.service;import com.example.demo.dto.OrderEventDto;import com.example.demo.dto.UserEventDto;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.kafka.support.SendResult;import org.springframework.stereotype.Service;import java.util.UUID;import java.util.concurrent.CompletableFuture;@Servicepublic class KafkaProducerService { @Autowired private KafkaTemplate<String, Object> kafkaTemplate; // Topic名称常量 public static final String USER_EVENTS_TOPIC = \"user-events\"; public static final String ORDER_EVENTS_TOPIC = \"order-events\"; public static final String NOTIFICATION_TOPIC = \"notifications\"; /** * 发送用户事件 */ public void sendUserEvent(UserEventDto userEvent) { try { CompletableFuture<SendResult<String, Object>> future =  kafkaTemplate.send(USER_EVENTS_TOPIC, userEvent.getUserId().toString(), userEvent); future.whenComplete((result, ex) -> { if (ex == null) {  System.out.println(\"用户事件发送成功: \" + userEvent.getEventId() + \" with offset=[\" + result.getRecordMetadata().offset() + \"]\"); } else {  System.err.println(\"用户事件发送失败: \" + userEvent.getEventId() + \" \" + ex.getMessage()); } });  } catch (Exception e) { System.err.println(\"发送用户事件异常: \" + e.getMessage()); } } /** * 发送订单事件 */ public void sendOrderEvent(OrderEventDto orderEvent) { try { CompletableFuture<SendResult<String, Object>> future =  kafkaTemplate.send(ORDER_EVENTS_TOPIC, orderEvent.getOrderId().toString(), orderEvent); future.whenComplete((result, ex) -> { if (ex == null) {  System.out.println(\"订单事件发送成功: \" + orderEvent.getEventId() + \" with offset=[\" + result.getRecordMetadata().offset() + \"]\"); } else {  System.err.println(\"订单事件发送失败: \" + orderEvent.getEventId() + \" \" + ex.getMessage()); } });  } catch (Exception e) { System.err.println(\"发送订单事件异常: \" + e.getMessage()); } } /** * 发送通知消息 */ public void sendNotification(String message) { try { String messageId = UUID.randomUUID().toString(); CompletableFuture<SendResult<String, Object>> future =  kafkaTemplate.send(NOTIFICATION_TOPIC, messageId, message); future.whenComplete((result, ex) -> { if (ex == null) {  System.out.println(\"通知消息发送成功: \" + messageId + \" with offset=[\" + result.getRecordMetadata().offset() + \"]\"); } else {  System.err.println(\"通知消息发送失败: \" + messageId + \" \" + ex.getMessage()); } });  } catch (Exception e) { System.err.println(\"发送通知消息异常: \" + e.getMessage()); } } /** * 发送带分区的消息 */ public void sendMessageToPartition(String topic, int partition, String key, Object message) { try { CompletableFuture<SendResult<String, Object>> future =  kafkaTemplate.send(topic, partition, key, message); future.whenComplete((result, ex) -> { if (ex == null) {  System.out.println(\"消息发送到分区成功: partition=\" + partition + \" offset=[\" + result.getRecordMetadata().offset() + \"]\"); } else {  System.err.println(\"消息发送到分区失败: \" + ex.getMessage()); } });  } catch (Exception e) { System.err.println(\"发送分区消息异常: \" + e.getMessage()); } } /** * 批量发送消息 */ public void sendBatchMessages(String topic, java.util.List<Object> messages) { messages.forEach(message -> { String key = UUID.randomUUID().toString(); kafkaTemplate.send(topic, key, message); }); // 刷新缓冲区,确保消息立即发送 kafkaTemplate.flush(); System.out.println(\"批量发送 \" + messages.size() + \" 条消息完成\"); }}

📥 消息消费者

package com.example.demo.service;import com.example.demo.dto.OrderEventDto;import com.example.demo.dto.UserEventDto;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.kafka.support.Acknowledgment;import org.springframework.kafka.support.KafkaHeaders;import org.springframework.messaging.handler.annotation.Header;import org.springframework.messaging.handler.annotation.Payload;import org.springframework.stereotype.Service;import java.util.List;@Servicepublic class KafkaConsumerService { /** * 消费用户事件 */ @KafkaListener(topics = \"user-events\", groupId = \"user-event-group\",  containerFactory = \"userEventKafkaListenerContainerFactory\") public void consumeUserEvent( @Payload UserEventDto userEvent, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @Header(KafkaHeaders.RECEIVED_PARTITION) int partition, @Header(KafkaHeaders.OFFSET) long offset, Acknowledgment acknowledgment) { try { System.out.println(\"接收到用户事件: \" + userEvent); System.out.println(\"Topic: \" + topic + \", Partition: \" + partition + \", Offset: \" + offset); // 处理用户事件的业务逻辑 processUserEvent(userEvent); // 手动确认消息 acknowledgment.acknowledge(); System.out.println(\"用户事件处理完成: \" + userEvent.getEventId());  } catch (Exception e) { System.err.println(\"处理用户事件失败: \" + e.getMessage()); // 这里可以实现重试逻辑或将消息发送到死信队列 } } /** * 消费订单事件 */ @KafkaListener(topics = \"order-events\", groupId = \"order-event-group\") public void consumeOrderEvent( ConsumerRecord<String, OrderEventDto> record, Acknowledgment acknowledgment) { try { OrderEventDto orderEvent = record.value(); System.out.println(\"接收到订单事件: \" + orderEvent); System.out.println(\"Key: \" + record.key() + \", Partition: \" + record.partition() + \", Offset: \" + record.offset()); // 处理订单事件的业务逻辑 processOrderEvent(orderEvent); // 手动确认消息 acknowledgment.acknowledge(); System.out.println(\"订单事件处理完成: \" + orderEvent.getEventId());  } catch (Exception e) { System.err.println(\"处理订单事件失败: \" + e.getMessage()); } } /** * 消费通知消息 */ @KafkaListener(topics = \"notifications\", groupId = \"notification-group\") public void consumeNotification( @Payload String message, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, Acknowledgment acknowledgment) { try { System.out.println(\"接收到通知消息: \" + message); System.out.println(\"Message Key: \" + key); // 处理通知消息的业务逻辑 processNotification(message); // 手动确认消息 acknowledgment.acknowledge(); System.out.println(\"通知消息处理完成\");  } catch (Exception e) { System.err.println(\"处理通知消息失败: \" + e.getMessage()); } } /** * 批量消费消息 */ @KafkaListener(topics = \"batch-topic\", groupId = \"batch-group\") public void consumeBatchMessages( List<ConsumerRecord<String, Object>> records, Acknowledgment acknowledgment) { try { System.out.println(\"接收到批量消息,数量: \" + records.size()); for (ConsumerRecord<String, Object> record : records) { System.out.println(\"处理消息: Key=\" + record.key() + \", Value=\" + record.value() + \", Partition=\" + record.partition() + \", Offset=\" + record.offset()); // 处理单条消息 processBatchMessage(record.value()); } // 批量确认所有消息 acknowledgment.acknowledge(); System.out.println(\"批量消息处理完成\");  } catch (Exception e) { System.err.println(\"处理批量消息失败: \" + e.getMessage()); } } /** * 多Topic消费 */ @KafkaListener(topics = {\"user-events\", \"order-events\"}, groupId = \"multi-topic-group\") public void consumeMultiTopicEvents( ConsumerRecord<String, Object> record, Acknowledgment acknowledgment) { try { String topic = record.topic(); Object value = record.value(); System.out.println(\"接收到多Topic消息: Topic=\" + topic + \", Value=\" + value); // 根据Topic类型处理不同的消息 switch (topic) { case \"user-events\":  if (value instanceof UserEventDto) { processUserEvent((UserEventDto) value);  }  break; case \"order-events\":  if (value instanceof OrderEventDto) { processOrderEvent((OrderEventDto) value);  }  break; default:  System.out.println(\"未知Topic: \" + topic); } acknowledgment.acknowledge();  } catch (Exception e) { System.err.println(\"处理多Topic消息失败: \" + e.getMessage()); } } // 业务处理方法 private void processUserEvent(UserEventDto userEvent) { // 根据事件类型处理用户事件 switch (userEvent.getEventType()) { case \"CREATE\": System.out.println(\"处理用户创建事件: \" + userEvent.getUserId()); // 发送欢迎邮件、初始化用户数据等 break; case \"UPDATE\": System.out.println(\"处理用户更新事件: \" + userEvent.getUserId()); // 同步用户信息到其他系统 break; case \"DELETE\": System.out.println(\"处理用户删除事件: \" + userEvent.getUserId()); // 清理用户相关数据 break; default: System.out.println(\"未知用户事件类型: \" + userEvent.getEventType()); } } private void processOrderEvent(OrderEventDto orderEvent) { // 根据事件类型处理订单事件 switch (orderEvent.getEventType()) { case \"CREATED\": System.out.println(\"处理订单创建事件: \" + orderEvent.getOrderId()); // 库存扣减、发送确认邮件等 break; case \"PAID\": System.out.println(\"处理订单支付事件: \" + orderEvent.getOrderId()); // 更新订单状态、准备发货等 break; case \"SHIPPED\": System.out.println(\"处理订单发货事件: \" + orderEvent.getOrderId()); // 发送物流信息、更新状态等 break; case \"DELIVERED\": System.out.println(\"处理订单送达事件: \" + orderEvent.getOrderId()); // 确认收货、评价提醒等 break; case \"CANCELLED\": System.out.println(\"处理订单取消事件: \" + orderEvent.getOrderId()); // 退款处理、库存回滚等 break; default: System.out.println(\"未知订单事件类型: \" + orderEvent.getEventType()); } } private void processNotification(String message) { System.out.println(\"处理通知消息: \" + message); // 发送邮件、短信、推送通知等 } private void processBatchMessage(Object message) { System.out.println(\"处理批量消息项: \" + message); // 批量处理逻辑 }}

🎮 Controller层

package com.example.demo.controller;import com.example.demo.dto.OrderEventDto;import com.example.demo.dto.UserEventDto;import com.example.demo.service.KafkaProducerService;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.http.ResponseEntity;import org.springframework.web.bind.annotation.*;import jakarta.validation.Valid;import java.math.BigDecimal;import java.util.*;@RestController@RequestMapping(\"/api/kafka\")@CrossOrigin(origins = \"*\")public class KafkaController { @Autowired private KafkaProducerService kafkaProducerService; /** * 发送用户事件 */ @PostMapping(\"/user-events\") public ResponseEntity<Map<String, String>> sendUserEvent(@RequestBody @Valid UserEventDto userEvent) { kafkaProducerService.sendUserEvent(userEvent); Map<String, String> response = new HashMap<>(); response.put(\"status\", \"success\"); response.put(\"message\", \"用户事件发送成功\"); response.put(\"eventId\", userEvent.getEventId()); return ResponseEntity.ok(response); } /** * 发送订单事件 */ @PostMapping(\"/order-events\") public ResponseEntity<Map<String, String>> sendOrderEvent(@RequestBody @Valid OrderEventDto orderEvent) { kafkaProducerService.sendOrderEvent(orderEvent); Map<String, String> response = new HashMap<>(); response.put(\"status\", \"success\"); response.put(\"message\", \"订单事件发送成功\"); response.put(\"eventId\", orderEvent.getEventId()); return ResponseEntity.ok(response); } /** * 发送通知消息 */ @PostMapping(\"/notifications\") public ResponseEntity<Map<String, String>> sendNotification(@RequestBody Map<String, String> request) { String message = request.get(\"message\"); kafkaProducerService.sendNotification(message); Map<String, String> response = new HashMap<>(); response.put(\"status\", \"success\"); response.put(\"message\", \"通知消息发送成功\"); return ResponseEntity.ok(response); } /** * 快速创建用户事件 */ @PostMapping(\"/quick/user-event\") public ResponseEntity<Map<String, String>> quickUserEvent(@RequestBody Map<String, Object> request) { String eventType = (String) request.get(\"eventType\"); Long userId = Long.valueOf(request.get(\"userId\").toString()); String username = (String) request.get(\"username\"); String email = (String) request.get(\"email\"); UserEventDto userEvent = new UserEventDto( UUID.randomUUID().toString(), eventType, userId, \"API_OPERATION\" ); userEvent.setUsername(username); userEvent.setEmail(email); userEvent.setOperatorId(\"system\"); kafkaProducerService.sendUserEvent(userEvent); Map<String, String> response = new HashMap<>(); response.put(\"status\", \"success\"); response.put(\"message\", \"用户事件创建并发送成功\"); response.put(\"eventId\", userEvent.getEventId()); return ResponseEntity.ok(response); } /** * 快速创建订单事件 */ @PostMapping(\"/quick/order-event\") public ResponseEntity<Map<String, String>> quickOrderEvent(@RequestBody Map<String, Object> request) { String eventType = (String) request.get(\"eventType\"); Long orderId = Long.valueOf(request.get(\"orderId\").toString()); Long userId = Long.valueOf(request.get(\"userId\").toString()); String orderNo = (String) request.get(\"orderNo\"); BigDecimal totalAmount = new BigDecimal(request.get(\"totalAmount\").toString()); OrderEventDto orderEvent = new OrderEventDto( UUID.randomUUID().toString(), eventType, orderId, userId ); orderEvent.setOrderNo(orderNo); orderEvent.setTotalAmount(totalAmount); orderEvent.setStatus(eventType.toLowerCase()); kafkaProducerService.sendOrderEvent(orderEvent); Map<String, String> response = new HashMap<>(); response.put(\"status\", \"success\"); response.put(\"message\", \"订单事件创建并发送成功\"); response.put(\"eventId\", orderEvent.getEventId()); return ResponseEntity.ok(response); } /** * 批量发送消息 */ @PostMapping(\"/batch\") public ResponseEntity<Map<String, String>> sendBatchMessages(@RequestBody Map<String, Object> request) { String topic = (String) request.get(\"topic\"); @SuppressWarnings(\"unchecked\") List<String> messages = (List<String>) request.get(\"messages\"); List<Object> messageObjects = new ArrayList<>(messages); kafkaProducerService.sendBatchMessages(topic, messageObjects); Map<String, String> response = new HashMap<>(); response.put(\"status\", \"success\"); response.put(\"message\", \"批量消息发送成功\"); response.put(\"count\", String.valueOf(messages.size())); return ResponseEntity.ok(response); } /** * 发送到指定分区 */ @PostMapping(\"/partition\") public ResponseEntity<Map<String, String>> sendToPartition(@RequestBody Map<String, Object> request) { String topic = (String) request.get(\"topic\"); Integer partition = (Integer) request.get(\"partition\"); String key = (String) request.get(\"key\"); Object message = request.get(\"message\"); kafkaProducerService.sendMessageToPartition(topic, partition, key, message); Map<String, String> response = new HashMap<>(); response.put(\"status\", \"success\"); response.put(\"message\", \"消息发送到指定分区成功\"); response.put(\"partition\", partition.toString()); return ResponseEntity.ok(response); }}

📊 最佳实践

1. 消息设计

  • 设计合理的消息格式
  • 使用版本化的消息结构
  • 包含必要的元数据信息
  • 考虑消息的向后兼容性

2. 性能优化

  • 合理设置批量大小
  • 使用压缩减少网络传输
  • 优化序列化方式
  • 合理设置分区数量

3. 可靠性保证

  • 启用生产者确认机制
  • 实现消费者幂等性
  • 处理重复消息
  • 实现死信队列机制

4. 监控与运维

  • 监控消息积压情况
  • 跟踪消费者延迟
  • 监控集群健康状态
  • 实现告警机制

本文关键词: Kafka, 消息队列, 流处理, 分布式系统, 事件驱动, 微服务通信

文具用品