From f56d94dbcade0bdd971efa52f3c08e9ded3d6ea2 Mon Sep 17 00:00:00 2001 From: cxh Date: Fri, 17 Oct 2025 15:52:47 +0800 Subject: [PATCH] 1 --- src/mqtt_client_wrapper.cpp | 38 +++++++---- src/rtmp_manager.cpp | 132 +++++++++++++++++++++++++++++------- 2 files changed, 130 insertions(+), 40 deletions(-) diff --git a/src/mqtt_client_wrapper.cpp b/src/mqtt_client_wrapper.cpp index a5257e3..e17852c 100644 --- a/src/mqtt_client_wrapper.cpp +++ b/src/mqtt_client_wrapper.cpp @@ -24,32 +24,42 @@ static void send_heartbeat() auto now = std::chrono::system_clock::now(); auto ms = std::chrono::duration_cast(now.time_since_epoch()).count(); - // 获取当前所有通道状态 - auto status_list = RTMPManager::get_all_status(); + // 获取 RTMP 状态 + auto channels_info = RTMPManager::get_all_channels_status(); - int total = static_cast(status_list.size()); - int running_count = 0; nlohmann::json channels = nlohmann::json::array(); + int total = static_cast(channels_info.size()); + int running_count = 0; - for (int i = 0; i < total; ++i) + for (const auto &ch : channels_info) { - const auto &[key, running] = status_list[i]; - if (running) running_count++; - nlohmann::json item; - item["loc"] = i; // 摄像头位置索引 - item["url"] = RTMPManager::get_stream_url(g_app_config.cameras[i].name); - item["running"] = running; + item["loc"] = ch.loc; + item["running"] = ch.running; + + if (ch.running) + { + item["url"] = ch.url; + running_count++; + } + else + { + item["reason"] = ch.reason.empty() ? "Unknown error" : ch.reason; + } + channels.push_back(item); } nlohmann::json hb; hb["timestamp"] = ms; - hb["status"] = (running_count == total) ? 1 : (running_count == 0 ? 0 : 2); + hb["status"] = (running_count == 0) ? 0 // 全部失败 + : (running_count == total ? 1 : 2); // 全部正常 or 部分异常 hb["channels"] = channels; - mqtt_client->publish(g_app_config.mqtt.topics.heartbeat_up, hb.dump()); - LOG_INFO("[MQTT] Sent video heartbeat: " + hb.dump()); + // 发布心跳 + mqtt_client->publish(g_app_config.mqtt.topics.heartbeat_up, hb.dump(), 0); + LOG_INFO("[MQTT] Sent video heartbeat (" + std::to_string(running_count) + "/" + std::to_string(total) + + " running): " + hb.dump()); } // MQTT 回调 diff --git a/src/rtmp_manager.cpp b/src/rtmp_manager.cpp index 94bf4cf..feadf47 100644 --- a/src/rtmp_manager.cpp +++ b/src/rtmp_manager.cpp @@ -11,6 +11,13 @@ #include #include +// ========== 工具函数 ========== +static bool device_exists(const std::string &path) +{ + struct stat st; + return (stat(path.c_str(), &st) == 0); +} + // 动态获取指定网卡 IPv4 地址 static std::string get_ip_address(const std::string &ifname) { @@ -39,18 +46,14 @@ static std::string get_ip_address(const std::string &ifname) } freeifaddrs(ifaddr); - return ""; // 没找到 + return ""; } +// ========== 静态成员 ========== std::unordered_map> RTMPManager::streams; std::mutex RTMPManager::streams_mutex; -static bool device_exists(const std::string &path) -{ - struct stat st; - return (stat(path.c_str(), &st) == 0); -} - +// ========== 初始化 ========== void RTMPManager::init() { gst_init(nullptr, nullptr); @@ -59,6 +62,7 @@ void RTMPManager::init() std::string RTMPManager::make_key(const std::string &name) { return name + "_main"; } +// ========== 创建推流管线 ========== GstElement *RTMPManager::create_pipeline(const Camera &cam) { int width = cam.width; @@ -68,18 +72,22 @@ GstElement *RTMPManager::create_pipeline(const Camera &cam) std::string stream_name = cam.name + "_main"; std::string app = "record"; - std::string location = "rtmp://127.0.0.1:1935/" + app + "/" + stream_name + "?vhost=" + app; - std::string pipeline_str = "v4l2src device=" + cam.device + " ! video/x-raw,width=" + std::to_string(width) + - ",height=" + std::to_string(height) + ",framerate=" + std::to_string(fps) + - "/1 ! videoconvert ! video/x-raw,format=NV12 " - "! mpph264enc bps=" + - std::to_string(bitrate) + " gop=" + std::to_string(fps) + - " ! h264parse ! flvmux name=mux streamable=true " - "audiotestsrc wave=silence ! audioconvert ! audioresample ! voaacenc ! aacparse ! mux. " - "mux. ! rtmpsink location=\"" + - location + "\" sync=false"; + // fpsdisplaysink 探测首帧 + std::string pipeline_str = + "v4l2src device=" + cam.device + " ! video/x-raw,width=" + std::to_string(width) + + ",height=" + std::to_string(height) + ",framerate=" + std::to_string(fps) + + "/1 ! videoconvert ! video/x-raw,format=NV12 " + "! tee name=t " + "t. ! queue ! mpph264enc bps=" + + std::to_string(bitrate) + " gop=" + std::to_string(fps) + + " ! h264parse ! flvmux name=mux streamable=true " + "audiotestsrc wave=silence ! audioconvert ! audioresample ! voaacenc ! aacparse ! mux. " + "mux. ! rtmpsink location=\"" + + location + + "\" sync=false " + "t. ! queue ! fpsdisplaysink name=fpsprobe text-overlay=false video-sink=fakesink sync=false"; LOG_INFO("[RTMP] Pipeline: " + pipeline_str); @@ -94,6 +102,7 @@ GstElement *RTMPManager::create_pipeline(const Camera &cam) return pipeline; } +// ========== 主推流循环 ========== void RTMPManager::stream_loop(Camera cam, StreamContext *ctx) { std::string key = make_key(cam.name); @@ -121,14 +130,13 @@ void RTMPManager::stream_loop(Camera cam, StreamContext *ctx) GstBus *bus = gst_element_get_bus(pipeline); gst_element_set_state(pipeline, GST_STATE_PLAYING); - // -------- 等待状态切换结果 -------- + // 等 pipeline 真正进入 PLAYING 状态 GstStateChangeReturn ret = gst_element_get_state(pipeline, nullptr, nullptr, 3 * GST_SECOND); - if (ret != GST_STATE_CHANGE_SUCCESS) { ctx->status.running = false; ctx->status.last_error = "Pipeline failed to reach PLAYING"; - LOG_ERROR("[RTMP] " + key + " failed to start (no video signal?)"); + LOG_ERROR("[RTMP] " + key + " failed to start (maybe no signal)"); gst_element_set_state(pipeline, GST_STATE_NULL); gst_object_unref(bus); gst_object_unref(pipeline); @@ -136,11 +144,49 @@ void RTMPManager::stream_loop(Camera cam, StreamContext *ctx) continue; } + // 等首帧 + bool first_frame = false; + auto start_time = std::chrono::steady_clock::now(); + while (!first_frame && std::chrono::steady_clock::now() - start_time < std::chrono::seconds(3)) + { + GstMessage *msg = gst_bus_timed_pop_filtered(bus, 300 * GST_MSECOND, + (GstMessageType)(GST_MESSAGE_ELEMENT | GST_MESSAGE_ERROR)); + if (!msg) continue; + + if (GST_MESSAGE_TYPE(msg) == GST_MESSAGE_ELEMENT) + { + const GstStructure *s = gst_message_get_structure(msg); + if (s && g_str_has_prefix(gst_structure_get_name(s), "fpsdisplaysink")) + { + first_frame = true; + break; + } + } + else if (GST_MESSAGE_TYPE(msg) == GST_MESSAGE_ERROR) + { + first_frame = false; + break; + } + gst_message_unref(msg); + } + + if (!first_frame) + { + ctx->status.running = false; + ctx->status.last_error = "No frames detected"; + LOG_ERROR("[RTMP] " + key + " - No frames detected (no video signal)"); + gst_element_set_state(pipeline, GST_STATE_NULL); + gst_object_unref(bus); + gst_object_unref(pipeline); + std::this_thread::sleep_for(std::chrono::seconds(3)); + continue; + } + + // 进入正常推流状态 ctx->status.running = true; ctx->status.last_error.clear(); LOG_INFO("[RTMP] Started stream for " + key); - // -------- 主循环 -------- bool need_restart = false; while (ctx->running) { @@ -189,6 +235,7 @@ void RTMPManager::stream_loop(Camera cam, StreamContext *ctx) ctx->status.running = false; } +// ========== 启停与状态 ========== void RTMPManager::start_all() { LOG_INFO("[RTMP] Starting all record streams..."); @@ -222,16 +269,49 @@ bool RTMPManager::is_streaming(const std::string &cam_name) std::string RTMPManager::get_stream_url(const std::string &cam_name) { - // 优先动态检测网卡 IP,失败则使用固定值 std::string ip = get_ip_address("enP2p33s0"); - if (ip.empty()) ip = "127.0.0.1"; // 或用动态获取网卡 IP + if (ip.empty()) ip = "192.168.3.211"; // fallback return "http://" + ip + ":1985/rtc/v1/whep/?app=record&stream=" + cam_name + "_main"; } -std::vector> RTMPManager::get_all_status() +// ========== 汇总状态 ========== +std::vector RTMPManager::get_all_channels_status() { - std::vector> result; + std::vector result; std::lock_guard lock(streams_mutex); - for (auto &kv : streams) result.emplace_back(kv.first, kv.second->status.running); + + for (size_t i = 0; i < g_app_config.cameras.size(); ++i) + { + const auto &cam = g_app_config.cameras[i]; + auto key = make_key(cam.name); + + ChannelInfo ch; + ch.loc = static_cast(i); + ch.url.clear(); + ch.running = false; + ch.reason = "Not started"; + + auto it = streams.find(key); + if (it != streams.end()) + { + auto &status = it->second->status; + ch.running = status.running; + if (status.running) + { + ch.url = get_stream_url(cam.name); + ch.reason.clear(); + } + else + { + ch.reason = status.last_error.empty() ? "Unknown error" : status.last_error; + } + } + else + { + ch.reason = "Context missing"; + } + + result.push_back(ch); + } return result; }