开启录像功能

This commit is contained in:
cxh 2026-01-20 13:47:30 +08:00
parent 9c639335ea
commit dc89d480c6
4 changed files with 119 additions and 26 deletions

View File

@ -43,6 +43,10 @@ class RTMPManager
// 新增:获取所有通道详细状态 // 新增:获取所有通道详细状态
static std::vector<ChannelInfo> get_all_channels_status(); static std::vector<ChannelInfo> get_all_channels_status();
// 控制 live 分支开关MQTT 用)
static void set_live_enabled_all(bool enable);
static void set_live_enabled(const std::string& cam_name, bool enable);
private: private:
struct StreamContext struct StreamContext
{ {
@ -51,6 +55,8 @@ class RTMPManager
StreamStatus status; StreamStatus status;
std::mutex status_mutex; std::mutex status_mutex;
GstElement* live_valve{nullptr};
}; };
static void stream_loop(Camera cam, StreamContext* ctx); static void stream_loop(Camera cam, StreamContext* ctx);
@ -59,6 +65,7 @@ class RTMPManager
static std::unordered_map<std::string, std::unique_ptr<StreamContext>> streams; static std::unordered_map<std::string, std::unique_ptr<StreamContext>> streams;
static std::mutex streams_mutex; static std::mutex streams_mutex;
static std::atomic<bool> g_live_enabled{false};
static constexpr int RETRY_BASE_DELAY_MS = 3000; static constexpr int RETRY_BASE_DELAY_MS = 3000;
}; };

View File

@ -38,7 +38,7 @@ int main()
LOG_INFO("[MAIN] ===== Vehicle Video Service Starting ====="); LOG_INFO("[MAIN] ===== Vehicle Video Service Starting =====");
// 创建全局 RecordManager 实例 // 创建全局 RecordManager 实例
// g_record_manager = std::make_shared<RecordManager>("/home/aiec/srs/conf/kun_record.conf"); g_record_manager = std::make_shared<RecordManager>("/home/neardi/video_sweeper/srs/conf/kun_record.conf");
try try
{ {
@ -59,8 +59,8 @@ int main()
RTMPManager::init(); RTMPManager::init();
// ---------- 自动推流8 路录像守护) ---------- // ---------- 自动推流8 路录像守护) ----------
// LOG_INFO("[MAIN] Starting all record streams..."); LOG_INFO("[MAIN] Starting all record streams...");
// RTMPManager::start_all(); RTMPManager::start_all();
// 启动 MQTT 线程 // 启动 MQTT 线程
std::thread mqtt_thread( std::thread mqtt_thread(

View File

@ -258,17 +258,15 @@ static void handle_vehicle_ctrl_request(const nlohmann::json& req)
if (cmd == "startCtrl") if (cmd == "startCtrl")
{ {
LOG_INFO("[vehicle_ctrl] startCtrl → start all RTMP"); LOG_INFO("[vehicle_ctrl] startCtrl → ENABLE live");
RTMPManager::start_all(); RTMPManager::set_live_enabled_all(true);
g_streaming.store(true);
return; return;
} }
if (cmd == "stopCtrl") if (cmd == "stopCtrl")
{ {
LOG_INFO("[vehicle_ctrl] stopCtrl → stop all RTMP"); LOG_INFO("[vehicle_ctrl] stopCtrl → DISABLE live");
RTMPManager::stop_all(); RTMPManager::set_live_enabled_all(false);
g_streaming.store(false);
return; return;
} }

View File

@ -8,8 +8,11 @@
#include <chrono> #include <chrono>
#include <cstring> #include <cstring>
#include <memory>
#include <thread> #include <thread>
std::atomic<bool> RTMPManager::g_live_enabled{false};
// ======================================================= // =======================================================
// 工具函数 // 工具函数
// ======================================================= // =======================================================
@ -66,47 +69,52 @@ GstElement* RTMPManager::create_pipeline(const Camera& cam)
const std::string stream = cam.name + "_main"; const std::string stream = cam.name + "_main";
const std::string app = "camera"; const std::string app = "camera";
// live → 外网
const std::string live_rtmp = "rtmp://36.153.162.171:19435/" + app + "/" + stream + "?vhost=live"; const std::string live_rtmp = "rtmp://36.153.162.171:19435/" + app + "/" + stream + "?vhost=live";
// record → 本地 SRS录像
const std::string record_rtmp = "rtmp://127.0.0.1:2935/" + app + "/" + stream + "?vhost=record";
std::string pipeline_str = "v4l2src name=src device=" + cam.device + std::string pipeline_str = "v4l2src name=src device=" + cam.device +
" io-mode=dmabuf " " io-mode=dmabuf "
// ⭐ 相机真实输出:写死 1080p "! video/x-raw,format=NV12,width=1920,height=1080,"
"! video/x-raw,format=NV12,"
"width=1920,height=1080,"
"framerate=" + "framerate=" +
std::to_string(cam.fps) + std::to_string(cam.fps) +
"/1 " "/1 "
// ⭐ 关键:在编码前缩放到“推流分辨率”
"! videoscale " "! videoscale "
"! video/x-raw," "! video/x-raw,width=" +
"width=" +
std::to_string(cam.width) + ",height=" + std::to_string(cam.height) + std::to_string(cam.width) + ",height=" + std::to_string(cam.height) +
" " " "
// 🔻 低延时队列
"! queue max-size-buffers=2 max-size-time=0 leaky=downstream " "! queue max-size-buffers=2 max-size-time=0 leaky=downstream "
// 🔻 编码器SIM 卡友好 "! mpph264enc rc-mode=cbr "
"! mpph264enc "
"rc-mode=cbr "
"bps=" + "bps=" +
std::to_string(cam.bitrate) + std::to_string(cam.bitrate) +
" " " "
"gop=" + "gop=" +
std::to_string(cam.fps) + std::to_string(cam.fps) +
" " " "
"header-mode=each-idr " "header-mode=each-idr profile=baseline "
"profile=baseline "
"! h264parse config-interval=1 " "! h264parse config-interval=1 "
"! identity sync=false single-segment=true " "! tee name=t "
// ===== record永远开 =====
"t. ! queue max-size-buffers=5 leaky=downstream "
"! flvmux streamable=true " "! flvmux streamable=true "
"! rtmpsink location=\"" + "! rtmpsink location=\"" +
live_rtmp + record_rtmp +
"\" " "\" sync=false async=false "
"sync=false async=false";
// ===== livevalve 控制 =====
"t. ! queue max-size-buffers=5 leaky=downstream "
"! valve name=live_valve drop=true "
"! flvmux streamable=true "
"! rtmpsink location=\"" +
live_rtmp + "\" sync=false async=false";
GError* error = nullptr; GError* error = nullptr;
GstElement* pipeline = gst_parse_launch(pipeline_str.c_str(), &error); GstElement* pipeline = gst_parse_launch(pipeline_str.c_str(), &error);
@ -155,6 +163,30 @@ void RTMPManager::stream_loop(Camera cam, StreamContext* ctx)
continue; continue;
} }
// 2.1 获取 live_valve用于 MQTT 控制)
GstElement* live_valve = gst_bin_get_by_name(GST_BIN(pipeline), "live_valve");
if (!live_valve)
{
{
std::lock_guard<std::mutex> lk(ctx->status_mutex);
ctx->status.running = false;
ctx->status.last_error = "live_valve not found";
}
LOG_ERROR("[RTMP] " + key + " - live_valve not found");
gst_object_unref(pipeline);
std::this_thread::sleep_for(std::chrono::seconds(3));
continue;
}
// 保存到 ctx
{
std::lock_guard<std::mutex> lk(ctx->status_mutex);
if (ctx->live_valve) gst_object_unref(ctx->live_valve);
ctx->live_valve = live_valve; // ⚠️ live_valve 引用交给 ctx 管理
}
g_object_set(G_OBJECT(ctx->live_valve), "drop", g_live_enabled.load() ? FALSE : TRUE, nullptr);
GstBus* bus = gst_element_get_bus(pipeline); GstBus* bus = gst_element_get_bus(pipeline);
LOG_INFO("[RTMP] Starting stream: " + key); LOG_INFO("[RTMP] Starting stream: " + key);
@ -226,6 +258,15 @@ void RTMPManager::stream_loop(Camera cam, StreamContext* ctx)
} }
cleanup: cleanup:
{
std::lock_guard<std::mutex> lk(ctx->status_mutex);
if (ctx->live_valve)
{
gst_object_unref(ctx->live_valve);
ctx->live_valve = nullptr;
}
}
gst_element_set_state(pipeline, GST_STATE_NULL); gst_element_set_state(pipeline, GST_STATE_NULL);
if (bus) gst_object_unref(bus); if (bus) gst_object_unref(bus);
gst_object_unref(pipeline); gst_object_unref(pipeline);
@ -282,7 +323,18 @@ void RTMPManager::start_all()
void RTMPManager::stop_all() void RTMPManager::stop_all()
{ {
std::lock_guard<std::mutex> lock(streams_mutex); std::lock_guard<std::mutex> lock(streams_mutex);
for (auto& kv : streams) kv.second->thread_running.store(false);
for (auto& kv : streams)
{
kv.second->thread_running.store(false);
std::lock_guard<std::mutex> lk(kv.second->status_mutex);
if (kv.second->live_valve)
{
gst_object_unref(kv.second->live_valve);
kv.second->live_valve = nullptr;
}
}
for (auto& kv : streams) for (auto& kv : streams)
if (kv.second->thread.joinable()) kv.second->thread.join(); if (kv.second->thread.joinable()) kv.second->thread.join();
@ -352,3 +404,39 @@ std::vector<RTMPManager::ChannelInfo> RTMPManager::get_all_channels_status()
return result; return result;
} }
void RTMPManager::set_live_enabled_all(bool enable)
{
std::lock_guard<std::mutex> lock(streams_mutex);
g_live_enabled.store(enable);
for (auto& kv : streams)
{
auto* ctx = kv.second.get();
std::lock_guard<std::mutex> lk(ctx->status_mutex);
if (!ctx->live_valve) continue;
// enable=true → drop=false放行 live
g_object_set(G_OBJECT(ctx->live_valve), "drop", enable ? FALSE : TRUE, nullptr);
}
LOG_INFO(std::string("[RTMP] Live ") + (enable ? "ENABLED" : "DISABLED") + " for all streams");
}
void RTMPManager::set_live_enabled(const std::string& cam_name, bool enable)
{
std::lock_guard<std::mutex> lock(streams_mutex);
auto it = streams.find(make_key(cam_name));
if (it == streams.end()) return;
auto* ctx = it->second.get();
std::lock_guard<std::mutex> lk(ctx->status_mutex);
if (!ctx->live_valve) return;
g_object_set(G_OBJECT(ctx->live_valve), "drop", enable ? FALSE : TRUE, nullptr);
LOG_INFO("[RTMP] Live " + std::string(enable ? "ENABLED" : "DISABLED") + " for " + cam_name);
}