// mqtt_client_wrapper.cpp #include "mqtt_client_wrapper.hpp" #include #include #include #include std::shared_ptr mqtt_client; extern std::atomic g_running; static void send_heartbeat() { if (!mqtt_client || !mqtt_client->isConnected()) return; nlohmann::json hb_data; hb_data["time"] = Logger::get_current_time_utc8(); hb_data["status"] = 0; // 固定上报正常 hb_data["dispatchId"] = ""; // 已无任务状态 nlohmann::json msg; msg["data"] = hb_data; msg["isEnc"] = 0; msg["type"] = 0; mqtt_client->publish(g_app_config.mqtt.topics.heartbeat_up, msg.dump()); } // MQTT 连接成功 static void on_mqtt_connected() { LOG_INFO("[MQTT] Connected to broker: " + g_app_config.mqtt.server_ip); send_heartbeat(); const auto &topics = g_app_config.mqtt.topics; mqtt_client->subscribe(topics.video_down); mqtt_client->subscribe(topics.substream_down); mqtt_client->subscribe(topics.reset_down); } // MQTT 连接断开 static void on_mqtt_disconnected() { LOG_WARN("[MQTT] Disconnected from broker: " + g_app_config.mqtt.server_ip); } // 处理消息 static void on_mqtt_message_received(const std::string &topic, const std::string &message) { LOG_INFO("[MQTT] Received message on topic [" + topic + "]"); LOG_INFO("[MQTT] Content: " + message); try { // ------- video_down 应答 ------- if (topic == g_app_config.mqtt.topics.video_down) { auto j = nlohmann::json::parse(message); auto seqNo = j["data"].value("seqNo", ""); LOG_INFO("[MQTT] video_down received, RTSP always running (no action)."); nlohmann::json reply_data; reply_data["time"] = Logger::get_current_time_utc8(); reply_data["result"] = 0; reply_data["seqNo"] = seqNo; nlohmann::json reply; reply["data"] = reply_data; reply["isEnc"] = 0; reply["type"] = 0; mqtt_client->publish(g_app_config.mqtt.topics.video_down_ack, reply.dump()); return; } // ------- substream_down 应答 ------- if (topic == g_app_config.mqtt.topics.substream_down) { LOG_INFO("[MQTT] substream_down received (ignored)."); return; } // ------- reset/down 应答 ------- if (topic == g_app_config.mqtt.topics.reset_down) { auto j = nlohmann::json::parse(message); auto seqNo = j["data"].value("seqNo", ""); LOG_WARN("[MQTT] reset/down received (ignored)."); nlohmann::json reply_data; reply_data["time"] = Logger::get_current_time_utc8(); reply_data["result"] = 0; reply_data["seqNo"] = seqNo; nlohmann::json reply; reply["data"] = reply_data; reply["isEnc"] = 0; reply["type"] = 0; mqtt_client->publish(g_app_config.mqtt.topics.reset_down_ack, reply.dump()); return; } LOG_WARN("[MQTT] Unknown topic: " + topic); } catch (const std::exception &e) { LOG_ERROR(std::string("[MQTT] JSON processing failed: ") + e.what()); } } void mqtt_client_thread_func() { const auto &cfg = g_app_config.mqtt; auto heartbeat_interval = std::chrono::milliseconds((int)(cfg.keep_alive * 0.9)); while (g_running) { mqtt_client = std::make_unique(cfg); mqtt_client->setConnectCallback(on_mqtt_connected); mqtt_client->setDisconnectCallback(on_mqtt_disconnected); mqtt_client->setMessageCallback(on_mqtt_message_received); // 等待连接 while (g_running && !mqtt_client->isConnected()) { mqtt_client->connect(); for (int i = 0; i < 10 && g_running && !mqtt_client->isConnected(); i++) std::this_thread::sleep_for(std::chrono::milliseconds(50)); } // 心跳循环 while (g_running && mqtt_client->isConnected()) { send_heartbeat(); auto remaining = heartbeat_interval; while (remaining.count() > 0 && g_running && mqtt_client->isConnected()) { auto step = std::min(remaining, std::chrono::milliseconds(50)); std::this_thread::sleep_for(step); remaining -= step; } } if (mqtt_client) { mqtt_client->force_disconnect(); mqtt_client.reset(); } // 重连间隔 for (int i = 0; i < 5 && g_running; i++) std::this_thread::sleep_for(std::chrono::milliseconds(200)); } LOG_INFO("[MQTT] Client thread exiting."); }