diff --git a/include/rtmp_manager.hpp b/include/rtmp_manager.hpp index 835e282..cce417f 100644 --- a/include/rtmp_manager.hpp +++ b/include/rtmp_manager.hpp @@ -51,6 +51,9 @@ class RTMPManager StreamStatus status; std::mutex status_mutex; + + // 新增:记录最后收到帧的时间(毫秒) + std::atomic last_frame_ms{0}; }; static void stream_loop(Camera cam, StreamContext* ctx); diff --git a/src/rtmp_manager.cpp b/src/rtmp_manager.cpp index 9ba0ccf..7cb0d34 100644 --- a/src/rtmp_manager.cpp +++ b/src/rtmp_manager.cpp @@ -116,9 +116,12 @@ void RTMPManager::stream_loop(Camera cam, StreamContext* ctx) { const std::string key = make_key(cam.name); + constexpr int64_t START_TIMEOUT_MS = 5000; // 启动阶段无帧 + constexpr int64_t NO_FRAME_TIMEOUT_MS = 10000; // 运行阶段突然无帧 ⇒ pipeline 卡死重启 + while (ctx->thread_running) { - // 1. 检查设备节点是否存在 + // 1) 检查设备节点 struct stat st{}; if (stat(cam.device.c_str(), &st) != 0) { @@ -130,7 +133,7 @@ void RTMPManager::stream_loop(Camera cam, StreamContext* ctx) continue; } - // 2. 创建 GStreamer 管线 + // 2) 创建 pipeline GstElement* pipeline = create_pipeline(cam); if (!pipeline) { @@ -143,8 +146,8 @@ void RTMPManager::stream_loop(Camera cam, StreamContext* ctx) gst_element_set_name(pipeline, key.c_str()); GstBus* bus = gst_element_get_bus(pipeline); - // 3. 在 v4l2src 的 src pad 上挂探测 probe - bool got_frame = false; + // 3) 帧探测:记录 last_frame_ms + ctx->last_frame_ms.store(0, std::memory_order_relaxed); { GstElement* src = gst_bin_get_by_name(GST_BIN(pipeline), "src"); if (src) @@ -154,76 +157,69 @@ void RTMPManager::stream_loop(Camera cam, StreamContext* ctx) { gst_pad_add_probe( pad, GST_PAD_PROBE_TYPE_BUFFER, - [](GstPad*, GstPadProbeInfo*, gpointer user_data) -> GstPadProbeReturn + [](GstPad*, GstPadProbeInfo*, gpointer data) -> GstPadProbeReturn { - *static_cast(user_data) = true; + auto ts = static_cast*>(data); + auto now = std::chrono::steady_clock::now().time_since_epoch(); + auto ms = std::chrono::duration_cast(now).count(); + ts->store(ms, std::memory_order_relaxed); return GST_PAD_PROBE_OK; }, - &got_frame, nullptr); + &(ctx->last_frame_ms), nullptr); gst_object_unref(pad); } - else - { - LOG_WARN("[RTMP] " + key + " - src has no 'src' pad?"); - } gst_object_unref(src); } - else - { - LOG_WARN("[RTMP] " + key + " - cannot find element 'src' for pad-probe"); - } } - // 4. 启动播放 + // 4) 启动 pipeline LOG_INFO("[RTMP] Starting stream: " + key); gst_element_set_state(pipeline, GST_STATE_PLAYING); - // 等待进入 PLAYING 状态(最长 5s) GstState state = GST_STATE_NULL, pending = GST_STATE_NULL; - bool confirmed_running = false; - if (gst_element_get_state(pipeline, &state, &pending, 5 * GST_SECOND) == GST_STATE_CHANGE_SUCCESS && - state == GST_STATE_PLAYING) - { - std::lock_guard lk(ctx->status_mutex); - confirmed_running = true; - ctx->status.running = true; - ctx->status.last_error.clear(); - LOG_INFO("[RTMP] " + key + " confirmed PLAYING"); - } - else + if (gst_element_get_state(pipeline, &state, &pending, 5 * GST_SECOND) != GST_STATE_CHANGE_SUCCESS || + state != GST_STATE_PLAYING) { std::lock_guard lk(ctx->status_mutex); ctx->status.running = false; - ctx->status.last_error = "Pipeline failed to confirm PLAYING"; + ctx->status.last_error = "Failed to enter PLAYING"; LOG_WARN("[RTMP] " + key + " - " + ctx->status.last_error); - } - if (!confirmed_running) - { gst_element_set_state(pipeline, GST_STATE_NULL); if (bus) gst_object_unref(bus); gst_object_unref(pipeline); + std::this_thread::sleep_for(std::chrono::milliseconds(RETRY_BASE_DELAY_MS)); continue; } - // 5. 运行阶段:监测帧和错误 - const auto start_t = std::chrono::steady_clock::now(); + // pipeline 成功启动 + { + std::lock_guard lk(ctx->status_mutex); + ctx->status.running = true; + ctx->status.last_error.clear(); + } + LOG_INFO("[RTMP] " + key + " confirmed PLAYING"); + + auto launch_tp = std::chrono::steady_clock::now(); + bool need_restart = false; + // 5) 主循环:检测停帧或 error while (ctx->thread_running) { - // 检查是否收到帧(前 5s 内) - if (!got_frame) + auto now = std::chrono::steady_clock::now(); + int64_t now_ms = std::chrono::duration_cast(now.time_since_epoch()).count(); + int64_t last_ms = ctx->last_frame_ms.load(std::memory_order_relaxed); + + if (last_ms == 0) { - auto elapsed = - std::chrono::duration_cast(std::chrono::steady_clock::now() - start_t) - .count(); - if (elapsed > 5) + auto startup_ms = std::chrono::duration_cast(now - launch_tp).count(); + if (startup_ms > START_TIMEOUT_MS) { std::lock_guard lk(ctx->status_mutex); ctx->status.running = false; - ctx->status.last_error = "No frames detected (no video signal)"; + ctx->status.last_error = "No frames detected at startup"; LOG_ERROR("[RTMP] " + key + " - " + ctx->status.last_error); need_restart = true; break; @@ -231,53 +227,59 @@ void RTMPManager::stream_loop(Camera cam, StreamContext* ctx) } else { - std::lock_guard lk(ctx->status_mutex); - ctx->status.running = true; - ctx->status.last_error.clear(); + int64_t idle_ms = now_ms - last_ms; + if (idle_ms > NO_FRAME_TIMEOUT_MS) + { + std::lock_guard lk(ctx->status_mutex); + ctx->status.running = false; + ctx->status.last_error = "Frame stalled for " + std::to_string(idle_ms) + " ms"; + LOG_ERROR("[RTMP] " + key + " - " + ctx->status.last_error); + need_restart = true; + break; + } + else + { + std::lock_guard lk(ctx->status_mutex); + ctx->status.running = true; + ctx->status.last_error.clear(); + } } - // 等待错误或 EOS 消息 + // 错误消息检测 GstMessage* msg = gst_bus_timed_pop_filtered(bus, 200 * GST_MSECOND, (GstMessageType)(GST_MESSAGE_ERROR | GST_MESSAGE_EOS)); if (!msg) continue; - switch (GST_MESSAGE_TYPE(msg)) + if (GST_MESSAGE_TYPE(msg) == GST_MESSAGE_ERROR) { - case GST_MESSAGE_ERROR: - { - GError* err = nullptr; - gst_message_parse_error(msg, &err, nullptr); - std::lock_guard lk(ctx->status_mutex); - ctx->status.running = false; - ctx->status.last_error = err ? err->message : "GStreamer error"; - LOG_ERROR("[RTMP] " + key + " stream error: " + ctx->status.last_error); - if (err) g_error_free(err); - need_restart = true; - break; - } - case GST_MESSAGE_EOS: - { - std::lock_guard lk(ctx->status_mutex); - ctx->status.running = false; - ctx->status.last_error = "End of stream (EOS)"; - LOG_WARN("[RTMP] " + key + " reached EOS"); - need_restart = true; - break; - } - default: break; + GError* err = nullptr; + gst_message_parse_error(msg, &err, nullptr); + std::lock_guard lk(ctx->status_mutex); + ctx->status.running = false; + ctx->status.last_error = err ? err->message : "GStreamer error"; + LOG_ERROR("[RTMP] " + key + " - " + ctx->status.last_error); + if (err) g_error_free(err); + need_restart = true; + } + else if (GST_MESSAGE_TYPE(msg) == GST_MESSAGE_EOS) + { + std::lock_guard lk(ctx->status_mutex); + ctx->status.running = false; + ctx->status.last_error = "EOS (End of stream)"; + LOG_WARN("[RTMP] " + key + " - " + ctx->status.last_error); + need_restart = true; } - gst_message_unref(msg); + gst_message_unref(msg); if (need_restart) break; } - // 6. 收尾清理 + // 清理 & 重建 gst_element_set_state(pipeline, GST_STATE_NULL); if (bus) gst_object_unref(bus); gst_object_unref(pipeline); - // 7. 若仍在运行状态,准备重启 if (ctx->thread_running) { LOG_WARN("[RTMP] Restarting " + key + " in 3s..."); @@ -289,6 +291,7 @@ void RTMPManager::stream_loop(Camera cam, StreamContext* ctx) std::lock_guard lk(ctx->status_mutex); ctx->status.running = false; } + LOG_INFO("[RTMP] Stream thread exited for " + key); } @@ -301,7 +304,7 @@ void RTMPManager::start_all() int delay_ms = 0; for (const auto& cam : g_app_config.cameras) { - // 🔴 核心修正:跳过未启用摄像头 + // 跳过未启用摄像头 if (!cam.enabled) { LOG_INFO("[RTMP] Skip disabled camera: " + cam.name);