From ee06511a8471179937bdae5cf95951ed668f52d5 Mon Sep 17 00:00:00 2001 From: cxh Date: Thu, 22 Jan 2026 10:19:45 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E7=A8=B3=E5=AE=9A=E6=80=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/rtmp_manager.cpp | 194 +++++++++++++++++++++---------------------- 1 file changed, 97 insertions(+), 97 deletions(-) diff --git a/src/rtmp_manager.cpp b/src/rtmp_manager.cpp index afa94df..38ca804 100644 --- a/src/rtmp_manager.cpp +++ b/src/rtmp_manager.cpp @@ -1,4 +1,10 @@ -// rtmp_manager.cpp +// rtmp_manager.cpp (reliable integrated version) +// - record:启动时由配置决定(静态开关) +// - live:MQTT 动态开关(运行时可变) +// - 关键增强:PLAYING 成功后再“放行 valve”(避免 cold-start mux/sink 饿死) +// - 关键增强:rtmpsink 强制 IPv4(protocol=0),减少偶发握手卡住 +// - 关键增强:更严谨的 cleanup + 保存/释放 valve 引用,避免泄漏/悬挂引用 + #include "rtmp_manager.hpp" #include @@ -12,6 +18,7 @@ #include std::atomic RTMPManager::g_live_enabled{false}; +// record 是“启动时决定一次”的静态开关:由 config.json 决定 bool RTMPManager::g_record_enabled = true; // ======================================================= @@ -24,7 +31,7 @@ static bool device_exists(const std::string& path) } // 获取指定网卡 IPv4 -std::string get_ip_address(const std::string& ifname) +static std::string get_ip_address(const std::string& ifname) { struct ifaddrs* ifaddr = nullptr; if (getifaddrs(&ifaddr) != 0) return ""; @@ -45,6 +52,16 @@ std::string get_ip_address(const std::string& ifname) return ip; } +// 安全释放 GstObject* +static void safe_unref(GstObject*& obj) +{ + if (obj) + { + gst_object_unref(obj); + obj = nullptr; + } +} + // ======================================================= // 静态成员 // ======================================================= @@ -70,10 +87,12 @@ GstElement* RTMPManager::create_pipeline(const Camera& cam) const std::string stream = cam.name + "_main"; const std::string app = "camera"; + // 维持你当前 SRS 的 vhost query 方式,避免改动联动太多 const std::string live_rtmp = "rtmp://36.153.162.171:19435/" + app + "/" + stream + "?vhost=live"; - const std::string record_rtmp = "rtmp://127.0.0.1:2935/" + app + "/" + stream + "?vhost=record"; + // 关键:rtmpsink protocol=0 强制 IPv4(减少握手卡住/IPv6 干扰) + // 关键:valve 默认 drop=true,真正放行在 PLAYING 成功后再做(避免 cold-start 饿死) std::string pipeline_str = "v4l2src device=" + cam.device + " io-mode=dmabuf " "! video/x-raw,format=NV12,width=1920,height=1080,framerate=" + @@ -90,22 +109,22 @@ GstElement* RTMPManager::create_pipeline(const Camera& cam) "! h264parse config-interval=1 " "! tee name=t " - // ===== record:可开关 ===== + // ===== record branch ===== "t. ! queue max-size-buffers=8 leaky=downstream " - "! valve name=record_valve drop=true " // ★★ 新增 + "! valve name=record_valve drop=true " "! queue max-size-buffers=8 leaky=downstream " "! flvmux name=rec_mux streamable=true " "! rtmpsink name=rec_sink location=\"" + record_rtmp + - "\" sync=false async=false " + "\" sync=false async=false protocol=0 " - // ===== live:可随意开关 ===== + // ===== live branch ===== "t. ! queue max-size-buffers=8 leaky=downstream " "! valve name=live_valve drop=true " "! queue max-size-buffers=8 leaky=downstream " "! flvmux name=live_mux streamable=true " "! rtmpsink name=live_sink location=\"" + - live_rtmp + "\" sync=false async=false "; + live_rtmp + "\" sync=false async=false protocol=0 "; GError* err = nullptr; GstElement* pipeline = gst_parse_launch(pipeline_str.c_str(), &err); @@ -126,107 +145,97 @@ void RTMPManager::stream_loop(Camera cam, StreamContext* ctx) { const std::string key = make_key(cam.name); + auto mark_error = [&](const std::string& err) + { + std::lock_guard lk(ctx->status_mutex); + ctx->status.running = false; + ctx->status.last_error = err; + }; + while (ctx->thread_running) { - // 1. 设备存在性 + // 1) 设备存在性 if (!device_exists(cam.device)) { - { - std::lock_guard lk(ctx->status_mutex); - ctx->status.running = false; - ctx->status.last_error = "Device not found: " + cam.device; - } + mark_error("Device not found: " + cam.device); LOG_WARN("[RTMP] " + key + " - device not found"); std::this_thread::sleep_for(std::chrono::seconds(3)); continue; } - // 2. 创建 pipeline + // 2) 创建 pipeline GstElement* pipeline = create_pipeline(cam); if (!pipeline) { - { - std::lock_guard lk(ctx->status_mutex); - ctx->status.running = false; - ctx->status.last_error = "Pipeline creation failed"; - } + mark_error("Pipeline creation failed"); std::this_thread::sleep_for(std::chrono::seconds(3)); continue; } - // 2.1 获取 live_valve(用于 MQTT 控制) + // 2.1) 获取 valve(保存到 ctx,供 MQTT 动态控制 live,record 启动静态决定) GstElement* live_valve = gst_bin_get_by_name(GST_BIN(pipeline), "live_valve"); - if (!live_valve) - { - { - std::lock_guard lk(ctx->status_mutex); - ctx->status.running = false; - ctx->status.last_error = "live_valve not found"; - } - LOG_ERROR("[RTMP] " + key + " - live_valve not found"); - gst_object_unref(pipeline); - std::this_thread::sleep_for(std::chrono::seconds(3)); - continue; - } - - // 保存到 ctx - { - std::lock_guard lk(ctx->status_mutex); - if (ctx->live_valve) gst_object_unref(ctx->live_valve); - ctx->live_valve = live_valve; // ⚠️ live_valve 引用交给 ctx 管理 - } - - // 2.x 获取 record_valve GstElement* record_valve = gst_bin_get_by_name(GST_BIN(pipeline), "record_valve"); - if (!record_valve) + + if (!live_valve || !record_valve) { - { - std::lock_guard lk(ctx->status_mutex); - ctx->status.running = false; - ctx->status.last_error = "record_valve not found"; - } - LOG_ERROR("[RTMP] " + key + " - record_valve not found"); + if (!live_valve) LOG_ERROR("[RTMP] " + key + " - live_valve not found"); + if (!record_valve) LOG_ERROR("[RTMP] " + key + " - record_valve not found"); + mark_error("valve not found"); + + if (live_valve) gst_object_unref(live_valve); + if (record_valve) gst_object_unref(record_valve); gst_object_unref(pipeline); std::this_thread::sleep_for(std::chrono::seconds(3)); continue; } - // 保存到 ctx { std::lock_guard lk(ctx->status_mutex); + // 替换旧引用(避免泄漏) + if (ctx->live_valve) gst_object_unref(ctx->live_valve); if (ctx->record_valve) gst_object_unref(ctx->record_valve); - ctx->record_valve = record_valve; + + ctx->live_valve = live_valve; // 引用交给 ctx 管理 + ctx->record_valve = record_valve; // 引用交给 ctx 管理 } - g_object_set(G_OBJECT(ctx->record_valve), "drop", g_record_enabled ? FALSE : TRUE, nullptr); - - g_object_set(G_OBJECT(ctx->live_valve), "drop", g_live_enabled.load() ? FALSE : TRUE, nullptr); + // 2.2) 启动前:统一先 drop=true(不放行) + // 真正放行在 PLAYING 确认后进行(关键稳定性增强) + g_object_set(G_OBJECT(live_valve), "drop", TRUE, nullptr); + g_object_set(G_OBJECT(record_valve), "drop", TRUE, nullptr); GstBus* bus = gst_element_get_bus(pipeline); LOG_INFO("[RTMP] Starting stream: " + key); gst_element_set_state(pipeline, GST_STATE_PLAYING); - // 3. 等待进入 PLAYING + // 3) 等待进入 PLAYING GstState state = GST_STATE_NULL; - if (gst_element_get_state(pipeline, &state, nullptr, 5 * GST_SECOND) == GST_STATE_CHANGE_SUCCESS && - state == GST_STATE_PLAYING) + auto ret = gst_element_get_state(pipeline, &state, nullptr, 5 * GST_SECOND); + if (!(ret == GST_STATE_CHANGE_SUCCESS && state == GST_STATE_PLAYING)) { - std::lock_guard lk(ctx->status_mutex); - ctx->status.running = true; - ctx->status.last_error.clear(); - LOG_INFO("[RTMP] " + key + " confirmed PLAYING"); - } - else - { - std::lock_guard lk(ctx->status_mutex); - ctx->status.running = false; - ctx->status.last_error = "Failed to reach PLAYING"; + mark_error("Failed to reach PLAYING"); LOG_ERROR("[RTMP] " + key + " failed to PLAY"); goto cleanup; } - // 4. 正常运行:只监听 ERROR / EOS + // 4) PLAYING 成功后,稍微延迟再按最终策略放行 valve(关键稳定性增强) + std::this_thread::sleep_for(std::chrono::milliseconds(150)); + + // record:启动时决定一次(静态开关) + g_object_set(G_OBJECT(record_valve), "drop", g_record_enabled ? FALSE : TRUE, nullptr); + + // live:运行期由 MQTT 控制(动态开关),此处按当前全局状态设置初值 + g_object_set(G_OBJECT(live_valve), "drop", g_live_enabled.load() ? FALSE : TRUE, nullptr); + + { + std::lock_guard lk(ctx->status_mutex); + ctx->status.running = true; + ctx->status.last_error.clear(); + } + LOG_INFO("[RTMP] " + key + " confirmed PLAYING"); + + // 5) 正常运行:监听 ERROR / EOS while (ctx->thread_running) { GstMessage* msg = gst_bus_timed_pop_filtered( @@ -234,7 +243,7 @@ void RTMPManager::stream_loop(Camera cam, StreamContext* ctx) if (!msg) { - // 心跳:pipeline 没死就认为在跑 + // 心跳:能跑就认为 running std::lock_guard lk(ctx->status_mutex); ctx->status.running = true; continue; @@ -245,13 +254,9 @@ void RTMPManager::stream_loop(Camera cam, StreamContext* ctx) GError* err = nullptr; gst_message_parse_error(msg, &err, nullptr); - { - std::lock_guard lk(ctx->status_mutex); - ctx->status.running = false; - ctx->status.last_error = err ? err->message : "Unknown GStreamer error"; - } - + mark_error(err ? err->message : "Unknown GStreamer error"); LOG_ERROR("[RTMP] " + key + " ERROR: " + (err ? err->message : "unknown")); + if (err) g_error_free(err); gst_message_unref(msg); break; @@ -259,11 +264,7 @@ void RTMPManager::stream_loop(Camera cam, StreamContext* ctx) if (GST_MESSAGE_TYPE(msg) == GST_MESSAGE_EOS) { - { - std::lock_guard lk(ctx->status_mutex); - ctx->status.running = false; - ctx->status.last_error = "EOS"; - } + mark_error("EOS"); LOG_WARN("[RTMP] " + key + " EOS"); gst_message_unref(msg); break; @@ -273,22 +274,22 @@ void RTMPManager::stream_loop(Camera cam, StreamContext* ctx) } cleanup: - { - std::lock_guard lk(ctx->status_mutex); - - if (ctx->live_valve) + // 6) 清理:先释放 ctx 保存的 valve 引用(避免下一轮使用旧引用) { - gst_object_unref(ctx->live_valve); - ctx->live_valve = nullptr; + std::lock_guard lk(ctx->status_mutex); + if (ctx->live_valve) + { + gst_object_unref(ctx->live_valve); + ctx->live_valve = nullptr; + } + if (ctx->record_valve) + { + gst_object_unref(ctx->record_valve); + ctx->record_valve = nullptr; + } } - if (ctx->record_valve) - { - gst_object_unref(ctx->record_valve); - ctx->record_valve = nullptr; - } - } - + // 7) pipeline 清理 gst_element_set_state(pipeline, GST_STATE_NULL); if (bus) gst_object_unref(bus); gst_object_unref(pipeline); @@ -357,7 +358,6 @@ void RTMPManager::stop_all() gst_object_unref(kv.second->live_valve); kv.second->live_valve = nullptr; } - if (kv.second->record_valve) { gst_object_unref(kv.second->record_valve); @@ -434,6 +434,9 @@ std::vector RTMPManager::get_all_channels_status() return result; } +// ======================================================= +// live:MQTT 动态开关 +// ======================================================= void RTMPManager::set_live_enabled_all(bool enable) { std::lock_guard lock(streams_mutex); @@ -444,7 +447,6 @@ void RTMPManager::set_live_enabled_all(bool enable) { auto* ctx = kv.second.get(); std::lock_guard lk(ctx->status_mutex); - if (!ctx->live_valve) continue; // enable=true → drop=false(放行 live) @@ -462,10 +464,8 @@ void RTMPManager::set_live_enabled(const std::string& cam_name, bool enable) auto* ctx = it->second.get(); std::lock_guard lk(ctx->status_mutex); - if (!ctx->live_valve) return; g_object_set(G_OBJECT(ctx->live_valve), "drop", enable ? FALSE : TRUE, nullptr); - LOG_INFO("[RTMP] Live " + std::string(enable ? "ENABLED" : "DISABLED") + " for " + cam_name); }