> 技术文档 > Spring Boot 集成 Eclipse Mosquitto_springboot连接mosquitto

Spring Boot 集成 Eclipse Mosquitto_springboot连接mosquitto


文章目录

    • 添加 MQTT 客户端依赖
    • 配置 MQTT 连接参数
    • 实现 MQTT 客户端(发布 + 订阅)
      • MQTT 客户端配置类
      • 发布和订阅工具类
      • 测试 MQTT 功能

添加 MQTT 客户端依赖

在 Spring Boot 项目的 pom.xml 中添加 Eclipse Paho MQTT 客户端依赖(主流的 MQTT Java 客户端):

<dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.5</version></dependency>

配置 MQTT 连接参数

application.yml(或 application.properties)中配置 Mosquitto 连接信息:

mqtt: # 是否启用 enable: true # Mosquitto 服务地址(非加密端口),若启用 TLS 加密,使用 ssl://localhost:8883 broker: tcp://localhost:1883 # 客户端唯一标识(建议加随机数避免冲突) client-id: springboot-mqtt-client # 认证用户名(Mosquitto 启用认证时必填) username: user1 # 认证密码 password: 123456 # 默认 QoS 等级(0/1/2) defalut-qos: 1 # 心跳间隔(秒) keep-alive: 60

实现 MQTT 客户端(发布 + 订阅)

MQTT 客户端配置类

  • 配置实体类

    import lombok.Data;import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;import org.springframework.boot.context.properties.ConfigurationProperties;import org.springframework.cloud.context.config.annotation.RefreshScope;import org.springframework.stereotype.Component;@Data@Component@RefreshScope@ConfigurationProperties(prefix = \"mqtt\")@ConditionalOnProperty(name = \"mqtt.enable\", havingValue = \"true\")public class MqttProperties { /** * 是否启用 */ private boolean enable; /** * Mosquitto 服务地址(非加密端口)。若启用 TLS 加密,使用 ssl://localhost:8883 */ private String broker; /** * 客户端唯一标识(建议加随机数避免冲突) */ private String clientId; /** * 认证用户名(Mosquitto 启用认证时必填) */ private String username; /** * 认证密码 */ private String password; /** * 默认 QoS 等级(0/1/2),非关键数据用 QoS 0,重要状态用 QoS 1,核心控制指令用 QoS 2 */ private int defaultQos; /** * 心跳间隔(秒) */ private int keepAlive = 60;}
  • 配置类

    import lombok.extern.slf4j.Slf4j;import org.eclipse.paho.client.mqttv3.*;import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.util.StringUtils;@Slf4j@Configuration@ConditionalOnBean(MqttProperties.class)public class MqttConfig { private final MqttProperties mqttProp; public MqttConfig(MqttProperties mqttProp) { this.mqttProp = mqttProp; } /** * 创建 MQTT 客户端实例 */ @Bean public MqttClient mqttClient() throws MqttException { // 客户端 ID 建议添加随机数,避免重复连接 String clientIdWithRandom = mqttProp.getClientId() + \"_\" + System.currentTimeMillis(); MqttClient client = new MqttClient(mqttProp.getBroker(), clientIdWithRandom, new MemoryPersistence()); // 配置连接参数 MqttConnectOptions options = new MqttConnectOptions(); if (StringUtils.hasText((mqttProp.getUsername()))) options.setUserName(mqttProp.getUsername()); if (StringUtils.hasText((mqttProp.getPassword()))) options.setPassword(mqttProp.getPassword().toCharArray()); options.setKeepAliveInterval(mqttProp.getKeepAlive()); // 自动重连 options.setAutomaticReconnect(true); // 不清除会话(保留订阅关系和未确认消息 options.setCleanSession(false); // 连接回调(处理连接状态) client.setCallback(new MqttCallback() { /** * 连接断开时触发,可在此实现重连逻辑 */ @Override public void connectionLost(Throwable cause) { log.error(\"MQTT 连接断开,原因:{}\", cause.getMessage()); } /** * 收到订阅的消息时触发,用于处理业务逻辑(如存储数据到数据库) */ @Override public void messageArrived(String topic, MqttMessage message) throws Exception { // 接收消息回调(订阅的主题有消息时触发) String content = new String(message.getPayload()); log.debug(\"收到消息 - 主题:{},内容:{}\", topic, content); // TODO 业务逻辑 } /** * 消息发布完成后触发,可用于确认消息已送达 */ @Override public void deliveryComplete(IMqttDeliveryToken token) { // 消息发布完成回调 try {  log.debug(\"消息发布成功,主题:{}\", token.getTopics()[0]); } catch (Exception e) {  log.error(\"\", e); } } }); // 连接到 Mosquitto client.connect(options); log.info(\"MQTT 连接成功:{}\", mqttProp.getClientId()); return client; }}

发布和订阅工具类

  • 消息发布工具类

    import lombok.extern.slf4j.Slf4j;import org.eclipse.paho.client.mqttv3.MqttClient;import org.eclipse.paho.client.mqttv3.MqttException;import org.eclipse.paho.client.mqttv3.MqttMessage;import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;import org.springframework.stereotype.Component;/** * MQTT 消息发布工具类 */@Slf4j@Component@ConditionalOnBean(MqttProperties.class)public class MqttPublisher { private final MqttClient mqttClient; private final MqttProperties mqttProp; public MqttPublisher(MqttClient mqttClient, MqttProperties mqttProp) { this.mqttClient = mqttClient; this.mqttProp = mqttProp; } /** * 发布消息到指定主题 * @param topic 主题 * @param content 消息内容 */ public void publish(String topic, String content) throws MqttException { publish(topic, content, mqttProp.getDefaultQos()); } /** * 发布消息到指定主题(自定义QoS) * @param topic 主题 * @param content 消息内容 * @param qos QoS等级 */ public void publish(String topic, String content, int qos) throws MqttException { if (!mqttClient.isConnected()) { mqttClient.reconnect(); // 若断开连接,尝试重连 } log.debug(\"发布消息,主题:{},内容:{}, QoS等级:{}\", topic, content, qos); MqttMessage message = new MqttMessage(content.getBytes()); message.setQos(qos); mqttClient.publish(topic, message); }}
  • 消息订阅工具类

    import lombok.extern.slf4j.Slf4j;import org.eclipse.paho.client.mqttv3.MqttClient;import org.eclipse.paho.client.mqttv3.MqttException;import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;import org.springframework.stereotype.Component;/** * MQTT 消息订阅工具类 */@Slf4j@Component@ConditionalOnBean(MqttProperties.class)public class MqttSubscriber { private final MqttClient mqttClient; private final MqttProperties mqttProp; public MqttSubscriber(MqttClient mqttClient, MqttProperties mqttProp) { this.mqttClient = mqttClient; this.mqttProp = mqttProp; } /** * 订阅指定主题 * @param topic 主题(支持通配符,如 sensor/+) */ public void subscribe(String topic) throws MqttException { subscribe(topic, mqttProp.getDefaultQos()); } /** * 订阅指定主题(自定义QoS) * @param topic 主题 * @param qos QoS等级 */ public void subscribe(String topic, int qos) throws MqttException { if (!mqttClient.isConnected()) { mqttClient.reconnect(); } mqttClient.subscribe(topic, qos); log.info(\"已订阅主题:{},QoS等级:{}\", topic, qos); } /** * 取消订阅主题 * @param topic 主题 */ public void unsubscribe(String topic) throws MqttException { mqttClient.unsubscribe(topic); log.info(\"已取消订阅主题:{}\", topic); }}

测试 MQTT 功能

  • 创建一个测试控制器,验证消息发布和订阅:

    import com.blackcrow.test.mqtt.config.MqttPublisher;import com.blackcrow.test.mqtt.config.MqttSubscriber;import org.eclipse.paho.client.mqttv3.MqttException;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Value;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RequestParam;import org.springframework.web.bind.annotation.RestController;@RestControllerpublic class MqttTestController { @Autowired private MqttPublisher mqttPublisher; @Autowired private MqttSubscriber mqttSubscriber; @Value(\"${mqtt.default-topic:topic/temp}\") private String defaultTopic; /** * 订阅主题 */ @GetMapping(\"/subscribe\") public String subscribe(@RequestParam(required = false) String topic) { try { String targetTopic = topic != null ? topic : defaultTopic; mqttSubscriber.subscribe(targetTopic); return \"订阅成功:\" + targetTopic; } catch (MqttException e) { return \"订阅失败:\" + e.getMessage(); } } /** * 发布消息 */ @GetMapping(\"/publish\") public String publish(@RequestParam(required = false) String topic, @RequestParam String message) { try { String targetTopic = topic != null ? topic : defaultTopic; mqttPublisher.publish(targetTopic, message); return \"发布成功:主题=\" + targetTopic + \",消息=\" + message; } catch (MqttException e) { return \"发布失败:\" + e.getMessage(); } }}