From e44e9142972e6354c9746f06289e9d09d08171c0 Mon Sep 17 00:00:00 2001 From: cxh Date: Fri, 17 Oct 2025 13:36:22 +0800 Subject: [PATCH] 1 --- include/rtmp_manager.hpp | 9 +- src/mqtt_client_wrapper.cpp | 6 +- src/rtmp_manager.cpp | 196 +++++++++++++++++++++++++++--------- 3 files changed, 159 insertions(+), 52 deletions(-) diff --git a/include/rtmp_manager.hpp b/include/rtmp_manager.hpp index 96a4d8d..c0a3298 100644 --- a/include/rtmp_manager.hpp +++ b/include/rtmp_manager.hpp @@ -57,6 +57,8 @@ class RTMPManager static std::vector process_push_request(const VideoPushRequest &req); static void stop_all(); + // 自动启动所有录像流(上电调用一次) + static void start_all_record_streams(); static bool is_streaming(const std::string &cam_name, StreamType type); static bool is_any_streaming(); static std::string get_stream_url(const std::string &cam_name, StreamType type); @@ -67,6 +69,7 @@ class RTMPManager std::atomic running{false}; std::thread thread; + // 启动阶段的 promise,用于首次启动结果反馈(7s 内) std::promise start_promise; std::atomic start_promise_set{false}; @@ -96,6 +99,7 @@ class RTMPManager static std::string make_stream_key(const std::string &cam_name, StreamType type); static GstElement *create_pipeline(const Camera &cam, StreamType type); static void stream_loop(Camera cam, StreamType type, StreamContext *ctx); + static bool do_stream_once(const Camera &cam, StreamType type, StreamContext *ctx); static void update_status(const std::string &key, const StreamStatus &status); @@ -104,6 +108,7 @@ class RTMPManager static StreamCallback status_callback; // 自动重试参数 - static constexpr int MAX_RETRIES = 3; - static constexpr int RETRY_DELAY_MS = 2000; + // MAIN(record)为永久重试;SUB(live)使用限次重试 + static constexpr int LIVE_MAX_RETRIES = 5; + static constexpr int RETRY_BASE_DELAY_MS = 2000; // 2s -> 可指数退避 }; \ No newline at end of file diff --git a/src/mqtt_client_wrapper.cpp b/src/mqtt_client_wrapper.cpp index e19ed48..caf2fed 100644 --- a/src/mqtt_client_wrapper.cpp +++ b/src/mqtt_client_wrapper.cpp @@ -63,7 +63,8 @@ static void on_mqtt_message_received(const std::string &topic, const std::string { VideoPushRequest::DataItem item; item.switchVal = data.value("switch", 0); - item.streamType = data.value("streamType", 0); + item.streamType = data.value("streamType", 1); // ✅ 默认子码流 + if (item.streamType == 0) item.streamType = 1; // ✅ 防止平台误发 main item.channels = data.value("channels", std::vector{}); req.data.push_back(item); } @@ -73,7 +74,8 @@ static void on_mqtt_message_received(const std::string &topic, const std::string { VideoPushRequest::DataItem item; item.switchVal = d.value("switch", 0); - item.streamType = d.value("streamType", 0); + item.streamType = data.value("streamType", 1); // ✅ 默认子码流 + if (item.streamType == 0) item.streamType = 1; // ✅ 防止平台误发 main item.channels = d.value("channels", std::vector{}); req.data.push_back(item); } diff --git a/src/rtmp_manager.cpp b/src/rtmp_manager.cpp index 71cd5e9..c093cc0 100644 --- a/src/rtmp_manager.cpp +++ b/src/rtmp_manager.cpp @@ -17,6 +17,8 @@ #include "logger.hpp" +// ---------- 小工具 ---------- + static bool device_exists(const std::string &path) { struct stat st; @@ -38,7 +40,6 @@ static std::string get_ip_address(const std::string &ifname) { if (ifa->ifa_addr == nullptr) continue; - // 只看 IPv4 if (ifa->ifa_addr->sa_family == AF_INET && ifname == ifa->ifa_name) { void *addr = &((struct sockaddr_in *)ifa->ifa_addr)->sin_addr; @@ -56,10 +57,14 @@ static std::string get_ip_address(const std::string &ifname) static inline std::string stream_type_suffix(StreamType type) { return (type == StreamType::MAIN) ? "_main" : "_sub"; } +// ---------- 静态成员 ---------- + std::unordered_map> RTMPManager::streams; std::mutex RTMPManager::streams_mutex; RTMPManager::StreamCallback RTMPManager::status_callback = nullptr; +// ---------- 内部工具 ---------- + std::string RTMPManager::make_stream_key(const std::string &cam_name, StreamType type) { return cam_name + stream_type_suffix(type); @@ -83,23 +88,21 @@ void RTMPManager::update_status(const std::string &key, const StreamStatus &stat if (status_callback) status_callback(key, status); } +// 决定 app:MAIN -> record(录像),SUB -> live(按需) +static inline std::string pick_app(StreamType type) { return (type == StreamType::MAIN) ? "record" : "live"; } + GstElement *RTMPManager::create_pipeline(const Camera &cam, StreamType type) { int width = cam.width, height = cam.height, fps = cam.fps, bitrate = cam.bitrate; if (type == StreamType::SUB) { - width = std::max(160, width / 2); - height = std::max(120, height / 2); - fps = std::max(10, fps / 2); bitrate = std::max(300000, bitrate / 2); } - std::string stream_name = cam.name + stream_type_suffix(type); + const std::string stream_name = cam.name + stream_type_suffix(type); + const std::string app = pick_app(type); - // 说明: - // 1) 维持你当前的视频链路(NV12 -> mpph264enc -> h264parse -> flvmux)。 - // 2) 加一个静音音轨:audiotestsrc wave=silence -> voaacenc -> aacparse -> mux. - // 3) 给 rtmpsink 命名 name=rtmp_sink,供后续在 bus loop 中识别并判断“真正连上”时机。 + // NV12 -> mpph264enc -> h264parse -> flvmux(+静音音轨) -> rtmpsink std::string pipeline_str = "v4l2src device=" + cam.device + " ! video/x-raw,width=" + std::to_string(width) + ",height=" + std::to_string(height) + ",framerate=" + std::to_string(fps) + "/1 " @@ -110,10 +113,10 @@ GstElement *RTMPManager::create_pipeline(const Camera &cam, StreamType type) "! h264parse " "! flvmux name=mux streamable=true " "audiotestsrc wave=silence ! audioconvert ! audioresample ! voaacenc ! aacparse ! mux. " - "mux. ! rtmpsink name=rtmp_sink location=\"rtmp://127.0.0.1/live/" + - stream_name + " live=1\" sync=false"; + "mux. ! rtmpsink name=rtmp_sink location=\"rtmp://127.0.0.1/" + + app + "/" + stream_name + " live=1\" sync=false"; - LOG_INFO("[RTMP] Creating pipeline for '" + stream_name + "': " + pipeline_str); + LOG_INFO("[RTMP] Creating pipeline for '" + stream_name + "' -> app '" + app + "': " + pipeline_str); GError *error = nullptr; GstElement *pipeline = gst_parse_launch(pipeline_str.c_str(), &error); @@ -126,13 +129,16 @@ GstElement *RTMPManager::create_pipeline(const Camera &cam, StreamType type) return pipeline; } -int get_camera_index(const std::string &name) +// 摄像头在配置中的索引(用于 reply.loc) +static int get_camera_index(const std::string &name) { for (size_t i = 0; i < g_app_config.cameras.size(); ++i) if (g_app_config.cameras[i].name == name) return static_cast(i); return -1; } +// ---------- 对外接口:启动/停止 ---------- + RTMPManager::StreamResultInfo RTMPManager::start_camera(const Camera &cam, StreamType type) { StreamResultInfo res; @@ -146,8 +152,8 @@ RTMPManager::StreamResultInfo RTMPManager::start_camera(const Camera &cam, Strea return res; } - // 新增:设备节点存在性检查 - if (!device_exists(cam.device)) + // 主码流(record)允许设备缺失时也进入“重试守护”,所以这里只在 SUB 时严格检查 + if (type == StreamType::SUB && !device_exists(cam.device)) { res.result = 1; res.reason = "Device not found: " + cam.device; @@ -163,7 +169,7 @@ RTMPManager::StreamResultInfo RTMPManager::start_camera(const Camera &cam, Strea std::lock_guard lock(streams_mutex); auto it = streams.find(key); - // 若旧线程仍在运行,先安全停止 + // 若旧线程仍在运行,先安全停止并移除 if (it != streams.end()) { if (it->second->running.load()) @@ -246,9 +252,8 @@ RTMPManager::StreamResultInfo RTMPManager::stop_camera(const std::string &cam_na streams.erase(it); } - bool was_running = ctx->running.load(); // ✅ 改为使用 running + bool was_running = ctx->running.load(); ctx->running.store(false); - if (ctx->thread.joinable()) ctx->thread.join(); if (was_running) @@ -265,6 +270,8 @@ RTMPManager::StreamResultInfo RTMPManager::stop_camera(const std::string &cam_na return res; } +// ---------- 核心:推流循环(带重试策略) ---------- + void RTMPManager::stream_loop(Camera cam, StreamType type, StreamContext *ctx) { const std::string key = make_stream_key(cam.name, type); @@ -284,7 +291,68 @@ void RTMPManager::stream_loop(Camera cam, StreamType type, StreamContext *ctx) } }; - StreamStatus status; + // MAIN(record)= 永久自动重试;SUB(live)= 限次重试 + if (type == StreamType::MAIN) + { + int backoff_ms = RETRY_BASE_DELAY_MS; // 可指数退避 + while (ctx->running.load()) + { + const bool ok = do_stream_once(cam, type, ctx); + if (!ok && ctx->running.load()) + { + LOG_WARN("[RTMP] MAIN(record) stream lost, retry in " + std::to_string(backoff_ms) + "ms: " + cam.name); + std::this_thread::sleep_for(std::chrono::milliseconds(backoff_ms)); + backoff_ms = std::min(backoff_ms * 2, 60000); // up to 60s + } + else + { + backoff_ms = RETRY_BASE_DELAY_MS; // 成功后重置退避 + } + } + } + else // SUB / live + { + int retries = 0; + while (ctx->running.load()) + { + const bool ok = do_stream_once(cam, type, ctx); + if (ok) break; // 正常退出(被 stop 或 EOS) + retries++; + if (retries >= LIVE_MAX_RETRIES || !ctx->running.load()) + { + LOG_ERROR("[RTMP] SUB(live) stream failed after retries, giving up: " + cam.name); + break; + } + LOG_WARN("[RTMP] SUB(live) retry " + std::to_string(retries) + "/" + std::to_string(LIVE_MAX_RETRIES) + + " in " + std::to_string(RETRY_BASE_DELAY_MS) + "ms: " + cam.name); + std::this_thread::sleep_for(std::chrono::milliseconds(RETRY_BASE_DELAY_MS)); + } + } + + ctx->status.running = false; +} + +// 单次推流:创建 pipeline -> PLAYING -> loop 直到 ERROR/EOS/stop +bool RTMPManager::do_stream_once(const Camera &cam, StreamType type, StreamContext *ctx) +{ + const std::string key = make_stream_key(cam.name, type); + + auto try_set_start = [&](const StreamStatus &st) + { + bool expected = false; + if (ctx && ctx->start_promise_set.compare_exchange_strong(expected, true)) + { + try + { + ctx->start_promise.set_value(st); + } + catch (...) + { + } + } + }; + + RTMPManager::StreamStatus status; status.running = false; status.last_result = StreamResult::UNKNOWN; status.last_error.clear(); @@ -294,17 +362,21 @@ void RTMPManager::stream_loop(Camera cam, StreamType type, StreamContext *ctx) { status.last_result = StreamResult::PIPELINE_ERROR; status.last_error = "Failed to create pipeline"; + update_status(key, status); try_set_start(status); - return; + return false; } GstBus *bus = gst_element_get_bus(pipeline); gst_element_set_state(pipeline, GST_STATE_PLAYING); - bool started_reported = false; // 是否已经上报“启动成功” + bool started_reported = false; auto t0 = std::chrono::steady_clock::now(); - const int first_frame_timeout_s = 5; // 首帧/连通性保护上限(与原逻辑一致) - const int connect_guard_ms = 2000; // 连接保护窗口:给 rtmpsink 一点时间去连接 + const int first_frame_timeout_s = 5; + const int connect_guard_ms = 2000; + + bool need_restart = false; + bool ok = false; while (true) { @@ -313,11 +385,12 @@ void RTMPManager::stream_loop(Camera cam, StreamType type, StreamContext *ctx) status.running = false; status.last_result = StreamResult::UNKNOWN; status.last_error = "Stream stopped manually"; + update_status(key, status); try_set_start(status); break; } - // 超时保护:既没报错也没确认连接成功,超过上限则判失败 + // 首帧/连通性超时保护 if (!started_reported) { auto elapsed_s = @@ -327,7 +400,9 @@ void RTMPManager::stream_loop(Camera cam, StreamType type, StreamContext *ctx) status.running = false; status.last_result = StreamResult::TIMEOUT; status.last_error = "No frames/connection established within timeout"; + update_status(key, status); try_set_start(status); + need_restart = true; break; } } @@ -339,8 +414,7 @@ void RTMPManager::stream_loop(Camera cam, StreamType type, StreamContext *ctx) if (!msg) { - // 保底:过了 connect_guard_ms 仍然没有 ERROR, - // 并且 rtmpsink 进入 PLAYING,就认为真的起来了。 + // 过了 connect_guard_ms 且 rtmpsink 进入 PLAYING,认为启动成功 if (!started_reported) { auto elapsed_ms = @@ -359,6 +433,7 @@ void RTMPManager::stream_loop(Camera cam, StreamType type, StreamContext *ctx) ctx->status.running = true; status.last_result = StreamResult::OK; status.last_error.clear(); + update_status(key, status); try_set_start(status); started_reported = true; LOG_INFO("[RTMP] Guard-start OK for '" + cam.name + "'"); @@ -374,7 +449,6 @@ void RTMPManager::stream_loop(Camera cam, StreamType type, StreamContext *ctx) { case GST_MESSAGE_STREAM_STATUS: { - // 优先判断 rtmpsink 的 STREAM_STATUS(CREATE/START)来认定“真正连上” GstStreamStatusType type_ss; GstElement *owner = nullptr; gst_message_parse_stream_status(msg, &type_ss, &owner); @@ -390,6 +464,7 @@ void RTMPManager::stream_loop(Camera cam, StreamType type, StreamContext *ctx) ctx->status.running = true; status.last_result = StreamResult::OK; status.last_error.clear(); + update_status(key, status); try_set_start(status); started_reported = true; LOG_INFO(std::string("[RTMP] Connected (STREAM_STATUS) for '") + cam.name + "'"); @@ -400,11 +475,8 @@ void RTMPManager::stream_loop(Camera cam, StreamType type, StreamContext *ctx) } case GST_MESSAGE_STATE_CHANGED: - { - // 不再用 pipeline 进入 PLAYING 判成功(容易误判) - // 这里仅用于日志或后续扩展 + // 可用于扩展日志;不以 pipeline 的 PLAYING 作为成功判据 break; - } case GST_MESSAGE_ERROR: { @@ -418,15 +490,12 @@ void RTMPManager::stream_loop(Camera cam, StreamType type, StreamContext *ctx) LOG_ERROR("[RTMP] Stream error from '" + cam.name + "': " + status.last_error); - // 立刻标记为停止,避免线程继续循环误判 ctx->status.running = false; - ctx->running.store(false); - - gst_element_set_state(pipeline, GST_STATE_NULL); + need_restart = true; if (err) g_error_free(err); if (dbg) g_free(dbg); - + update_status(key, status); try_set_start(status); break; } @@ -436,7 +505,9 @@ void RTMPManager::stream_loop(Camera cam, StreamType type, StreamContext *ctx) status.running = false; status.last_result = StreamResult::EOS_RECEIVED; status.last_error = "EOS received"; + update_status(key, status); try_set_start(status); + need_restart = true; break; } @@ -446,20 +517,24 @@ void RTMPManager::stream_loop(Camera cam, StreamType type, StreamContext *ctx) gst_message_unref(msg); - // 发生错误或 EOS 就退出循环 - if (status.last_result == StreamResult::CONNECTION_FAIL || status.last_result == StreamResult::EOS_RECEIVED) + if (need_restart) break; + if (started_reported) { - break; + // 正常运行中,继续监听 } } + // 收尾 gst_element_set_state(pipeline, GST_STATE_NULL); if (bus) gst_object_unref(bus); gst_object_unref(pipeline); - ctx->status.running = false; + ok = started_reported && !need_restart; // 若在 stop 前一直运行且未触发重启则认为 ok + return ok; } +// ---------- 其他工具 ---------- + void RTMPManager::stop_all() { std::vector> ctxs; @@ -493,17 +568,18 @@ bool RTMPManager::is_any_streaming() std::string RTMPManager::get_stream_url(const std::string &cam_name, StreamType type) { - // 获取当前设备 enp1s0 网卡 IP(或者改成你希望的接口) + // 你可以把这里的网卡名替换成你的默认上行网卡 std::string ip = get_ip_address("enP2p33s0"); if (ip.empty()) ip = "127.0.0.1"; - // 构建流名,例如 AHD2_main - std::string stream_name = make_stream_key(cam_name, type); + const std::string app = pick_app(type); + const std::string stream_name = make_stream_key(cam_name, type); - // 最终返回 WebRTC 拉流地址(不加 .flv) - return "http://" + ip + ":1985/rtc/v1/whep/?app=live&stream=" + stream_name; + // 返回 WebRTC 拉流地址(WHEP) + return "http://" + ip + ":1985/rtc/v1/whep/?app=" + app + "&stream=" + stream_name; } +// 批量处理 std::vector RTMPManager::process_push_request(const VideoPushRequest &req) { std::vector results; @@ -521,20 +597,17 @@ std::vector RTMPManager::process_push_request(con if (item.switchVal == 0) { - // 异步启动推流 futures.emplace_back( std::async(std::launch::async, [&, cam, type]() { return start_camera(cam, type); })); } else { - // 异步停止推流 futures.emplace_back( std::async(std::launch::async, [&, cam, type]() { return stop_camera(cam.name, type); })); } } } - // 收集所有结果 for (auto &f : futures) { try @@ -552,3 +625,30 @@ std::vector RTMPManager::process_push_request(con return results; } + +void RTMPManager::start_all_record_streams() +{ + LOG_INFO("[RTMP] Auto-starting all record (MAIN) streams..."); + + for (const auto &cam : g_app_config.cameras) + { + // 检查是否已经存在 + if (is_streaming(cam.name, StreamType::MAIN)) + { + LOG_INFO("[RTMP] Record stream already running: " + cam.name); + continue; + } + + auto res = start_camera(cam, StreamType::MAIN); + if (res.result == 0) + { + LOG_INFO("[RTMP] Record stream started for " + cam.name + ": " + res.reason); + } + else + { + LOG_WARN("[RTMP] Failed to start record stream for " + cam.name + ": " + res.reason); + } + } + + LOG_INFO("[RTMP] Auto-start for record streams done."); +}