From d4531c99b0fdeba72fa909936f4a2f04e6b0981b Mon Sep 17 00:00:00 2001 From: cxh Date: Mon, 29 Dec 2025 14:24:40 +0800 Subject: [PATCH] 1 --- src/rtmp_manager.cpp | 330 +++++++++++++++++-------------------------- 1 file changed, 131 insertions(+), 199 deletions(-) diff --git a/src/rtmp_manager.cpp b/src/rtmp_manager.cpp index 06d6e30..ebd7841 100644 --- a/src/rtmp_manager.cpp +++ b/src/rtmp_manager.cpp @@ -3,7 +3,6 @@ #include #include -#include #include #include @@ -11,179 +10,146 @@ #include #include -// ========== 工具函数 ========== +// ======================================================= +// 工具函数 +// ======================================================= static bool device_exists(const std::string& path) { struct stat st; - return (stat(path.c_str(), &st) == 0); + return stat(path.c_str(), &st) == 0; } -// 动态获取指定网卡 IPv4 地址 -std::string get_ip_address(const std::string& ifname) +// 获取指定网卡 IPv4 +static std::string get_ip_address(const std::string& ifname) { - struct ifaddrs *ifaddr, *ifa; + struct ifaddrs* ifaddr = nullptr; + if (getifaddrs(&ifaddr) != 0) return ""; + char ip[INET_ADDRSTRLEN] = {0}; - - if (getifaddrs(&ifaddr) == -1) + for (auto* ifa = ifaddr; ifa; ifa = ifa->ifa_next) { - perror("getifaddrs"); - return ""; - } - - for (ifa = ifaddr; ifa != nullptr; ifa = ifa->ifa_next) - { - if (ifa->ifa_addr == nullptr) continue; - + if (!ifa->ifa_addr) continue; if (ifa->ifa_addr->sa_family == AF_INET && ifname == ifa->ifa_name) { - void* addr = &((struct sockaddr_in*)ifa->ifa_addr)->sin_addr; - if (inet_ntop(AF_INET, addr, ip, sizeof(ip))) - { - freeifaddrs(ifaddr); - return ip; - } + auto* addr = &reinterpret_cast(ifa->ifa_addr)->sin_addr; + inet_ntop(AF_INET, addr, ip, sizeof(ip)); + break; } } freeifaddrs(ifaddr); - return ""; + return ip; } -// ========== 静态成员 ========== +// ======================================================= +// 静态成员 +// ======================================================= std::unordered_map> RTMPManager::streams; std::mutex RTMPManager::streams_mutex; -// ========== 初始化 ========== +// ======================================================= +// 初始化 +// ======================================================= void RTMPManager::init() { gst_init(nullptr, nullptr); - LOG_INFO("[RTMP] GStreamer initialized."); + LOG_INFO("[RTMP] GStreamer initialized"); } std::string RTMPManager::make_key(const std::string& name) { return name + "_main"; } -// ========== 创建推流管线 ========== +// ======================================================= +// 创建推流管线(稳定版) +// ======================================================= GstElement* RTMPManager::create_pipeline(const Camera& cam) { - const int width = cam.width; - const int height = cam.height; - const int fps = cam.fps; - const int bitrate = cam.bitrate; - - const std::string stream_name = cam.name + "_main"; + const std::string stream = cam.name + "_main"; const std::string app = "camera"; - // 两个不同的 SRS 实例:live 用于远控,record 用于录像 - // const std::string live_rtmp = "rtmp://36.153.162.171:19435/" + app + "/" + stream_name + "?vhost=live"; - const std::string live_rtmp = "rtmp://36.153.162.171:19435/" + app + "/" + stream_name + "?vhost=live"; - const std::string record_rtmp = "rtmp://127.0.0.1:2935/" + app + "/" + stream_name + "?vhost=record"; - std::string pipeline_str = "v4l2src device=" + cam.device + + const std::string live_rtmp = "rtmp://36.153.162.171:19435/" + app + "/" + stream + "?vhost=live"; + + std::string pipeline_str = "v4l2src name=src device=" + cam.device + " io-mode=dmabuf " - "! video/x-raw,format=NV12,width=" + - std::to_string(width) + ",height=" + std::to_string(height) + - ",framerate=" + std::to_string(fps) + + "! video/x-raw,format=NV12," + "width=" + + std::to_string(cam.width) + ",height=" + std::to_string(cam.height) + + ",framerate=" + std::to_string(cam.fps) + "/1 " - "! queue max-size-buffers=2 leaky=downstream " + "! queue max-size-buffers=4 max-size-time=0 leaky=downstream " "! mpph264enc " "bps=" + - std::to_string(bitrate) + + std::to_string(cam.bitrate) + " " "gop=" + - std::to_string(fps) + + std::to_string(cam.fps) + " " "rc-mode=cbr " "header-mode=each-idr " - "! h264parse " + "! h264parse config-interval=1 " "! flvmux streamable=true " "! rtmpsink location=\"" + - live_rtmp + "\" sync=false async=false"; + live_rtmp + + "\" " + "sync=false async=false"; GError* error = nullptr; GstElement* pipeline = gst_parse_launch(pipeline_str.c_str(), &error); if (error) { - LOG_ERROR(std::string("[RTMP] Pipeline creation failed: ") + error->message); + LOG_ERROR("[RTMP] Pipeline creation failed: " + std::string(error->message)); g_error_free(error); return nullptr; } + return pipeline; } -// ========== 主推流循环 ========== +// ======================================================= +// 主推流线程 +// ======================================================= void RTMPManager::stream_loop(Camera cam, StreamContext* ctx) { const std::string key = make_key(cam.name); while (ctx->thread_running) { - // 1. 检查设备节点是否存在 - struct stat st{}; - if (stat(cam.device.c_str(), &st) != 0) + // 1. 设备存在性 + if (!device_exists(cam.device)) { - std::lock_guard lk(ctx->status_mutex); - ctx->status.running = false; - ctx->status.last_error = "Device not found: " + cam.device; - LOG_WARN("[RTMP] " + key + " - " + ctx->status.last_error); + { + std::lock_guard lk(ctx->status_mutex); + ctx->status.running = false; + ctx->status.last_error = "Device not found: " + cam.device; + } + LOG_WARN("[RTMP] " + key + " - device not found"); std::this_thread::sleep_for(std::chrono::seconds(3)); continue; } - // 2. 创建 GStreamer 管线 + // 2. 创建 pipeline GstElement* pipeline = create_pipeline(cam); if (!pipeline) { - std::lock_guard lk(ctx->status_mutex); - ctx->status.running = false; - ctx->status.last_error = "Pipeline creation failed"; + { + std::lock_guard lk(ctx->status_mutex); + ctx->status.running = false; + ctx->status.last_error = "Pipeline creation failed"; + } std::this_thread::sleep_for(std::chrono::seconds(3)); continue; } - gst_element_set_name(pipeline, key.c_str()); + GstBus* bus = gst_element_get_bus(pipeline); - // 3. 在 v4l2src 的 src pad 上挂探测 probe - bool got_frame = false; - { - GstElement* src = gst_bin_get_by_name(GST_BIN(pipeline), "src"); - if (src) - { - GstPad* pad = gst_element_get_static_pad(src, "src"); - if (pad) - { - gst_pad_add_probe( - pad, GST_PAD_PROBE_TYPE_BUFFER, - [](GstPad*, GstPadProbeInfo*, gpointer user_data) -> GstPadProbeReturn - { - *static_cast(user_data) = true; - return GST_PAD_PROBE_OK; - }, - &got_frame, nullptr); - gst_object_unref(pad); - } - else - { - LOG_WARN("[RTMP] " + key + " - src has no 'src' pad?"); - } - gst_object_unref(src); - } - else - { - LOG_WARN("[RTMP] " + key + " - cannot find element 'src' for pad-probe"); - } - } - - // 4. 启动播放 LOG_INFO("[RTMP] Starting stream: " + key); gst_element_set_state(pipeline, GST_STATE_PLAYING); - // 等待进入 PLAYING 状态(最长 5s) - GstState state = GST_STATE_NULL, pending = GST_STATE_NULL; - bool confirmed_running = false; - if (gst_element_get_state(pipeline, &state, &pending, 5 * GST_SECOND) == GST_STATE_CHANGE_SUCCESS && + // 3. 等待进入 PLAYING + GstState state = GST_STATE_NULL; + if (gst_element_get_state(pipeline, &state, nullptr, 5 * GST_SECOND) == GST_STATE_CHANGE_SUCCESS && state == GST_STATE_PLAYING) { std::lock_guard lk(ctx->status_mutex); - confirmed_running = true; ctx->status.running = true; ctx->status.last_error.clear(); LOG_INFO("[RTMP] " + key + " confirmed PLAYING"); @@ -192,95 +158,66 @@ void RTMPManager::stream_loop(Camera cam, StreamContext* ctx) { std::lock_guard lk(ctx->status_mutex); ctx->status.running = false; - ctx->status.last_error = "Pipeline failed to confirm PLAYING"; - LOG_WARN("[RTMP] " + key + " - " + ctx->status.last_error); + ctx->status.last_error = "Failed to reach PLAYING"; + LOG_ERROR("[RTMP] " + key + " failed to PLAY"); + goto cleanup; } - if (!confirmed_running) - { - gst_element_set_state(pipeline, GST_STATE_NULL); - if (bus) gst_object_unref(bus); - gst_object_unref(pipeline); - std::this_thread::sleep_for(std::chrono::milliseconds(RETRY_BASE_DELAY_MS)); - continue; - } - - // 5. 运行阶段:监测帧和错误 - const auto start_t = std::chrono::steady_clock::now(); - bool need_restart = false; - + // 4. 正常运行:只监听 ERROR / EOS while (ctx->thread_running) { - // 检查是否收到帧(前 5s 内) - if (!got_frame) - { - auto elapsed = - std::chrono::duration_cast(std::chrono::steady_clock::now() - start_t) - .count(); - if (elapsed > 5) - { - std::lock_guard lk(ctx->status_mutex); - ctx->status.running = false; - ctx->status.last_error = "No frames detected (no video signal)"; - LOG_ERROR("[RTMP] " + key + " - " + ctx->status.last_error); - need_restart = true; - break; - } - } - else + GstMessage* msg = gst_bus_timed_pop_filtered( + bus, 300 * GST_MSECOND, static_cast(GST_MESSAGE_ERROR | GST_MESSAGE_EOS)); + + if (!msg) { + // 心跳:pipeline 没死就认为在跑 std::lock_guard lk(ctx->status_mutex); ctx->status.running = true; - ctx->status.last_error.clear(); + continue; } - // 等待错误或 EOS 消息 - GstMessage* msg = gst_bus_timed_pop_filtered(bus, 200 * GST_MSECOND, - (GstMessageType)(GST_MESSAGE_ERROR | GST_MESSAGE_EOS)); - - if (!msg) continue; - - switch (GST_MESSAGE_TYPE(msg)) + if (GST_MESSAGE_TYPE(msg) == GST_MESSAGE_ERROR) { - case GST_MESSAGE_ERROR: - { - GError* err = nullptr; - gst_message_parse_error(msg, &err, nullptr); - std::lock_guard lk(ctx->status_mutex); - ctx->status.running = false; - ctx->status.last_error = err ? err->message : "GStreamer error"; - LOG_ERROR("[RTMP] " + key + " stream error: " + ctx->status.last_error); - if (err) g_error_free(err); - need_restart = true; - break; - } - case GST_MESSAGE_EOS: - { - std::lock_guard lk(ctx->status_mutex); - ctx->status.running = false; - ctx->status.last_error = "End of stream (EOS)"; - LOG_WARN("[RTMP] " + key + " reached EOS"); - need_restart = true; - break; - } - default: - break; - } - gst_message_unref(msg); + GError* err = nullptr; + gst_message_parse_error(msg, &err, nullptr); - if (need_restart) break; + { + std::lock_guard lk(ctx->status_mutex); + ctx->status.running = false; + ctx->status.last_error = err ? err->message : "Unknown GStreamer error"; + } + + LOG_ERROR("[RTMP] " + key + " ERROR: " + (err ? err->message : "unknown")); + if (err) g_error_free(err); + gst_message_unref(msg); + break; + } + + if (GST_MESSAGE_TYPE(msg) == GST_MESSAGE_EOS) + { + { + std::lock_guard lk(ctx->status_mutex); + ctx->status.running = false; + ctx->status.last_error = "EOS"; + } + LOG_WARN("[RTMP] " + key + " EOS"); + gst_message_unref(msg); + break; + } + + gst_message_unref(msg); } - // 6. 收尾清理 + cleanup: gst_element_set_state(pipeline, GST_STATE_NULL); if (bus) gst_object_unref(bus); gst_object_unref(pipeline); - // 7. 若仍在运行状态,准备重启 if (ctx->thread_running) { LOG_WARN("[RTMP] Restarting " + key + " in 3s..."); - std::this_thread::sleep_for(std::chrono::milliseconds(RETRY_BASE_DELAY_MS)); + std::this_thread::sleep_for(std::chrono::seconds(3)); } } @@ -291,10 +228,12 @@ void RTMPManager::stream_loop(Camera cam, StreamContext* ctx) LOG_INFO("[RTMP] Stream thread exited for " + key); } -// ========== 启停与状态 ========== +// ======================================================= +// 启停管理 +// ======================================================= void RTMPManager::start_all() { - LOG_INFO("[RTMP] Starting enabled record streams..."); + LOG_INFO("[RTMP] Starting enabled streams..."); std::lock_guard lock(streams_mutex); int delay_ms = 0; @@ -306,12 +245,8 @@ void RTMPManager::start_all() continue; } - auto key = make_key(cam.name); - if (streams.find(key) != streams.end()) - { - LOG_INFO("[RTMP] Stream already running: " + key); - continue; - } + const auto key = make_key(cam.name); + if (streams.count(key)) continue; auto ctx = std::make_unique(); ctx->thread_running.store(true); @@ -332,29 +267,37 @@ void RTMPManager::stop_all() { std::lock_guard lock(streams_mutex); for (auto& kv : streams) kv.second->thread_running.store(false); + for (auto& kv : streams) if (kv.second->thread.joinable()) kv.second->thread.join(); + streams.clear(); } +// ======================================================= +// 状态 & URL +// ======================================================= bool RTMPManager::is_streaming(const std::string& cam_name) { std::lock_guard lock(streams_mutex); auto it = streams.find(make_key(cam_name)); if (it == streams.end()) return false; - auto& ctx = *(it->second); - std::lock_guard lk(ctx.status_mutex); - return ctx.status.running; + std::lock_guard lk(it->second->status_mutex); + return it->second->status.running; } std::string RTMPManager::get_stream_url(const std::string& cam_name) { std::string ip = get_ip_address("enP2p33s0"); if (ip.empty()) ip = "127.0.0.1"; - return "http://" + ip + ":1985/rtc/v1/whep/?app=camera&stream=" + cam_name + "_main&vhost=live"; + + return "http://" + ip + ":11985/rtc/v1/whep/?app=camera&stream=" + cam_name + "_main&vhost=live"; } -// ========== 汇总状态 ========== + +// ======================================================= +// 汇总状态 +// ======================================================= std::vector RTMPManager::get_all_channels_status() { std::vector result; @@ -363,40 +306,29 @@ std::vector RTMPManager::get_all_channels_status() for (size_t i = 0; i < g_app_config.cameras.size(); ++i) { const auto& cam = g_app_config.cameras[i]; - auto key = make_key(cam.name); - - ChannelInfo ch; + ChannelInfo ch{}; ch.loc = static_cast(i); - ch.url.clear(); - ch.running = false; if (!cam.enabled) { + ch.running = false; ch.reason = "Disabled by config"; result.push_back(ch); continue; } - auto it = streams.find(key); + auto it = streams.find(make_key(cam.name)); if (it != streams.end()) { - auto& ctx = *(it->second); - std::lock_guard lk(ctx.status_mutex); - - ch.running = ctx.status.running; - if (ctx.status.running) - { - ch.url = get_stream_url(cam.name); - ch.reason.clear(); - } - else - { - ch.reason = ctx.status.last_error.empty() ? "Stopped" : ctx.status.last_error; - } + std::lock_guard lk(it->second->status_mutex); + ch.running = it->second->status.running; + ch.reason = it->second->status.last_error; + if (ch.running) ch.url = get_stream_url(cam.name); } else { - ch.reason = "Enabled but not started"; + ch.running = false; + ch.reason = "Not started"; } result.push_back(ch);