> 技术文档 > Java集成MQTT和Kafka实现稳定、可靠、高性能的物联网消息处理系统_kafka mqtt

Java集成MQTT和Kafka实现稳定、可靠、高性能的物联网消息处理系统_kafka mqtt


Java集成MQTT和Kafka实现高可用方案

1. 概述

在物联网(IoT)和分布式系统中,消息传递的可靠性和高可用性至关重要。本文将详细介绍如何使用Java集成MQTT和Kafka来构建一个高可用的消息处理系统。

MQTT(消息队列遥测传输)是一种轻量级的发布/订阅协议,适用于资源受限的设备和低带宽、高延迟网络。而Kafka是一个分布式流处理平台,提供高吞吐量、可扩展性和持久性。将两者结合,可以创建一个既能处理大量IoT设备连接,又能保证消息可靠传递和处理的系统。

2. 架构设计

我们的高可用架构设计如下:
在这里插入图片描述

主要组件:

  • MQTT集群:使用EMQ X等MQTT代理实现集群
  • Kafka集群:作为中央消息总线和持久化层
  • 桥接组件:将MQTT消息转发到Kafka
  • Java应用服务:处理和分析消息
  • 监控系统:确保整个系统的健康运行

3. Java集成MQTT实现

3.1 Maven依赖

<dependencies>  <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.5</version> </dependency>  <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> <version>5.5.15</version> </dependency>  <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> <version>2.7.8</version> </dependency></dependencies>

3.2 MQTT配置类

@Configurationpublic class MqttConfig {  @Value(\"${mqtt.broker.urls}\") private String[] brokerUrls; // 多个MQTT代理地址,用于故障转移 @Value(\"${mqtt.client.id}\") private String clientId; @Value(\"${mqtt.username}\") private String username; @Value(\"${mqtt.password}\") private String password; @Value(\"${mqtt.topics}\") private String[] topics; @Bean public MqttPahoClientFactory mqttClientFactory() {  DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); MqttConnectOptions options = new MqttConnectOptions(); // 设置多个服务器地址,实现故障转移 options.setServerURIs(brokerUrls); // 设置自动重连 options.setAutomaticReconnect(true); options.setKeepAliveInterval(30); options.setConnectionTimeout(30); // 设置遗嘱消息,当客户端异常断开时发送 options.setWill(\"clients/status\", (clientId + \": disconnected\").getBytes(), 1, true); if (username != null && !username.isEmpty()) {  options.setUserName(username); options.setPassword(password.toCharArray()); } // 设置清除会话,false表示客户端断开连接后,服务器保留其订阅信息 options.setCleanSession(false); factory.setConnectionOptions(options); return factory; } // 出站通道(用于发送消息) @Bean public MessageChannel mqttOutboundChannel() {  return new DirectChannel(); } // 出站消息处理器 @Bean @ServiceActivator(inputChannel = \"mqttOutboundChannel\") public MessageHandler mqttOutbound() {  MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId + \"-pub\", mqttClientFactory()); messageHandler.setAsync(true); messageHandler.setDefaultQos(1); return messageHandler; } // 入站通道(用于接收消息) @Bean public MessageChannel mqttInboundChannel() {  return new DirectChannel(); } // 入站消息适配器 @Bean public MessageProducer inbound() {  MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter( clientId + \"-sub\",  mqttClientFactory(),  topics); adapter.setCompletionTimeout(5000); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(1); adapter.setOutputChannel(mqttInboundChannel()); return adapter; }}

3.3 MQTT服务类

@Service@Slf4jpublic class MqttService {  private final MessageChannel mqttOutboundChannel; @Autowired public MqttService(MessageChannel mqttOutboundChannel) {  this.mqttOutboundChannel = mqttOutboundChannel; } // 发布消息到MQTT主题 public void publish(String topic, String payload) {  log.info(\"Publishing message to topic {}: {}\", topic, payload); Message<String> message = MessageBuilder .withPayload(payload) .setHeader(MqttHeaders.TOPIC, topic) .setHeader(MqttHeaders.QOS, 1) .setHeader(MqttHeaders.RETAINED, false) .build();  mqttOutboundChannel.send(message); } // 处理接收到的MQTT消息 @ServiceActivator(inputChannel = \"mqttInboundChannel\") public void handleMessage(Message<?> message) {  String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC); String payload = message.getPayload().toString(); log.info(\"Received message from topic {}: {}\", topic, payload); // 这里可以添加消息处理逻辑,或者转发到Kafka }}

4. Java集成Kafka实现

4.1 Maven依赖

<dependencies>  <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.3.2</version> </dependency>  <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.9.5</version> </dependency></dependencies>

4.2 Kafka配置类

@Configurationpublic class KafkaConfig {  @Value(\"${kafka.bootstrap.servers}\") private String bootstrapServers; @Value(\"${kafka.consumer.group.id}\") private String consumerGroupId; // Kafka生产者配置 @Bean public ProducerFactory<String, String> producerFactory() {  Map<String, Object> configProps = new HashMap<>(); // 设置Kafka集群地址 configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // 高可用配置 // acks=all表示所有副本都确认后才认为消息发送成功 configProps.put(ProducerConfig.ACKS_CONFIG, \"all\"); // 重试次数 configProps.put(ProducerConfig.RETRIES_CONFIG, 10); // 启用幂等性,确保消息不会重复发送 configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 批处理大小 configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768); // 批处理延迟 configProps.put(ProducerConfig.LINGER_MS_CONFIG, 20); // 缓冲区大小 configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); return new DefaultKafkaProducerFactory<>(configProps); } @Bean public KafkaTemplate<String, String> kafkaTemplate() {  return new KafkaTemplate<>(producerFactory()); } // Kafka消费者配置 @Bean public ConsumerFactory<String, String> consumerFactory() {  Map<String, Object> configProps = new HashMap<>(); configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); configProps.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId); configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // 高可用配置 // 自动提交偏移量 configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 从最早的消息开始消费 configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, \"earliest\"); // 最大拉取记录数 configProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); // 心跳间隔 configProps.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000); // 会话超时 configProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000); // 最大拉取间隔 configProps.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000); return new DefaultKafkaConsumerFactory<>(configProps); } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {  ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); // 设置并发消费者数量 factory.setConcurrency(3); // 批量消费 factory.setBatchListener(true); // 手动提交偏移量 factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); return factory; }}

4.3 Kafka服务类