diff --git a/src/mqtt_report/include/mqtt_report/mqtt_client.hpp b/src/mqtt_report/include/mqtt_report/mqtt_client.hpp index 3cf0269..c897a8a 100644 --- a/src/mqtt_report/include/mqtt_report/mqtt_client.hpp +++ b/src/mqtt_report/include/mqtt_report/mqtt_client.hpp @@ -3,179 +3,168 @@ #include "paho_mqtt_3c/MQTTClient.h" #include #include -#include -#include #include +#include +#include +#include class MQTTClientWrapper { public: - MQTTClientWrapper(const std::string &server_address, const std::string &client_id, - const std::string &username = "", const std::string &password = "", - int reconnect_interval_ms = 5000) // 重连间隔(毫秒) - : server_address_(server_address), client_id_(client_id), - username_(username), password_(password), client_(nullptr), - is_connected_(false), reconnect_interval_ms_(reconnect_interval_ms) + MQTTClientWrapper(const std::string &server_uri, + const std::string &client_id, + const std::string &username = "", + const std::string &password = "", + int reconnect_interval_ms = 3000) + : server_uri_(server_uri), + client_id_(client_id), + username_(username), + password_(password), + reconnect_interval_ms_(reconnect_interval_ms), + connected_(false), + stop_flag_(false) { - // 初始化客户端 - int rc = MQTTClient_create(&client_, server_address.c_str(), client_id.c_str(), - MQTTCLIENT_PERSISTENCE_NONE, nullptr); + int rc = MQTTClient_create(&client_, + server_uri_.c_str(), + client_id_.c_str(), + MQTTCLIENT_PERSISTENCE_NONE, + nullptr); + if (rc != MQTTCLIENT_SUCCESS) { - std::cerr << "Failed to create MQTT client, return code " << rc << std::endl; + std::cerr << "MQTTClient_create failed! rc=" << rc << std::endl; client_ = nullptr; + return; } + + MQTTClient_setCallbacks(client_, this, conn_lost_cb, nullptr, nullptr); + + reconnect_thread_ = std::thread([this]() { reconnectLoop(); }); } ~MQTTClientWrapper() { + stop_flag_ = true; + + if (reconnect_thread_.joinable()) + reconnect_thread_.join(); + std::lock_guard lock(mtx_); if (client_) { - disconnect(); + if (connected_) + MQTTClient_disconnect(client_, 2000); + MQTTClient_destroy(&client_); - client_ = nullptr; } } - bool connect(int max_retries = 5) + // 手动连接(可选) + bool connect() + { + std::lock_guard lock(mtx_); + return doConnect(); + } + + // 发布(非阻塞,无 waitForCompletion) + bool publish(const std::string &topic, + const std::string &payload, + int qos = 0, + bool retained = false) { std::lock_guard lock(mtx_); if (!client_) return false; - MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; - conn_opts.keepAliveInterval = 20; - conn_opts.cleansession = 1; - conn_opts.connectTimeout = 5; - - if (!username_.empty()) - { - conn_opts.username = username_.c_str(); - conn_opts.password = password_.c_str(); - } - - int rc; - for (int retry_count = 0; retry_count < max_retries; ++retry_count) - { - std::cout << "Connecting to MQTT (" << retry_count + 1 << "/" << max_retries << ")..." << std::endl; - rc = MQTTClient_connect(client_, &conn_opts); - - if (rc == MQTTCLIENT_SUCCESS) - { - is_connected_ = true; - std::cout << "MQTT connected successfully" << std::endl; - return true; - } - - std::cerr << "Connect failed rc=" << rc << ", wait " << reconnect_interval_ms_ << " ms" << std::endl; - std::this_thread::sleep_for(std::chrono::milliseconds(reconnect_interval_ms_)); - } - - is_connected_ = false; - return false; - } - - void disconnect() - { - std::lock_guard lock(mtx_); - if (client_ && is_connected_) - { - int rc = MQTTClient_disconnect(client_, 10000); - if (rc != MQTTCLIENT_SUCCESS) - { - std::cerr << "Failed to disconnect, return code " << rc << std::endl; - } - else - { - std::cout << "MQTT disconnected." << std::endl; - } - is_connected_ = false; - } - } - - // 发布消息,自动处理重连 - bool publish(const std::string &topic, const std::string &payload, int qos = 0, bool retained = false) - { - std::lock_guard lock(mtx_); - if (!client_) - { - std::cerr << "MQTT client not initialized" << std::endl; + if (!connected_) return false; - } - // 检查连接状态,断开则尝试重连 - if (!is_connected_ || !check_actual_connection()) - { - std::cerr << "MQTT connection lost, trying to reconnect..." << std::endl; - if (!connect()) // 使用默认重试次数重连 - { - std::cerr << "Reconnection failed, cannot publish message" << std::endl; - return false; - } - } + MQTTClient_message msg = MQTTClient_message_initializer; + msg.payload = (void *)payload.c_str(); + msg.payloadlen = payload.size(); + msg.qos = qos; + msg.retained = retained ? 1 : 0; - MQTTClient_message pubmsg = MQTTClient_message_initializer; MQTTClient_deliveryToken token; + int rc = MQTTClient_publishMessage(client_, topic.c_str(), &msg, &token); - pubmsg.payload = const_cast(payload.c_str()); - pubmsg.payloadlen = payload.length(); - pubmsg.qos = qos; - pubmsg.retained = retained ? 1 : 0; - - int rc = MQTTClient_publishMessage(client_, topic.c_str(), &pubmsg, &token); if (rc != MQTTCLIENT_SUCCESS) { - std::cerr << "Failed to publish message, return code " << rc << std::endl; - is_connected_ = false; // 发布失败标记为断开 + connected_ = false; return false; } - // 等待发布完成 - rc = MQTTClient_waitForCompletion(client_, token, 10000L); // 超时10秒 - if (rc != MQTTCLIENT_SUCCESS) - { - std::cerr << "Failed to wait for completion, return code " << rc << std::endl; - // is_connected_ = false; // 确认失败标记为断开 - return false; - } - - // 可选:取消注释以打印发布信息 - // std::cout << "Published to " << topic << std::endl; + // 非阻塞,不等待 completion return true; } - // 检查实际连接状态(通过MQTTClient API) - bool is_connected() - { - std::lock_guard lock(mtx_); - return check_actual_connection(); - } - - // 设置重连间隔(毫秒) - void set_reconnect_interval(int interval_ms) - { - std::lock_guard lock(mtx_); - reconnect_interval_ms_ = interval_ms; - } + bool isConnected() const { return connected_; } private: - // 内部检查连接状态的实现 - bool check_actual_connection() + bool doConnect() { if (!client_) return false; - int state = MQTTClient_isConnected(client_); - is_connected_ = (state == 1); - return is_connected_; + + MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer; + opts.keepAliveInterval = 20; + opts.cleansession = 1; + opts.connectTimeout = 5; + + if (!username_.empty()) + { + opts.username = username_.c_str(); + opts.password = password_.c_str(); + } + + int rc = MQTTClient_connect(client_, &opts); + if (rc == MQTTCLIENT_SUCCESS) + { + std::cout << "[MQTT] Connected." << std::endl; + connected_ = true; + return true; + } + + std::cerr << "[MQTT] Connect failed rc=" << rc << std::endl; + connected_ = false; + return false; } - std::string server_address_; + static void conn_lost_cb(void *context, char *cause) + { + auto *self = static_cast(context); + std::cerr << "[MQTT] Connection lost: " << (cause ? cause : "unknown") << std::endl; + self->connected_ = false; + } + + // 自动重连线程 + void reconnectLoop() + { + while (!stop_flag_) + { + if (!connected_) + { + std::lock_guard lock(mtx_); + doConnect(); + } + + std::this_thread::sleep_for( + std::chrono::milliseconds(reconnect_interval_ms_)); + } + } + +private: + MQTTClient client_; + std::string server_uri_; std::string client_id_; std::string username_; std::string password_; - MQTTClient client_; - bool is_connected_; // 连接状态标记 - int reconnect_interval_ms_; // 重连间隔(毫秒) - std::mutex mtx_; // 线程安全锁 -}; \ No newline at end of file + + int reconnect_interval_ms_; + + std::atomic connected_; + std::atomic stop_flag_; + + std::mutex mtx_; + std::thread reconnect_thread_; +}; diff --git a/src/mqtt_report/src/mqtt_report.cpp b/src/mqtt_report/src/mqtt_report.cpp index c2ae575..f5189e0 100644 --- a/src/mqtt_report/src/mqtt_report.cpp +++ b/src/mqtt_report/src/mqtt_report.cpp @@ -273,12 +273,6 @@ public: info_topic_(config.info_topic), fault_topic_(config.fault_topic) { - // 初始连接(带重试) - if (!mqtt_client_.connect(5)) // 最多重试5次 - { - RCLCPP_WARN(this->get_logger(), "Initial MQTT connection failed, will retry in background"); - } - subscription_ = this->create_subscription( "can_data", 10, std::bind(&CanDataSubscriber::topic_callback, this, std::placeholders::_1)); @@ -302,8 +296,6 @@ public: info_timer_->cancel(); fault_timer_->cancel(); connection_check_timer_->cancel(); - // 断开MQTT连接 - mqtt_client_.disconnect(); } private: @@ -344,12 +336,11 @@ private: } // 定期检查连接状态,主动重连 - void check_connection_callback() + void check_connection_callback() { - if (!mqtt_client_.is_connected()) + if (!mqtt_client_.isConnected()) { - RCLCPP_WARN(this->get_logger(), "MQTT connection check failed, attempting to reconnect"); - mqtt_client_.connect(3); // 检查时重试3次 + RCLCPP_WARN(this->get_logger(), "MQTT connection lost."); } }