first commit
This commit is contained in:
parent
61e267dece
commit
05815c81f9
@ -14,7 +14,7 @@ MQTTClient::MQTTClient(const MQTTConfig &config)
|
||||
|
||||
MQTTClient::~MQTTClient()
|
||||
{
|
||||
disconnect();
|
||||
force_disconnect(); // 析构时直接释放资源,不阻塞
|
||||
}
|
||||
|
||||
void MQTTClient::initializeClient()
|
||||
@ -40,29 +40,8 @@ void MQTTClient::connect()
|
||||
.password(config_.password)
|
||||
.finalize();
|
||||
|
||||
auto connect_token = client_->connect(connOpts); // shared_ptr<mqtt::token>
|
||||
|
||||
for (int i = 0; i < 20 && g_running; ++i) // 最多 2 秒
|
||||
{
|
||||
if (connect_token->is_complete()) // 非阻塞检查
|
||||
{
|
||||
connected_ = true;
|
||||
break;
|
||||
}
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
}
|
||||
|
||||
// 如果循环结束仍未完成,可以手动断开
|
||||
if (!connected_)
|
||||
{
|
||||
try
|
||||
{
|
||||
client_->disconnect()->wait();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
}
|
||||
}
|
||||
// 异步连接,不阻塞
|
||||
auto tok = client_->connect(connOpts); // 返回 token
|
||||
}
|
||||
catch (const mqtt::exception &e)
|
||||
{
|
||||
@ -79,22 +58,21 @@ void MQTTClient::disconnect()
|
||||
|
||||
try
|
||||
{
|
||||
auto disc_token = client_->disconnect(); // 返回 std::shared_ptr<mqtt::token>
|
||||
auto disc_tok = client_->disconnect(); // 异步返回 token
|
||||
|
||||
// 等待最多1秒钟,期间检查 g_running
|
||||
int wait_loops = 10; // 10*100ms = 1秒
|
||||
for (int i = 0; i < wait_loops && g_running; ++i)
|
||||
// 等待短时间,检查 g_running
|
||||
for (int i = 0; i < 10 && g_running; ++i)
|
||||
{
|
||||
if (disc_token->is_complete())
|
||||
if (disc_tok->is_complete())
|
||||
break;
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
}
|
||||
|
||||
if (!disc_token->is_complete())
|
||||
if (!disc_tok->is_complete())
|
||||
{
|
||||
LOG_WARN("[MQTT] Disconnect timed out, forcing disconnection");
|
||||
client_->stop_consuming(); // 强制停止消费消息
|
||||
client_.reset(); // 释放 client 对象
|
||||
client_->stop_consuming();
|
||||
client_.reset();
|
||||
}
|
||||
}
|
||||
catch (const mqtt::exception &e)
|
||||
@ -136,7 +114,7 @@ void MQTTClient::publish(const std::string &topic, const std::string &payload, i
|
||||
|
||||
try
|
||||
{
|
||||
client_->publish(topic, payload.data(), payload.size(), qos, false)->wait_for(std::chrono::milliseconds(500));
|
||||
client_->publish(topic, payload.data(), payload.size(), qos, false); // 异步,不阻塞
|
||||
// 如果 topic 不包含 "heartbeat",才打印日志
|
||||
if (topic.find("heartbeat") == std::string::npos)
|
||||
{
|
||||
@ -159,7 +137,7 @@ void MQTTClient::subscribe(const std::string &topic, int qos)
|
||||
|
||||
try
|
||||
{
|
||||
client_->subscribe(topic, qos)->wait();
|
||||
client_->subscribe(topic, qos); // 异步,不阻塞
|
||||
LOG_INFO("[MQTT] Subscribed to topic: " + topic);
|
||||
}
|
||||
catch (const mqtt::exception &e)
|
||||
|
||||
@ -211,36 +211,21 @@ void mqtt_client_thread_func()
|
||||
mqtt_client->setDisconnectCallback(on_mqtt_disconnected);
|
||||
mqtt_client->setMessageCallback(on_mqtt_message_received);
|
||||
|
||||
// 等待连接
|
||||
while (g_running && !mqtt_client->isConnected())
|
||||
{
|
||||
try
|
||||
{
|
||||
mqtt_client->connect();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
}
|
||||
|
||||
// 拆分等待,及时响应退出
|
||||
for (int i = 0; i < 10 && g_running && !mqtt_client->isConnected(); i++)
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(50));
|
||||
|
||||
if (!g_running && mqtt_client->isConnected() == false)
|
||||
{
|
||||
if (!g_running && !mqtt_client->isConnected())
|
||||
mqtt_client->force_disconnect();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// 主循环:心跳
|
||||
while (g_running && mqtt_client->isConnected())
|
||||
{
|
||||
try
|
||||
{
|
||||
send_heartbeat();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
}
|
||||
|
||||
auto sleep_time = heartbeat_interval;
|
||||
while (sleep_time.count() > 0 && g_running && mqtt_client->isConnected())
|
||||
@ -251,12 +236,10 @@ void mqtt_client_thread_func()
|
||||
}
|
||||
|
||||
if (!g_running && mqtt_client->isConnected())
|
||||
{
|
||||
mqtt_client->force_disconnect();
|
||||
}
|
||||
}
|
||||
|
||||
// 清理客户端
|
||||
// 清理
|
||||
if (mqtt_client)
|
||||
{
|
||||
if (g_running)
|
||||
@ -268,17 +251,12 @@ void mqtt_client_thread_func()
|
||||
|
||||
mqtt_restart_required = false;
|
||||
|
||||
// 如果正在退出,跳出循环
|
||||
if (!g_running)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
// 短暂等待后再尝试重连
|
||||
// 短暂等待再重连
|
||||
for (int i = 0; i < 5 && g_running; i++)
|
||||
{
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(200));
|
||||
}
|
||||
}
|
||||
LOG_INFO("[MQTT] Client thread exiting.");
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user