From fa054362b9a6235b61d51b8ed42effcf021b7d0b Mon Sep 17 00:00:00 2001 From: cxh Date: Mon, 29 Dec 2025 17:45:57 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9mqtt=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CMakeLists.txt | 14 +++++++--- config.json => config.example.json | 0 include/app_config.hpp | 10 ++++--- src/main.cpp | 42 +++++++++++++++--------------- src/mqtt_client_wrapper.cpp | 36 +++++++++++++++++++++++++ 5 files changed, 73 insertions(+), 29 deletions(-) rename config.json => config.example.json (100%) diff --git a/CMakeLists.txt b/CMakeLists.txt index e9b8549..d6b036f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,9 +1,10 @@ -cmake_minimum_required(VERSION 3.10) +cmake_minimum_required(VERSION 3.21) # 可配置的项目名称 set(APP_NAME "video_manager") # 可配置的配置文件名 +set(CONFIG_EXAMPLE_FILE "config.example.json") set(CONFIG_FILE "config.json") project(${APP_NAME}) @@ -53,7 +54,12 @@ target_link_libraries(${APP_NAME} # 拷贝配置文件 add_custom_command(TARGET ${APP_NAME} POST_BUILD - COMMAND ${CMAKE_COMMAND} -E copy_if_different - ${CMAKE_SOURCE_DIR}/${CONFIG_FILE} - $/${CONFIG_FILE} + COMMAND ${CMAKE_COMMAND} -E echo "Checking runtime config..." + COMMAND ${CMAKE_COMMAND} -E make_directory $ + COMMAND ${CMAKE_COMMAND} -E + if_not_exists + $/${CONFIG_FILE} + ${CMAKE_COMMAND} -E copy + ${CMAKE_SOURCE_DIR}/${CONFIG_EXAMPLE_FILE} + $/${CONFIG_FILE} ) diff --git a/config.json b/config.example.json similarity index 100% rename from config.json rename to config.example.json diff --git a/include/app_config.hpp b/include/app_config.hpp index 92af729..f914431 100644 --- a/include/app_config.hpp +++ b/include/app_config.hpp @@ -40,13 +40,15 @@ struct VehicleMQTTTopics std::string video_down; std::string record_query; std::string record_play; + std::string vehicle_ctrl; void fill_with_veh_id(const std::string& vehId) { - heartbeat_up = "/kun/vehicle/video/status/" + vehId; - video_down = "/kun/vehicle/video/request/" + vehId; - record_query = "/kun/vehicle/video/record/query/" + vehId; - record_play = "/kun/vehicle/video/record/play/" + vehId; + heartbeat_up = "/zxwl/vehicle/video/status/" + vehId; + video_down = "/zxwl/vehicle/video/request/" + vehId; + record_query = "/zxwl/vehicle/video/record/query/" + vehId; + record_play = "/zxwl/vehicle/video/record/play/" + vehId; + vehicle_ctrl = "/zxwl/vehicle/ctrl/" + vehId; } }; diff --git a/src/main.cpp b/src/main.cpp index b7df1a0..c24dda9 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -55,24 +55,24 @@ int main() RTMPManager::init(); // ---------- 自动推流(8 路录像守护) ---------- - LOG_INFO("[MAIN] Starting all record streams..."); - RTMPManager::start_all(); + // LOG_INFO("[MAIN] Starting all record streams..."); + // RTMPManager::start_all(); // 启动 MQTT 线程 - // std::thread mqtt_thread( - // [] - // { - // try - // { - // LOG_INFO("[MAIN] MQTT thread started."); - // mqtt_client_thread_func(); // 在回调里执行推流控制 - // } - // catch (const std::exception& e) - // { - // LOG_ERROR(std::string("[MAIN] MQTT thread crashed: ") + e.what()); - // } - // LOG_INFO("[MAIN] MQTT thread exiting..."); - // }); + std::thread mqtt_thread( + [] + { + try + { + LOG_INFO("[MAIN] MQTT thread started."); + mqtt_client_thread_func(); // 在回调里执行推流控制 + } + catch (const std::exception& e) + { + LOG_ERROR(std::string("[MAIN] MQTT thread crashed: ") + e.what()); + } + LOG_INFO("[MAIN] MQTT thread exiting..."); + }); // 主循环,仅等待退出信号 while (g_running.load(std::memory_order_relaxed)) std::this_thread::sleep_for(std::chrono::milliseconds(200)); @@ -85,11 +85,11 @@ int main() RTMPManager::stop_all(); - // if (mqtt_thread.joinable()) - // { - // mqtt_thread.join(); - // LOG_INFO("[MAIN] MQTT thread joined."); - // } + if (mqtt_thread.joinable()) + { + mqtt_thread.join(); + LOG_INFO("[MAIN] MQTT thread joined."); + } LOG_INFO("[MAIN] ===== Vehicle Video Service Exited Cleanly ====="); return 0; diff --git a/src/mqtt_client_wrapper.cpp b/src/mqtt_client_wrapper.cpp index b66f89b..f659018 100644 --- a/src/mqtt_client_wrapper.cpp +++ b/src/mqtt_client_wrapper.cpp @@ -72,6 +72,7 @@ static void on_mqtt_connected() mqtt_client->subscribe(g_app_config.mqtt.topics.video_down); mqtt_client->subscribe(g_app_config.mqtt.topics.record_query); mqtt_client->subscribe(g_app_config.mqtt.topics.record_play); + mqtt_client->subscribe(g_app_config.mqtt.topics.vehicle_ctrl); } static void on_mqtt_disconnected() { LOG_WARN("[MQTT] Disconnected from broker: " + g_app_config.mqtt.server_ip); } @@ -246,6 +247,37 @@ static void handle_record_play_request(const nlohmann::json& req) mqtt_client->publish(g_app_config.mqtt.topics.record_play, resp.dump(), 1); } +static void handle_vehicle_ctrl_request(const nlohmann::json& req) +{ + std::string cmd = req.value("command", ""); + int value = req.value("value", 0); + + // value 固定为 1,不是 1 直接忽略 + if (value != 1) + { + LOG_WARN("[vehicle_ctrl] ignore command=" + cmd + " value=" + std::to_string(value)); + return; + } + + if (cmd == "startCtrl") + { + LOG_INFO("[vehicle_ctrl] startCtrl → start all RTMP"); + RTMPManager::start_all(); + g_streaming.store(true); + return; + } + + if (cmd == "stopCtrl") + { + LOG_INFO("[vehicle_ctrl] stopCtrl → stop all RTMP"); + RTMPManager::stop_all(); + g_streaming.store(false); + return; + } + + LOG_WARN("[vehicle_ctrl] unknown command: " + cmd); +} + static void on_mqtt_message_received(const std::string& topic, const std::string& message) { try @@ -274,6 +306,10 @@ static void on_mqtt_message_received(const std::string& topic, const std::string { handle_record_play_request(j); } + else if (topic == g_app_config.mqtt.topics.vehicle_ctrl) + { + handle_vehicle_ctrl_request(j); + } else { LOG_WARN("[MQTT] Unknown topic: " + topic);