> 技术文档 > MQTT协议介绍&Linux中配置Mosquitto

MQTT协议介绍&Linux中配置Mosquitto

目录

MQTT协议介绍

1. 核心思想:发布/订阅 (Publish/Subscribe) 模式

2. MQTT 的核心概念

3. 典型应用场景

配置 Mosquitto

安装 Mosquitto MQTT 代理

mosquitto.conf文件的编辑

创建密码文件&添加用户

重启 Mosquitto 服务

测试 MQTT 连接

a. 打开一个终端窗口订阅主题

b. 打开另一个终端窗口发布消息

STM32连接MQTT服务器

代码实现

代码解释

AT+MQTTUSERCFG:配置 MQTT 用户参数

AT+MQTTCONN:连接 MQTT 服务器

关于服务器端的显示

MQTT的订阅和发布

代码实现

订阅主题

发布主题

持续接收来自MQTT中发布的信息

实现思路:

代码实现


MQTT协议介绍

MQTT 是一种轻量级的、发布/订阅模式的消息传输协议。

1. 核心思想:发布/订阅 (Publish/Subscribe) 模式

这是 MQTT 最重要的特点。它与传统的客户端/服务器模式不同:

  • 传统客户端/服务器: 客户端直接向服务器请求数据,服务器响应。是一对一的请求-响应模式。

  • 发布/订阅模式:

    • 消息发布者 (Publisher): 发布消息到特定的“主题”(Topic)。

    • 消息订阅者 (Subscriber): 订阅自己感兴趣的“主题”。

    • 消息代理/服务器 (Broker): 作为消息的中心枢纽。发布者将消息发送给 Broker,Broker 再将消息转发给所有订阅了该主题的订阅者。

    • 优点: 发布者和订阅者之间解耦,彼此不知道对方的存在,只需知道 Broker 和共同的主题。这大大提高了系统的灵活性和可伸缩性。

2. MQTT 的核心概念

  • 主题 (Topic): 类似文件路径的字符串,用于分类消息。订阅者通过订阅特定主题来接收消息。例如:home/livingroom/temperature

  • 客户端 (Client): 任何连接到 MQTT Broker 的设备或应用程序。

  • 服务器/代理 (Broker): 负责接收、过滤和路由消息的中心节点。

  • 服务质量 (QoS): 定义了消息传递的可靠性级别:

    • QoS 0 (At most once): 最多一次,消息可能丢失,不重传。

    • QoS 1 (At least once): 至少一次,消息至少送达一次,可能重复。有重传机制。

    • QoS 2 (Exactly once): 恰好一次,消息只送达一次,不丢失不重复。最可靠,但开销最大。

  • 连接 (Connect/Disconnect): 客户端与 Broker 建立或断开 TCP 连接。

  • 发布 (Publish): 客户端向某个主题发送消息。

  • 订阅 (Subscribe): 客户端向 Broker 注册,表示对某个主题的消息感兴趣。

  • 取消订阅 (Unsubscribe): 客户端通知 Broker 不再接收某个主题的消息。

3. 典型应用场景

  • 物联网 (IoT): 传感器数据采集、设备远程控制、智能家居、车联网。

  • 移动应用: 推送通知、聊天应用。

  • 工业控制: 机器数据监控、SCADA 系统。

  • 智能穿戴设备: 低功耗、间歇性连接。

配置 Mosquitto

安装 Mosquitto MQTT 代理

sudo apt updatesudo apt install mosquitto mosquitto-clients

mosquitto 是 MQTT 代理服务器本身。

mosquitto-clients 包含一些客户端工具,如 mosquitto_pub(发布消息)和 mosquitto_sub(订阅消息),方便你测试。

安装完成后,Mosquitto 服务通常会自动启动。你可以检查其状态:

sudo systemctl status mosquitto

如果它正在运行,你会看到“active (running)”的输出

Mosquitto 的主配置文件通常位于/etc/mosquitto/mosquitto.conf。在进行任何修改之前,建议备份原始配置文件:

sudo cp /etc/mosquitto/mosquitto.conf /etc/mosquitto/mosquitto.conf.bak

现在,你可以编辑配置文件:mosquitto.conf

vim /etc/mosquitto/mosquitto.conf

mosquitto.conf文件的编辑

禁用匿名访问并启用密码认证 (推荐)

默认情况下,Mosquitto 可能允许匿名连接。这意味着任何客户端都可以连接到代理并发布/订阅消息,而无需提供用户名和密码。

出于安全考虑,强烈建议禁用匿名访问,并要求所有客户端进行身份验证。

确保 allow_anonymous 设置为 false

allow_anonymous false

指定密码文件:

你需要告诉 Mosquitto 使用哪个文件来存储用户凭据。

password_file /etc/mosquitto/passwd

你可以将 passwd 替换为其他文件名,但通常使用 passwd。

配置监听端口

listener 1883

配置好的 mosquitto.conf文件中的内容如下:

创建密码文件&添加用户

创建密码文件 使用 mosquitto_passwd 工具来创建并管理密码文件。第一次创建时需要加 -c 参数,后续添加用户时不需要。

创建密码文件并添加第一个用户 (例如 myuser):

sudo mosquitto_passwd -c /etc/mosquitto/passwd myuser

系统会提示你输入并确认密码。

这个密码是为 MQTT 客户端用户 myuser 设置的密码,将来客户端连接 Mosquitto 代理时,需要使用这个密码进行认证。这个密码会加密后存储在 /etc/mosquitto/passwd 文件中

注意: 你可能会看到一个警告,提示文件权限不安全。这是正常的,因为你刚刚创建了一个新文件。请继续下一步来修复它。

添加更多用户 (例如 anotheruser)

sudo mosquitto_passwd /etc/mosquitto/passwd anotheruser

(注意:这次没有 -c 参数,否则会覆盖现有文件。)

同样的,添加新用户时,mosquitto_passwd 工具会提示我们输入并确认新用户(anotheruser)的密码

重启 Mosquitto 服务

修改了mosquitto.conf文件后,你需要重启 Mosquitto 服务以使更改生效

sudo systemctl restart mosquitto

检查服务状态确保它已成功启动:

sudo systemctl status mosquitto

测试 MQTT 连接

安装 mosquitto-clients 后,你可以使用 mosquitto_pub 和 mosquitto_sub 来测试你的 MQTT 代理。

a. 打开一个终端窗口订阅主题

mosquitto_sub -h localhost -t \"topicA\" -u \"myuser\" -P \"mypassword\"

-h localhost: 连接到本地 Mosquitto 代理。

-t \"topicA\": 订阅名为 \"topicA\" 的主题。

-u \"myuser\": 指定用户名。

-P \"mypassword\": 指定该用户对应的密码。

b. 打开另一个终端窗口发布消息

mosquitto_pub -h localhost -t \"topicA\" -m \"Hello, MQTT!\" -u \"myuser\" -P \"mypassword\"

-m \"Hello, MQTT!\": 发布消息 \"Hello, MQTT!\"。

如果你在订阅窗口看到了 \"Hello, MQTT!\" 消息,那么你的 Mosquitto 代理就已经配置成功并正常工作了!

STM32连接MQTT服务器

代码实现

在上一篇文章中,我们已经实现了通过ESP8266连接WiFi这个任务,接下来我们在上一篇文章的基础上来进一步实现连接MQTT服务器。

ESP8266详解-CSDN博客

由于前文ESP8266.c中定义的函数的良好铺垫,这里加上连接MQTT连接的逻辑只需要修改我们的main函数。其他部分的代码和之前的都一样!!!

 前面的代码和上一篇文章中写过的代码相同// 3. 连接 Wi-Fi printf(\"Connecting to WiFi...\\r\\n\"); char wifi_cmd[100]; // 注意:使用真实 Wi-Fi 名称和密码,确保双引号正确转义 sprintf(wifi_cmd, \"AT+CWJAP=\\\"%s\\\",\\\"%s\\\"\", \"cyz_WiFi\", \"12345678\"); if (ESP8266_SendATCommand(wifi_cmd, 15000, \"OK\") == 1) { // 连接 Wi-Fi 可能需要较长时间 printf(\"WiFi connected successfully!\\r\\n\"); // 获取并打印 IP 地址 (可选) ESP8266_SendATCommand(\"AT+CIFSR\", 2000, \"OK\"); HAL_Delay(500);// --- MQTT 连接配置开始 ---// 4. Wi-Fi连接成功后,配置连接 MQTT 服务器的参数 (AT+MQTTUSERCFG)// 参数: linkID, type, \"clientID\", \"username\", \"password\", cert_id, key_id// linkID: 通常为0 for单连接; type: 0 for TCP; cert_id/key_id: for SSL/TLS, here 0 for non-SSLprintf(\"Configuring MQTT user parameters...\\r\\n\");char mqtt_user_cfg_cmd[200];sprintf(mqtt_user_cfg_cmd, \"AT+MQTTUSERCFG=0,1,\\\"%s\\\",\\\"%s\\\",\\\"%s\\\",0,0,\\\" \\\"\",  \"ESP8266_Client_01\", \"myuser\", \"123456\");if (ESP8266_SendATCommand(mqtt_user_cfg_cmd, 2000, \"OK\") == 1) {printf(\"MQTT user parameters configured.\\r\\n\");} else {printf(\"Failed to configure MQTT user parameters.\\r\\n\");continue;}HAL_Delay(500);// 5. 连接 MQTT 服务器 (AT+MQTTCONN)// 参数: linkID, \"host\", port, keepalive_interval, clean_session// keepalive_interval: in seconds; clean_session: 0 (persist), 1 (clean)printf(\"Connecting to MQTT broker...\\r\\n\");char mqtt_conn_cmd[150];sprintf(mqtt_conn_cmd, \"AT+MQTTCONN=0,\\\"%s\\\",%d,1\", // Keep-alive 10秒,clean session\"124.222.153.243\", 1883);if (ESP8266_SendATCommand(mqtt_conn_cmd, 5000, \"OK\") == 1) { // 响应可能是 \"+MQTTCONN:0,OK\"printf(\"Connected to MQTT broker!!!!!!!!!\\r\\n\");} else {printf(\"Failed to connect to MQTT broker. Retrying...\\r\\n\");// 连接失败可能需要等待更长时间或重置模块HAL_Delay(5000);}// --- MQTT 连接配置结束 --- } else { printf(\"WiFi connection failed. Retrying...\\r\\n\"); } HAL_Delay(5000); // 等待一段时间再重新尝试,避免频繁连接失败 /* USER CODE BEGIN 3 */ } /* USER CODE END 3 */}

代码解释

为了连接上MQTT服务器(mosquitto),这里只需要多发送两条AT指令就行:

  1. AT+MQTTUSERCFG:配置 MQTT 用户参数
     
  2. AT+MQTTCONN:连接 MQTT 服务器

AT+MQTTUSERCFG:配置 MQTT 用户参数

这条指令用于设置 MQTT 连接所需的客户端标识、认证信息以及其他与用户相关的配置。

指令格式:

AT+MQTTUSERCFG=,,,,,,,,,,

看到这里参数很多,不要慌,因为很多都设置成固定值就行,或者模仿我这里的设置就可以。

char mqtt_user_cfg_cmd[200];sprintf(mqtt_user_cfg_cmd, \"AT+MQTTUSERCFG=0,1,\\\"%s\\\",\\\"%s\\\",\\\"%s\\\",0,0,\\\" \\\"\", \"ESP8266_Client_01\", \"myuser\", \"123456\");if (ESP8266_SendATCommand(mqtt_user_cfg_cmd, 2000, \"OK\") == 1) 

下面简单介绍一下这里的参数有个大致了解就行。 

  • <LinkID>: 0 表示 MQTT 客户端的连接 ID,通常从 0 开始。许多模组支持多个 MQTT 连接,每个连接有独立的 LinkID。
  • : 1 表示连接类型。0 通常表示 TCP 连接,1 通常表示 SSL/TLS 加密连接
  • : \"ESP8266_Client_01\"
  • MQTT 客户端 ID。每个连接到 MQTT 代理的客户端都必须有一个唯一的 ID。
  • : \"myuser\"
  • 用于连接 MQTT 代理的用户名。如果代理需要认证,则必须提供。
  • : \"123456\"
  • 用于连接 MQTT 代理的密码。如果代理需要认证,则必须提供。
  • : 0
  • 证书 ID,用于 SSL/TLS 连接。0 表示不使用客户端证书。
  • : 0
  • 密钥 ID,用于 SSL/TLS 连接。0 表示不使用客户端密钥。
  • : \"\"
  • Last Will and Testament (LWT) 主题。当客户端非正常断开时,代理会将遗嘱消息发布到此主题。示例中是空字符串 \"\",表示未设置遗嘱主题。
  • : \"\"
  • Last Will and Testament (LWT) 消息。当客户端非正常断开时,发布到遗嘱主题的消息内容。示例中是空字符串 \"\",表示未设置遗嘱消息

AT+MQTTCONN:连接 MQTT 服务器

这条指令用于实际建立与 MQTT 代理的连接。

指令格式:

AT+MQTTCONN=,,,,

char mqtt_conn_cmd[150];sprintf(mqtt_conn_cmd, \"AT+MQTTCONN=0,\\\"%s\\\",%d,60\", \"124.222.153.243\", 1883);if (ESP8266_SendATCommand(mqtt_conn_cmd, 5000, \"OK\") == 1)

指令字段解释

  • : 0

    • AT+MQTTUSERCFG 中的 LinkID 对应,指定要连接的 MQTT 客户端实例。

  • : \"124.222.153.243\"

    • MQTT 代理的 IP 地址或域名。

  • : 1883

    • MQTT 代理监听的端口号。

  • : 1

    • MQTT 心跳间隔(单位:秒)。这告诉 MQTT 代理和客户端,如果在这个时间内没有任何数据传输,客户端需要发送一个 PINGREQ 包来维持连接。你的代码中写的是 1,这意味着心跳间隔是 1 秒。请注意,1 秒的心跳间隔非常短,可能会导致频繁的心跳包,增加网络流量和模组功耗。通常建议设置为 10-60 秒。

关于服务器端的显示

主要像补充一下这里猜的踩的坑,这里连接成功之后发送这里的两条AT指令的话都会失败,并不像前三条指令一样即使连接成功反复发送也没有事情(是不是这样呢问一下gemini)。

所以这里串口助手中,只有第一波会显示对应MQTT连接成功,后面几次都是卡在第四条,这里要注意以下(问一下gemini我这里说的真实性,我也是推测)

lsof -i | grep mosquitto 的显示

当你使用 lsof -i | grep mosquitto 命令时,它会显示当前系统上由 mosquitto 进程打开的所有网络连接。

这里有一点很好玩的东西,当你将STM32断电之后,接下来的1-3min,你还能看到设备连接状态,难道说是我们的配置有问题或者代码有问题吗?其实不是的。

断电初期: 在设备断电后的短时间内,MQTT 代理可能还没有通过 Keep Alive 机制检测到连接丢失。此时,代理仍然认为该连接是“ESTABLISHED”(已建立)的,因此 lsof 命令可能会继续显示这个连接。

Keep Alive 超时后: 一旦代理检测到 Keep Alive 超时并清理了连接,那么 lsof -i | grep mosquitto 命令的输出中,该设备的连接条目就会消失。这个时间取决于你配置的 Keep Alive 值,通常是几十秒到几分钟。

MQTT的订阅和发布

代码实现

// 6. 订阅主题 (AT+MQTTSUB) - 订阅消息// 参数: linkID, \"topic\", qosprintf(\"Subscribing to MQTT topic: %s\\r\\n\", \"from_mosquitto\");char mqtt_sub_cmd[100];sprintf(mqtt_sub_cmd, \"AT+MQTTSUB=0,\\\"%s\\\",1\", \"from_mosquitto\"); // QoS 1if (ESP8266_SendATCommand(mqtt_sub_cmd, 2000, \"OK\") == 1) {printf(\"Subscribed to topic: %s\\r\\n\", \"from_mosquitto\");} else {printf(\"Failed to subscribe to topic.\\r\\n\");}HAL_Delay(1000);// 7. 发布消息示例 (AT+MQTTPUB) - 发布一条测试消息// 参数: linkID, \"topic\", \"data\", qos, retainprintf(\"Publishing test message to topic: %s\\r\\n\", \"from_stm32\");char mqtt_pub_data[] = \"Hello from stm32!!!\";char mqtt_pub_cmd[200];sprintf(mqtt_pub_cmd, \"AT+MQTTPUB=0,\\\"%s\\\",\\\"%s\\\",1,0\", // QoS 1, 不保留\"from_stm32\", mqtt_pub_data);if (ESP8266_SendATCommand(mqtt_pub_cmd, 2000, \"OK\") == 1) {printf(\"Published message successfully!\\r\\n\");} else {printf(\"Failed to publish message.\\r\\n\");}HAL_Delay(2000); // 间隔一段时间再次发布//下面的代码和之前的一样} else {printf(\"Failed to connect to MQTT broker. Retrying...\\r\\n\");// 连接失败可能需要等待更长时间或重置模块HAL_Delay(5000);}// --- MQTT 连接配置结束 ---

订阅主题

sprintf(mqtt_sub_cmd, \"AT+MQTTSUB=0,\\\"%s\\\",1\", \"from_mosquitto\");

这行代码将 MQTT 订阅命令格式化到 mqtt_sub_cmd 字符串中。

  • AT+MQTTSUB: 这是 ESP8266 模块用于订阅 MQTT 主题的 AT 命令。
  • 0: 表示 linkID,通常是 MQTT 连接的唯一标识符。在这里,0 很可能指的是默认或第一个 MQTT 连接。
  • \"from_mosquitto\": 这是要订阅的 MQTT 主题名称。当有消息发布到这个主题时,ESP8266 会接收到。
  • 1: 表示 QoS (Quality of Service) 等级。QoS 等级定义了消息传输的可靠性。1 代表 至少一次 (At Least Once),即消息至少会被送达一次,但可能会重复。
if (ESP8266_SendATCommand(mqtt_sub_cmd, 2000, \"OK\") == 1)

这行代码调用一个名为 ESP8266_SendATCommand 的函数,用于将构建好的 AT 命令发送给 ESP8266 模块。

  • mqtt_sub_cmd: 要发送的 AT 命令字符串。
  • 2000: 等待 ESP8266 响应的超时时间(单位:毫秒)。
  • \"OK\": 期望从 ESP8266 接收到的成功响应字符串。如果收到 \"OK\",说明命令执行成功。
  • 如果函数返回 1,表示命令发送成功并收到了期望的响应。

发布主题

sprintf(mqtt_pub_cmd, \"AT+MQTTPUB=0,\\\"%s\\\",\\\"%s\\\",1,0\", \"from_stm32\", mqtt_pub_data);

将 MQTT 发布命令格式化到 mqtt_pub_cmd 字符串中。AT+MQTTPUB: 这是 ESP8266 模块用于发布 MQTT 消息的 AT 命令。

  • 0: linkID,同订阅命令中的含义。
  • \"from_stm32\": 这是消息要发布到的 MQTT 主题名称。
  • \"%s\" (第二个 %s): 用 mqtt_pub_data 的内容替换,即要发布的消息主体。
  • 1: QoS 等级,这里也是 至少一次 (At Least Once)。
  • 0: 保留 (Retain) 标志。0 表示不保留消息,即 MQTT 代理不会存储这条消息以供新的订阅者在订阅时立即接收。如果设置为 1,则代理会保留最新的消息,当有新的订阅者订阅该主题时,会立即收到这条保留的消息。
if (ESP8266_SendATCommand(mqtt_pub_cmd, 2000, \"OK\") == 1)
  • printf(\"Published message successfully!\\r\\n\");: 如果发布成功,则打印成功消息。
  • else { printf(\"Failed to publish message.\\r\\n\"); }: 如果发布失败,则打印失败消息。
  • HAL_Delay(2000);: 延时 2000 毫秒(2 秒),用于控制发布消息的间隔。

持续接收来自MQTT中发布的信息

实现思路

只需要在 main 函数的无限循环 while(1) 中,加入一个简单的子循环,用来不断检测并输出MQTT 消息,注意这里暂时不用过多考虑这样代码会使得其他代码无法执行。

这里主要是为了验证我们的配置是否正确,用实际效果验证一下。后面真实做了项目的话,将各部分模块代码集成在一起的时候就要考虑了,并且我们是基于freertos的,实现保证不同模块都可以执行会轻松一些。

代码实现

main.c中加入的逻辑

//8. 一旦 MQTT 连接成功并完成订阅/发布测试,进入此循环// 持续监听并打印来自 ESP8266 的原始 MQTT 消息while (1) {// 调用 CheckAndPrintMqttRawMessage 尝试查找并打印缓冲区中的原始 MQTT 消息CheckAndPrintMqttRawMessage();// 重要的:添加一个短暂的延时,避免 CPU 100% 占用,// 同时允许其他系统任务(如 HAL_Tick、其他外设中断)正常运行HAL_Delay(100); // 例如,每次循环等待 10ms// 你可能需要一个机制来检查 MQTT 连接是否仍然有效// 如果连接断开,则跳出此循环,重新尝试 Wi-Fi 和 MQTT 连接// (这里为了简化,暂时省略断开检测)}

esp8266.h中加入的声明

/** * @brief 检查并输出从 ESP8266 接收到的完整 MQTT 订阅消息 * 此函数应在主循环中周期性调用 * @param None * @retval 1表示成功找到并输出了一个MQTT消息,0表示未发现 */uint8_t CheckAndPrintMqttRawMessage(void);

esp8266.c中的加入的函数 uint8_t CheckAndPrintMqttRawMessage(void)

uint8_t CheckAndPrintMqttRawMessage(void) { // 检查缓冲区是否有数据 if (Rx_Head == Rx_Tail) { return 0; } // 临时缓冲区,用于从环形缓冲区中拷贝数据进行查找 char temp_line_buffer[RX_BUFFER_SIZE]; uint16_t len_available; uint16_t i; // 计算当前环形缓冲区中的数据长度 if (Rx_Head >= Rx_Tail) { len_available = Rx_Head - Rx_Tail; } else { len_available = RX_BUFFER_SIZE - Rx_Tail + Rx_Head; } // 如果数据长度不足以包含 \"\\r\\n\" 结束符,则不处理 if (len_available < 2) { return 0; } // 将环形缓冲区的数据拷贝到临时缓冲区 // 注意: 确保拷贝的长度不会超过 temp_line_buffer 的容量 uint16_t copy_len = (len_available < RX_BUFFER_SIZE) ? len_available : (RX_BUFFER_SIZE - 1); for (i = 0; i < copy_len; i++) { temp_line_buffer[i] = Rx_Buffer[(Rx_Tail + i) % RX_BUFFER_SIZE]; } temp_line_buffer[copy_len] = \'\\0\'; // 确保字符串以空字符结尾 // 查找行结束标志:\"\\r\\n\" char* line_end_ptr = strstr(temp_line_buffer, \"\\r\\n\"); if (line_end_ptr != NULL) { // 找到完整的消息行,打印它 *line_end_ptr = \'\\0\'; // 临时截断字符串,只打印到 \\r\\n 前 printf(\"RECEIVED LINE: %s\\r\\n\", temp_line_buffer);  // 恢复截断的字符,以便正确计算偏移量 *line_end_ptr = \'\\r\'; // 移动 Rx_Tail,移除已处理的消息行 uint16_t processed_len = (line_end_ptr - temp_line_buffer) + 2; // +2 是为了跳过 \\r\\n Rx_Tail = (Rx_Tail + processed_len) % RX_BUFFER_SIZE; return 1; // 成功处理一个消息行 } return 0; // 未找到完整的消息行}