From 45440a71dcedc28c39d3511f6c6a673a64489853 Mon Sep 17 00:00:00 2001 From: cxh Date: Fri, 9 Jan 2026 11:28:47 +0800 Subject: [PATCH] 1 --- src/rtmp_manager.cpp | 378 ++++++++++++++++++++++++++++++++----------- 1 file changed, 286 insertions(+), 92 deletions(-) diff --git a/src/rtmp_manager.cpp b/src/rtmp_manager.cpp index 568903d..bb3a858 100644 --- a/src/rtmp_manager.cpp +++ b/src/rtmp_manager.cpp @@ -3,10 +3,12 @@ #include #include +#include #include #include #include +#include #include // ========== 工具函数 ========== @@ -16,24 +18,35 @@ static bool device_exists(const std::string& path) return (stat(path.c_str(), &st) == 0); } +// 动态获取指定网卡 IPv4 地址 std::string get_ip_address(const std::string& ifname) { - struct ifaddrs* ifaddr = nullptr; - if (getifaddrs(&ifaddr) == -1) return ""; - + struct ifaddrs *ifaddr, *ifa; char ip[INET_ADDRSTRLEN] = {0}; - for (auto* ifa = ifaddr; ifa; ifa = ifa->ifa_next) + + if (getifaddrs(&ifaddr) == -1) { - if (!ifa->ifa_addr) continue; + perror("getifaddrs"); + return ""; + } + + for (ifa = ifaddr; ifa != nullptr; ifa = ifa->ifa_next) + { + if (ifa->ifa_addr == nullptr) continue; + if (ifa->ifa_addr->sa_family == AF_INET && ifname == ifa->ifa_name) { - auto* addr = &((struct sockaddr_in*)ifa->ifa_addr)->sin_addr; - inet_ntop(AF_INET, addr, ip, sizeof(ip)); - break; + void* addr = &((struct sockaddr_in*)ifa->ifa_addr)->sin_addr; + if (inet_ntop(AF_INET, addr, ip, sizeof(ip))) + { + freeifaddrs(ifaddr); + return ip; + } } } + freeifaddrs(ifaddr); - return ip; + return ""; } // ========== 静态成员 ========== @@ -49,46 +62,102 @@ void RTMPManager::init() std::string RTMPManager::make_key(const std::string& name) { return name + "_main"; } -// ========== 创建 RTMP Pipeline ========== +// ========== 创建推流管线 ========== +// GstElement* RTMPManager::create_pipeline(const Camera& cam) +// { +// const int width = cam.width; +// const int height = cam.height; +// const int fps = cam.fps; +// const int bitrate = cam.bitrate; + +// // MediaMTX 中的 stream key +// const std::string stream_name = cam.name; + +// // RTMP 推送到 MediaMTX +// // mediamtx.yml 中 paths 会自动创建 +// const std::string rtmp_url = "rtmp://127.0.0.1:1935/" + stream_name; + +// /* +// * Pipeline 说明: +// * v4l2src -> mpph264enc -> h264parse -> flvmux -> rtmpsink +// * +// * - 不使用 tee(降低死锁概率) +// * - 不引入音频(MediaMTX 不强制要求) +// * - 纯视频、纯 RTMP、纯 TCP +// */ +// std::string pipeline_str = "v4l2src name=src device=" + cam.device + +// " ! video/x-raw,format=NV12,width=" + std::to_string(width) + +// ",height=" + std::to_string(height) + ",framerate=" + std::to_string(fps) + +// "/1 " +// " ! mpph264enc bps=" + +// std::to_string(bitrate) + " gop=" + std::to_string(fps) + +// " rc-mode=cbr " +// " ! h264parse name=parse " +// " ! flvmux streamable=true " +// " ! rtmpsink location=\"" + +// rtmp_url + +// "\" " +// " sync=false async=false"; + +// GError* error = nullptr; +// GstElement* pipeline = gst_parse_launch(pipeline_str.c_str(), &error); +// if (error) +// { +// LOG_ERROR(std::string("[RTMP] Pipeline creation failed: ") + error->message); +// g_error_free(error); +// return nullptr; +// } + +// return pipeline; +// } + GstElement* RTMPManager::create_pipeline(const Camera& cam) { - const int fps = cam.fps > 0 ? cam.fps : 30; - const int bitrate = cam.bitrate > 0 ? cam.bitrate : 3500000; + const int fps = 30; + const int bitrate = cam.bitrate; + const int gop = 15; // ⭐ 从 4 改为 15(非常重要) - const std::string rtmp_url = "rtmp://127.0.0.1:1935/live/" + cam.name; + const std::string stream_name = cam.name; + const std::string rtmp_url = "rtmp://127.0.0.1:1935/" + stream_name; - std::string pipeline_str = "v4l2src name=src device=" + cam.device + + std::string pipeline_str = "v4l2src device=" + cam.device + " io-mode=dmabuf " - "! video/x-raw,format=NV12,width=1280,height=960,framerate=" + - std::to_string(fps) + - "/1 " - "! videoscale method=nearest-neighbour " + + "! video/x-raw,format=NV12," + "width=1280,height=960," + "framerate=30/1 " + "! videocrop top=120 bottom=120 " - "! video/x-raw,width=" + + + "! videoscale " + "! video/x-raw," + "width=" + std::to_string(cam.width) + ",height=" + std::to_string(cam.height) + - ",pixel-aspect-ratio=1/1 " - "! queue max-size-buffers=6 max-size-time=0 max-size-bytes=0 " - "! mpph264enc rc-mode=cbr " + " " + + // ⭐ 缓冲拉开,避免瞬时饿死 + "! queue max-size-buffers=8 max-size-time=0 leaky=downstream " + + "! mpph264enc " + "rc-mode=cbr " "bps=" + std::to_string(bitrate) + " " - "max-bps=" + - std::to_string(bitrate) + - " " - "min-bps=" + - std::to_string(bitrate) + - " " "gop=" + - std::to_string(fps) + + std::to_string(gop) + " " - "header-mode=each-idr profile=main " - "! h264parse name=parse config-interval=1 " - "! video/x-h264,stream-format=avc,alignment=au " - "! flvmux streamable=true " - "! rtmpsink location=\"" + - rtmp_url + "\" sync=false async=false"; + "header-mode=each-idr " + "profile=main " - LOG_INFO("[PIPELINE] " + pipeline_str); + "! h264parse config-interval=1 " + "! video/x-h264,stream-format=avc,alignment=au " + + "! flvmux streamable=true " + + "! rtmpsink location=\"" + + rtmp_url + + "\" " + "sync=false async=false"; GError* error = nullptr; GstElement* pipeline = gst_parse_launch(pipeline_str.c_str(), &error); @@ -98,6 +167,7 @@ GstElement* RTMPManager::create_pipeline(const Camera& cam) g_error_free(error); return nullptr; } + return pipeline; } @@ -106,136 +176,208 @@ void RTMPManager::stream_loop(Camera cam, StreamContext* ctx) { const std::string key = make_key(cam.name); - constexpr int64_t START_TIMEOUT_MS = 12000; // 启动宽限 - constexpr int64_t NO_FRAME_TIMEOUT_MS = 15000; // 运行期宽限 + constexpr int64_t START_TIMEOUT_MS = 5000; // 启动阶段无帧 + constexpr int64_t NO_FRAME_TIMEOUT_MS = 10000; // 运行阶段突然无帧 ⇒ pipeline 卡死重启 while (ctx->thread_running) { - if (!device_exists(cam.device)) + // 1) 检查设备节点 + struct stat st{}; + if (stat(cam.device.c_str(), &st) != 0) { - LOG_WARN("[RTMP] " + key + " - device not found"); + std::lock_guard lk(ctx->status_mutex); + ctx->status.running = false; + ctx->status.last_error = "Device not found: " + cam.device; + LOG_WARN("[RTMP] " + key + " - " + ctx->status.last_error); std::this_thread::sleep_for(std::chrono::seconds(3)); continue; } + // 2) 创建 pipeline GstElement* pipeline = create_pipeline(cam); if (!pipeline) { + std::lock_guard lk(ctx->status_mutex); + ctx->status.running = false; + ctx->status.last_error = "Pipeline creation failed"; std::this_thread::sleep_for(std::chrono::seconds(3)); continue; } - gst_element_set_name(pipeline, key.c_str()); GstBus* bus = gst_element_get_bus(pipeline); + // 3) 帧探测:记录 last_frame_ms ctx->last_frame_ms.store(0, std::memory_order_relaxed); - - // ⭐ 帧探针挂在 h264parse src - if (GstElement* parse = gst_bin_get_by_name(GST_BIN(pipeline), "parse")) { - if (GstPad* pad = gst_element_get_static_pad(parse, "src")) + GstElement* src = gst_bin_get_by_name(GST_BIN(pipeline), "src"); + if (src) { - gst_pad_add_probe( - pad, GST_PAD_PROBE_TYPE_BUFFER, - [](GstPad*, GstPadProbeInfo*, gpointer data) -> GstPadProbeReturn - { - auto* ts = static_cast*>(data); - auto now = std::chrono::steady_clock::now().time_since_epoch(); - ts->store(std::chrono::duration_cast(now).count(), - std::memory_order_relaxed); - return GST_PAD_PROBE_OK; - }, - &ctx->last_frame_ms, nullptr); - gst_object_unref(pad); + GstPad* pad = gst_element_get_static_pad(src, "src"); + if (pad) + { + gst_pad_add_probe( + pad, GST_PAD_PROBE_TYPE_BUFFER, + [](GstPad*, GstPadProbeInfo*, gpointer data) -> GstPadProbeReturn + { + auto ts = static_cast*>(data); + auto now = std::chrono::steady_clock::now().time_since_epoch(); + auto ms = std::chrono::duration_cast(now).count(); + ts->store(ms, std::memory_order_relaxed); + return GST_PAD_PROBE_OK; + }, + &(ctx->last_frame_ms), nullptr); + gst_object_unref(pad); + } + gst_object_unref(src); } - gst_object_unref(parse); } + // 4) 启动 pipeline LOG_INFO("[RTMP] Starting stream: " + key); gst_element_set_state(pipeline, GST_STATE_PLAYING); - GstState state; - if (gst_element_get_state(pipeline, &state, nullptr, 5 * GST_SECOND) != GST_STATE_CHANGE_SUCCESS || + GstState state = GST_STATE_NULL, pending = GST_STATE_NULL; + if (gst_element_get_state(pipeline, &state, &pending, 5 * GST_SECOND) != GST_STATE_CHANGE_SUCCESS || state != GST_STATE_PLAYING) { - LOG_WARN("[RTMP] " + key + " failed to enter PLAYING"); + std::lock_guard lk(ctx->status_mutex); + ctx->status.running = false; + ctx->status.last_error = "Failed to enter PLAYING"; + LOG_WARN("[RTMP] " + key + " - " + ctx->status.last_error); + gst_element_set_state(pipeline, GST_STATE_NULL); - gst_object_unref(bus); + if (bus) gst_object_unref(bus); gst_object_unref(pipeline); - std::this_thread::sleep_for(std::chrono::seconds(3)); + + std::this_thread::sleep_for(std::chrono::milliseconds(RETRY_BASE_DELAY_MS)); continue; } + // pipeline 成功启动 + { + std::lock_guard lk(ctx->status_mutex); + ctx->status.running = true; + ctx->status.last_error.clear(); + } LOG_INFO("[RTMP] " + key + " confirmed PLAYING"); auto launch_tp = std::chrono::steady_clock::now(); - bool entered_running = false; + bool need_restart = false; + // 5) 主循环:检测停帧或 error while (ctx->thread_running) { auto now = std::chrono::steady_clock::now(); int64_t now_ms = std::chrono::duration_cast(now.time_since_epoch()).count(); int64_t last_ms = ctx->last_frame_ms.load(std::memory_order_relaxed); - if (!entered_running) + if (last_ms == 0) { - if (last_ms != 0) + auto startup_ms = std::chrono::duration_cast(now - launch_tp).count(); + if (startup_ms > START_TIMEOUT_MS) { - entered_running = true; - LOG_INFO("[RTMP] " + key + " entered RUNNING state"); - } - else if (std::chrono::duration_cast(now - launch_tp).count() > - START_TIMEOUT_MS) - { - // ⭐ 启动阶段允许慢,不重启 - LOG_WARN("[RTMP] " + key + " - no frames yet during startup (tolerated)"); - } - } - else - { - if (now_ms - last_ms > NO_FRAME_TIMEOUT_MS) - { - LOG_ERROR("[RTMP] " + key + " - frame stalled"); + std::lock_guard lk(ctx->status_mutex); + ctx->status.running = false; + ctx->status.last_error = "No frames detected at startup"; + LOG_ERROR("[RTMP] " + key + " - " + ctx->status.last_error); need_restart = true; break; } } - - if (GstMessage* msg = gst_bus_timed_pop_filtered(bus, 200 * GST_MSECOND, - (GstMessageType)(GST_MESSAGE_ERROR | GST_MESSAGE_EOS))) + else { - need_restart = true; - gst_message_unref(msg); - break; + int64_t idle_ms = now_ms - last_ms; + if (idle_ms > NO_FRAME_TIMEOUT_MS) + { + std::lock_guard lk(ctx->status_mutex); + ctx->status.running = false; + ctx->status.last_error = "Frame stalled for " + std::to_string(idle_ms) + " ms"; + LOG_ERROR("[RTMP] " + key + " - " + ctx->status.last_error); + need_restart = true; + break; + } + else + { + std::lock_guard lk(ctx->status_mutex); + ctx->status.running = true; + ctx->status.last_error.clear(); + } } + + // 错误消息检测 + GstMessage* msg = gst_bus_timed_pop_filtered(bus, 200 * GST_MSECOND, + (GstMessageType)(GST_MESSAGE_ERROR | GST_MESSAGE_EOS)); + + if (!msg) continue; + + if (GST_MESSAGE_TYPE(msg) == GST_MESSAGE_ERROR) + { + GError* err = nullptr; + gst_message_parse_error(msg, &err, nullptr); + std::lock_guard lk(ctx->status_mutex); + ctx->status.running = false; + ctx->status.last_error = err ? err->message : "GStreamer error"; + LOG_ERROR("[RTMP] " + key + " - " + ctx->status.last_error); + if (err) g_error_free(err); + need_restart = true; + } + else if (GST_MESSAGE_TYPE(msg) == GST_MESSAGE_EOS) + { + std::lock_guard lk(ctx->status_mutex); + ctx->status.running = false; + ctx->status.last_error = "EOS (End of stream)"; + LOG_WARN("[RTMP] " + key + " - " + ctx->status.last_error); + need_restart = true; + } + + gst_message_unref(msg); + if (need_restart) break; } + // 清理 & 重建 gst_element_set_state(pipeline, GST_STATE_NULL); - gst_object_unref(bus); + if (bus) gst_object_unref(bus); gst_object_unref(pipeline); - if (ctx->thread_running && need_restart) + if (ctx->thread_running) { LOG_WARN("[RTMP] Restarting " + key + " in 3s..."); - std::this_thread::sleep_for(std::chrono::seconds(3)); + std::this_thread::sleep_for(std::chrono::milliseconds(RETRY_BASE_DELAY_MS)); } } + { + std::lock_guard lk(ctx->status_mutex); + ctx->status.running = false; + } + LOG_INFO("[RTMP] Stream thread exited for " + key); } -// ========== 启停 ========== +// ========== 启停与状态 ========== void RTMPManager::start_all() { + LOG_INFO("[RTMP] Starting enabled record streams..."); std::lock_guard lock(streams_mutex); int delay_ms = 0; for (const auto& cam : g_app_config.cameras) { - if (!cam.enabled) continue; + // 跳过未启用摄像头 + if (!cam.enabled) + { + LOG_INFO("[RTMP] Skip disabled camera: " + cam.name); + continue; + } auto key = make_key(cam.name); + if (streams.find(key) != streams.end()) + { + LOG_INFO("[RTMP] Stream already running: " + key); + continue; + } + auto ctx = std::make_unique(); ctx->thread_running.store(true); @@ -260,9 +402,61 @@ void RTMPManager::stop_all() streams.clear(); } +bool RTMPManager::is_streaming(const std::string& cam_name) +{ + std::lock_guard lock(streams_mutex); + auto it = streams.find(make_key(cam_name)); + if (it == streams.end()) return false; + + auto& ctx = *(it->second); + std::lock_guard lk(ctx.status_mutex); + return ctx.status.running; +} + std::string RTMPManager::get_stream_url(const std::string& cam_name) { std::string ip = get_ip_address("enP2p33s0"); if (ip.empty()) ip = "127.0.0.1"; return "rtsp://" + ip + ":18554/" + cam_name; } +// ========== 汇总状态 ========== +std::vector RTMPManager::get_all_channels_status() +{ + std::vector result; + std::lock_guard lock(streams_mutex); + + for (size_t i = 0; i < g_app_config.cameras.size(); ++i) + { + const auto& cam = g_app_config.cameras[i]; + auto key = make_key(cam.name); + + ChannelInfo ch; + ch.loc = static_cast(i); + ch.url.clear(); + ch.running = false; + ch.reason = "Not started"; + + auto it = streams.find(key); + if (it != streams.end()) + { + auto& ctx = *(it->second); + + std::lock_guard lk(ctx.status_mutex); + auto& status = it->second->status; + + ch.running = status.running; + if (status.running) + { + ch.url = get_stream_url(cam.name); + ch.reason.clear(); + } + else + { + ch.reason = status.last_error.empty() ? "Unknown error" : status.last_error; + } + } + + result.push_back(ch); + } + return result; +}