From be514b6d83b493f37dd06d69a2203ad8273d69f1 Mon Sep 17 00:00:00 2001 From: lyq Date: Sat, 25 Oct 2025 12:25:15 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0mqtt=E6=96=AD=E7=BA=BF?= =?UTF-8?q?=E9=87=8D=E8=BF=9E=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../include/mqtt_report/mqtt_client.hpp | 120 +++++++++++--- src/mqtt_report/src/mqtt_report.cpp | 86 +++++++--- src/pub_gps/src/pub_gps_node.cpp | 148 ++++++++++++++---- 3 files changed, 285 insertions(+), 69 deletions(-) diff --git a/src/mqtt_report/include/mqtt_report/mqtt_client.hpp b/src/mqtt_report/include/mqtt_report/mqtt_client.hpp index e8cbc6b..1d4d00d 100644 --- a/src/mqtt_report/include/mqtt_report/mqtt_client.hpp +++ b/src/mqtt_report/include/mqtt_report/mqtt_client.hpp @@ -3,18 +3,23 @@ #include "paho_mqtt_3c/MQTTClient.h" #include #include +#include +#include +#include class MQTTClientWrapper { public: MQTTClientWrapper(const std::string &server_address, const std::string &client_id, - const std::string &username = "", const std::string &password = "") - : server_address_(server_address), client_id_(client_id), - username_(username), password_(password), client_(nullptr) + const std::string &username = "", const std::string &password = "", + int reconnect_interval_ms = 5000) // 重连间隔(毫秒) + : server_address_(server_address), client_id_(client_id), + username_(username), password_(password), client_(nullptr), + is_connected_(false), reconnect_interval_ms_(reconnect_interval_ms) { // 初始化客户端 int rc = MQTTClient_create(&client_, server_address.c_str(), client_id.c_str(), - MQTTCLIENT_PERSISTENCE_NONE, nullptr); + MQTTCLIENT_PERSISTENCE_NONE, nullptr); if (rc != MQTTCLIENT_SUCCESS) { std::cerr << "Failed to create MQTT client, return code " << rc << std::endl; @@ -24,6 +29,7 @@ public: ~MQTTClientWrapper() { + std::lock_guard lock(mtx_); if (client_) { disconnect(); @@ -32,8 +38,10 @@ public: } } - bool connect() + // 连接服务器,支持最大重试次数 + bool connect(int max_retries = 5) { + std::lock_guard lock(mtx_); if (!client_) { std::cerr << "MQTT client not initialized" << std::endl; @@ -44,27 +52,48 @@ public: conn_opts.keepAliveInterval = 20; conn_opts.cleansession = 1; - if (!username_.empty()) { conn_opts.username = username_.c_str(); conn_opts.password = password_.c_str(); } - int rc = MQTTClient_connect(client_, &conn_opts); - if (rc != MQTTCLIENT_SUCCESS) + int rc; + int retry_count = 0; + while (retry_count < max_retries) { - std::cerr << "Failed to connect, return code " << rc << std::endl; - return false; + rc = MQTTClient_connect(client_, &conn_opts); + if (rc == MQTTCLIENT_SUCCESS) + { + is_connected_ = true; + std::cout << "MQTT connected successfully" << std::endl; + return true; + } + + retry_count++; + std::cerr << "Failed to connect (attempt " << retry_count << "/" << max_retries + << "), return code " << rc << std::endl; + + if (retry_count < max_retries) + { + // 等待重连间隔前,检查是否需要退出程序 + if (!rclcpp::ok()) + { + std::cerr << "Program exiting, stop reconnection" << std::endl; + return false; + } + std::this_thread::sleep_for(std::chrono::milliseconds(reconnect_interval_ms_)); + } } - std::cout << "MQTT connected successfully" << std::endl; - return true; + is_connected_ = false; + return false; } void disconnect() { - if (client_) + std::lock_guard lock(mtx_); + if (client_ && is_connected_) { int rc = MQTTClient_disconnect(client_, 10000); if (rc != MQTTCLIENT_SUCCESS) @@ -75,21 +104,35 @@ public: { std::cout << "MQTT disconnected." << std::endl; } + is_connected_ = false; } } - void publish(const std::string &topic, const std::string &payload, int qos = 0, bool retained = false) + // 发布消息,自动处理重连 + bool publish(const std::string &topic, const std::string &payload, int qos = 0, bool retained = false) { + std::lock_guard lock(mtx_); if (!client_) { std::cerr << "MQTT client not initialized" << std::endl; - return; + return false; + } + + // 检查连接状态,断开则尝试重连 + if (!is_connected_ || !check_actual_connection()) + { + std::cerr << "MQTT connection lost, trying to reconnect..." << std::endl; + if (!connect()) // 使用默认重试次数重连 + { + std::cerr << "Reconnection failed, cannot publish message" << std::endl; + return false; + } } MQTTClient_message pubmsg = MQTTClient_message_initializer; MQTTClient_deliveryToken token; - pubmsg.payload = const_cast(payload.c_str()); + pubmsg.payload = const_cast(payload.c_str()); pubmsg.payloadlen = payload.length(); pubmsg.qos = qos; pubmsg.retained = retained ? 1 : 0; @@ -98,26 +141,55 @@ public: if (rc != MQTTCLIENT_SUCCESS) { std::cerr << "Failed to publish message, return code " << rc << std::endl; - return; + is_connected_ = false; // 发布失败标记为断开 + return false; } // 等待发布完成 - rc = MQTTClient_waitForCompletion(client_, token, 10000L); + rc = MQTTClient_waitForCompletion(client_, token, 1000L); // 超时1秒 if (rc != MQTTCLIENT_SUCCESS) { std::cerr << "Failed to wait for completion, return code " << rc << std::endl; + is_connected_ = false; // 确认失败标记为断开 + return false; } - else - { - // 可选:取消注释以打印发布信息 - // std::cout << "Published to " << topic << std::endl; - } + + // 可选:取消注释以打印发布信息 + // std::cout << "Published to " << topic << std::endl; + return true; + } + + // 检查实际连接状态(通过MQTTClient API) + bool is_connected() + { + std::lock_guard lock(mtx_); + return check_actual_connection(); + } + + // 设置重连间隔(毫秒) + void set_reconnect_interval(int interval_ms) + { + std::lock_guard lock(mtx_); + reconnect_interval_ms_ = interval_ms; } private: + // 内部检查连接状态的实现 + bool check_actual_connection() + { + if (!client_) + return false; + int state = MQTTClient_isConnected(client_); + is_connected_ = (state == 1); + return is_connected_; + } + std::string server_address_; std::string client_id_; std::string username_; std::string password_; MQTTClient client_; -}; + bool is_connected_; // 连接状态标记 + int reconnect_interval_ms_; // 重连间隔(毫秒) + std::mutex mtx_; // 线程安全锁 +}; \ No newline at end of file diff --git a/src/mqtt_report/src/mqtt_report.cpp b/src/mqtt_report/src/mqtt_report.cpp index ecbd8a6..ac80348 100644 --- a/src/mqtt_report/src/mqtt_report.cpp +++ b/src/mqtt_report/src/mqtt_report.cpp @@ -1,18 +1,19 @@ #include #include "sweeper_interfaces/msg/can_frame.hpp" -#include "mqtt_report/mqtt_client.hpp" +#include "mqtt_report/mqtt_client.hpp" // 包含上面修改后的MQTTClientWrapper #include "mqtt_report/get_config.h" #include "mqtt_report/report_struct.h" #include "mqtt_report/fault_codes.h" #include #include #include +#include namespace sweeperMsg = sweeper_interfaces::msg; -Config config; // 清扫车配置文件 - -GeneralMsg info_report; // 常规消息上报 +Config config; // 清扫车配置文件 +GeneralMsg info_report; // 常规消息上报 +int mqtt_reconnect_interval = 5000; // 重连间隔(ms) // 解析can报文,做消息上报 void Msg_Handler(const sweeperMsg::CanFrame::SharedPtr msg) @@ -25,7 +26,6 @@ void Msg_Handler(const sweeperMsg::CanFrame::SharedPtr msg) info_report.power = msg->data[4] * 0.5f; info_report.chargeStatus = (msg->data[3] == 0x61); std::cout << "[0x1821E5F1] power : " << info_report.power << std::endl; - // std::cout << "[0x1821E5F1] chargeStatus : " << info_report.chargeStatus << std::endl; break; } @@ -81,7 +81,7 @@ void Msg_Handler(const sweeperMsg::CanFrame::SharedPtr msg) if (is_fault && faultMap.count(error_code)) // 故障存在且在映射表中 { vehicle_error_code.addErrorCode(error_code); - updateFaultLevel(error_code, 1); // 协议未定义故障等级,默认1级(可根据需求调整) + updateFaultLevel(error_code, 1); // 协议未定义故障等级,默认1级 } else // 故障不存在 { @@ -97,17 +97,16 @@ void Msg_Handler(const sweeperMsg::CanFrame::SharedPtr msg) { // 解析控制器温度(data[2],偏移量40℃,1℃/bit) int raw_ctrl_temp = static_cast(msg->data[2]); - info_report.controllerTemp = raw_ctrl_temp - 40; // 实际温度=原始值-40,量程:-40~215℃ + info_report.controllerTemp = raw_ctrl_temp - 40; // 实际温度=原始值-40 // 解析电机温度(data[3],偏移量30℃,1℃/bit) int raw_motor_temp = static_cast(msg->data[3]); - info_report.motorTemp = raw_motor_temp - 30; // 实际温度=原始值-30,量程:-30~225℃ + info_report.motorTemp = raw_motor_temp - 30; // 实际温度=原始值-30 // 解析控制器状态(data[4],BIT1-BIT0:命令状态;BIT3-BIT2:反馈状态) uint8_t ctrl_status = msg->data[4]; // 反馈状态(BIT3-BIT2):0=空挡,1=前进,2=后退,3=保留 int feedback_gear = (ctrl_status >> 2) & 0x03; - // 档位优先级:命令状态优先(空挡时以反馈状态为准,可根据需求调整) info_report.gear = feedback_gear; std::cout << "[0x0CF11F00] controllerTemp : " << info_report.controllerTemp << std::endl; @@ -254,13 +253,17 @@ class CanDataSubscriber : public rclcpp::Node public: CanDataSubscriber(const Config &config) : Node("can_data_subscriber"), - mqtt_client_(config.mqtt_ip + ":" + std::to_string(config.mqtt_port), generate_mqtt_client_id(), "zxwl", "zxwl1234@"), + mqtt_client_(config.mqtt_ip + ":" + std::to_string(config.mqtt_port), + generate_mqtt_client_id(), + "zxwl", "zxwl1234@", + mqtt_reconnect_interval), // 传入重连间隔 info_topic_(config.info_topic), fault_topic_(config.fault_topic) { - if (!mqtt_client_.connect()) + // 初始连接(带重试) + if (!mqtt_client_.connect(5)) // 最多重试5次 { - RCLCPP_ERROR(this->get_logger(), "Failed to connect to MQTT server"); + RCLCPP_WARN(this->get_logger(), "Initial MQTT connection failed, will retry in background"); } subscription_ = this->create_subscription( @@ -273,30 +276,67 @@ public: fault_timer_ = this->create_wall_timer( std::chrono::milliseconds(500), std::bind(&CanDataSubscriber::fault_timer_callback, this)); + + // 添加定期连接检查定时器(每10秒) + connection_check_timer_ = this->create_wall_timer( + std::chrono::seconds(10), + std::bind(&CanDataSubscriber::check_connection_callback, this)); + } + + ~CanDataSubscriber() + { + // 停止所有定时器,避免回调继续执行 + info_timer_->cancel(); + fault_timer_->cancel(); + connection_check_timer_->cancel(); + // 断开MQTT连接 + mqtt_client_.disconnect(); } private: void topic_callback(const sweeperMsg::CanFrame::SharedPtr msg) { - // RCLCPP_INFO(this->get_logger(), "Received CAN frame ID: %u, DLC: %u", msg->id, msg->dlc); Msg_Handler(msg); } void info_timer_callback() { + if (!rclcpp::ok()) + { // 优先检查退出信号 + return; + } info_report.timestamp = getCurrentTimestampMs(); std::string json_msg = pack_general_info_to_json(info_report); - mqtt_client_.publish(info_topic_, json_msg); - // std::cout << "topic : " << info_topic_ << std::endl; - // std::cout << "json_msg : " << json_msg << std::endl; + if (!mqtt_client_.publish(info_topic_, json_msg, 1)) + { + RCLCPP_WARN(this->get_logger(), "Failed to publish info message to MQTT"); + } } void fault_timer_callback() { + if (!rclcpp::ok()) + { // 优先检查退出信号 + return; + } + for (int code : vehicle_error_code.getAllErrorCodes()) { std::string fault_json = generateFaultJson(code, config.vid, getCurrentTimestampMs()); - mqtt_client_.publish(fault_topic_, fault_json); + if (!mqtt_client_.publish(fault_topic_, fault_json, 1)) // QoS=1 + { + RCLCPP_WARN(this->get_logger(), "Failed to publish fault code %d to MQTT", code); + } + } + } + + // 定期检查连接状态,主动重连 + void check_connection_callback() + { + if (!mqtt_client_.is_connected()) + { + RCLCPP_WARN(this->get_logger(), "MQTT connection check failed, attempting to reconnect"); + mqtt_client_.connect(3); // 检查时重试3次 } } @@ -306,6 +346,7 @@ private: std::string fault_topic_; rclcpp::TimerBase::SharedPtr info_timer_; rclcpp::TimerBase::SharedPtr fault_timer_; + rclcpp::TimerBase::SharedPtr connection_check_timer_; // 新增:连接检查定时器 }; int main(int argc, char **argv) @@ -318,14 +359,23 @@ int main(int argc, char **argv) return 1; } + // 确保配置中有重连间隔(默认5000ms) + if (mqtt_reconnect_interval <= 0) + { + mqtt_reconnect_interval = 5000; + } + auto node = std::make_shared(config); RCLCPP_INFO(node->get_logger(), "info_topic : %s", config.info_topic.c_str()); RCLCPP_INFO(node->get_logger(), "fault_topic : %s", config.fault_topic.c_str()); + RCLCPP_INFO(node->get_logger(), "vid : %s", config.vid.c_str()); + RCLCPP_INFO(node->get_logger(), "mqtt_ip : %s", config.mqtt_ip.c_str()); + RCLCPP_INFO(node->get_logger(), "MQTT reconnection interval: %d ms", mqtt_reconnect_interval); RCLCPP_INFO(node->get_logger(), "MqttReport node started."); rclcpp::spin(node); rclcpp::shutdown(); return 0; -} +} \ No newline at end of file diff --git a/src/pub_gps/src/pub_gps_node.cpp b/src/pub_gps/src/pub_gps_node.cpp index 2e6d707..093cdd2 100644 --- a/src/pub_gps/src/pub_gps_node.cpp +++ b/src/pub_gps/src/pub_gps_node.cpp @@ -25,22 +25,68 @@ struct MqttConf std::string pub_gps_topic; int qos = 1; long timeout_ms = 10000L; + int reconnect_interval = 5000; // 重连间隔(ms) } g_conf; GPS gps_mes; +// 增加连接状态标志,确保线程安全 +static bool is_connected = false; +static pthread_mutex_t connect_mutex = PTHREAD_MUTEX_INITIALIZER; + +// 检查连接状态 +bool check_connection(MQTTClient client) +{ + pthread_mutex_lock(&connect_mutex); + int state = MQTTClient_isConnected(client); + is_connected = (state == 1); + pthread_mutex_unlock(&connect_mutex); + return is_connected; +} + +// 设置连接状态 +void set_connected(bool connected) +{ + pthread_mutex_lock(&connect_mutex); + is_connected = connected; + pthread_mutex_unlock(&connect_mutex); +} + +// 重连函数 +bool reconnect_client(MQTTClient *client) +{ + MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; + conn_opts.keepAliveInterval = 20; + conn_opts.cleansession = 1; + conn_opts.username = g_conf.user.c_str(); + conn_opts.password = g_conf.password.c_str(); + + int rc; + // 尝试重连,最多尝试5次 + for (int i = 0; i < 5; ++i) + { + std::cerr << "[MQTT] Reconnecting... (attempt " << i + 1 << "/5)" << std::endl; + rc = MQTTClient_connect(*client, &conn_opts); + if (rc == MQTTCLIENT_SUCCESS) + { + set_connected(true); + std::cout << "[MQTT] Reconnected successfully" << std::endl; + return true; + } + std::cerr << "[MQTT] Reconnect failed, rc=" << rc << ". Retrying in " + << g_conf.reconnect_interval << "ms..." << std::endl; + usleep(g_conf.reconnect_interval * 1000); + } + return false; +} void *mqtt_pub_gps(void *arg) { (void)arg; - MQTTClient client; + MQTTClient client = nullptr; MQTTClient_deliveryToken token_d_m; - - MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; - MQTTClient_message pubmsg = MQTTClient_message_initializer; - int rc; - /* 创建客户端 */ + // 初始化客户端 if ((rc = MQTTClient_create(&client, g_conf.address.c_str(), g_conf.client_id.c_str(), @@ -51,19 +97,15 @@ void *mqtt_pub_gps(void *arg) return nullptr; } - /* 连接配置 */ - conn_opts.keepAliveInterval = 20; - conn_opts.cleansession = 1; - conn_opts.username = g_conf.user.c_str(); - conn_opts.password = g_conf.password.c_str(); - - if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) + // 初始连接 + if (!reconnect_client(&client)) { - std::cerr << "Failed to connect, rc=" << rc << std::endl; + std::cerr << "[MQTT] Failed to establish initial connection" << std::endl; MQTTClient_destroy(&client); return nullptr; } + MQTTClient_message pubmsg = MQTTClient_message_initializer; pubmsg.qos = g_conf.qos; pubmsg.retained = 0; @@ -73,6 +115,41 @@ void *mqtt_pub_gps(void *arg) while (true) { + // 检查连接状态,断开则尝试重连 + if (!check_connection(client)) + { + std::cerr << "[MQTT] Connection lost. Attempting to reconnect..." << std::endl; + + // 先断开旧连接 + MQTTClient_disconnect(client, 1000); + + // 重连失败则销毁客户端重新创建 + if (!reconnect_client(&client)) + { + std::cerr << "[MQTT] All reconnection attempts failed. Recreating client..." << std::endl; + MQTTClient_destroy(&client); + + // 重新创建客户端 + if ((rc = MQTTClient_create(&client, + g_conf.address.c_str(), + g_conf.client_id.c_str(), + MQTTCLIENT_PERSISTENCE_NONE, + nullptr)) != MQTTCLIENT_SUCCESS) + { + std::cerr << "Failed to recreate client, rc=" << rc << std::endl; + usleep(g_conf.reconnect_interval * 1000); + continue; + } + + // 再次尝试连接 + if (!reconnect_client(&client)) + { + usleep(g_conf.reconnect_interval * 1000); + continue; + } + } + } + root_st.clear(); root_st["lng"] = [&] @@ -92,27 +169,33 @@ void *mqtt_pub_gps(void *arg) pubmsg.payload = msg_st; pubmsg.payloadlen = static_cast(json_string.size()); - int rc = MQTTClient_publishMessage(client, + // 发布消息 + if (check_connection(client)) + { + rc = MQTTClient_publishMessage(client, g_conf.pub_gps_topic.c_str(), &pubmsg, &token_d_m); - if (rc == MQTTCLIENT_SUCCESS) - { - std::cout << "[MQTT] Published to topic: " << g_conf.pub_gps_topic - << "\n msg: " << json_string << std::endl; - MQTTClient_waitForCompletion(client, token_d_m, g_conf.timeout_ms); + if (rc == MQTTCLIENT_SUCCESS) + { + std::cout << "[MQTT] Published to topic: " << g_conf.pub_gps_topic + << "\n msg: " << json_string << std::endl; + MQTTClient_waitForCompletion(client, token_d_m, g_conf.timeout_ms); + } + else + { + std::cerr << "[MQTT] publish failed, rc=" << rc << std::endl; + set_connected(false); // 发布失败标记为断开连接 + } } - else - { - std::cerr << "[MQTT] publish failed, rc=" << rc << std::endl; - } - MQTTClient_waitForCompletion(client, token_d_m, g_conf.timeout_ms); usleep(200000); // 200 ms } + // 清理资源 MQTTClient_disconnect(client, 10000); MQTTClient_destroy(&client); + pthread_mutex_destroy(&connect_mutex); return nullptr; } @@ -171,7 +254,12 @@ bool load_config(const std::string &path) g_conf.user = m["mqtt_user"].asString(); g_conf.password = m["mqtt_password"].asString(); g_conf.pub_gps_topic = m["pub_gps_topic"].asString(); - /* 其它字段(QOS 等)如有需求也可读出来 */ + + // 从配置读取重连间隔,默认5000ms + if (m.isMember("reconnect_interval")) + { + g_conf.reconnect_interval = m["reconnect_interval"].asInt(); + } return true; } @@ -181,11 +269,17 @@ int main(int argc, char **argv) if (!load_config("config.json")) return -1; + // 初始化互斥锁 + pthread_mutex_init(&connect_mutex, nullptr); + pthread_t mqtt_thread; pthread_create(&mqtt_thread, nullptr, mqtt_pub_gps, nullptr); rclcpp::init(argc, argv); rclcpp::spin(std::make_shared("pub_gps_node")); rclcpp::shutdown(); + + // 清理资源 + pthread_mutex_destroy(&connect_mutex); return 0; -} +} \ No newline at end of file