diff --git a/src/mqtt_client_wrapper.cpp b/src/mqtt_client_wrapper.cpp index dac8abe..69bafb3 100644 --- a/src/mqtt_client_wrapper.cpp +++ b/src/mqtt_client_wrapper.cpp @@ -39,6 +39,26 @@ static void on_mqtt_connected() { LOG_INFO("[MQTT] Connected to broker: " + g_app_config.mqtt.server_ip); + // --- ★关键!MQTT 重连后立即执行全局复位 --- + g_streaming = false; // 清除推流状态 + { + std::lock_guard lock(g_dispatch_id_mutex); + g_dispatch_id.clear(); // 清除旧任务 ID + } + + // 停掉所有流,清理 RTSP 状态 + for (const auto &cam : g_app_config.cameras) + { + if (RTSPManager::is_streaming(cam.name)) + { + RTSPManager::unmount_camera(cam); + LOG_WARN("[MQTT] Reconnected -> force unmount camera: " + cam.name); + } + } + // ------------------------------------------------------- + + send_heartbeat(); + const auto &topics = g_app_config.mqtt.topics; mqtt_client->subscribe(topics.video_down); mqtt_client->subscribe(topics.substream_down); @@ -90,11 +110,14 @@ static void on_mqtt_message_received(const std::string &topic, const std::string if (!cam.enabled) continue; - if (!RTSPManager::is_streaming(cam.name)) + if (RTSPManager::is_streaming(cam.name)) { - RTSPManager::mount_camera(cam); - LOG_INFO("[MQTT] Started streaming: " + cam.name); + RTSPManager::unmount_camera(cam); + LOG_WARN("[MQTT] Start-before-unmount: " + cam.name); } + + RTSPManager::mount_camera(cam); + LOG_INFO("[MQTT] Started streaming: " + cam.name); } } else if (status == 1) @@ -160,6 +183,14 @@ static void on_mqtt_message_received(const std::string &topic, const std::string LOG_WARN("[MQTT] Reset command received, errorCode=" + errCode + ", des=" + des); + // --- ★★ 新增:重置全局 streaming 状态与 dispatchId --- + g_streaming = false; + { + std::lock_guard lock(g_dispatch_id_mutex); + g_dispatch_id.clear(); + } + // ------------------------------------------------------ + // 停止所有流,相当于复位 for (const auto &cam : g_app_config.cameras) {