diff --git a/include/rtmp_manager.hpp b/include/rtmp_manager.hpp index 835e282..499dfe5 100644 --- a/include/rtmp_manager.hpp +++ b/include/rtmp_manager.hpp @@ -43,6 +43,10 @@ class RTMPManager // 新增:获取所有通道详细状态 static std::vector get_all_channels_status(); + // 控制 live 分支开关(MQTT 用) + static void set_live_enabled_all(bool enable); + static void set_live_enabled(const std::string& cam_name, bool enable); + private: struct StreamContext { @@ -51,6 +55,8 @@ class RTMPManager StreamStatus status; std::mutex status_mutex; + + GstElement* live_valve{nullptr}; }; static void stream_loop(Camera cam, StreamContext* ctx); @@ -59,6 +65,7 @@ class RTMPManager static std::unordered_map> streams; static std::mutex streams_mutex; + static std::atomic g_live_enabled{false}; static constexpr int RETRY_BASE_DELAY_MS = 3000; }; diff --git a/src/main.cpp b/src/main.cpp index d0608e4..5ba080a 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -38,7 +38,7 @@ int main() LOG_INFO("[MAIN] ===== Vehicle Video Service Starting ====="); // 创建全局 RecordManager 实例 - // g_record_manager = std::make_shared("/home/aiec/srs/conf/kun_record.conf"); + g_record_manager = std::make_shared("/home/neardi/video_sweeper/srs/conf/kun_record.conf"); try { @@ -59,8 +59,8 @@ int main() RTMPManager::init(); // ---------- 自动推流(8 路录像守护) ---------- - // LOG_INFO("[MAIN] Starting all record streams..."); - // RTMPManager::start_all(); + LOG_INFO("[MAIN] Starting all record streams..."); + RTMPManager::start_all(); // 启动 MQTT 线程 std::thread mqtt_thread( diff --git a/src/mqtt_client_wrapper.cpp b/src/mqtt_client_wrapper.cpp index 4948ecd..0a9aed2 100644 --- a/src/mqtt_client_wrapper.cpp +++ b/src/mqtt_client_wrapper.cpp @@ -258,17 +258,15 @@ static void handle_vehicle_ctrl_request(const nlohmann::json& req) if (cmd == "startCtrl") { - LOG_INFO("[vehicle_ctrl] startCtrl → start all RTMP"); - RTMPManager::start_all(); - g_streaming.store(true); + LOG_INFO("[vehicle_ctrl] startCtrl → ENABLE live"); + RTMPManager::set_live_enabled_all(true); return; } if (cmd == "stopCtrl") { - LOG_INFO("[vehicle_ctrl] stopCtrl → stop all RTMP"); - RTMPManager::stop_all(); - g_streaming.store(false); + LOG_INFO("[vehicle_ctrl] stopCtrl → DISABLE live"); + RTMPManager::set_live_enabled_all(false); return; } diff --git a/src/rtmp_manager.cpp b/src/rtmp_manager.cpp index 751a450..c58f96b 100644 --- a/src/rtmp_manager.cpp +++ b/src/rtmp_manager.cpp @@ -8,8 +8,11 @@ #include #include +#include #include +std::atomic RTMPManager::g_live_enabled{false}; + // ======================================================= // 工具函数 // ======================================================= @@ -66,47 +69,52 @@ GstElement* RTMPManager::create_pipeline(const Camera& cam) const std::string stream = cam.name + "_main"; const std::string app = "camera"; + // live → 外网 const std::string live_rtmp = "rtmp://36.153.162.171:19435/" + app + "/" + stream + "?vhost=live"; + // record → 本地 SRS(录像) + const std::string record_rtmp = "rtmp://127.0.0.1:2935/" + app + "/" + stream + "?vhost=record"; + std::string pipeline_str = "v4l2src name=src device=" + cam.device + " io-mode=dmabuf " - // ⭐ 相机真实输出:写死 1080p - "! video/x-raw,format=NV12," - "width=1920,height=1080," + "! video/x-raw,format=NV12,width=1920,height=1080," "framerate=" + std::to_string(cam.fps) + "/1 " - // ⭐ 关键:在编码前缩放到“推流分辨率” "! videoscale " - "! video/x-raw," - "width=" + + "! video/x-raw,width=" + std::to_string(cam.width) + ",height=" + std::to_string(cam.height) + " " - // 🔻 低延时队列 "! queue max-size-buffers=2 max-size-time=0 leaky=downstream " - // 🔻 编码器:SIM 卡友好 - "! mpph264enc " - "rc-mode=cbr " + "! mpph264enc rc-mode=cbr " "bps=" + std::to_string(cam.bitrate) + " " "gop=" + std::to_string(cam.fps) + " " - "header-mode=each-idr " - "profile=baseline " + "header-mode=each-idr profile=baseline " "! h264parse config-interval=1 " - "! identity sync=false single-segment=true " + "! tee name=t " + + // ===== record:永远开 ===== + "t. ! queue max-size-buffers=5 leaky=downstream " "! flvmux streamable=true " "! rtmpsink location=\"" + - live_rtmp + - "\" " - "sync=false async=false"; + record_rtmp + + "\" sync=false async=false " + + // ===== live:valve 控制 ===== + "t. ! queue max-size-buffers=5 leaky=downstream " + "! valve name=live_valve drop=true " + "! flvmux streamable=true " + "! rtmpsink location=\"" + + live_rtmp + "\" sync=false async=false"; GError* error = nullptr; GstElement* pipeline = gst_parse_launch(pipeline_str.c_str(), &error); @@ -155,6 +163,30 @@ void RTMPManager::stream_loop(Camera cam, StreamContext* ctx) continue; } + // 2.1 获取 live_valve(用于 MQTT 控制) + GstElement* live_valve = gst_bin_get_by_name(GST_BIN(pipeline), "live_valve"); + if (!live_valve) + { + { + std::lock_guard lk(ctx->status_mutex); + ctx->status.running = false; + ctx->status.last_error = "live_valve not found"; + } + LOG_ERROR("[RTMP] " + key + " - live_valve not found"); + gst_object_unref(pipeline); + std::this_thread::sleep_for(std::chrono::seconds(3)); + continue; + } + + // 保存到 ctx + { + std::lock_guard lk(ctx->status_mutex); + if (ctx->live_valve) gst_object_unref(ctx->live_valve); + ctx->live_valve = live_valve; // ⚠️ live_valve 引用交给 ctx 管理 + } + + g_object_set(G_OBJECT(ctx->live_valve), "drop", g_live_enabled.load() ? FALSE : TRUE, nullptr); + GstBus* bus = gst_element_get_bus(pipeline); LOG_INFO("[RTMP] Starting stream: " + key); @@ -226,6 +258,15 @@ void RTMPManager::stream_loop(Camera cam, StreamContext* ctx) } cleanup: + { + std::lock_guard lk(ctx->status_mutex); + if (ctx->live_valve) + { + gst_object_unref(ctx->live_valve); + ctx->live_valve = nullptr; + } + } + gst_element_set_state(pipeline, GST_STATE_NULL); if (bus) gst_object_unref(bus); gst_object_unref(pipeline); @@ -282,7 +323,18 @@ void RTMPManager::start_all() void RTMPManager::stop_all() { std::lock_guard lock(streams_mutex); - for (auto& kv : streams) kv.second->thread_running.store(false); + + for (auto& kv : streams) + { + kv.second->thread_running.store(false); + + std::lock_guard lk(kv.second->status_mutex); + if (kv.second->live_valve) + { + gst_object_unref(kv.second->live_valve); + kv.second->live_valve = nullptr; + } + } for (auto& kv : streams) if (kv.second->thread.joinable()) kv.second->thread.join(); @@ -352,3 +404,39 @@ std::vector RTMPManager::get_all_channels_status() return result; } + +void RTMPManager::set_live_enabled_all(bool enable) +{ + std::lock_guard lock(streams_mutex); + + g_live_enabled.store(enable); + + for (auto& kv : streams) + { + auto* ctx = kv.second.get(); + std::lock_guard lk(ctx->status_mutex); + + if (!ctx->live_valve) continue; + + // enable=true → drop=false(放行 live) + g_object_set(G_OBJECT(ctx->live_valve), "drop", enable ? FALSE : TRUE, nullptr); + } + + LOG_INFO(std::string("[RTMP] Live ") + (enable ? "ENABLED" : "DISABLED") + " for all streams"); +} + +void RTMPManager::set_live_enabled(const std::string& cam_name, bool enable) +{ + std::lock_guard lock(streams_mutex); + auto it = streams.find(make_key(cam_name)); + if (it == streams.end()) return; + + auto* ctx = it->second.get(); + std::lock_guard lk(ctx->status_mutex); + + if (!ctx->live_valve) return; + + g_object_set(G_OBJECT(ctx->live_valve), "drop", enable ? FALSE : TRUE, nullptr); + + LOG_INFO("[RTMP] Live " + std::string(enable ? "ENABLED" : "DISABLED") + " for " + cam_name); +}