// mqtt_client_wrapper.cpp #include "mqtt_client_wrapper.hpp" std::shared_ptr mqtt_client; std::atomic mqtt_restart_required{false}; extern std::atomic g_running; // MQTT 回调定义 static void on_mqtt_connected() { LOG_INFO("[MQTT] Connected to broker: " + g_app_config.mqtt.server_ip); const auto &topics = g_app_config.mqtt.topics; mqtt_client->subscribe(topics.video_down); mqtt_client->subscribe(topics.substream_down); mqtt_client->subscribe(topics.reset_down); } static void on_mqtt_disconnected() { LOG_WARN("[MQTT] Disconnected from broker: " + g_app_config.mqtt.server_ip); } static void on_mqtt_message_received(const std::string &topic, const std::string &message) { LOG_INFO("[MQTT] Received message on topic [" + topic + "], len = " + std::to_string(message.size())); try { if (topic == g_app_config.mqtt.topics.video_down) { LOG_INFO("[MQTT] Step 1: parsing json"); auto j = nlohmann::json::parse(message); LOG_INFO("[MQTT] Step 2: checking data field"); if (!j.contains("data")) { LOG_WARN("[MQTT] missing data"); return; } if (!j["data"].contains("status")) { LOG_WARN("[MQTT] missing status"); return; } int status = j["data"]["status"].get(); LOG_INFO("[MQTT] Step 3: got status=" + std::to_string(status)); // 处理 video_down // auto j = nlohmann::json::parse(message); // if (!j.contains("data") || !j["data"].contains("status")) // { // LOG_WARN("[MQTT] video_down JSON missing data.status"); // return; // } // int status = j["data"]["status"].get(); if (status == 1) { // 启动推流:挂载本地配置中 enabled 的摄像头 for (const auto &cam : g_app_config.cameras) { if (!cam.enabled) continue; if (!RTSPManager::is_streaming(cam.name)) { RTSPManager::mount_camera(cam); LOG_INFO("[MQTT] Started streaming: " + cam.name); } } } else if (status == 0) { // 停止推流:卸载本地配置中 enabled 的摄像头 for (const auto &cam : g_app_config.cameras) { if (!cam.enabled) continue; if (RTSPManager::is_streaming(cam.name)) { RTSPManager::unmount_camera(cam); LOG_INFO("[MQTT] Stopped streaming: " + cam.name); } } } else { LOG_WARN("[MQTT] video_down: unknown status value " + std::to_string(status)); } } else if (topic == g_app_config.mqtt.topics.substream_down) { // 处理 substream_down LOG_INFO("[MQTT] substream_down message received (not implemented yet)."); } else if (topic == g_app_config.mqtt.topics.reset_down) { // 处理 reset_down LOG_INFO("[MQTT] reset_down message received (not implemented yet)."); } else { LOG_WARN("[MQTT] Unknown topic: " + topic); } } catch (const std::exception &e) { LOG_ERROR(std::string("[MQTT] Failed to process incoming JSON: ") + e.what()); } } void mqtt_client_thread_func() { const auto &cfg = g_app_config.mqtt; while (g_running) { mqtt_client = std::make_unique(cfg); mqtt_client->setConnectCallback(on_mqtt_connected); mqtt_client->setDisconnectCallback(on_mqtt_disconnected); mqtt_client->setMessageCallback(on_mqtt_message_received); mqtt_client->connect(); while (!mqtt_restart_required && g_running) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); } // 需要重启或退出 mqtt_client->disconnect(); mqtt_client.reset(); mqtt_restart_required = false; } LOG_INFO("[MQTT] Client thread exiting."); }