diff --git a/src/mqtt_client.cpp b/src/mqtt_client.cpp index 492e3d0..bdc39a1 100644 --- a/src/mqtt_client.cpp +++ b/src/mqtt_client.cpp @@ -40,17 +40,28 @@ void MQTTClient::connect() .password(config_.password) .finalize(); - // 使用wait_for而不是wait,设置超时时间 - auto connect_future = client_->connect(connOpts); + auto connect_token = client_->connect(connOpts); // shared_ptr - // 等待最多2秒钟,期间检查退出标志 - for (int i = 0; i < 20 && g_running; i++) + for (int i = 0; i < 20 && g_running; ++i) // 最多 2 秒 { - if (connect_future->wait_for(std::chrono::milliseconds(100)) == std::future_status::ready) + if (connect_token->is_complete()) // 非阻塞检查 { connected_ = true; break; } + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + + // 如果循环结束仍未完成,可以手动断开 + if (!connected_) + { + try + { + client_->disconnect()->wait(); + } + catch (...) + { + } } } catch (const mqtt::exception &e) @@ -68,16 +79,22 @@ void MQTTClient::disconnect() try { - // 使用wait_for而不是wait,设置超时时间 - auto disconnect_future = client_->disconnect(); + auto disc_token = client_->disconnect(); // 返回 std::shared_ptr - // 等待最多1秒钟,而不是无限等待 - if (disconnect_future->wait_for(std::chrono::seconds(1)) == std::future_status::timeout) + // 等待最多1秒钟,期间检查 g_running + int wait_loops = 10; // 10*100ms = 1秒 + for (int i = 0; i < wait_loops && g_running; ++i) + { + if (disc_token->is_complete()) + break; + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + + if (!disc_token->is_complete()) { LOG_WARN("[MQTT] Disconnect timed out, forcing disconnection"); - // 强制断开连接 - client_->stop_consuming(); - client_.reset(); + client_->stop_consuming(); // 强制停止消费消息 + client_.reset(); // 释放 client 对象 } } catch (const mqtt::exception &e)