// rtmp_manager.cpp (reliable integrated version) // - record:启动时由配置决定(静态开关) // - live:MQTT 动态开关(运行时可变) // - 关键增强:PLAYING 成功后再“放行 valve”(避免 cold-start mux/sink 饿死) // - 关键增强:rtmpsink 强制 IPv4(protocol=0),减少偶发握手卡住 // - 关键增强:更严谨的 cleanup + 保存/释放 valve 引用,避免泄漏/悬挂引用 #include "rtmp_manager.hpp" #include #include #include #include #include #include #include #include std::atomic RTMPManager::g_live_enabled{false}; // record 是“启动时决定一次”的静态开关:由 config.json 决定 bool RTMPManager::g_record_enabled = true; // ======================================================= // 工具函数 // ======================================================= static bool device_exists(const std::string& path) { struct stat st; return stat(path.c_str(), &st) == 0; } // 获取指定网卡 IPv4 std::string get_ip_address(const std::string& ifname) { struct ifaddrs* ifaddr = nullptr; if (getifaddrs(&ifaddr) != 0) return ""; char ip[INET_ADDRSTRLEN] = {0}; for (auto* ifa = ifaddr; ifa; ifa = ifa->ifa_next) { if (!ifa->ifa_addr) continue; if (ifa->ifa_addr->sa_family == AF_INET && ifname == ifa->ifa_name) { auto* addr = &reinterpret_cast(ifa->ifa_addr)->sin_addr; inet_ntop(AF_INET, addr, ip, sizeof(ip)); break; } } freeifaddrs(ifaddr); return ip; } // 安全释放 GstObject* static void safe_unref(GstObject*& obj) { if (obj) { gst_object_unref(obj); obj = nullptr; } } // 安全设置 GStreamer 属性:有则设,无则跳过 static void try_set_property(GstElement* elem, const char* prop, int value) { if (!elem) return; GObjectClass* klass = G_OBJECT_GET_CLASS(elem); if (klass && g_object_class_find_property(klass, prop)) { g_object_set(G_OBJECT(elem), prop, value, nullptr); LOG_INFO(std::string("[RTMP] Set ") + GST_ELEMENT_NAME(elem) + "." + prop + "=" + std::to_string(value)); } else { LOG_WARN(std::string("[RTMP] ") + GST_ELEMENT_NAME(elem) + " has NO property '" + prop + "' (skip)"); } } // ======================================================= // 静态成员 // ======================================================= std::unordered_map> RTMPManager::streams; std::mutex RTMPManager::streams_mutex; // ======================================================= // 初始化 // ======================================================= void RTMPManager::init() { gst_init(nullptr, nullptr); LOG_INFO("[RTMP] GStreamer initialized"); } std::string RTMPManager::make_key(const std::string& name) { return name + "_main"; } // ======================================================= // 创建推流管线(稳定版) // ======================================================= GstElement* RTMPManager::create_pipeline(const Camera& cam) { const std::string stream = cam.name + "_main"; const std::string app = "camera"; // 维持你当前 SRS 的 vhost query 方式,避免改动联动太多 const std::string live_rtmp = "rtmp://36.153.162.171:19435/" + app + "/" + stream + "?vhost=live"; const std::string record_rtmp = "rtmp://127.0.0.1:2935/" + app + "/" + stream + "?vhost=record"; // 关键:rtmpsink protocol=0 强制 IPv4(减少握手卡住/IPv6 干扰) // 关键:valve 默认 drop=true,真正放行在 PLAYING 成功后再做(避免 cold-start 饿死) std::string pipeline_str = "v4l2src device=" + cam.device + " io-mode=dmabuf " "! video/x-raw,format=NV12,width=1920,height=1080,framerate=" + std::to_string(cam.fps) + "/1 " "! videoscale " "! video/x-raw,width=" + std::to_string(cam.width) + ",height=" + std::to_string(cam.height) + " " "! queue max-size-buffers=2 leaky=downstream " "! mpph264enc rc-mode=cbr bps=" + std::to_string(cam.bitrate) + " gop=" + std::to_string(cam.fps) + " header-mode=each-idr profile=baseline " "! h264parse config-interval=1 " "! tee name=t " // ===== record branch ===== "t. ! queue max-size-buffers=8 leaky=downstream " "! valve name=record_valve drop=true " "! queue max-size-buffers=8 leaky=downstream " "! flvmux name=rec_mux streamable=true " "! rtmpsink name=rec_sink location=\"" + record_rtmp + "\" sync=false async=false " // ===== live branch ===== "t. ! queue max-size-buffers=8 leaky=downstream " "! valve name=live_valve drop=true " "! queue max-size-buffers=8 leaky=downstream " "! flvmux name=live_mux streamable=true " "! rtmpsink name=live_sink location=\"" + live_rtmp + "\" sync=false async=false "; GError* err = nullptr; GstElement* pipeline = gst_parse_launch(pipeline_str.c_str(), &err); if (err) { LOG_ERROR("[RTMP] Pipeline creation failed: " + std::string(err->message)); g_error_free(err); return nullptr; } return pipeline; } // ======================================================= // 主推流线程 // ======================================================= void RTMPManager::stream_loop(Camera cam, StreamContext* ctx) { const std::string key = make_key(cam.name); auto mark_error = [&](const std::string& err) { std::lock_guard lk(ctx->status_mutex); ctx->status.running = false; ctx->status.last_error = err; }; while (ctx->thread_running) { // 1) 设备存在性 if (!device_exists(cam.device)) { mark_error("Device not found: " + cam.device); LOG_WARN("[RTMP] " + key + " - device not found"); std::this_thread::sleep_for(std::chrono::seconds(3)); continue; } // 2) 创建 pipeline GstElement* pipeline = create_pipeline(cam); if (!pipeline) { mark_error("Pipeline creation failed"); std::this_thread::sleep_for(std::chrono::seconds(3)); continue; } // 2.1) 获取 valve(保存到 ctx,供 MQTT 动态控制 live,record 启动静态决定) GstElement* live_valve = gst_bin_get_by_name(GST_BIN(pipeline), "live_valve"); GstElement* record_valve = gst_bin_get_by_name(GST_BIN(pipeline), "record_valve"); // ===== 获取 rec_sink / live_sink ===== GstElement* rec_sink = gst_bin_get_by_name(GST_BIN(pipeline), "rec_sink"); GstElement* live_sink = gst_bin_get_by_name(GST_BIN(pipeline), "live_sink"); // ===== 尝试设置 protocol=0(兼容:有就设,无就跳过)===== try_set_property(rec_sink, "protocol", 0); try_set_property(live_sink, "protocol", 0); // ===== 引用使用完就释放(不交给 ctx 管理)===== if (rec_sink) gst_object_unref(rec_sink); if (live_sink) gst_object_unref(live_sink); if (!live_valve || !record_valve) { if (!live_valve) LOG_ERROR("[RTMP] " + key + " - live_valve not found"); if (!record_valve) LOG_ERROR("[RTMP] " + key + " - record_valve not found"); mark_error("valve not found"); if (live_valve) gst_object_unref(live_valve); if (record_valve) gst_object_unref(record_valve); gst_object_unref(pipeline); std::this_thread::sleep_for(std::chrono::seconds(3)); continue; } { std::lock_guard lk(ctx->status_mutex); // 替换旧引用(避免泄漏) if (ctx->live_valve) gst_object_unref(ctx->live_valve); if (ctx->record_valve) gst_object_unref(ctx->record_valve); ctx->live_valve = live_valve; // 引用交给 ctx 管理 ctx->record_valve = record_valve; // 引用交给 ctx 管理 } // 2.2) 启动前:统一先 drop=true(不放行) // 真正放行在 PLAYING 确认后进行(关键稳定性增强) g_object_set(G_OBJECT(live_valve), "drop", TRUE, nullptr); g_object_set(G_OBJECT(record_valve), "drop", TRUE, nullptr); GstBus* bus = gst_element_get_bus(pipeline); LOG_INFO("[RTMP] Starting stream: " + key); gst_element_set_state(pipeline, GST_STATE_PLAYING); // 3) 等待进入 PLAYING GstState state = GST_STATE_NULL; auto ret = gst_element_get_state(pipeline, &state, nullptr, 5 * GST_SECOND); if (!(ret == GST_STATE_CHANGE_SUCCESS && state == GST_STATE_PLAYING)) { mark_error("Failed to reach PLAYING"); LOG_ERROR("[RTMP] " + key + " failed to PLAY"); goto cleanup; } // 4) PLAYING 成功后,稍微延迟再按最终策略放行 valve(关键稳定性增强) std::this_thread::sleep_for(std::chrono::milliseconds(150)); // record:启动时决定一次(静态开关) g_object_set(G_OBJECT(record_valve), "drop", g_record_enabled ? FALSE : TRUE, nullptr); // live:运行期由 MQTT 控制(动态开关),此处按当前全局状态设置初值 g_object_set(G_OBJECT(live_valve), "drop", g_live_enabled.load() ? FALSE : TRUE, nullptr); { std::lock_guard lk(ctx->status_mutex); ctx->status.running = true; ctx->status.last_error.clear(); } LOG_INFO("[RTMP] " + key + " confirmed PLAYING"); // 5) 正常运行:监听 ERROR / EOS while (ctx->thread_running) { GstMessage* msg = gst_bus_timed_pop_filtered( bus, 300 * GST_MSECOND, static_cast(GST_MESSAGE_ERROR | GST_MESSAGE_EOS)); if (!msg) { // 心跳:能跑就认为 running std::lock_guard lk(ctx->status_mutex); ctx->status.running = true; continue; } if (GST_MESSAGE_TYPE(msg) == GST_MESSAGE_ERROR) { GError* err = nullptr; gst_message_parse_error(msg, &err, nullptr); mark_error(err ? err->message : "Unknown GStreamer error"); LOG_ERROR("[RTMP] " + key + " ERROR: " + (err ? err->message : "unknown")); if (err) g_error_free(err); gst_message_unref(msg); break; } if (GST_MESSAGE_TYPE(msg) == GST_MESSAGE_EOS) { mark_error("EOS"); LOG_WARN("[RTMP] " + key + " EOS"); gst_message_unref(msg); break; } gst_message_unref(msg); } cleanup: // 6) 清理:先释放 ctx 保存的 valve 引用(避免下一轮使用旧引用) { std::lock_guard lk(ctx->status_mutex); if (ctx->live_valve) { gst_object_unref(ctx->live_valve); ctx->live_valve = nullptr; } if (ctx->record_valve) { gst_object_unref(ctx->record_valve); ctx->record_valve = nullptr; } } // 7) pipeline 清理 gst_element_set_state(pipeline, GST_STATE_NULL); if (bus) gst_object_unref(bus); gst_object_unref(pipeline); if (ctx->thread_running) { LOG_WARN("[RTMP] Restarting " + key + " in 3s..."); std::this_thread::sleep_for(std::chrono::seconds(3)); } } { std::lock_guard lk(ctx->status_mutex); ctx->status.running = false; } LOG_INFO("[RTMP] Stream thread exited for " + key); } // ======================================================= // 启停管理 // ======================================================= void RTMPManager::start_all() { LOG_INFO("[RTMP] Starting enabled streams..."); std::lock_guard lock(streams_mutex); int delay_ms = 0; for (const auto& cam : g_app_config.cameras) { if (!cam.enabled) { LOG_INFO("[RTMP] Skip disabled camera: " + cam.name); continue; } const auto key = make_key(cam.name); if (streams.count(key)) continue; auto ctx = std::make_unique(); ctx->thread_running.store(true); ctx->thread = std::thread( [cam, ptr = ctx.get(), delay_ms]() { std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms)); stream_loop(cam, ptr); }); streams.emplace(key, std::move(ctx)); delay_ms += 200; } } void RTMPManager::stop_all() { std::lock_guard lock(streams_mutex); for (auto& kv : streams) { kv.second->thread_running.store(false); std::lock_guard lk(kv.second->status_mutex); if (kv.second->live_valve) { gst_object_unref(kv.second->live_valve); kv.second->live_valve = nullptr; } if (kv.second->record_valve) { gst_object_unref(kv.second->record_valve); kv.second->record_valve = nullptr; } } for (auto& kv : streams) if (kv.second->thread.joinable()) kv.second->thread.join(); streams.clear(); } // ======================================================= // 状态 & URL // ======================================================= bool RTMPManager::is_streaming(const std::string& cam_name) { std::lock_guard lock(streams_mutex); auto it = streams.find(make_key(cam_name)); if (it == streams.end()) return false; std::lock_guard lk(it->second->status_mutex); return it->second->status.running; } std::string RTMPManager::get_stream_url(const std::string& cam_name) { std::string ip = get_ip_address("enP2p33s0"); if (ip.empty()) ip = "192.168.4.194"; return "http://" + ip + ":11985/rtc/v1/whep/?app=camera&stream=" + cam_name + "_main&vhost=live"; } // ======================================================= // 汇总状态 // ======================================================= std::vector RTMPManager::get_all_channels_status() { std::vector result; std::lock_guard lock(streams_mutex); for (size_t i = 0; i < g_app_config.cameras.size(); ++i) { const auto& cam = g_app_config.cameras[i]; ChannelInfo ch{}; ch.loc = static_cast(i); if (!cam.enabled) { ch.running = false; ch.reason = "Disabled by config"; result.push_back(ch); continue; } auto it = streams.find(make_key(cam.name)); if (it != streams.end()) { std::lock_guard lk(it->second->status_mutex); ch.running = it->second->status.running; ch.reason = it->second->status.last_error; if (ch.running) ch.url = get_stream_url(cam.name); } else { ch.running = false; ch.reason = "Not started"; } result.push_back(ch); } return result; } // ======================================================= // live:MQTT 动态开关 // ======================================================= void RTMPManager::set_live_enabled_all(bool enable) { std::lock_guard lock(streams_mutex); g_live_enabled.store(enable); for (auto& kv : streams) { auto* ctx = kv.second.get(); std::lock_guard lk(ctx->status_mutex); if (!ctx->live_valve) continue; // enable=true → drop=false(放行 live) g_object_set(G_OBJECT(ctx->live_valve), "drop", enable ? FALSE : TRUE, nullptr); } LOG_INFO(std::string("[RTMP] Live ") + (enable ? "ENABLED" : "DISABLED") + " for all streams"); } void RTMPManager::set_live_enabled(const std::string& cam_name, bool enable) { std::lock_guard lock(streams_mutex); auto it = streams.find(make_key(cam_name)); if (it == streams.end()) return; auto* ctx = it->second.get(); std::lock_guard lk(ctx->status_mutex); if (!ctx->live_valve) return; g_object_set(G_OBJECT(ctx->live_valve), "drop", enable ? FALSE : TRUE, nullptr); LOG_INFO("[RTMP] Live " + std::string(enable ? "ENABLED" : "DISABLED") + " for " + cam_name); }