From 05815c81f93ece1ee56a3f3f7d0e4b24584483ed Mon Sep 17 00:00:00 2001 From: cxh Date: Wed, 10 Sep 2025 13:33:29 +0800 Subject: [PATCH] first commit --- src/mqtt_client.cpp | 46 ++++++++++--------------------------- src/mqtt_client_wrapper.cpp | 36 ++++++----------------------- 2 files changed, 19 insertions(+), 63 deletions(-) diff --git a/src/mqtt_client.cpp b/src/mqtt_client.cpp index bdc39a1..00d1b7e 100644 --- a/src/mqtt_client.cpp +++ b/src/mqtt_client.cpp @@ -14,7 +14,7 @@ MQTTClient::MQTTClient(const MQTTConfig &config) MQTTClient::~MQTTClient() { - disconnect(); + force_disconnect(); // 析构时直接释放资源,不阻塞 } void MQTTClient::initializeClient() @@ -40,29 +40,8 @@ void MQTTClient::connect() .password(config_.password) .finalize(); - auto connect_token = client_->connect(connOpts); // shared_ptr - - for (int i = 0; i < 20 && g_running; ++i) // 最多 2 秒 - { - if (connect_token->is_complete()) // 非阻塞检查 - { - connected_ = true; - break; - } - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - } - - // 如果循环结束仍未完成,可以手动断开 - if (!connected_) - { - try - { - client_->disconnect()->wait(); - } - catch (...) - { - } - } + // 异步连接,不阻塞 + auto tok = client_->connect(connOpts); // 返回 token } catch (const mqtt::exception &e) { @@ -79,22 +58,21 @@ void MQTTClient::disconnect() try { - auto disc_token = client_->disconnect(); // 返回 std::shared_ptr + auto disc_tok = client_->disconnect(); // 异步返回 token - // 等待最多1秒钟,期间检查 g_running - int wait_loops = 10; // 10*100ms = 1秒 - for (int i = 0; i < wait_loops && g_running; ++i) + // 等待短时间,检查 g_running + for (int i = 0; i < 10 && g_running; ++i) { - if (disc_token->is_complete()) + if (disc_tok->is_complete()) break; std::this_thread::sleep_for(std::chrono::milliseconds(100)); } - if (!disc_token->is_complete()) + if (!disc_tok->is_complete()) { LOG_WARN("[MQTT] Disconnect timed out, forcing disconnection"); - client_->stop_consuming(); // 强制停止消费消息 - client_.reset(); // 释放 client 对象 + client_->stop_consuming(); + client_.reset(); } } catch (const mqtt::exception &e) @@ -136,7 +114,7 @@ void MQTTClient::publish(const std::string &topic, const std::string &payload, i try { - client_->publish(topic, payload.data(), payload.size(), qos, false)->wait_for(std::chrono::milliseconds(500)); + client_->publish(topic, payload.data(), payload.size(), qos, false); // 异步,不阻塞 // 如果 topic 不包含 "heartbeat",才打印日志 if (topic.find("heartbeat") == std::string::npos) { @@ -159,7 +137,7 @@ void MQTTClient::subscribe(const std::string &topic, int qos) try { - client_->subscribe(topic, qos)->wait(); + client_->subscribe(topic, qos); // 异步,不阻塞 LOG_INFO("[MQTT] Subscribed to topic: " + topic); } catch (const mqtt::exception &e) diff --git a/src/mqtt_client_wrapper.cpp b/src/mqtt_client_wrapper.cpp index f0f14a9..dac8abe 100644 --- a/src/mqtt_client_wrapper.cpp +++ b/src/mqtt_client_wrapper.cpp @@ -211,36 +211,21 @@ void mqtt_client_thread_func() mqtt_client->setDisconnectCallback(on_mqtt_disconnected); mqtt_client->setMessageCallback(on_mqtt_message_received); + // 等待连接 while (g_running && !mqtt_client->isConnected()) { - try - { - mqtt_client->connect(); - } - catch (...) - { - } - - // 拆分等待,及时响应退出 + mqtt_client->connect(); for (int i = 0; i < 10 && g_running && !mqtt_client->isConnected(); i++) std::this_thread::sleep_for(std::chrono::milliseconds(50)); - if (!g_running && mqtt_client->isConnected() == false) - { + if (!g_running && !mqtt_client->isConnected()) mqtt_client->force_disconnect(); - break; - } } + // 主循环:心跳 while (g_running && mqtt_client->isConnected()) { - try - { - send_heartbeat(); - } - catch (...) - { - } + send_heartbeat(); auto sleep_time = heartbeat_interval; while (sleep_time.count() > 0 && g_running && mqtt_client->isConnected()) @@ -251,12 +236,10 @@ void mqtt_client_thread_func() } if (!g_running && mqtt_client->isConnected()) - { mqtt_client->force_disconnect(); - } } - // 清理客户端 + // 清理 if (mqtt_client) { if (g_running) @@ -268,17 +251,12 @@ void mqtt_client_thread_func() mqtt_restart_required = false; - // 如果正在退出,跳出循环 if (!g_running) - { break; - } - // 短暂等待后再尝试重连 + // 短暂等待再重连 for (int i = 0; i < 5 && g_running; i++) - { std::this_thread::sleep_for(std::chrono::milliseconds(200)); - } } LOG_INFO("[MQTT] Client thread exiting."); }