From ba8d3ece68524995d730dc8bc8711cb413b9cefb Mon Sep 17 00:00:00 2001 From: cxh Date: Fri, 17 Oct 2025 08:45:21 +0800 Subject: [PATCH] 1 --- src/rtmp_manager.cpp | 132 ++++++++++++++++++++++++++++++++++--------- 1 file changed, 104 insertions(+), 28 deletions(-) diff --git a/src/rtmp_manager.cpp b/src/rtmp_manager.cpp index 1c30d43..6b07379 100644 --- a/src/rtmp_manager.cpp +++ b/src/rtmp_manager.cpp @@ -96,15 +96,21 @@ GstElement *RTMPManager::create_pipeline(const Camera &cam, StreamType type) std::string stream_name = cam.name + stream_type_suffix(type); + // 说明: + // 1) 维持你当前的视频链路(NV12 -> mpph264enc -> h264parse -> flvmux)。 + // 2) 加一个静音音轨:audiotestsrc wave=silence -> voaacenc -> aacparse -> mux. + // 3) 给 rtmpsink 命名 name=rtmp_sink,供后续在 bus loop 中识别并判断“真正连上”时机。 std::string pipeline_str = "v4l2src device=" + cam.device + " ! video/x-raw,width=" + std::to_string(width) + ",height=" + std::to_string(height) + ",framerate=" + std::to_string(fps) + "/1 " "! videoconvert ! video/x-raw,format=NV12 " "! mpph264enc bps=" + std::to_string(bitrate) + " gop=" + std::to_string(fps) + - " ! h264parse ! flvmux name=mux streamable=true " + " " + "! h264parse " + "! flvmux name=mux streamable=true " "audiotestsrc wave=silence ! audioconvert ! audioresample ! voaacenc ! aacparse ! mux. " - "mux. ! rtmpsink location=\"rtmp://127.0.0.1/live/" + + "mux. ! rtmpsink name=rtmp_sink location=\"rtmp://127.0.0.1/live/" + stream_name + " live=1\" sync=false"; LOG_INFO("[RTMP] Creating pipeline for '" + stream_name + "': " + pipeline_str); @@ -274,8 +280,10 @@ void RTMPManager::stream_loop(Camera cam, StreamType type, StreamContext *ctx) GstBus *bus = gst_element_get_bus(pipeline); gst_element_set_state(pipeline, GST_STATE_PLAYING); - bool first_frame = false; - const auto start_time = std::chrono::steady_clock::now(); + bool started_reported = false; // 是否已经上报“启动成功” + auto t0 = std::chrono::steady_clock::now(); + const int first_frame_timeout_s = 5; // 首帧/连通性保护上限(与原逻辑一致) + const int connect_guard_ms = 2000; // 连接保护窗口:给 rtmpsink 一点时间去连接 while (true) { @@ -288,43 +296,100 @@ void RTMPManager::stream_loop(Camera cam, StreamType type, StreamContext *ctx) break; } - // 首帧超时保护 - if (!first_frame && - std::chrono::duration_cast(std::chrono::steady_clock::now() - start_time).count() > 5) + // 超时保护:既没报错也没确认连接成功,超过上限则判失败 + if (!started_reported) { - status.running = false; - status.last_result = StreamResult::TIMEOUT; - status.last_error = "No frames received within timeout"; - try_set_start(status); - break; + auto elapsed_s = + std::chrono::duration_cast(std::chrono::steady_clock::now() - t0).count(); + if (elapsed_s > first_frame_timeout_s) + { + status.running = false; + status.last_result = StreamResult::TIMEOUT; + status.last_error = "No frames/connection established within timeout"; + try_set_start(status); + break; + } } - GstMessage *msg = gst_bus_timed_pop_filtered( - bus, 200 * GST_MSECOND, (GstMessageType)(GST_MESSAGE_ERROR | GST_MESSAGE_EOS | GST_MESSAGE_STATE_CHANGED)); + GstMessage *msg = + gst_bus_timed_pop_filtered(bus, 200 * GST_MSECOND, + (GstMessageType)(GST_MESSAGE_ERROR | GST_MESSAGE_EOS | + GST_MESSAGE_STATE_CHANGED | GST_MESSAGE_STREAM_STATUS)); - if (!msg) continue; + if (!msg) + { + // 保底:过了 connect_guard_ms 仍然没有 ERROR, + // 并且 rtmpsink 进入 PLAYING,就认为真的起来了。 + if (!started_reported) + { + auto elapsed_ms = + std::chrono::duration_cast(std::chrono::steady_clock::now() - t0) + .count(); + if (elapsed_ms >= connect_guard_ms) + { + GstElement *sink = gst_bin_get_by_name(GST_BIN(pipeline), "rtmp_sink"); + if (sink) + { + GstState cur, pending; + if (gst_element_get_state(sink, &cur, &pending, 0) == GST_STATE_CHANGE_SUCCESS && + cur == GST_STATE_PLAYING) + { + status.running = true; + ctx->status.running = true; + status.last_result = StreamResult::OK; + status.last_error.clear(); + try_set_start(status); + started_reported = true; + LOG_INFO("[RTMP] Guard-start OK for '" + cam.name + "'"); + } + gst_object_unref(sink); + } + } + } + continue; + } switch (GST_MESSAGE_TYPE(msg)) { - case GST_MESSAGE_STATE_CHANGED: + case GST_MESSAGE_STREAM_STATUS: { - GstState old_state, new_state; - gst_message_parse_state_changed(msg, &old_state, &new_state, nullptr); - if (GST_MESSAGE_SRC(msg) == GST_OBJECT(pipeline) && new_state == GST_STATE_PLAYING && !first_frame) + // 优先判断 rtmpsink 的 STREAM_STATUS(CREATE/START)来认定“真正连上” + GstStreamStatusType type_ss; + GstElement *owner = nullptr; + gst_message_parse_stream_status(msg, &type_ss, &owner); + if (!started_reported && owner) { - first_frame = true; - status.running = true; - ctx->status.running = true; - status.last_result = StreamResult::OK; - try_set_start(status); + const gchar *oname = GST_OBJECT_NAME(owner); + if (oname && std::strcmp(oname, "rtmp_sink") == 0) + { + if (type_ss == GST_STREAM_STATUS_TYPE_CREATE || type_ss == GST_STREAM_STATUS_TYPE_ENTER || + type_ss == GST_STREAM_STATUS_TYPE_START) + { + status.running = true; + ctx->status.running = true; + status.last_result = StreamResult::OK; + status.last_error.clear(); + try_set_start(status); + started_reported = true; + LOG_INFO(std::string("[RTMP] Connected (STREAM_STATUS) for '") + cam.name + "'"); + } + } } break; } + + case GST_MESSAGE_STATE_CHANGED: + { + // 不再用 pipeline 进入 PLAYING 判成功(容易误判) + // 这里仅用于日志或后续扩展 + break; + } + case GST_MESSAGE_ERROR: { GError *err = nullptr; - gchar *debug = nullptr; - gst_message_parse_error(msg, &err, &debug); + gchar *dbg = nullptr; + gst_message_parse_error(msg, &err, &dbg); status.running = false; status.last_result = StreamResult::CONNECTION_FAIL; @@ -332,14 +397,19 @@ void RTMPManager::stream_loop(Camera cam, StreamType type, StreamContext *ctx) LOG_ERROR("[RTMP] Stream error from '" + cam.name + "': " + status.last_error); - // ✅ 立即停止 pipeline,避免阻塞 + // 立刻标记为停止,避免线程继续循环误判 + ctx->status.running = false; + ctx->running.store(false); + gst_element_set_state(pipeline, GST_STATE_NULL); if (err) g_error_free(err); - if (debug) g_free(debug); + if (dbg) g_free(dbg); + try_set_start(status); break; } + case GST_MESSAGE_EOS: { status.running = false; @@ -348,12 +418,18 @@ void RTMPManager::stream_loop(Camera cam, StreamType type, StreamContext *ctx) try_set_start(status); break; } + + default: + break; } gst_message_unref(msg); + // 发生错误或 EOS 就退出循环 if (status.last_result == StreamResult::CONNECTION_FAIL || status.last_result == StreamResult::EOS_RECEIVED) + { break; + } } gst_element_set_state(pipeline, GST_STATE_NULL);