This commit is contained in:
cxh 2025-10-17 08:45:21 +08:00
parent e0a659750a
commit ba8d3ece68

View File

@ -96,15 +96,21 @@ GstElement *RTMPManager::create_pipeline(const Camera &cam, StreamType type)
std::string stream_name = cam.name + stream_type_suffix(type);
// 说明:
// 1) 维持你当前的视频链路NV12 -> mpph264enc -> h264parse -> flvmux
// 2) 加一个静音音轨audiotestsrc wave=silence -> voaacenc -> aacparse -> mux.
// 3) 给 rtmpsink 命名 name=rtmp_sink供后续在 bus loop 中识别并判断“真正连上”时机。
std::string pipeline_str = "v4l2src device=" + cam.device + " ! video/x-raw,width=" + std::to_string(width) +
",height=" + std::to_string(height) + ",framerate=" + std::to_string(fps) +
"/1 "
"! videoconvert ! video/x-raw,format=NV12 "
"! mpph264enc bps=" +
std::to_string(bitrate) + " gop=" + std::to_string(fps) +
" ! h264parse ! flvmux name=mux streamable=true "
" "
"! h264parse "
"! flvmux name=mux streamable=true "
"audiotestsrc wave=silence ! audioconvert ! audioresample ! voaacenc ! aacparse ! mux. "
"mux. ! rtmpsink location=\"rtmp://127.0.0.1/live/" +
"mux. ! rtmpsink name=rtmp_sink location=\"rtmp://127.0.0.1/live/" +
stream_name + " live=1\" sync=false";
LOG_INFO("[RTMP] Creating pipeline for '" + stream_name + "': " + pipeline_str);
@ -274,8 +280,10 @@ void RTMPManager::stream_loop(Camera cam, StreamType type, StreamContext *ctx)
GstBus *bus = gst_element_get_bus(pipeline);
gst_element_set_state(pipeline, GST_STATE_PLAYING);
bool first_frame = false;
const auto start_time = std::chrono::steady_clock::now();
bool started_reported = false; // 是否已经上报“启动成功”
auto t0 = std::chrono::steady_clock::now();
const int first_frame_timeout_s = 5; // 首帧/连通性保护上限(与原逻辑一致)
const int connect_guard_ms = 2000; // 连接保护窗口:给 rtmpsink 一点时间去连接
while (true)
{
@ -288,43 +296,100 @@ void RTMPManager::stream_loop(Camera cam, StreamType type, StreamContext *ctx)
break;
}
// 首帧超时保护
if (!first_frame &&
std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now() - start_time).count() > 5)
// 超时保护:既没报错也没确认连接成功,超过上限则判失败
if (!started_reported)
{
status.running = false;
status.last_result = StreamResult::TIMEOUT;
status.last_error = "No frames received within timeout";
try_set_start(status);
break;
auto elapsed_s =
std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now() - t0).count();
if (elapsed_s > first_frame_timeout_s)
{
status.running = false;
status.last_result = StreamResult::TIMEOUT;
status.last_error = "No frames/connection established within timeout";
try_set_start(status);
break;
}
}
GstMessage *msg = gst_bus_timed_pop_filtered(
bus, 200 * GST_MSECOND, (GstMessageType)(GST_MESSAGE_ERROR | GST_MESSAGE_EOS | GST_MESSAGE_STATE_CHANGED));
GstMessage *msg =
gst_bus_timed_pop_filtered(bus, 200 * GST_MSECOND,
(GstMessageType)(GST_MESSAGE_ERROR | GST_MESSAGE_EOS |
GST_MESSAGE_STATE_CHANGED | GST_MESSAGE_STREAM_STATUS));
if (!msg) continue;
if (!msg)
{
// 保底:过了 connect_guard_ms 仍然没有 ERROR
// 并且 rtmpsink 进入 PLAYING就认为真的起来了。
if (!started_reported)
{
auto elapsed_ms =
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - t0)
.count();
if (elapsed_ms >= connect_guard_ms)
{
GstElement *sink = gst_bin_get_by_name(GST_BIN(pipeline), "rtmp_sink");
if (sink)
{
GstState cur, pending;
if (gst_element_get_state(sink, &cur, &pending, 0) == GST_STATE_CHANGE_SUCCESS &&
cur == GST_STATE_PLAYING)
{
status.running = true;
ctx->status.running = true;
status.last_result = StreamResult::OK;
status.last_error.clear();
try_set_start(status);
started_reported = true;
LOG_INFO("[RTMP] Guard-start OK for '" + cam.name + "'");
}
gst_object_unref(sink);
}
}
}
continue;
}
switch (GST_MESSAGE_TYPE(msg))
{
case GST_MESSAGE_STATE_CHANGED:
case GST_MESSAGE_STREAM_STATUS:
{
GstState old_state, new_state;
gst_message_parse_state_changed(msg, &old_state, &new_state, nullptr);
if (GST_MESSAGE_SRC(msg) == GST_OBJECT(pipeline) && new_state == GST_STATE_PLAYING && !first_frame)
// 优先判断 rtmpsink 的 STREAM_STATUSCREATE/START来认定“真正连上”
GstStreamStatusType type_ss;
GstElement *owner = nullptr;
gst_message_parse_stream_status(msg, &type_ss, &owner);
if (!started_reported && owner)
{
first_frame = true;
status.running = true;
ctx->status.running = true;
status.last_result = StreamResult::OK;
try_set_start(status);
const gchar *oname = GST_OBJECT_NAME(owner);
if (oname && std::strcmp(oname, "rtmp_sink") == 0)
{
if (type_ss == GST_STREAM_STATUS_TYPE_CREATE || type_ss == GST_STREAM_STATUS_TYPE_ENTER ||
type_ss == GST_STREAM_STATUS_TYPE_START)
{
status.running = true;
ctx->status.running = true;
status.last_result = StreamResult::OK;
status.last_error.clear();
try_set_start(status);
started_reported = true;
LOG_INFO(std::string("[RTMP] Connected (STREAM_STATUS) for '") + cam.name + "'");
}
}
}
break;
}
case GST_MESSAGE_STATE_CHANGED:
{
// 不再用 pipeline 进入 PLAYING 判成功(容易误判)
// 这里仅用于日志或后续扩展
break;
}
case GST_MESSAGE_ERROR:
{
GError *err = nullptr;
gchar *debug = nullptr;
gst_message_parse_error(msg, &err, &debug);
gchar *dbg = nullptr;
gst_message_parse_error(msg, &err, &dbg);
status.running = false;
status.last_result = StreamResult::CONNECTION_FAIL;
@ -332,14 +397,19 @@ void RTMPManager::stream_loop(Camera cam, StreamType type, StreamContext *ctx)
LOG_ERROR("[RTMP] Stream error from '" + cam.name + "': " + status.last_error);
// ✅ 立即停止 pipeline避免阻塞
// 立刻标记为停止,避免线程继续循环误判
ctx->status.running = false;
ctx->running.store(false);
gst_element_set_state(pipeline, GST_STATE_NULL);
if (err) g_error_free(err);
if (debug) g_free(debug);
if (dbg) g_free(dbg);
try_set_start(status);
break;
}
case GST_MESSAGE_EOS:
{
status.running = false;
@ -348,12 +418,18 @@ void RTMPManager::stream_loop(Camera cam, StreamType type, StreamContext *ctx)
try_set_start(status);
break;
}
default:
break;
}
gst_message_unref(msg);
// 发生错误或 EOS 就退出循环
if (status.last_result == StreamResult::CONNECTION_FAIL || status.last_result == StreamResult::EOS_RECEIVED)
{
break;
}
}
gst_element_set_state(pipeline, GST_STATE_NULL);