#include "mqtt_client_wrapper.hpp" std::shared_ptr mqtt_client; std::atomic mqtt_restart_required{false}; static uint16_t broadcast_sequence = 0; // MQTT 回调定义 static void on_mqtt_connected() { LOG_INFO("[MQTT] Connected to broker."); mqtt_client->subscribe(g_mqtt_config.topics.downlink); } static void on_mqtt_disconnected() { LOG_WARN("[MQTT] Disconnected from broker."); } static void on_mqtt_message_received(const std::string &topic, const std::string &message) { LOG_INFO("[MQTT] Received message on topic [" + topic + "], len = " + std::to_string(message.size())); try { } catch (const std::exception &e) { LOG_ERROR(std::string("[MQTT] Failed to process incoming JSON: ") + e.what()); } } void mqtt_client_thread_func() { while (true) { const auto &cfg = g_mqtt_config; mqtt_client = std::make_unique(cfg); mqtt_client->setConnectCallback(on_mqtt_connected); mqtt_client->setDisconnectCallback(on_mqtt_disconnected); mqtt_client->setMessageCallback(on_mqtt_message_received); mqtt_client->connect(); // 主线程监听重启信号 while (!mqtt_restart_required) { std::this_thread::sleep_for(std::chrono::seconds(1)); } // 需要重启 LOG_INFO("[MQTT] Restarting client..."); mqtt_client->disconnect(); // 可加锁 mqtt_client.reset(); mqtt_restart_required = false; } }