From b6fd6034086b3af9c19d38cb36acc6cdb9057070 Mon Sep 17 00:00:00 2001 From: cxh Date: Fri, 9 Jan 2026 09:59:46 +0800 Subject: [PATCH] 1 --- src/rtmp_manager.cpp | 333 +++++++++---------------------------------- 1 file changed, 67 insertions(+), 266 deletions(-) diff --git a/src/rtmp_manager.cpp b/src/rtmp_manager.cpp index bb3a858..19b7c0b 100644 --- a/src/rtmp_manager.cpp +++ b/src/rtmp_manager.cpp @@ -3,12 +3,10 @@ #include #include -#include #include #include #include -#include #include // ========== 工具函数 ========== @@ -18,35 +16,24 @@ static bool device_exists(const std::string& path) return (stat(path.c_str(), &st) == 0); } -// 动态获取指定网卡 IPv4 地址 std::string get_ip_address(const std::string& ifname) { - struct ifaddrs *ifaddr, *ifa; + struct ifaddrs* ifaddr = nullptr; + if (getifaddrs(&ifaddr) == -1) return ""; + char ip[INET_ADDRSTRLEN] = {0}; - - if (getifaddrs(&ifaddr) == -1) + for (auto* ifa = ifaddr; ifa; ifa = ifa->ifa_next) { - perror("getifaddrs"); - return ""; - } - - for (ifa = ifaddr; ifa != nullptr; ifa = ifa->ifa_next) - { - if (ifa->ifa_addr == nullptr) continue; - + if (!ifa->ifa_addr) continue; if (ifa->ifa_addr->sa_family == AF_INET && ifname == ifa->ifa_name) { - void* addr = &((struct sockaddr_in*)ifa->ifa_addr)->sin_addr; - if (inet_ntop(AF_INET, addr, ip, sizeof(ip))) - { - freeifaddrs(ifaddr); - return ip; - } + auto* addr = &((struct sockaddr_in*)ifa->ifa_addr)->sin_addr; + inet_ntop(AF_INET, addr, ip, sizeof(ip)); + break; } } - freeifaddrs(ifaddr); - return ""; + return ip; } // ========== 静态成员 ========== @@ -62,102 +49,40 @@ void RTMPManager::init() std::string RTMPManager::make_key(const std::string& name) { return name + "_main"; } -// ========== 创建推流管线 ========== -// GstElement* RTMPManager::create_pipeline(const Camera& cam) -// { -// const int width = cam.width; -// const int height = cam.height; -// const int fps = cam.fps; -// const int bitrate = cam.bitrate; - -// // MediaMTX 中的 stream key -// const std::string stream_name = cam.name; - -// // RTMP 推送到 MediaMTX -// // mediamtx.yml 中 paths 会自动创建 -// const std::string rtmp_url = "rtmp://127.0.0.1:1935/" + stream_name; - -// /* -// * Pipeline 说明: -// * v4l2src -> mpph264enc -> h264parse -> flvmux -> rtmpsink -// * -// * - 不使用 tee(降低死锁概率) -// * - 不引入音频(MediaMTX 不强制要求) -// * - 纯视频、纯 RTMP、纯 TCP -// */ -// std::string pipeline_str = "v4l2src name=src device=" + cam.device + -// " ! video/x-raw,format=NV12,width=" + std::to_string(width) + -// ",height=" + std::to_string(height) + ",framerate=" + std::to_string(fps) + -// "/1 " -// " ! mpph264enc bps=" + -// std::to_string(bitrate) + " gop=" + std::to_string(fps) + -// " rc-mode=cbr " -// " ! h264parse name=parse " -// " ! flvmux streamable=true " -// " ! rtmpsink location=\"" + -// rtmp_url + -// "\" " -// " sync=false async=false"; - -// GError* error = nullptr; -// GstElement* pipeline = gst_parse_launch(pipeline_str.c_str(), &error); -// if (error) -// { -// LOG_ERROR(std::string("[RTMP] Pipeline creation failed: ") + error->message); -// g_error_free(error); -// return nullptr; -// } - -// return pipeline; -// } - +// ========== 创建 RTMP Pipeline ========== GstElement* RTMPManager::create_pipeline(const Camera& cam) { - const int fps = 30; - const int bitrate = cam.bitrate; - const int gop = 15; // ⭐ 从 4 改为 15(非常重要) + const int gop = 15; - const std::string stream_name = cam.name; - const std::string rtmp_url = "rtmp://127.0.0.1:1935/" + stream_name; + const std::string rtmp_url = "rtmp://127.0.0.1:1935/" + cam.name; std::string pipeline_str = "v4l2src device=" + cam.device + " io-mode=dmabuf " - "! video/x-raw,format=NV12," - "width=1280,height=960," - "framerate=30/1 " - + "! video/x-raw,format=NV12,width=1280,height=960,framerate=30/1 " "! videocrop top=120 bottom=120 " - "! videoscale " - "! video/x-raw," - "width=" + + "! video/x-raw,width=" + std::to_string(cam.width) + ",height=" + std::to_string(cam.height) + " " - // ⭐ 缓冲拉开,避免瞬时饿死 "! queue max-size-buffers=8 max-size-time=0 leaky=downstream " - "! mpph264enc " - "rc-mode=cbr " + "! mpph264enc rc-mode=cbr " "bps=" + - std::to_string(bitrate) + + std::to_string(cam.bitrate) + " " "gop=" + std::to_string(gop) + " " - "header-mode=each-idr " - "profile=main " + "header-mode=each-idr profile=main " - "! h264parse config-interval=1 " + "! h264parse name=parse config-interval=1 " "! video/x-h264,stream-format=avc,alignment=au " "! flvmux streamable=true " - "! rtmpsink location=\"" + - rtmp_url + - "\" " - "sync=false async=false"; + rtmp_url + "\" sync=false async=false"; GError* error = nullptr; GstElement* pipeline = gst_parse_launch(pipeline_str.c_str(), &error); @@ -167,7 +92,6 @@ GstElement* RTMPManager::create_pipeline(const Camera& cam) g_error_free(error); return nullptr; } - return pipeline; } @@ -176,208 +100,137 @@ void RTMPManager::stream_loop(Camera cam, StreamContext* ctx) { const std::string key = make_key(cam.name); - constexpr int64_t START_TIMEOUT_MS = 5000; // 启动阶段无帧 - constexpr int64_t NO_FRAME_TIMEOUT_MS = 10000; // 运行阶段突然无帧 ⇒ pipeline 卡死重启 + constexpr int64_t START_TIMEOUT_MS = 12000; // ⭐ 启动宽限期 + constexpr int64_t NO_FRAME_TIMEOUT_MS = 10000; while (ctx->thread_running) { - // 1) 检查设备节点 - struct stat st{}; - if (stat(cam.device.c_str(), &st) != 0) + if (!device_exists(cam.device)) { - std::lock_guard lk(ctx->status_mutex); - ctx->status.running = false; - ctx->status.last_error = "Device not found: " + cam.device; - LOG_WARN("[RTMP] " + key + " - " + ctx->status.last_error); + LOG_WARN("[RTMP] " + key + " - device not found"); std::this_thread::sleep_for(std::chrono::seconds(3)); continue; } - // 2) 创建 pipeline GstElement* pipeline = create_pipeline(cam); if (!pipeline) { - std::lock_guard lk(ctx->status_mutex); - ctx->status.running = false; - ctx->status.last_error = "Pipeline creation failed"; std::this_thread::sleep_for(std::chrono::seconds(3)); continue; } + gst_element_set_name(pipeline, key.c_str()); GstBus* bus = gst_element_get_bus(pipeline); - // 3) 帧探测:记录 last_frame_ms ctx->last_frame_ms.store(0, std::memory_order_relaxed); + + // ⭐ 帧探测挂在 h264parse + if (GstElement* parse = gst_bin_get_by_name(GST_BIN(pipeline), "parse")) { - GstElement* src = gst_bin_get_by_name(GST_BIN(pipeline), "src"); - if (src) + if (GstPad* pad = gst_element_get_static_pad(parse, "src")) { - GstPad* pad = gst_element_get_static_pad(src, "src"); - if (pad) - { - gst_pad_add_probe( - pad, GST_PAD_PROBE_TYPE_BUFFER, - [](GstPad*, GstPadProbeInfo*, gpointer data) -> GstPadProbeReturn - { - auto ts = static_cast*>(data); - auto now = std::chrono::steady_clock::now().time_since_epoch(); - auto ms = std::chrono::duration_cast(now).count(); - ts->store(ms, std::memory_order_relaxed); - return GST_PAD_PROBE_OK; - }, - &(ctx->last_frame_ms), nullptr); - gst_object_unref(pad); - } - gst_object_unref(src); + gst_pad_add_probe( + pad, GST_PAD_PROBE_TYPE_BUFFER, + [](GstPad*, GstPadProbeInfo*, gpointer data) -> GstPadProbeReturn + { + auto* ts = static_cast*>(data); + auto now = std::chrono::steady_clock::now().time_since_epoch(); + ts->store(std::chrono::duration_cast(now).count(), + std::memory_order_relaxed); + return GST_PAD_PROBE_OK; + }, + &ctx->last_frame_ms, nullptr); + gst_object_unref(pad); } + gst_object_unref(parse); } - // 4) 启动 pipeline LOG_INFO("[RTMP] Starting stream: " + key); gst_element_set_state(pipeline, GST_STATE_PLAYING); - GstState state = GST_STATE_NULL, pending = GST_STATE_NULL; - if (gst_element_get_state(pipeline, &state, &pending, 5 * GST_SECOND) != GST_STATE_CHANGE_SUCCESS || + GstState state; + if (gst_element_get_state(pipeline, &state, nullptr, 5 * GST_SECOND) != GST_STATE_CHANGE_SUCCESS || state != GST_STATE_PLAYING) { - std::lock_guard lk(ctx->status_mutex); - ctx->status.running = false; - ctx->status.last_error = "Failed to enter PLAYING"; - LOG_WARN("[RTMP] " + key + " - " + ctx->status.last_error); - + LOG_WARN("[RTMP] " + key + " failed to enter PLAYING"); gst_element_set_state(pipeline, GST_STATE_NULL); - if (bus) gst_object_unref(bus); + gst_object_unref(bus); gst_object_unref(pipeline); - - std::this_thread::sleep_for(std::chrono::milliseconds(RETRY_BASE_DELAY_MS)); + std::this_thread::sleep_for(std::chrono::seconds(3)); continue; } - // pipeline 成功启动 - { - std::lock_guard lk(ctx->status_mutex); - ctx->status.running = true; - ctx->status.last_error.clear(); - } LOG_INFO("[RTMP] " + key + " confirmed PLAYING"); auto launch_tp = std::chrono::steady_clock::now(); - + bool entered_running = false; bool need_restart = false; - // 5) 主循环:检测停帧或 error while (ctx->thread_running) { auto now = std::chrono::steady_clock::now(); int64_t now_ms = std::chrono::duration_cast(now.time_since_epoch()).count(); int64_t last_ms = ctx->last_frame_ms.load(std::memory_order_relaxed); - if (last_ms == 0) + if (!entered_running) { - auto startup_ms = std::chrono::duration_cast(now - launch_tp).count(); - if (startup_ms > START_TIMEOUT_MS) + if (last_ms != 0) { - std::lock_guard lk(ctx->status_mutex); - ctx->status.running = false; - ctx->status.last_error = "No frames detected at startup"; - LOG_ERROR("[RTMP] " + key + " - " + ctx->status.last_error); + entered_running = true; + LOG_INFO("[RTMP] " + key + " entered RUNNING state"); + } + else if (std::chrono::duration_cast(now - launch_tp).count() > + START_TIMEOUT_MS) + { + LOG_ERROR("[RTMP] " + key + " - no frames during startup"); need_restart = true; break; } } else { - int64_t idle_ms = now_ms - last_ms; - if (idle_ms > NO_FRAME_TIMEOUT_MS) + if (now_ms - last_ms > NO_FRAME_TIMEOUT_MS) { - std::lock_guard lk(ctx->status_mutex); - ctx->status.running = false; - ctx->status.last_error = "Frame stalled for " + std::to_string(idle_ms) + " ms"; - LOG_ERROR("[RTMP] " + key + " - " + ctx->status.last_error); + LOG_ERROR("[RTMP] " + key + " - frame stalled"); need_restart = true; break; } - else - { - std::lock_guard lk(ctx->status_mutex); - ctx->status.running = true; - ctx->status.last_error.clear(); - } } - // 错误消息检测 - GstMessage* msg = gst_bus_timed_pop_filtered(bus, 200 * GST_MSECOND, - (GstMessageType)(GST_MESSAGE_ERROR | GST_MESSAGE_EOS)); - - if (!msg) continue; - - if (GST_MESSAGE_TYPE(msg) == GST_MESSAGE_ERROR) + if (GstMessage* msg = gst_bus_timed_pop_filtered(bus, 200 * GST_MSECOND, + (GstMessageType)(GST_MESSAGE_ERROR | GST_MESSAGE_EOS))) { - GError* err = nullptr; - gst_message_parse_error(msg, &err, nullptr); - std::lock_guard lk(ctx->status_mutex); - ctx->status.running = false; - ctx->status.last_error = err ? err->message : "GStreamer error"; - LOG_ERROR("[RTMP] " + key + " - " + ctx->status.last_error); - if (err) g_error_free(err); need_restart = true; + gst_message_unref(msg); + break; } - else if (GST_MESSAGE_TYPE(msg) == GST_MESSAGE_EOS) - { - std::lock_guard lk(ctx->status_mutex); - ctx->status.running = false; - ctx->status.last_error = "EOS (End of stream)"; - LOG_WARN("[RTMP] " + key + " - " + ctx->status.last_error); - need_restart = true; - } - - gst_message_unref(msg); - if (need_restart) break; } - // 清理 & 重建 gst_element_set_state(pipeline, GST_STATE_NULL); - if (bus) gst_object_unref(bus); + gst_object_unref(bus); gst_object_unref(pipeline); - if (ctx->thread_running) + if (ctx->thread_running && need_restart) { LOG_WARN("[RTMP] Restarting " + key + " in 3s..."); - std::this_thread::sleep_for(std::chrono::milliseconds(RETRY_BASE_DELAY_MS)); + std::this_thread::sleep_for(std::chrono::seconds(3)); } } - { - std::lock_guard lk(ctx->status_mutex); - ctx->status.running = false; - } - LOG_INFO("[RTMP] Stream thread exited for " + key); } -// ========== 启停与状态 ========== +// ========== 启停 ========== void RTMPManager::start_all() { - LOG_INFO("[RTMP] Starting enabled record streams..."); std::lock_guard lock(streams_mutex); int delay_ms = 0; for (const auto& cam : g_app_config.cameras) { - // 跳过未启用摄像头 - if (!cam.enabled) - { - LOG_INFO("[RTMP] Skip disabled camera: " + cam.name); - continue; - } + if (!cam.enabled) continue; auto key = make_key(cam.name); - if (streams.find(key) != streams.end()) - { - LOG_INFO("[RTMP] Stream already running: " + key); - continue; - } - auto ctx = std::make_unique(); ctx->thread_running.store(true); @@ -402,61 +255,9 @@ void RTMPManager::stop_all() streams.clear(); } -bool RTMPManager::is_streaming(const std::string& cam_name) -{ - std::lock_guard lock(streams_mutex); - auto it = streams.find(make_key(cam_name)); - if (it == streams.end()) return false; - - auto& ctx = *(it->second); - std::lock_guard lk(ctx.status_mutex); - return ctx.status.running; -} - std::string RTMPManager::get_stream_url(const std::string& cam_name) { std::string ip = get_ip_address("enP2p33s0"); if (ip.empty()) ip = "127.0.0.1"; return "rtsp://" + ip + ":18554/" + cam_name; } -// ========== 汇总状态 ========== -std::vector RTMPManager::get_all_channels_status() -{ - std::vector result; - std::lock_guard lock(streams_mutex); - - for (size_t i = 0; i < g_app_config.cameras.size(); ++i) - { - const auto& cam = g_app_config.cameras[i]; - auto key = make_key(cam.name); - - ChannelInfo ch; - ch.loc = static_cast(i); - ch.url.clear(); - ch.running = false; - ch.reason = "Not started"; - - auto it = streams.find(key); - if (it != streams.end()) - { - auto& ctx = *(it->second); - - std::lock_guard lk(ctx.status_mutex); - auto& status = it->second->status; - - ch.running = status.running; - if (status.running) - { - ch.url = get_stream_url(cam.name); - ch.reason.clear(); - } - else - { - ch.reason = status.last_error.empty() ? "Unknown error" : status.last_error; - } - } - - result.push_back(ch); - } - return result; -}