From ba07a6fe15f521a9ac1dd2938f60dce53b03c0da Mon Sep 17 00:00:00 2001 From: cxh Date: Fri, 14 Nov 2025 14:56:02 +0800 Subject: [PATCH] 1 --- src/mqtt_client.cpp | 86 ++++++++++++++++++--------------------------- 1 file changed, 34 insertions(+), 52 deletions(-) diff --git a/src/mqtt_client.cpp b/src/mqtt_client.cpp index 00d1b7e..9b6f477 100644 --- a/src/mqtt_client.cpp +++ b/src/mqtt_client.cpp @@ -1,20 +1,17 @@ #include "mqtt_client.hpp" -#include -#include + #include #include +#include +#include extern std::atomic g_running; -MQTTClient::MQTTClient(const MQTTConfig &config) - : config_(config), connected_(false) -{ - initializeClient(); -} +MQTTClient::MQTTClient(const MQTTConfig& config) : config_(config), connected_(false) { initializeClient(); } MQTTClient::~MQTTClient() { - force_disconnect(); // 析构时直接释放资源,不阻塞 + force_disconnect(); // 析构时直接释放资源,不阻塞 } void MQTTClient::initializeClient() @@ -27,7 +24,7 @@ void MQTTClient::initializeClient() void MQTTClient::connect() { std::lock_guard lock(mutex_); - if (connected_ || !g_running) // 添加g_running检查 + if (connected_ || !g_running) // 添加g_running检查 return; try @@ -41,9 +38,9 @@ void MQTTClient::connect() .finalize(); // 异步连接,不阻塞 - auto tok = client_->connect(connOpts); // 返回 token + auto tok = client_->connect(connOpts); // 返回 token } - catch (const mqtt::exception &e) + catch (const mqtt::exception& e) { connected_ = false; LOG_ERROR("[MQTT] Connect failed: " + std::string(e.what())); @@ -53,18 +50,16 @@ void MQTTClient::connect() void MQTTClient::disconnect() { std::lock_guard lock(mutex_); - if (!connected_) - return; + if (!connected_) return; try { - auto disc_tok = client_->disconnect(); // 异步返回 token + auto disc_tok = client_->disconnect(); // 异步返回 token // 等待短时间,检查 g_running for (int i = 0; i < 10 && g_running; ++i) { - if (disc_tok->is_complete()) - break; + if (disc_tok->is_complete()) break; std::this_thread::sleep_for(std::chrono::milliseconds(100)); } @@ -75,21 +70,19 @@ void MQTTClient::disconnect() client_.reset(); } } - catch (const mqtt::exception &e) + catch (const mqtt::exception& e) { LOG_ERROR("[MQTT] Disconnect failed: " + std::string(e.what())); } connected_ = false; - if (on_disconnect_) - on_disconnect_(); + if (on_disconnect_) on_disconnect_(); } void MQTTClient::force_disconnect() { std::lock_guard lock(mutex_); - if (!connected_) - return; + if (!connected_) return; try { @@ -97,59 +90,54 @@ void MQTTClient::force_disconnect() client_->stop_consuming(); client_.reset(); } - catch (const mqtt::exception &e) + catch (const mqtt::exception& e) { LOG_ERROR("[MQTT] Force disconnect failed: " + std::string(e.what())); } connected_ = false; - if (on_disconnect_) - on_disconnect_(); + if (on_disconnect_) on_disconnect_(); } -void MQTTClient::publish(const std::string &topic, const std::string &payload, int qos) +void MQTTClient::publish(const std::string& topic, const std::string& payload, int qos) { - if (qos == -1) - qos = config_.qos; + if (qos == -1) qos = config_.qos; try { - client_->publish(topic, payload.data(), payload.size(), qos, false); // 异步,不阻塞 + client_->publish(topic, payload.data(), payload.size(), qos, false); // 异步,不阻塞 // 如果 topic 不包含 "heartbeat",才打印日志 - if (topic.find("heartbeat") == std::string::npos) + if (topic.find("status") == std::string::npos) { LOG_INFO("[MQTT] Published message to topic: " + topic); } } - catch (const mqtt::exception &e) + catch (const mqtt::exception& e) { LOG_ERROR("[MQTT] Publish failed: " + std::string(e.what())); connected_ = false; - if (on_disconnect_) - on_disconnect_(); + if (on_disconnect_) on_disconnect_(); } } -void MQTTClient::subscribe(const std::string &topic, int qos) +void MQTTClient::subscribe(const std::string& topic, int qos) { - if (qos == -1) - qos = config_.qos; + if (qos == -1) qos = config_.qos; try { - client_->subscribe(topic, qos); // 异步,不阻塞 + client_->subscribe(topic, qos); // 异步,不阻塞 LOG_INFO("[MQTT] Subscribed to topic: " + topic); } - catch (const mqtt::exception &e) + catch (const mqtt::exception& e) { LOG_ERROR("[MQTT] Subscribe failed: " + std::string(e.what())); connected_ = false; - if (on_disconnect_) - on_disconnect_(); + if (on_disconnect_) on_disconnect_(); } } -void MQTTClient::switchServer(const MQTTConfig &newConfig) +void MQTTClient::switchServer(const MQTTConfig& newConfig) { std::lock_guard lock(mutex_); disconnect(); @@ -158,33 +146,27 @@ void MQTTClient::switchServer(const MQTTConfig &newConfig) connect(); } -bool MQTTClient::isConnected() const -{ - return connected_; -} +bool MQTTClient::isConnected() const { return connected_; } void MQTTClient::setConnectCallback(ConnectCallback cb) { on_connect_ = cb; } void MQTTClient::setDisconnectCallback(DisconnectCallback cb) { on_disconnect_ = cb; } void MQTTClient::setMessageCallback(MessageCallback cb) { on_message_ = cb; } -void MQTTClient::connection_lost(const std::string &cause) +void MQTTClient::connection_lost(const std::string& cause) { connected_ = false; LOG_WARN("[MQTT] Connection lost: " + cause); - if (on_disconnect_) - on_disconnect_(); + if (on_disconnect_) on_disconnect_(); } -void MQTTClient::connected(const std::string &cause) +void MQTTClient::connected(const std::string& cause) { connected_ = true; LOG_INFO("[MQTT] Reconnected: " + cause); - if (on_connect_) - on_connect_(); + if (on_connect_) on_connect_(); } void MQTTClient::message_arrived(mqtt::const_message_ptr msg) { - if (on_message_) - on_message_(msg->get_topic(), msg->get_payload_str()); + if (on_message_) on_message_(msg->get_topic(), msg->get_payload_str()); }