MQTT协议介绍&Linux中配置Mosquitto
目录
MQTT协议介绍
1. 核心思想:发布/订阅 (Publish/Subscribe) 模式
2. MQTT 的核心概念
3. 典型应用场景
配置 Mosquitto
安装 Mosquitto MQTT 代理
mosquitto.conf文件的编辑
创建密码文件&添加用户
重启 Mosquitto 服务
测试 MQTT 连接
a. 打开一个终端窗口订阅主题
b. 打开另一个终端窗口发布消息
STM32连接MQTT服务器
代码实现
代码解释
AT+MQTTUSERCFG:配置 MQTT 用户参数
AT+MQTTCONN:连接 MQTT 服务器
关于服务器端的显示
MQTT的订阅和发布
代码实现
订阅主题
发布主题
持续接收来自MQTT中发布的信息
实现思路:
代码实现
MQTT协议介绍
MQTT 是一种轻量级的、发布/订阅模式的消息传输协议。
1. 核心思想:发布/订阅 (Publish/Subscribe) 模式
这是 MQTT 最重要的特点。它与传统的客户端/服务器模式不同:
-
传统客户端/服务器: 客户端直接向服务器请求数据,服务器响应。是一对一的请求-响应模式。
-
发布/订阅模式:
-
消息发布者 (Publisher): 发布消息到特定的“主题”(Topic)。
-
消息订阅者 (Subscriber): 订阅自己感兴趣的“主题”。
-
消息代理/服务器 (Broker): 作为消息的中心枢纽。发布者将消息发送给 Broker,Broker 再将消息转发给所有订阅了该主题的订阅者。
-
优点: 发布者和订阅者之间解耦,彼此不知道对方的存在,只需知道 Broker 和共同的主题。这大大提高了系统的灵活性和可伸缩性。
-
2. MQTT 的核心概念
-
主题 (Topic): 类似文件路径的字符串,用于分类消息。订阅者通过订阅特定主题来接收消息。例如:
home/livingroom/temperature
。 -
客户端 (Client): 任何连接到 MQTT Broker 的设备或应用程序。
-
服务器/代理 (Broker): 负责接收、过滤和路由消息的中心节点。
-
服务质量 (QoS): 定义了消息传递的可靠性级别:
-
QoS 0 (At most once): 最多一次,消息可能丢失,不重传。
-
QoS 1 (At least once): 至少一次,消息至少送达一次,可能重复。有重传机制。
-
QoS 2 (Exactly once): 恰好一次,消息只送达一次,不丢失不重复。最可靠,但开销最大。
-
-
连接 (Connect/Disconnect): 客户端与 Broker 建立或断开 TCP 连接。
-
发布 (Publish): 客户端向某个主题发送消息。
-
订阅 (Subscribe): 客户端向 Broker 注册,表示对某个主题的消息感兴趣。
-
取消订阅 (Unsubscribe): 客户端通知 Broker 不再接收某个主题的消息。
3. 典型应用场景
-
物联网 (IoT): 传感器数据采集、设备远程控制、智能家居、车联网。
-
移动应用: 推送通知、聊天应用。
-
工业控制: 机器数据监控、SCADA 系统。
-
智能穿戴设备: 低功耗、间歇性连接。
配置 Mosquitto
安装 Mosquitto MQTT 代理
sudo apt updatesudo apt install mosquitto mosquitto-clients
mosquitto 是 MQTT 代理服务器本身。
mosquitto-clients 包含一些客户端工具,如 mosquitto_pub(发布消息)和 mosquitto_sub(订阅消息),方便你测试。
安装完成后,Mosquitto 服务通常会自动启动。你可以检查其状态:
sudo systemctl status mosquitto
如果它正在运行,你会看到“active (running)”的输出
Mosquitto 的主配置文件通常位于/etc/mosquitto/mosquitto.conf。在进行任何修改之前,建议备份原始配置文件:
sudo cp /etc/mosquitto/mosquitto.conf /etc/mosquitto/mosquitto.conf.bak
现在,你可以编辑配置文件:mosquitto.conf
vim /etc/mosquitto/mosquitto.conf
mosquitto.conf文件的编辑
禁用匿名访问并启用密码认证 (推荐)
默认情况下,Mosquitto 可能允许匿名连接。这意味着任何客户端都可以连接到代理并发布/订阅消息,而无需提供用户名和密码。
出于安全考虑,强烈建议禁用匿名访问,并要求所有客户端进行身份验证。
确保 allow_anonymous 设置为 false
allow_anonymous false
指定密码文件:
你需要告诉 Mosquitto 使用哪个文件来存储用户凭据。
password_file /etc/mosquitto/passwd
你可以将 passwd 替换为其他文件名,但通常使用 passwd。
配置监听端口
listener 1883
配置好的 mosquitto.conf文件中的内容如下:
创建密码文件&添加用户
创建密码文件: 使用 mosquitto_passwd
工具来创建并管理密码文件。第一次创建时需要加 -c 参数,后续添加用户时不需要。
创建密码文件并添加第一个用户 (例如 myuser
):
sudo mosquitto_passwd -c /etc/mosquitto/passwd myuser
系统会提示你输入并确认密码。
这个密码是为 MQTT 客户端用户 myuser 设置的密码,将来客户端连接 Mosquitto 代理时,需要使用这个密码进行认证。这个密码会加密后存储在 /etc/mosquitto/passwd 文件中。
注意: 你可能会看到一个警告,提示文件权限不安全。这是正常的,因为你刚刚创建了一个新文件。请继续下一步来修复它。
添加更多用户 (例如 anotheruser):
sudo mosquitto_passwd /etc/mosquitto/passwd anotheruser
(注意:这次没有 -c 参数,否则会覆盖现有文件。)
同样的,添加新用户时,mosquitto_passwd 工具会提示我们输入并确认新用户(anotheruser)的密码。
重启 Mosquitto 服务
在修改了mosquitto.conf文件后,你需要重启 Mosquitto 服务以使更改生效:
sudo systemctl restart mosquitto
检查服务状态确保它已成功启动:
sudo systemctl status mosquitto
测试 MQTT 连接
安装 mosquitto-clients 后,你可以使用 mosquitto_pub 和 mosquitto_sub 来测试你的 MQTT 代理。
a. 打开一个终端窗口订阅主题
mosquitto_sub -h localhost -t \"topicA\" -u \"myuser\" -P \"mypassword\"
-h localhost: 连接到本地 Mosquitto 代理。
-t \"topicA\": 订阅名为 \"topicA\" 的主题。
-u \"myuser\": 指定用户名。
-P \"mypassword\": 指定该用户对应的密码。
b. 打开另一个终端窗口发布消息
mosquitto_pub -h localhost -t \"topicA\" -m \"Hello, MQTT!\" -u \"myuser\" -P \"mypassword\"
-m \"Hello, MQTT!\": 发布消息 \"Hello, MQTT!\"。
如果你在订阅窗口看到了 \"Hello, MQTT!\" 消息,那么你的 Mosquitto 代理就已经配置成功并正常工作了!
STM32连接MQTT服务器
代码实现
在上一篇文章中,我们已经实现了通过ESP8266连接WiFi这个任务,接下来我们在上一篇文章的基础上来进一步实现连接MQTT服务器。
ESP8266详解-CSDN博客
由于前文ESP8266.c中定义的函数的良好铺垫,这里加上连接MQTT连接的逻辑只需要修改我们的main函数。其他部分的代码和之前的都一样!!!
前面的代码和上一篇文章中写过的代码相同// 3. 连接 Wi-Fi printf(\"Connecting to WiFi...\\r\\n\"); char wifi_cmd[100]; // 注意:使用真实 Wi-Fi 名称和密码,确保双引号正确转义 sprintf(wifi_cmd, \"AT+CWJAP=\\\"%s\\\",\\\"%s\\\"\", \"cyz_WiFi\", \"12345678\"); if (ESP8266_SendATCommand(wifi_cmd, 15000, \"OK\") == 1) { // 连接 Wi-Fi 可能需要较长时间 printf(\"WiFi connected successfully!\\r\\n\"); // 获取并打印 IP 地址 (可选) ESP8266_SendATCommand(\"AT+CIFSR\", 2000, \"OK\"); HAL_Delay(500);// --- MQTT 连接配置开始 ---// 4. Wi-Fi连接成功后,配置连接 MQTT 服务器的参数 (AT+MQTTUSERCFG)// 参数: linkID, type, \"clientID\", \"username\", \"password\", cert_id, key_id// linkID: 通常为0 for单连接; type: 0 for TCP; cert_id/key_id: for SSL/TLS, here 0 for non-SSLprintf(\"Configuring MQTT user parameters...\\r\\n\");char mqtt_user_cfg_cmd[200];sprintf(mqtt_user_cfg_cmd, \"AT+MQTTUSERCFG=0,1,\\\"%s\\\",\\\"%s\\\",\\\"%s\\\",0,0,\\\" \\\"\", \"ESP8266_Client_01\", \"myuser\", \"123456\");if (ESP8266_SendATCommand(mqtt_user_cfg_cmd, 2000, \"OK\") == 1) {printf(\"MQTT user parameters configured.\\r\\n\");} else {printf(\"Failed to configure MQTT user parameters.\\r\\n\");continue;}HAL_Delay(500);// 5. 连接 MQTT 服务器 (AT+MQTTCONN)// 参数: linkID, \"host\", port, keepalive_interval, clean_session// keepalive_interval: in seconds; clean_session: 0 (persist), 1 (clean)printf(\"Connecting to MQTT broker...\\r\\n\");char mqtt_conn_cmd[150];sprintf(mqtt_conn_cmd, \"AT+MQTTCONN=0,\\\"%s\\\",%d,1\", // Keep-alive 10秒,clean session\"124.222.153.243\", 1883);if (ESP8266_SendATCommand(mqtt_conn_cmd, 5000, \"OK\") == 1) { // 响应可能是 \"+MQTTCONN:0,OK\"printf(\"Connected to MQTT broker!!!!!!!!!\\r\\n\");} else {printf(\"Failed to connect to MQTT broker. Retrying...\\r\\n\");// 连接失败可能需要等待更长时间或重置模块HAL_Delay(5000);}// --- MQTT 连接配置结束 --- } else { printf(\"WiFi connection failed. Retrying...\\r\\n\"); } HAL_Delay(5000); // 等待一段时间再重新尝试,避免频繁连接失败 /* USER CODE BEGIN 3 */ } /* USER CODE END 3 */}
代码解释
为了连接上MQTT服务器(mosquitto),这里只需要多发送两条AT指令就行:
- AT+MQTTUSERCFG:配置 MQTT 用户参数
- AT+MQTTCONN:连接 MQTT 服务器
AT+MQTTUSERCFG:配置 MQTT 用户参数
这条指令用于设置 MQTT 连接所需的客户端标识、认证信息以及其他与用户相关的配置。
指令格式:
AT+MQTTUSERCFG=,,,,,,,,,,
看到这里参数很多,不要慌,因为很多都设置成固定值就行,或者模仿我这里的设置就可以。
char mqtt_user_cfg_cmd[200];sprintf(mqtt_user_cfg_cmd, \"AT+MQTTUSERCFG=0,1,\\\"%s\\\",\\\"%s\\\",\\\"%s\\\",0,0,\\\" \\\"\", \"ESP8266_Client_01\", \"myuser\", \"123456\");if (ESP8266_SendATCommand(mqtt_user_cfg_cmd, 2000, \"OK\") == 1)
下面简单介绍一下这里的参数,有个大致了解就行。
- <LinkID>: 0 表示 MQTT 客户端的连接 ID,通常从 0 开始。许多模组支持多个 MQTT 连接,每个连接有独立的 LinkID。
- : 1 表示连接类型。0 通常表示 TCP 连接,1 通常表示 SSL/TLS 加密连接
- : \"ESP8266_Client_01\"
- MQTT 客户端 ID。每个连接到 MQTT 代理的客户端都必须有一个唯一的 ID。
- : \"myuser\"
- 用于连接 MQTT 代理的用户名。如果代理需要认证,则必须提供。
- : \"123456\"
- 用于连接 MQTT 代理的密码。如果代理需要认证,则必须提供。
- : 0
- 证书 ID,用于 SSL/TLS 连接。0 表示不使用客户端证书。
- : 0
- 密钥 ID,用于 SSL/TLS 连接。0 表示不使用客户端密钥。
- : \"\"
- Last Will and Testament (LWT) 主题。当客户端非正常断开时,代理会将遗嘱消息发布到此主题。示例中是空字符串 \"\",表示未设置遗嘱主题。
- : \"\"
- Last Will and Testament (LWT) 消息。当客户端非正常断开时,发布到遗嘱主题的消息内容。示例中是空字符串 \"\",表示未设置遗嘱消息
AT+MQTTCONN:连接 MQTT 服务器
这条指令用于实际建立与 MQTT 代理的连接。
指令格式:
AT+MQTTCONN=,,,,
char mqtt_conn_cmd[150];sprintf(mqtt_conn_cmd, \"AT+MQTTCONN=0,\\\"%s\\\",%d,60\", \"124.222.153.243\", 1883);if (ESP8266_SendATCommand(mqtt_conn_cmd, 5000, \"OK\") == 1)
指令字段解释
:
0
与
AT+MQTTUSERCFG
中的LinkID
对应,指定要连接的 MQTT 客户端实例。
:
\"124.222.153.243\"
MQTT 代理的 IP 地址或域名。
:
1883
MQTT 代理监听的端口号。
:
1
MQTT 心跳间隔(单位:秒)。这告诉 MQTT 代理和客户端,如果在这个时间内没有任何数据传输,客户端需要发送一个
PINGREQ
包来维持连接。你的代码中写的是1
,这意味着心跳间隔是 1 秒。请注意,1 秒的心跳间隔非常短,可能会导致频繁的心跳包,增加网络流量和模组功耗。通常建议设置为 10-60 秒。
关于服务器端的显示
主要像补充一下这里猜的踩的坑,这里连接成功之后发送这里的两条AT指令的话都会失败,并不像前三条指令一样即使连接成功反复发送也没有事情(是不是这样呢问一下gemini)。
所以这里串口助手中,只有第一波会显示对应MQTT连接成功,后面几次都是卡在第四条,这里要注意以下(问一下gemini我这里说的真实性,我也是推测)
lsof -i | grep mosquitto 的显示
当你使用 lsof -i | grep mosquitto 命令时,它会显示当前系统上由 mosquitto 进程打开的所有网络连接。
这里有一点很好玩的东西,当你将STM32断电之后,接下来的1-3min,你还能看到设备连接状态,难道说是我们的配置有问题或者代码有问题吗?其实不是的。
断电初期: 在设备断电后的短时间内,MQTT 代理可能还没有通过 Keep Alive 机制检测到连接丢失。此时,代理仍然认为该连接是“ESTABLISHED”(已建立)的,因此 lsof 命令可能会继续显示这个连接。
Keep Alive 超时后: 一旦代理检测到 Keep Alive 超时并清理了连接,那么 lsof -i | grep mosquitto 命令的输出中,该设备的连接条目就会消失。这个时间取决于你配置的 Keep Alive 值,通常是几十秒到几分钟。
MQTT的订阅和发布
代码实现
// 6. 订阅主题 (AT+MQTTSUB) - 订阅消息// 参数: linkID, \"topic\", qosprintf(\"Subscribing to MQTT topic: %s\\r\\n\", \"from_mosquitto\");char mqtt_sub_cmd[100];sprintf(mqtt_sub_cmd, \"AT+MQTTSUB=0,\\\"%s\\\",1\", \"from_mosquitto\"); // QoS 1if (ESP8266_SendATCommand(mqtt_sub_cmd, 2000, \"OK\") == 1) {printf(\"Subscribed to topic: %s\\r\\n\", \"from_mosquitto\");} else {printf(\"Failed to subscribe to topic.\\r\\n\");}HAL_Delay(1000);// 7. 发布消息示例 (AT+MQTTPUB) - 发布一条测试消息// 参数: linkID, \"topic\", \"data\", qos, retainprintf(\"Publishing test message to topic: %s\\r\\n\", \"from_stm32\");char mqtt_pub_data[] = \"Hello from stm32!!!\";char mqtt_pub_cmd[200];sprintf(mqtt_pub_cmd, \"AT+MQTTPUB=0,\\\"%s\\\",\\\"%s\\\",1,0\", // QoS 1, 不保留\"from_stm32\", mqtt_pub_data);if (ESP8266_SendATCommand(mqtt_pub_cmd, 2000, \"OK\") == 1) {printf(\"Published message successfully!\\r\\n\");} else {printf(\"Failed to publish message.\\r\\n\");}HAL_Delay(2000); // 间隔一段时间再次发布//下面的代码和之前的一样} else {printf(\"Failed to connect to MQTT broker. Retrying...\\r\\n\");// 连接失败可能需要等待更长时间或重置模块HAL_Delay(5000);}// --- MQTT 连接配置结束 ---
订阅主题
sprintf(mqtt_sub_cmd, \"AT+MQTTSUB=0,\\\"%s\\\",1\", \"from_mosquitto\");
这行代码将 MQTT 订阅命令格式化到 mqtt_sub_cmd 字符串中。
- AT+MQTTSUB: 这是 ESP8266 模块用于订阅 MQTT 主题的 AT 命令。
- 0: 表示 linkID,通常是 MQTT 连接的唯一标识符。在这里,0 很可能指的是默认或第一个 MQTT 连接。
- \"from_mosquitto\": 这是要订阅的 MQTT 主题名称。当有消息发布到这个主题时,ESP8266 会接收到。
- 1: 表示 QoS (Quality of Service) 等级。QoS 等级定义了消息传输的可靠性。1 代表 至少一次 (At Least Once),即消息至少会被送达一次,但可能会重复。
if (ESP8266_SendATCommand(mqtt_sub_cmd, 2000, \"OK\") == 1)
这行代码调用一个名为 ESP8266_SendATCommand 的函数,用于将构建好的 AT 命令发送给 ESP8266 模块。
- mqtt_sub_cmd: 要发送的 AT 命令字符串。
- 2000: 等待 ESP8266 响应的超时时间(单位:毫秒)。
- \"OK\": 期望从 ESP8266 接收到的成功响应字符串。如果收到 \"OK\",说明命令执行成功。
- 如果函数返回 1,表示命令发送成功并收到了期望的响应。
发布主题
sprintf(mqtt_pub_cmd, \"AT+MQTTPUB=0,\\\"%s\\\",\\\"%s\\\",1,0\", \"from_stm32\", mqtt_pub_data);
将 MQTT 发布命令格式化到 mqtt_pub_cmd 字符串中。AT+MQTTPUB: 这是 ESP8266 模块用于发布 MQTT 消息的 AT 命令。
- 0: linkID,同订阅命令中的含义。
- \"from_stm32\": 这是消息要发布到的 MQTT 主题名称。
- \"%s\" (第二个 %s): 用 mqtt_pub_data 的内容替换,即要发布的消息主体。
- 1: QoS 等级,这里也是 至少一次 (At Least Once)。
- 0: 保留 (Retain) 标志。0 表示不保留消息,即 MQTT 代理不会存储这条消息以供新的订阅者在订阅时立即接收。如果设置为 1,则代理会保留最新的消息,当有新的订阅者订阅该主题时,会立即收到这条保留的消息。
if (ESP8266_SendATCommand(mqtt_pub_cmd, 2000, \"OK\") == 1)
- printf(\"Published message successfully!\\r\\n\");: 如果发布成功,则打印成功消息。
- else { printf(\"Failed to publish message.\\r\\n\"); }: 如果发布失败,则打印失败消息。
- HAL_Delay(2000);: 延时 2000 毫秒(2 秒),用于控制发布消息的间隔。
持续接收来自MQTT中发布的信息
实现思路:
只需要在 main 函数的无限循环 while(1) 中,加入一个简单的子循环,用来不断检测并输出MQTT 消息,注意这里暂时不用过多考虑这样代码会使得其他代码无法执行。
这里主要是为了验证我们的配置是否正确,用实际效果验证一下。后面真实做了项目的话,将各部分模块代码集成在一起的时候就要考虑了,并且我们是基于freertos的,实现保证不同模块都可以执行会轻松一些。
代码实现
main.c中加入的逻辑
//8. 一旦 MQTT 连接成功并完成订阅/发布测试,进入此循环// 持续监听并打印来自 ESP8266 的原始 MQTT 消息while (1) {// 调用 CheckAndPrintMqttRawMessage 尝试查找并打印缓冲区中的原始 MQTT 消息CheckAndPrintMqttRawMessage();// 重要的:添加一个短暂的延时,避免 CPU 100% 占用,// 同时允许其他系统任务(如 HAL_Tick、其他外设中断)正常运行HAL_Delay(100); // 例如,每次循环等待 10ms// 你可能需要一个机制来检查 MQTT 连接是否仍然有效// 如果连接断开,则跳出此循环,重新尝试 Wi-Fi 和 MQTT 连接// (这里为了简化,暂时省略断开检测)}
esp8266.h中加入的声明
/** * @brief 检查并输出从 ESP8266 接收到的完整 MQTT 订阅消息 * 此函数应在主循环中周期性调用 * @param None * @retval 1表示成功找到并输出了一个MQTT消息,0表示未发现 */uint8_t CheckAndPrintMqttRawMessage(void);
esp8266.c中的加入的函数 uint8_t CheckAndPrintMqttRawMessage(void)
uint8_t CheckAndPrintMqttRawMessage(void) { // 检查缓冲区是否有数据 if (Rx_Head == Rx_Tail) { return 0; } // 临时缓冲区,用于从环形缓冲区中拷贝数据进行查找 char temp_line_buffer[RX_BUFFER_SIZE]; uint16_t len_available; uint16_t i; // 计算当前环形缓冲区中的数据长度 if (Rx_Head >= Rx_Tail) { len_available = Rx_Head - Rx_Tail; } else { len_available = RX_BUFFER_SIZE - Rx_Tail + Rx_Head; } // 如果数据长度不足以包含 \"\\r\\n\" 结束符,则不处理 if (len_available < 2) { return 0; } // 将环形缓冲区的数据拷贝到临时缓冲区 // 注意: 确保拷贝的长度不会超过 temp_line_buffer 的容量 uint16_t copy_len = (len_available < RX_BUFFER_SIZE) ? len_available : (RX_BUFFER_SIZE - 1); for (i = 0; i < copy_len; i++) { temp_line_buffer[i] = Rx_Buffer[(Rx_Tail + i) % RX_BUFFER_SIZE]; } temp_line_buffer[copy_len] = \'\\0\'; // 确保字符串以空字符结尾 // 查找行结束标志:\"\\r\\n\" char* line_end_ptr = strstr(temp_line_buffer, \"\\r\\n\"); if (line_end_ptr != NULL) { // 找到完整的消息行,打印它 *line_end_ptr = \'\\0\'; // 临时截断字符串,只打印到 \\r\\n 前 printf(\"RECEIVED LINE: %s\\r\\n\", temp_line_buffer); // 恢复截断的字符,以便正确计算偏移量 *line_end_ptr = \'\\r\'; // 移动 Rx_Tail,移除已处理的消息行 uint16_t processed_len = (line_end_ptr - temp_line_buffer) + 2; // +2 是为了跳过 \\r\\n Rx_Tail = (Rx_Tail + processed_len) % RX_BUFFER_SIZE; return 1; // 成功处理一个消息行 } return 0; // 未找到完整的消息行}