diff --git a/include/mqtt_client.hpp b/include/mqtt_client.hpp index 2a8f3c0..69ad3c4 100644 --- a/include/mqtt_client.hpp +++ b/include/mqtt_client.hpp @@ -26,6 +26,7 @@ public: void connect(); void disconnect(); + void force_disconnect(); void publish(const std::string &topic, const std::string &payload, int qos = -1); void subscribe(const std::string &topic, int qos = -1); void switchServer(const MQTTConfig &newConfig); diff --git a/src/main.cpp b/src/main.cpp index c1df2e0..02ba502 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -9,6 +9,7 @@ std::atomic g_running(true); +// main.cpp - 修改signalHandler void signalHandler(int signum) { static bool already_called = false; @@ -18,16 +19,21 @@ void signalHandler(int signum) LOG_INFO("[MAIN] Received signal " + std::to_string(signum) + ", shutting down..."); g_running = false; - RTSPManager::stop(); // 停止 RTSP loop - // 设置超时,防止无限等待 - std::this_thread::sleep_for(std::chrono::milliseconds(500)); + // 停止RTSP循环 + RTSPManager::stop(); - // 强制退出如果仍然卡住 - if (signum == SIGINT) + // 记录停止时间 + auto start_time = std::chrono::steady_clock::now(); + + // 等待一段时间让线程退出 + while (std::chrono::steady_clock::now() - start_time < std::chrono::seconds(3)) { - exit(1); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); } + + LOG_INFO("[MAIN] Force exiting after waiting for threads"); + exit(1); } int main() diff --git a/src/mqtt_client.cpp b/src/mqtt_client.cpp index c6a8812..a1154b6 100644 --- a/src/mqtt_client.cpp +++ b/src/mqtt_client.cpp @@ -3,6 +3,8 @@ #include #include +extern std::atomic g_running; + MQTTClient::MQTTClient(const MQTTConfig &config) : config_(config), connected_(false) { @@ -24,7 +26,7 @@ void MQTTClient::initializeClient() void MQTTClient::connect() { std::lock_guard lock(mutex_); - if (connected_) + if (connected_ || !g_running) // 添加g_running检查 return; try @@ -37,7 +39,18 @@ void MQTTClient::connect() .password(config_.password) .finalize(); - client_->connect(connOpts)->wait(); // 阻塞等待连接 + // 使用wait_for而不是wait,设置超时时间 + auto connect_future = client_->connect(connOpts); + + // 等待最多2秒钟,期间检查退出标志 + for (int i = 0; i < 20 && g_running; i++) + { + if (connect_future->wait_for(std::chrono::milliseconds(100)) == std::future_status::ready) + { + connected_ = true; + break; + } + } } catch (const mqtt::exception &e) { @@ -54,7 +67,17 @@ void MQTTClient::disconnect() try { - client_->disconnect()->wait(); + // 使用wait_for而不是wait,设置超时时间 + auto disconnect_future = client_->disconnect(); + + // 等待最多1秒钟,而不是无限等待 + if (disconnect_future->wait_for(std::chrono::seconds(1)) == std::future_status::timeout) + { + LOG_WARN("[MQTT] Disconnect timed out, forcing disconnection"); + // 强制断开连接 + client_->stop_consuming(); + client_.reset(); + } } catch (const mqtt::exception &e) { @@ -66,6 +89,28 @@ void MQTTClient::disconnect() on_disconnect_(); } +void MQTTClient::force_disconnect() +{ + std::lock_guard lock(mutex_); + if (!connected_) + return; + + try + { + // 直接停止客户端,不等待正常断开 + client_->stop_consuming(); + client_.reset(); + } + catch (const mqtt::exception &e) + { + LOG_ERROR("[MQTT] Force disconnect failed: " + std::string(e.what())); + } + + connected_ = false; + if (on_disconnect_) + on_disconnect_(); +} + void MQTTClient::publish(const std::string &topic, const std::string &payload, int qos) { if (qos == -1) diff --git a/src/mqtt_client_wrapper.cpp b/src/mqtt_client_wrapper.cpp index 85ed282..b7db638 100644 --- a/src/mqtt_client_wrapper.cpp +++ b/src/mqtt_client_wrapper.cpp @@ -247,13 +247,35 @@ void mqtt_client_thread_func() } } - // 需要重启或退出 + // 清理资源 if (mqtt_client) { - mqtt_client->disconnect(); + // 只有在运行标志仍然为true时才尝试正常断开 + if (g_running) + { + mqtt_client->disconnect(); + } + else + { + // 如果正在退出,直接重置客户端,不等待断开完成 + mqtt_client->force_disconnect(); + } mqtt_client.reset(); } + mqtt_restart_required = false; + + // 如果正在退出,跳出循环 + if (!g_running) + { + break; + } + + // 短暂等待后再尝试重连 + for (int i = 0; i < 5 && g_running; i++) + { + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + } } LOG_INFO("[MQTT] Client thread exiting."); }