// rtmp_manager.cpp #include "rtmp_manager.hpp" #include #include #include #include #include #include // ========== 工具函数 ========== static bool device_exists(const std::string& path) { struct stat st; return (stat(path.c_str(), &st) == 0); } std::string get_ip_address(const std::string& ifname) { struct ifaddrs* ifaddr = nullptr; if (getifaddrs(&ifaddr) == -1) return ""; char ip[INET_ADDRSTRLEN] = {0}; for (auto* ifa = ifaddr; ifa; ifa = ifa->ifa_next) { if (!ifa->ifa_addr) continue; if (ifa->ifa_addr->sa_family == AF_INET && ifname == ifa->ifa_name) { auto* addr = &((struct sockaddr_in*)ifa->ifa_addr)->sin_addr; inet_ntop(AF_INET, addr, ip, sizeof(ip)); break; } } freeifaddrs(ifaddr); return ip; } // ========== 静态成员 ========== std::unordered_map> RTMPManager::streams; std::mutex RTMPManager::streams_mutex; // ========== 初始化 ========== void RTMPManager::init() { gst_init(nullptr, nullptr); LOG_INFO("[RTMP] GStreamer initialized."); } std::string RTMPManager::make_key(const std::string& name) { return name + "_main"; } // ========== 创建 RTMP Pipeline ========== GstElement* RTMPManager::create_pipeline(const Camera& cam) { const int gop = 15; const std::string rtmp_url = "rtmp://127.0.0.1:1935/" + cam.name; std::string pipeline_str = "v4l2src device=" + cam.device + " io-mode=dmabuf " // 原始输入 "! video/x-raw,format=NV12,width=1280,height=960,framerate=30/1 " // 裁剪 + 缩放 "! videocrop top=120 bottom=120 " "! videoscale " // ⭐ 关键:在 RAW 阶段强制重打时钟(修复“看着一顿”) "! videorate " "! video/x-raw,framerate=30/1 " // 输出分辨率 "! video/x-raw,width=" + std::to_string(cam.width) + ",height=" + std::to_string(cam.height) + " " // 缓冲 "! queue max-size-buffers=8 max-size-time=0 leaky=downstream " // 编码 "! mpph264enc rc-mode=cbr " "bps=" + std::to_string(cam.bitrate) + " " "gop=" + std::to_string(gop) + " " "header-mode=each-idr profile=main " // H264 处理 "! h264parse name=parse config-interval=1 " "! video/x-h264,stream-format=avc,alignment=au " // RTMP "! flvmux streamable=true " "! rtmpsink location=\"" + rtmp_url + "\" sync=false async=false"; GError* error = nullptr; GstElement* pipeline = gst_parse_launch(pipeline_str.c_str(), &error); if (error) { LOG_ERROR(std::string("[RTMP] Pipeline creation failed: ") + error->message); g_error_free(error); return nullptr; } return pipeline; } // ========== 主推流循环 ========== void RTMPManager::stream_loop(Camera cam, StreamContext* ctx) { const std::string key = make_key(cam.name); constexpr int64_t START_TIMEOUT_MS = 12000; // ⭐ 启动宽限期 constexpr int64_t NO_FRAME_TIMEOUT_MS = 10000; while (ctx->thread_running) { if (!device_exists(cam.device)) { LOG_WARN("[RTMP] " + key + " - device not found"); std::this_thread::sleep_for(std::chrono::seconds(3)); continue; } GstElement* pipeline = create_pipeline(cam); if (!pipeline) { std::this_thread::sleep_for(std::chrono::seconds(3)); continue; } gst_element_set_name(pipeline, key.c_str()); GstBus* bus = gst_element_get_bus(pipeline); ctx->last_frame_ms.store(0, std::memory_order_relaxed); // ⭐ 帧探测挂在 h264parse if (GstElement* parse = gst_bin_get_by_name(GST_BIN(pipeline), "parse")) { if (GstPad* pad = gst_element_get_static_pad(parse, "src")) { gst_pad_add_probe( pad, GST_PAD_PROBE_TYPE_BUFFER, [](GstPad*, GstPadProbeInfo*, gpointer data) -> GstPadProbeReturn { auto* ts = static_cast*>(data); auto now = std::chrono::steady_clock::now().time_since_epoch(); ts->store(std::chrono::duration_cast(now).count(), std::memory_order_relaxed); return GST_PAD_PROBE_OK; }, &ctx->last_frame_ms, nullptr); gst_object_unref(pad); } gst_object_unref(parse); } LOG_INFO("[RTMP] Starting stream: " + key); gst_element_set_state(pipeline, GST_STATE_PLAYING); GstState state; if (gst_element_get_state(pipeline, &state, nullptr, 5 * GST_SECOND) != GST_STATE_CHANGE_SUCCESS || state != GST_STATE_PLAYING) { LOG_WARN("[RTMP] " + key + " failed to enter PLAYING"); gst_element_set_state(pipeline, GST_STATE_NULL); gst_object_unref(bus); gst_object_unref(pipeline); std::this_thread::sleep_for(std::chrono::seconds(3)); continue; } LOG_INFO("[RTMP] " + key + " confirmed PLAYING"); auto launch_tp = std::chrono::steady_clock::now(); bool entered_running = false; bool need_restart = false; while (ctx->thread_running) { auto now = std::chrono::steady_clock::now(); int64_t now_ms = std::chrono::duration_cast(now.time_since_epoch()).count(); int64_t last_ms = ctx->last_frame_ms.load(std::memory_order_relaxed); if (!entered_running) { if (last_ms != 0) { entered_running = true; LOG_INFO("[RTMP] " + key + " entered RUNNING state"); } else if (std::chrono::duration_cast(now - launch_tp).count() > START_TIMEOUT_MS) { LOG_ERROR("[RTMP] " + key + " - no frames during startup"); need_restart = true; break; } } else { if (now_ms - last_ms > NO_FRAME_TIMEOUT_MS) { LOG_ERROR("[RTMP] " + key + " - frame stalled"); need_restart = true; break; } } if (GstMessage* msg = gst_bus_timed_pop_filtered(bus, 200 * GST_MSECOND, (GstMessageType)(GST_MESSAGE_ERROR | GST_MESSAGE_EOS))) { need_restart = true; gst_message_unref(msg); break; } } gst_element_set_state(pipeline, GST_STATE_NULL); gst_object_unref(bus); gst_object_unref(pipeline); if (ctx->thread_running && need_restart) { LOG_WARN("[RTMP] Restarting " + key + " in 3s..."); std::this_thread::sleep_for(std::chrono::seconds(3)); } } LOG_INFO("[RTMP] Stream thread exited for " + key); } // ========== 启停 ========== void RTMPManager::start_all() { std::lock_guard lock(streams_mutex); int delay_ms = 0; for (const auto& cam : g_app_config.cameras) { if (!cam.enabled) continue; auto key = make_key(cam.name); auto ctx = std::make_unique(); ctx->thread_running.store(true); ctx->thread = std::thread( [cam, ptr = ctx.get(), delay_ms]() { std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms)); stream_loop(cam, ptr); }); streams.emplace(key, std::move(ctx)); delay_ms += 200; } } void RTMPManager::stop_all() { std::lock_guard lock(streams_mutex); for (auto& kv : streams) kv.second->thread_running.store(false); for (auto& kv : streams) if (kv.second->thread.joinable()) kv.second->thread.join(); streams.clear(); } std::string RTMPManager::get_stream_url(const std::string& cam_name) { std::string ip = get_ip_address("enP2p33s0"); if (ip.empty()) ip = "127.0.0.1"; return "rtsp://" + ip + ":18554/" + cam_name; }