From 4fb4b07b02c18145261e23199cf5ef2ee6c15cf7 Mon Sep 17 00:00:00 2001 From: cxh Date: Thu, 3 Jul 2025 09:50:59 +0800 Subject: [PATCH] Auto commit at 2025-07-03 09:50:58 --- .../include/mqtt_report/mqtt_client.hpp | 142 ++++++++++++++++-- src/mqtt_report/src/mqtt_report.cpp | 50 +++++- 2 files changed, 171 insertions(+), 21 deletions(-) diff --git a/src/mqtt_report/include/mqtt_report/mqtt_client.hpp b/src/mqtt_report/include/mqtt_report/mqtt_client.hpp index 858847a..7f83005 100644 --- a/src/mqtt_report/include/mqtt_report/mqtt_client.hpp +++ b/src/mqtt_report/include/mqtt_report/mqtt_client.hpp @@ -3,13 +3,20 @@ #include "mqtt/async_client.h" #include #include +#include +#include +#include +#include +#include class MQTTClientWrapper { public: MQTTClientWrapper(const std::string &server_address, const std::string &client_id, const std::string &username = "", const std::string &password = "") - : client_(server_address, client_id), connOpts_() + : client_(server_address, client_id), connOpts_(), + is_connected_(false), stop_reconnect_(false), + initial_reconnect_delay_ms_(1000), max_reconnect_delay_ms_(60000) { connOpts_.set_clean_start(true); if (!username.empty()) @@ -17,30 +24,38 @@ public: connOpts_.set_user_name(username); connOpts_.set_password(password); } + + // 设置连接丢失回调 + client_.set_connection_lost_handler([this](const std::string &cause) + { + std::cerr << "MQTT connection lost: " << cause << std::endl; + is_connected_ = false; + start_reconnect(); }); + } + + ~MQTTClientWrapper() + { + stop_reconnect(); + disconnect(); } bool connect() { - try - { - mqtt::token_ptr conntok = client_.connect(connOpts_); - conntok->wait(); // 阻塞直到连接完成 - std::cout << "MQTT connected successfully" << std::endl; - return true; - } - catch (const mqtt::exception &e) - { - std::cerr << "MQTT connection failed: " << e.what() << std::endl; - return false; - } + std::lock_guard lock(conn_mutex_); + return connect_with_retry(); } void disconnect() { + stop_reconnect(); try { - client_.disconnect()->wait(); - std::cout << "MQTT disconnected." << std::endl; + if (client_.is_connected()) + { + client_.disconnect()->wait(); + std::cout << "MQTT disconnected." << std::endl; + } + is_connected_ = false; } catch (const mqtt::exception &e) { @@ -50,6 +65,12 @@ public: void publish(const std::string &topic, const std::string &payload, int qos = 1, bool retained = false) { + if (!is_connected()) + { + std::cerr << "MQTT not connected, cannot publish." << std::endl; + return; + } + try { auto msg = mqtt::make_message(topic, payload); @@ -63,7 +84,98 @@ public: } } + bool is_connected() const + { + return is_connected_.load(); + } + + void set_reconnect_policy(int initial_delay_ms = 1000, int max_delay_ms = 60000) + { + initial_reconnect_delay_ms_ = initial_delay_ms; + max_reconnect_delay_ms_ = max_delay_ms; + } + +private: + bool connect_once() + { + try + { + mqtt::token_ptr conntok = client_.connect(connOpts_); + conntok->wait(); + std::cout << "MQTT connected successfully" << std::endl; + is_connected_ = true; + return true; + } + catch (const mqtt::exception &e) + { + std::cerr << "MQTT connection attempt failed: " << e.what() << std::endl; + return false; + } + } + + bool connect_with_retry() + { + int delay = initial_reconnect_delay_ms_; + + while (!stop_reconnect_) + { + if (connect_once()) + { + return true; + } + + // 指数退避策略,但有最大延迟限制 + delay = std::min(delay * 2, max_reconnect_delay_ms_); + + // 等待期间检查停止标志 + for (int i = 0; i < delay / 100; i += 100) + { + if (stop_reconnect_) + break; + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + } + return false; + } + + void start_reconnect() + { + std::lock_guard lock(reconnect_mutex_); + if (reconnect_thread_.joinable()) + { + return; // 重连已在进行中 + } + + stop_reconnect_ = false; + reconnect_thread_ = std::thread([this]() + { + std::cout << "Starting persistent MQTT reconnection..." << std::endl; + + { + std::lock_guard lock(conn_mutex_); + connect_with_retry(); + } + + std::cout << "Reconnection thread exiting." << std::endl; }); + } + + void stop_reconnect() + { + stop_reconnect_ = true; + if (reconnect_thread_.joinable()) + { + reconnect_thread_.join(); + } + } + private: mqtt::async_client client_; mqtt::connect_options connOpts_; + std::thread reconnect_thread_; + std::mutex conn_mutex_; + std::mutex reconnect_mutex_; + std::atomic is_connected_; + std::atomic stop_reconnect_; + int initial_reconnect_delay_ms_; + int max_reconnect_delay_ms_; }; diff --git a/src/mqtt_report/src/mqtt_report.cpp b/src/mqtt_report/src/mqtt_report.cpp index 765fafd..39dd236 100644 --- a/src/mqtt_report/src/mqtt_report.cpp +++ b/src/mqtt_report/src/mqtt_report.cpp @@ -5,6 +5,7 @@ #include "mqtt_report/report_struct.h" #include "mqtt_report/fault_codes.h" #include +#include Config config; // 清扫车配置文件 @@ -221,18 +222,23 @@ class CanDataSubscriber : public rclcpp::Node public: CanDataSubscriber(const Config &config) : Node("can_data_subscriber"), - mqtt_client_(config.mqtt_ip + ":" + std::to_string(config.mqtt_port), generate_mqtt_client_id(), "zxwl", "zxwl1234@"), + mqtt_client_(config.mqtt_ip + ":" + std::to_string(config.mqtt_port), + generate_mqtt_client_id(), "zxwl", "zxwl1234@"), info_topic_(config.info_topic), - fault_topic_(config.fault_topic) + fault_topic_(config.fault_topic), + config_(config) // 保存配置 { - if (!mqtt_client_.connect()) - { - RCLCPP_ERROR(this->get_logger(), "Failed to connect to MQTT server"); - } + // 设置MQTT重连策略(可选) + mqtt_client_.set_reconnect_policy(1000, 60000); // 初始1秒,最大60秒 + // 启动MQTT连接(非阻塞,后台自动重连) + mqtt_client_.connect(); + + // 创建订阅 subscription_ = this->create_subscription( "can_data", 10, std::bind(&CanDataSubscriber::topic_callback, this, std::placeholders::_1)); + // 创建定时器 info_timer_ = this->create_wall_timer( std::chrono::milliseconds(100), std::bind(&CanDataSubscriber::info_timer_callback, this)); @@ -240,17 +246,36 @@ public: fault_timer_ = this->create_wall_timer( std::chrono::milliseconds(500), std::bind(&CanDataSubscriber::fault_timer_callback, this)); + + // 添加MQTT连接状态监控 + connection_monitor_ = this->create_wall_timer( + std::chrono::seconds(5), + [this]() + { + if (!mqtt_client_.is_connected()) + { + RCLCPP_WARN(this->get_logger(), "MQTT disconnected, attempting to reconnect..."); + mqtt_client_.connect(); // 触发重连 + } + }); } private: void topic_callback(const mc::msg::CanFrame::SharedPtr msg) { // RCLCPP_INFO(this->get_logger(), "Received CAN frame ID: %u, DLC: %u", msg->id, msg->dlc); + std::lock_guard lock(data_mutex_); Msg_Handler(msg); } void info_timer_callback() { + // 确保MQTT连接后再发送 + if (!mqtt_client_.is_connected()) + { + return; + } + info_report.timestamp = getCurrentTimestampMs(); std::string json_msg = pack_general_info_to_json(info_report); mqtt_client_.publish(info_topic_, json_msg); @@ -258,6 +283,12 @@ private: void fault_timer_callback() { + // 确保MQTT连接后再发送 + if (!mqtt_client_.is_connected()) + { + return; + } + for (int code : vehicle_error_code.getAllErrorCodes()) { std::string fault_json = generateFaultJson(code, config.vid, getCurrentTimestampMs()); @@ -271,6 +302,13 @@ private: std::string fault_topic_; rclcpp::TimerBase::SharedPtr info_timer_; rclcpp::TimerBase::SharedPtr fault_timer_; + + rclcpp::TimerBase::SharedPtr connection_monitor_; // MQTT连接监控定时器 + Config config_; // 保存配置 + + // 用于线程安全的数据存储 + std::mutex data_mutex_; + mc::msg::CanFrame last_can_frame_; }; int main(int argc, char **argv)