diff --git a/include/mqtt_client.hpp b/include/mqtt_client.hpp index b2a05ba..2a8f3c0 100644 --- a/include/mqtt_client.hpp +++ b/include/mqtt_client.hpp @@ -34,7 +34,6 @@ public: private: void initializeClient(); - void startReconnect(); // mqtt::callback 实现 void connection_lost(const std::string &cause) override; @@ -45,7 +44,6 @@ private: MQTTConfig config_; std::shared_ptr client_; std::atomic connected_; - std::atomic reconnect_active_; std::mutex mutex_; ConnectCallback on_connect_; diff --git a/src/mqtt_client.cpp b/src/mqtt_client.cpp index 81979c3..d8f3337 100644 --- a/src/mqtt_client.cpp +++ b/src/mqtt_client.cpp @@ -4,7 +4,7 @@ #include MQTTClient::MQTTClient(const MQTTConfig &config) - : config_(config), connected_(false), reconnect_active_(false) + : config_(config), connected_(false) { initializeClient(); } @@ -37,18 +37,15 @@ void MQTTClient::connect() .password(config_.password) .finalize(); - auto tok = client_->connect(connOpts); - tok->wait(); // 阻塞直到连接成功或失败,不用 wait_for(2s) - - LOG_INFO("[MQTT] Connected to broker: " + config_.server_ip); + client_->connect(connOpts)->wait(); // 阻塞等待连接 connected_ = true; if (on_connect_) on_connect_(); } catch (const mqtt::exception &e) { + connected_ = false; LOG_ERROR("[MQTT] Connect failed: " + std::string(e.what())); - // startReconnect(); } } @@ -61,14 +58,15 @@ void MQTTClient::disconnect() try { client_->disconnect()->wait(); - connected_ = false; - if (on_disconnect_) - on_disconnect_(); } catch (const mqtt::exception &e) { LOG_ERROR("[MQTT] Disconnect failed: " + std::string(e.what())); } + + connected_ = false; + if (on_disconnect_) + on_disconnect_(); } void MQTTClient::publish(const std::string &topic, const std::string &payload, int qos) @@ -88,8 +86,9 @@ void MQTTClient::publish(const std::string &topic, const std::string &payload, i catch (const mqtt::exception &e) { LOG_ERROR("[MQTT] Publish failed: " + std::string(e.what())); - if (!connected_) - startReconnect(); + connected_ = false; + if (on_disconnect_) + on_disconnect_(); } } @@ -106,75 +105,21 @@ void MQTTClient::subscribe(const std::string &topic, int qos) catch (const mqtt::exception &e) { LOG_ERROR("[MQTT] Subscribe failed: " + std::string(e.what())); - if (!connected_) - startReconnect(); + connected_ = false; + if (on_disconnect_) + on_disconnect_(); } } void MQTTClient::switchServer(const MQTTConfig &newConfig) { std::lock_guard lock(mutex_); - if (connected_) - { - try - { - client_->disconnect()->wait(); - } - catch (...) - { - } - connected_ = false; - } + disconnect(); config_ = newConfig; initializeClient(); connect(); } -void MQTTClient::startReconnect() -{ - if (reconnect_active_) - return; - reconnect_active_ = true; - - std::thread([this] - { - while (!connected_ && reconnect_active_) { - std::this_thread::sleep_for(std::chrono::seconds(2)); - try { - connect(); - } catch (...) {} - } - reconnect_active_ = false; }) - .detach(); -} - -void MQTTClient::connection_lost(const std::string &cause) -{ - std::lock_guard lock(mutex_); - connected_ = false; - LOG_WARN("[MQTT] Connection lost: " + cause); - if (on_disconnect_) - on_disconnect_(); - // startReconnect(); -} - -void MQTTClient::connected(const std::string &cause) -{ - std::lock_guard lock(mutex_); - connected_ = true; - LOG_INFO("[MQTT] Reconnected: " + cause); - if (on_connect_) - on_connect_(); -} - -void MQTTClient::message_arrived(mqtt::const_message_ptr msg) -{ - if (on_message_) - { - on_message_(msg->get_topic(), msg->get_payload_str()); - } -} - bool MQTTClient::isConnected() const { return connected_; @@ -183,3 +128,25 @@ bool MQTTClient::isConnected() const void MQTTClient::setConnectCallback(ConnectCallback cb) { on_connect_ = cb; } void MQTTClient::setDisconnectCallback(DisconnectCallback cb) { on_disconnect_ = cb; } void MQTTClient::setMessageCallback(MessageCallback cb) { on_message_ = cb; } + +void MQTTClient::connection_lost(const std::string &cause) +{ + connected_ = false; + LOG_WARN("[MQTT] Connection lost: " + cause); + if (on_disconnect_) + on_disconnect_(); +} + +void MQTTClient::connected(const std::string &cause) +{ + connected_ = true; + LOG_INFO("[MQTT] Reconnected: " + cause); + if (on_connect_) + on_connect_(); +} + +void MQTTClient::message_arrived(mqtt::const_message_ptr msg) +{ + if (on_message_) + on_message_(msg->get_topic(), msg->get_payload_str()); +} diff --git a/src/mqtt_client_wrapper.cpp b/src/mqtt_client_wrapper.cpp index 12892a3..9bbf776 100644 --- a/src/mqtt_client_wrapper.cpp +++ b/src/mqtt_client_wrapper.cpp @@ -1,12 +1,13 @@ // mqtt_client_wrapper.cpp #include "mqtt_client_wrapper.hpp" +#include +#include +#include std::shared_ptr mqtt_client; std::atomic mqtt_restart_required{false}; - extern std::atomic g_running; - -std::atomic g_streaming{false}; // 当前是否有摄像头在拉流 +std::atomic g_streaming{false}; std::string g_dispatch_id; std::mutex g_dispatch_id_mutex; @@ -16,12 +17,13 @@ static void send_heartbeat() return; nlohmann::json hb_data; - hb_data["time"] = Logger::get_current_time_utc8(); // UTC+8 - - // 状态: 2=没有拉流, 0=正在拉流 + hb_data["time"] = Logger::get_current_time_utc8(); hb_data["status"] = g_streaming ? 0 : 2; - hb_data["dispatchId"] = g_streaming ? g_dispatch_id : ""; + { + std::lock_guard lock(g_dispatch_id_mutex); + hb["dispatchId"] = g_streaming ? g_dispatch_id : ""; + } nlohmann::json msg; msg["data"] = hb_data; @@ -29,7 +31,6 @@ static void send_heartbeat() msg["type"] = 0; mqtt_client->publish(g_app_config.mqtt.topics.heartbeat_up, msg.dump()); - // LOG_INFO("[MQTT] Sent heartbeat: " + msg.dump()); } // MQTT 回调定义 @@ -209,9 +210,20 @@ void mqtt_client_thread_func() mqtt_client->setDisconnectCallback(on_mqtt_disconnected); mqtt_client->setMessageCallback(on_mqtt_message_received); - mqtt_client->connect(); + while (g_running && !mqtt_client->isConnected()) + { + try + { + mqtt_client->connect(); + } + catch (...) + { + } + if (!mqtt_client->isConnected()) + std::this_thread::sleep_for(std::chrono::seconds(2)); + } - while (!mqtt_restart_required && g_running) + while (g_running && mqtt_client->isConnected()) { try { @@ -219,10 +231,9 @@ void mqtt_client_thread_func() } catch (const std::exception &e) { - LOG_ERROR(std::string("[MQTT] Heartbeat error: ") + e.what()); + LOG_ERROR("[MQTT] Heartbeat error: " + std::string(e.what())); } - - std::this_thread::sleep_for(heartbeat_interval); // 固定周期 + std::this_thread::sleep_for(heartbeat_interval); } // 需要重启或退出