From c70d14eadbf844f9d3f92284afa7750d6b9e2e63 Mon Sep 17 00:00:00 2001 From: cxh Date: Fri, 14 Nov 2025 15:13:05 +0800 Subject: [PATCH] 1 --- src/mqtt_client_wrapper.cpp | 100 ++++++++++++++++++++++-------------- 1 file changed, 61 insertions(+), 39 deletions(-) diff --git a/src/mqtt_client_wrapper.cpp b/src/mqtt_client_wrapper.cpp index 4ddc2d8..767b4d4 100644 --- a/src/mqtt_client_wrapper.cpp +++ b/src/mqtt_client_wrapper.cpp @@ -73,57 +73,79 @@ static void on_mqtt_connected() static void on_mqtt_disconnected() { LOG_WARN("[MQTT] Disconnected from broker: " + g_app_config.mqtt.server_ip); } +static void handle_video_down_request(const nlohmann::json& req) +{ + if (!req.contains("data") || !req["data"].is_object()) + { + LOG_WARN("[video_down] Missing data"); + return; + } + + int sw = req["data"].value("switch", 0); + if (sw != 1) + { + LOG_INFO("[video_down] switch != 1, ignore"); + return; + } + + // 取得当前所有 RTMP 播放地址 + auto channels_info = RTMPManager::get_all_channels_status(); + + // 构造响应 + nlohmann::json resp; + resp["type"] = "response"; + resp["seqNo"] = req.value("seqNo", "0"); + resp["data"] = nlohmann::json::array(); + + int loc = 0; + for (const auto& ch : channels_info) + { + resp["data"].push_back({{"loc", loc}, {"url", ch.url}}); + loc++; + } + + // 发布回复 + mqtt_client->publish(g_app_config.mqtt.topics.video_down, resp.dump(-1), 1); + + LOG_INFO("[video_down] response sent"); +} + +static void handle_record_query_request(const nlohmann::json& req) {} + +static void handle_record_play_request(const nlohmann::json& req) {} + static void on_mqtt_message_received(const std::string& topic, const std::string& message) { try { auto j = nlohmann::json::parse(message); - // -------- 判断字段 type 是否为 "request" -------- - if (j.contains("type") && j["type"].is_string() && j["type"] == "request") + // 必须是 request + if (!j.contains("type") || j["type"] != "request") { - LOG_INFO("[MQTT] Received message on topic [" + topic + "], len = " + std::to_string(message.size())); - LOG_INFO("[MQTT] Message content: " + j.dump(-1)); + LOG_INFO("[MQTT] Ignored non-request message"); + return; + } - // -------- 判断 data.switch 是否为 1 -------- - if (j.contains("data") && j["data"].is_object()) - { - const auto& d = j["data"]; + LOG_INFO("[MQTT] Received request on [" + topic + "]"); + LOG_INFO("[MQTT] Payload: " + j.dump(-1)); - if (d.contains("switch") && d["switch"].is_number_integer()) - { - int sw = d["switch"]; - - if (sw == 1) - { - // ---- 获取 RTMP 状态 ---- - auto channels_info = RTMPManager::get_all_channels_status(); - - // ---- 构造 response ---- - nlohmann::json resp; - resp["type"] = "response"; - resp["seqNo"] = j.value("seqNo", "0"); - - resp["data"] = nlohmann::json::array(); - - int loc = 0; - - for (const auto& ch : channels_info) - { - resp["data"].push_back({{"loc", loc}, {"url", ch.url}}); - loc++; - } - - // ---- 发布 MQTT 响应 ---- - std::string out = resp.dump(-1); - mqtt_client->publish(g_app_config.mqtt.topics.video_down, out, 1); - } - } - } + // 根据 topic 分发 + if (topic == g_app_config.mqtt.topics.video_down) + { + handle_video_down_request(j); + } + else if (topic == g_app_config.mqtt.topics.record_query) + { + handle_record_query_request(j); + } + else if (topic == g_app_config.mqtt.topics.record_play) + { + handle_record_play_request(j); } else { - LOG_INFO("[MQTT] Message is NOT a request."); + LOG_WARN("[MQTT] Unknown topic: " + topic); } } catch (const std::exception& e)