增加稳定性
This commit is contained in:
parent
222e816d5e
commit
ee06511a84
@ -1,4 +1,10 @@
|
||||
// rtmp_manager.cpp
|
||||
// rtmp_manager.cpp (reliable integrated version)
|
||||
// - record:启动时由配置决定(静态开关)
|
||||
// - live:MQTT 动态开关(运行时可变)
|
||||
// - 关键增强:PLAYING 成功后再“放行 valve”(避免 cold-start mux/sink 饿死)
|
||||
// - 关键增强:rtmpsink 强制 IPv4(protocol=0),减少偶发握手卡住
|
||||
// - 关键增强:更严谨的 cleanup + 保存/释放 valve 引用,避免泄漏/悬挂引用
|
||||
|
||||
#include "rtmp_manager.hpp"
|
||||
|
||||
#include <arpa/inet.h>
|
||||
@ -12,6 +18,7 @@
|
||||
#include <thread>
|
||||
|
||||
std::atomic<bool> RTMPManager::g_live_enabled{false};
|
||||
// record 是“启动时决定一次”的静态开关:由 config.json 决定
|
||||
bool RTMPManager::g_record_enabled = true;
|
||||
|
||||
// =======================================================
|
||||
@ -24,7 +31,7 @@ static bool device_exists(const std::string& path)
|
||||
}
|
||||
|
||||
// 获取指定网卡 IPv4
|
||||
std::string get_ip_address(const std::string& ifname)
|
||||
static std::string get_ip_address(const std::string& ifname)
|
||||
{
|
||||
struct ifaddrs* ifaddr = nullptr;
|
||||
if (getifaddrs(&ifaddr) != 0) return "";
|
||||
@ -45,6 +52,16 @@ std::string get_ip_address(const std::string& ifname)
|
||||
return ip;
|
||||
}
|
||||
|
||||
// 安全释放 GstObject*
|
||||
static void safe_unref(GstObject*& obj)
|
||||
{
|
||||
if (obj)
|
||||
{
|
||||
gst_object_unref(obj);
|
||||
obj = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
// =======================================================
|
||||
// 静态成员
|
||||
// =======================================================
|
||||
@ -70,10 +87,12 @@ GstElement* RTMPManager::create_pipeline(const Camera& cam)
|
||||
const std::string stream = cam.name + "_main";
|
||||
const std::string app = "camera";
|
||||
|
||||
// 维持你当前 SRS 的 vhost query 方式,避免改动联动太多
|
||||
const std::string live_rtmp = "rtmp://36.153.162.171:19435/" + app + "/" + stream + "?vhost=live";
|
||||
|
||||
const std::string record_rtmp = "rtmp://127.0.0.1:2935/" + app + "/" + stream + "?vhost=record";
|
||||
|
||||
// 关键:rtmpsink protocol=0 强制 IPv4(减少握手卡住/IPv6 干扰)
|
||||
// 关键:valve 默认 drop=true,真正放行在 PLAYING 成功后再做(避免 cold-start 饿死)
|
||||
std::string pipeline_str = "v4l2src device=" + cam.device +
|
||||
" io-mode=dmabuf "
|
||||
"! video/x-raw,format=NV12,width=1920,height=1080,framerate=" +
|
||||
@ -90,22 +109,22 @@ GstElement* RTMPManager::create_pipeline(const Camera& cam)
|
||||
"! h264parse config-interval=1 "
|
||||
"! tee name=t "
|
||||
|
||||
// ===== record:可开关 =====
|
||||
// ===== record branch =====
|
||||
"t. ! queue max-size-buffers=8 leaky=downstream "
|
||||
"! valve name=record_valve drop=true " // ★★ 新增
|
||||
"! valve name=record_valve drop=true "
|
||||
"! queue max-size-buffers=8 leaky=downstream "
|
||||
"! flvmux name=rec_mux streamable=true "
|
||||
"! rtmpsink name=rec_sink location=\"" +
|
||||
record_rtmp +
|
||||
"\" sync=false async=false "
|
||||
"\" sync=false async=false protocol=0 "
|
||||
|
||||
// ===== live:可随意开关 =====
|
||||
// ===== live branch =====
|
||||
"t. ! queue max-size-buffers=8 leaky=downstream "
|
||||
"! valve name=live_valve drop=true "
|
||||
"! queue max-size-buffers=8 leaky=downstream "
|
||||
"! flvmux name=live_mux streamable=true "
|
||||
"! rtmpsink name=live_sink location=\"" +
|
||||
live_rtmp + "\" sync=false async=false ";
|
||||
live_rtmp + "\" sync=false async=false protocol=0 ";
|
||||
|
||||
GError* err = nullptr;
|
||||
GstElement* pipeline = gst_parse_launch(pipeline_str.c_str(), &err);
|
||||
@ -126,107 +145,97 @@ void RTMPManager::stream_loop(Camera cam, StreamContext* ctx)
|
||||
{
|
||||
const std::string key = make_key(cam.name);
|
||||
|
||||
auto mark_error = [&](const std::string& err)
|
||||
{
|
||||
std::lock_guard<std::mutex> lk(ctx->status_mutex);
|
||||
ctx->status.running = false;
|
||||
ctx->status.last_error = err;
|
||||
};
|
||||
|
||||
while (ctx->thread_running)
|
||||
{
|
||||
// 1. 设备存在性
|
||||
// 1) 设备存在性
|
||||
if (!device_exists(cam.device))
|
||||
{
|
||||
{
|
||||
std::lock_guard<std::mutex> lk(ctx->status_mutex);
|
||||
ctx->status.running = false;
|
||||
ctx->status.last_error = "Device not found: " + cam.device;
|
||||
}
|
||||
mark_error("Device not found: " + cam.device);
|
||||
LOG_WARN("[RTMP] " + key + " - device not found");
|
||||
std::this_thread::sleep_for(std::chrono::seconds(3));
|
||||
continue;
|
||||
}
|
||||
|
||||
// 2. 创建 pipeline
|
||||
// 2) 创建 pipeline
|
||||
GstElement* pipeline = create_pipeline(cam);
|
||||
if (!pipeline)
|
||||
{
|
||||
{
|
||||
std::lock_guard<std::mutex> lk(ctx->status_mutex);
|
||||
ctx->status.running = false;
|
||||
ctx->status.last_error = "Pipeline creation failed";
|
||||
}
|
||||
mark_error("Pipeline creation failed");
|
||||
std::this_thread::sleep_for(std::chrono::seconds(3));
|
||||
continue;
|
||||
}
|
||||
|
||||
// 2.1 获取 live_valve(用于 MQTT 控制)
|
||||
// 2.1) 获取 valve(保存到 ctx,供 MQTT 动态控制 live,record 启动静态决定)
|
||||
GstElement* live_valve = gst_bin_get_by_name(GST_BIN(pipeline), "live_valve");
|
||||
if (!live_valve)
|
||||
{
|
||||
{
|
||||
std::lock_guard<std::mutex> lk(ctx->status_mutex);
|
||||
ctx->status.running = false;
|
||||
ctx->status.last_error = "live_valve not found";
|
||||
}
|
||||
LOG_ERROR("[RTMP] " + key + " - live_valve not found");
|
||||
gst_object_unref(pipeline);
|
||||
std::this_thread::sleep_for(std::chrono::seconds(3));
|
||||
continue;
|
||||
}
|
||||
|
||||
// 保存到 ctx
|
||||
{
|
||||
std::lock_guard<std::mutex> lk(ctx->status_mutex);
|
||||
if (ctx->live_valve) gst_object_unref(ctx->live_valve);
|
||||
ctx->live_valve = live_valve; // ⚠️ live_valve 引用交给 ctx 管理
|
||||
}
|
||||
|
||||
// 2.x 获取 record_valve
|
||||
GstElement* record_valve = gst_bin_get_by_name(GST_BIN(pipeline), "record_valve");
|
||||
if (!record_valve)
|
||||
|
||||
if (!live_valve || !record_valve)
|
||||
{
|
||||
{
|
||||
std::lock_guard<std::mutex> lk(ctx->status_mutex);
|
||||
ctx->status.running = false;
|
||||
ctx->status.last_error = "record_valve not found";
|
||||
}
|
||||
LOG_ERROR("[RTMP] " + key + " - record_valve not found");
|
||||
if (!live_valve) LOG_ERROR("[RTMP] " + key + " - live_valve not found");
|
||||
if (!record_valve) LOG_ERROR("[RTMP] " + key + " - record_valve not found");
|
||||
mark_error("valve not found");
|
||||
|
||||
if (live_valve) gst_object_unref(live_valve);
|
||||
if (record_valve) gst_object_unref(record_valve);
|
||||
gst_object_unref(pipeline);
|
||||
std::this_thread::sleep_for(std::chrono::seconds(3));
|
||||
continue;
|
||||
}
|
||||
|
||||
// 保存到 ctx
|
||||
{
|
||||
std::lock_guard<std::mutex> lk(ctx->status_mutex);
|
||||
// 替换旧引用(避免泄漏)
|
||||
if (ctx->live_valve) gst_object_unref(ctx->live_valve);
|
||||
if (ctx->record_valve) gst_object_unref(ctx->record_valve);
|
||||
ctx->record_valve = record_valve;
|
||||
|
||||
ctx->live_valve = live_valve; // 引用交给 ctx 管理
|
||||
ctx->record_valve = record_valve; // 引用交给 ctx 管理
|
||||
}
|
||||
|
||||
g_object_set(G_OBJECT(ctx->record_valve), "drop", g_record_enabled ? FALSE : TRUE, nullptr);
|
||||
|
||||
g_object_set(G_OBJECT(ctx->live_valve), "drop", g_live_enabled.load() ? FALSE : TRUE, nullptr);
|
||||
// 2.2) 启动前:统一先 drop=true(不放行)
|
||||
// 真正放行在 PLAYING 确认后进行(关键稳定性增强)
|
||||
g_object_set(G_OBJECT(live_valve), "drop", TRUE, nullptr);
|
||||
g_object_set(G_OBJECT(record_valve), "drop", TRUE, nullptr);
|
||||
|
||||
GstBus* bus = gst_element_get_bus(pipeline);
|
||||
|
||||
LOG_INFO("[RTMP] Starting stream: " + key);
|
||||
gst_element_set_state(pipeline, GST_STATE_PLAYING);
|
||||
|
||||
// 3. 等待进入 PLAYING
|
||||
// 3) 等待进入 PLAYING
|
||||
GstState state = GST_STATE_NULL;
|
||||
if (gst_element_get_state(pipeline, &state, nullptr, 5 * GST_SECOND) == GST_STATE_CHANGE_SUCCESS &&
|
||||
state == GST_STATE_PLAYING)
|
||||
auto ret = gst_element_get_state(pipeline, &state, nullptr, 5 * GST_SECOND);
|
||||
if (!(ret == GST_STATE_CHANGE_SUCCESS && state == GST_STATE_PLAYING))
|
||||
{
|
||||
std::lock_guard<std::mutex> lk(ctx->status_mutex);
|
||||
ctx->status.running = true;
|
||||
ctx->status.last_error.clear();
|
||||
LOG_INFO("[RTMP] " + key + " confirmed PLAYING");
|
||||
}
|
||||
else
|
||||
{
|
||||
std::lock_guard<std::mutex> lk(ctx->status_mutex);
|
||||
ctx->status.running = false;
|
||||
ctx->status.last_error = "Failed to reach PLAYING";
|
||||
mark_error("Failed to reach PLAYING");
|
||||
LOG_ERROR("[RTMP] " + key + " failed to PLAY");
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
// 4. 正常运行:只监听 ERROR / EOS
|
||||
// 4) PLAYING 成功后,稍微延迟再按最终策略放行 valve(关键稳定性增强)
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(150));
|
||||
|
||||
// record:启动时决定一次(静态开关)
|
||||
g_object_set(G_OBJECT(record_valve), "drop", g_record_enabled ? FALSE : TRUE, nullptr);
|
||||
|
||||
// live:运行期由 MQTT 控制(动态开关),此处按当前全局状态设置初值
|
||||
g_object_set(G_OBJECT(live_valve), "drop", g_live_enabled.load() ? FALSE : TRUE, nullptr);
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> lk(ctx->status_mutex);
|
||||
ctx->status.running = true;
|
||||
ctx->status.last_error.clear();
|
||||
}
|
||||
LOG_INFO("[RTMP] " + key + " confirmed PLAYING");
|
||||
|
||||
// 5) 正常运行:监听 ERROR / EOS
|
||||
while (ctx->thread_running)
|
||||
{
|
||||
GstMessage* msg = gst_bus_timed_pop_filtered(
|
||||
@ -234,7 +243,7 @@ void RTMPManager::stream_loop(Camera cam, StreamContext* ctx)
|
||||
|
||||
if (!msg)
|
||||
{
|
||||
// 心跳:pipeline 没死就认为在跑
|
||||
// 心跳:能跑就认为 running
|
||||
std::lock_guard<std::mutex> lk(ctx->status_mutex);
|
||||
ctx->status.running = true;
|
||||
continue;
|
||||
@ -245,13 +254,9 @@ void RTMPManager::stream_loop(Camera cam, StreamContext* ctx)
|
||||
GError* err = nullptr;
|
||||
gst_message_parse_error(msg, &err, nullptr);
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> lk(ctx->status_mutex);
|
||||
ctx->status.running = false;
|
||||
ctx->status.last_error = err ? err->message : "Unknown GStreamer error";
|
||||
}
|
||||
|
||||
mark_error(err ? err->message : "Unknown GStreamer error");
|
||||
LOG_ERROR("[RTMP] " + key + " ERROR: " + (err ? err->message : "unknown"));
|
||||
|
||||
if (err) g_error_free(err);
|
||||
gst_message_unref(msg);
|
||||
break;
|
||||
@ -259,11 +264,7 @@ void RTMPManager::stream_loop(Camera cam, StreamContext* ctx)
|
||||
|
||||
if (GST_MESSAGE_TYPE(msg) == GST_MESSAGE_EOS)
|
||||
{
|
||||
{
|
||||
std::lock_guard<std::mutex> lk(ctx->status_mutex);
|
||||
ctx->status.running = false;
|
||||
ctx->status.last_error = "EOS";
|
||||
}
|
||||
mark_error("EOS");
|
||||
LOG_WARN("[RTMP] " + key + " EOS");
|
||||
gst_message_unref(msg);
|
||||
break;
|
||||
@ -273,22 +274,22 @@ void RTMPManager::stream_loop(Camera cam, StreamContext* ctx)
|
||||
}
|
||||
|
||||
cleanup:
|
||||
{
|
||||
std::lock_guard<std::mutex> lk(ctx->status_mutex);
|
||||
|
||||
if (ctx->live_valve)
|
||||
// 6) 清理:先释放 ctx 保存的 valve 引用(避免下一轮使用旧引用)
|
||||
{
|
||||
gst_object_unref(ctx->live_valve);
|
||||
ctx->live_valve = nullptr;
|
||||
std::lock_guard<std::mutex> lk(ctx->status_mutex);
|
||||
if (ctx->live_valve)
|
||||
{
|
||||
gst_object_unref(ctx->live_valve);
|
||||
ctx->live_valve = nullptr;
|
||||
}
|
||||
if (ctx->record_valve)
|
||||
{
|
||||
gst_object_unref(ctx->record_valve);
|
||||
ctx->record_valve = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
if (ctx->record_valve)
|
||||
{
|
||||
gst_object_unref(ctx->record_valve);
|
||||
ctx->record_valve = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
// 7) pipeline 清理
|
||||
gst_element_set_state(pipeline, GST_STATE_NULL);
|
||||
if (bus) gst_object_unref(bus);
|
||||
gst_object_unref(pipeline);
|
||||
@ -357,7 +358,6 @@ void RTMPManager::stop_all()
|
||||
gst_object_unref(kv.second->live_valve);
|
||||
kv.second->live_valve = nullptr;
|
||||
}
|
||||
|
||||
if (kv.second->record_valve)
|
||||
{
|
||||
gst_object_unref(kv.second->record_valve);
|
||||
@ -434,6 +434,9 @@ std::vector<RTMPManager::ChannelInfo> RTMPManager::get_all_channels_status()
|
||||
return result;
|
||||
}
|
||||
|
||||
// =======================================================
|
||||
// live:MQTT 动态开关
|
||||
// =======================================================
|
||||
void RTMPManager::set_live_enabled_all(bool enable)
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(streams_mutex);
|
||||
@ -444,7 +447,6 @@ void RTMPManager::set_live_enabled_all(bool enable)
|
||||
{
|
||||
auto* ctx = kv.second.get();
|
||||
std::lock_guard<std::mutex> lk(ctx->status_mutex);
|
||||
|
||||
if (!ctx->live_valve) continue;
|
||||
|
||||
// enable=true → drop=false(放行 live)
|
||||
@ -462,10 +464,8 @@ void RTMPManager::set_live_enabled(const std::string& cam_name, bool enable)
|
||||
|
||||
auto* ctx = it->second.get();
|
||||
std::lock_guard<std::mutex> lk(ctx->status_mutex);
|
||||
|
||||
if (!ctx->live_valve) return;
|
||||
|
||||
g_object_set(G_OBJECT(ctx->live_valve), "drop", enable ? FALSE : TRUE, nullptr);
|
||||
|
||||
LOG_INFO("[RTMP] Live " + std::string(enable ? "ENABLED" : "DISABLED") + " for " + cam_name);
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user