修改mqtt逻辑

This commit is contained in:
cxh 2025-12-29 17:45:57 +08:00
parent 3c982a311e
commit fa054362b9
5 changed files with 73 additions and 29 deletions

View File

@ -1,9 +1,10 @@
cmake_minimum_required(VERSION 3.10) cmake_minimum_required(VERSION 3.21)
# #
set(APP_NAME "video_manager") set(APP_NAME "video_manager")
# #
set(CONFIG_EXAMPLE_FILE "config.example.json")
set(CONFIG_FILE "config.json") set(CONFIG_FILE "config.json")
project(${APP_NAME}) project(${APP_NAME})
@ -53,7 +54,12 @@ target_link_libraries(${APP_NAME}
# #
add_custom_command(TARGET ${APP_NAME} POST_BUILD add_custom_command(TARGET ${APP_NAME} POST_BUILD
COMMAND ${CMAKE_COMMAND} -E copy_if_different COMMAND ${CMAKE_COMMAND} -E echo "Checking runtime config..."
${CMAKE_SOURCE_DIR}/${CONFIG_FILE} COMMAND ${CMAKE_COMMAND} -E make_directory $<TARGET_FILE_DIR:${APP_NAME}>
COMMAND ${CMAKE_COMMAND} -E
if_not_exists
$<TARGET_FILE_DIR:${APP_NAME}>/${CONFIG_FILE}
${CMAKE_COMMAND} -E copy
${CMAKE_SOURCE_DIR}/${CONFIG_EXAMPLE_FILE}
$<TARGET_FILE_DIR:${APP_NAME}>/${CONFIG_FILE} $<TARGET_FILE_DIR:${APP_NAME}>/${CONFIG_FILE}
) )

View File

@ -40,13 +40,15 @@ struct VehicleMQTTTopics
std::string video_down; std::string video_down;
std::string record_query; std::string record_query;
std::string record_play; std::string record_play;
std::string vehicle_ctrl;
void fill_with_veh_id(const std::string& vehId) void fill_with_veh_id(const std::string& vehId)
{ {
heartbeat_up = "/kun/vehicle/video/status/" + vehId; heartbeat_up = "/zxwl/vehicle/video/status/" + vehId;
video_down = "/kun/vehicle/video/request/" + vehId; video_down = "/zxwl/vehicle/video/request/" + vehId;
record_query = "/kun/vehicle/video/record/query/" + vehId; record_query = "/zxwl/vehicle/video/record/query/" + vehId;
record_play = "/kun/vehicle/video/record/play/" + vehId; record_play = "/zxwl/vehicle/video/record/play/" + vehId;
vehicle_ctrl = "/zxwl/vehicle/ctrl/" + vehId;
} }
}; };

View File

@ -55,24 +55,24 @@ int main()
RTMPManager::init(); RTMPManager::init();
// ---------- 自动推流8 路录像守护) ---------- // ---------- 自动推流8 路录像守护) ----------
LOG_INFO("[MAIN] Starting all record streams..."); // LOG_INFO("[MAIN] Starting all record streams...");
RTMPManager::start_all(); // RTMPManager::start_all();
// 启动 MQTT 线程 // 启动 MQTT 线程
// std::thread mqtt_thread( std::thread mqtt_thread(
// [] []
// { {
// try try
// { {
// LOG_INFO("[MAIN] MQTT thread started."); LOG_INFO("[MAIN] MQTT thread started.");
// mqtt_client_thread_func(); // 在回调里执行推流控制 mqtt_client_thread_func(); // 在回调里执行推流控制
// } }
// catch (const std::exception& e) catch (const std::exception& e)
// { {
// LOG_ERROR(std::string("[MAIN] MQTT thread crashed: ") + e.what()); LOG_ERROR(std::string("[MAIN] MQTT thread crashed: ") + e.what());
// } }
// LOG_INFO("[MAIN] MQTT thread exiting..."); LOG_INFO("[MAIN] MQTT thread exiting...");
// }); });
// 主循环,仅等待退出信号 // 主循环,仅等待退出信号
while (g_running.load(std::memory_order_relaxed)) std::this_thread::sleep_for(std::chrono::milliseconds(200)); while (g_running.load(std::memory_order_relaxed)) std::this_thread::sleep_for(std::chrono::milliseconds(200));
@ -85,11 +85,11 @@ int main()
RTMPManager::stop_all(); RTMPManager::stop_all();
// if (mqtt_thread.joinable()) if (mqtt_thread.joinable())
// { {
// mqtt_thread.join(); mqtt_thread.join();
// LOG_INFO("[MAIN] MQTT thread joined."); LOG_INFO("[MAIN] MQTT thread joined.");
// } }
LOG_INFO("[MAIN] ===== Vehicle Video Service Exited Cleanly ====="); LOG_INFO("[MAIN] ===== Vehicle Video Service Exited Cleanly =====");
return 0; return 0;

View File

@ -72,6 +72,7 @@ static void on_mqtt_connected()
mqtt_client->subscribe(g_app_config.mqtt.topics.video_down); mqtt_client->subscribe(g_app_config.mqtt.topics.video_down);
mqtt_client->subscribe(g_app_config.mqtt.topics.record_query); mqtt_client->subscribe(g_app_config.mqtt.topics.record_query);
mqtt_client->subscribe(g_app_config.mqtt.topics.record_play); mqtt_client->subscribe(g_app_config.mqtt.topics.record_play);
mqtt_client->subscribe(g_app_config.mqtt.topics.vehicle_ctrl);
} }
static void on_mqtt_disconnected() { LOG_WARN("[MQTT] Disconnected from broker: " + g_app_config.mqtt.server_ip); } static void on_mqtt_disconnected() { LOG_WARN("[MQTT] Disconnected from broker: " + g_app_config.mqtt.server_ip); }
@ -246,6 +247,37 @@ static void handle_record_play_request(const nlohmann::json& req)
mqtt_client->publish(g_app_config.mqtt.topics.record_play, resp.dump(), 1); mqtt_client->publish(g_app_config.mqtt.topics.record_play, resp.dump(), 1);
} }
static void handle_vehicle_ctrl_request(const nlohmann::json& req)
{
std::string cmd = req.value("command", "");
int value = req.value("value", 0);
// value 固定为 1不是 1 直接忽略
if (value != 1)
{
LOG_WARN("[vehicle_ctrl] ignore command=" + cmd + " value=" + std::to_string(value));
return;
}
if (cmd == "startCtrl")
{
LOG_INFO("[vehicle_ctrl] startCtrl → start all RTMP");
RTMPManager::start_all();
g_streaming.store(true);
return;
}
if (cmd == "stopCtrl")
{
LOG_INFO("[vehicle_ctrl] stopCtrl → stop all RTMP");
RTMPManager::stop_all();
g_streaming.store(false);
return;
}
LOG_WARN("[vehicle_ctrl] unknown command: " + cmd);
}
static void on_mqtt_message_received(const std::string& topic, const std::string& message) static void on_mqtt_message_received(const std::string& topic, const std::string& message)
{ {
try try
@ -274,6 +306,10 @@ static void on_mqtt_message_received(const std::string& topic, const std::string
{ {
handle_record_play_request(j); handle_record_play_request(j);
} }
else if (topic == g_app_config.mqtt.topics.vehicle_ctrl)
{
handle_vehicle_ctrl_request(j);
}
else else
{ {
LOG_WARN("[MQTT] Unknown topic: " + topic); LOG_WARN("[MQTT] Unknown topic: " + topic);