diff --git a/src/mqtt_client_wrapper.cpp b/src/mqtt_client_wrapper.cpp index ca780dc..de51de1 100644 --- a/src/mqtt_client_wrapper.cpp +++ b/src/mqtt_client_wrapper.cpp @@ -57,79 +57,63 @@ static void on_mqtt_message_received(const std::string &topic, const std::string { auto j = nlohmann::json::parse(message); - if (topic == g_app_config.mqtt.topics.video_down) + if (topic == g_app_config.mqtt.topics.video_down && j["type"] == "request") { - if (topic.find("/kun/vehicle/video/push/") != std::string::npos && j["type"] == "request") + std::string seqNo = j.value("seqNo", ""); + auto data = j["data"]; + + int switch_val = data.value("switch", 0); + int streamType = data.value("streamType", 0); // 0主 1子 + StreamType type = (streamType == 0) ? StreamType::MAIN : StreamType::SUB; + auto channels = data.value("channels", std::vector{}); + + nlohmann::json resp_data = nlohmann::json::array(); + + for (int ch : channels) { - std::string seqNo = j.value("seqNo", ""); - auto data = j["data"]; + if (ch < 0 || ch >= static_cast(g_app_config.cameras.size())) + continue; - int switch_val = data.value("switch", 0); - int streamType = data.value("streamType", 0); // 0主 1子 - StreamType type = (streamType == 0) ? StreamType::MAIN : StreamType::SUB; - auto channels = data.value("channels", std::vector{}); + Camera &cam = g_app_config.cameras[ch]; + bool op_result = false; - nlohmann::json resp_data = nlohmann::json::array(); - - for (int ch : channels) + if (switch_val == 0) { - // 用数组索引找到摄像头 - if (ch < 0 || ch >= static_cast(g_app_config.cameras.size())) + if (!RTMPManager::is_streaming(cam.name)) { - LOG_WARN("[MQTT] Invalid channel: " + std::to_string(ch)); - continue; + RTMPManager::start_camera(cam, type); + op_result = true; } - - Camera &cam = g_app_config.cameras[ch]; - - bool op_result = false; - - if (switch_val == 0) + } + else + { + if (RTMPManager::is_streaming(cam.name)) { - // 启动流 - if (!RTMPManager::is_streaming(cam.name, type)) - { - RTMPManager::start_camera(cam, type); - op_result = true; - } + RTMPManager::stop_camera(cam.name, type); + op_result = true; } - else - { - // 停止流 - if (RTMPManager::is_streaming(cam.name, type)) - { - RTMPManager::stop_camera(cam.name, type); - op_result = true; - } - } - - nlohmann::json ch_resp; - ch_resp["loc"] = ch; - ch_resp["url"] = op_result ? RTMPManager::get_stream_url(cam.name, type) : ""; - ch_resp["result"] = op_result ? 0 : 1; - ch_resp["reason"] = op_result ? "" : "already in requested state"; - - resp_data.push_back(ch_resp); } - // 构造应答 - nlohmann::json reply; - reply["type"] = "response"; - reply["seqNo"] = seqNo; - reply["data"] = resp_data; + nlohmann::json ch_resp; + ch_resp["loc"] = ch; + ch_resp["url"] = op_result ? RTMPManager::get_stream_url(cam.name, type) : ""; + ch_resp["result"] = op_result ? 0 : 1; + ch_resp["reason"] = op_result ? "" : "already in requested state"; - mqtt_client->publish(topic, reply.dump()); - LOG_INFO("[MQTT] Replied to video push request: " + reply.dump()); - } - else - { - LOG_WARN("[MQTT] Unknown topic or non-request type: " + topic); + resp_data.push_back(ch_resp); } + + nlohmann::json reply; + reply["type"] = "response"; + reply["seqNo"] = seqNo; + reply["data"] = resp_data; + + mqtt_client->publish(topic, reply.dump()); } - catch (const std::exception &e) - { - LOG_ERROR(std::string("[MQTT] Failed to process incoming JSON: ") + e.what()); - } + } + catch (const std::exception &e) + { + LOG_ERROR(std::string("[MQTT] Failed to process incoming JSON: ") + e.what()); } }