From f44d8a46c2e1767b989dd7b5d4ec6e21e75532df Mon Sep 17 00:00:00 2001 From: cxh Date: Wed, 15 Oct 2025 14:14:00 +0800 Subject: [PATCH] temp --- .clang-format | 4 + include/rtmp_manager.hpp | 26 ++--- src/rtmp_manager.cpp | 211 +++++++++++++++++++++------------------ 3 files changed, 131 insertions(+), 110 deletions(-) create mode 100644 .clang-format diff --git a/.clang-format b/.clang-format new file mode 100644 index 0000000..42a2256 --- /dev/null +++ b/.clang-format @@ -0,0 +1,4 @@ +BasedOnStyle: Google +IndentWidth: 4 +ColumnLimit: 120 +BreakBeforeBraces: Allman diff --git a/include/rtmp_manager.hpp b/include/rtmp_manager.hpp index 17930fd..1fe4719 100644 --- a/include/rtmp_manager.hpp +++ b/include/rtmp_manager.hpp @@ -22,8 +22,8 @@ public: PIPELINE_ERROR, CONNECTION_FAIL, EOS_RECEIVED, - UNKNOWN, - TIMEOUT + TIMEOUT, + UNKNOWN }; struct StreamStatus @@ -55,17 +55,19 @@ private: StreamContext &operator=(const StreamContext &) = delete; }; + // helper + static std::string make_stream_key(const std::string &cam_name, StreamType type); + static GstElement *create_pipeline(const Camera &cam, StreamType type); + static void update_status(const std::string &key, const StreamStatus &status); + + // publish a single-camera response on mqtt (uses global mqtt_client) + static void publish_stream_status(const std::string &cam_name, StreamType type, const std::string &seqNo, bool ok, const std::string &reason); + + // actual loop (internal) - note it takes seqNo so it can report back + static void stream_loop(Camera cam, StreamType type, const std::string &seqNo); + + // storage static std::unordered_map> streams; static std::mutex streams_mutex; static StreamCallback status_callback; - - static void stream_loop(Camera cam, StreamType type, const std::string &seqNo); - static GstElement *create_pipeline(const Camera &cam, StreamType type); - static std::string make_stream_key(const std::string &cam_name, StreamType type); - static void update_status(const std::string &key, const StreamStatus &status); - static void publish_stream_status(const std::string &cam_name, StreamType type, const std::string &seqNo, bool ok, const std::string &reason); - static inline std::string stream_type_suffix(StreamType type) - { - return (type == StreamType::MAIN) ? "_main" : "_sub"; - } }; diff --git a/src/rtmp_manager.cpp b/src/rtmp_manager.cpp index 069c157..ca79a43 100644 --- a/src/rtmp_manager.cpp +++ b/src/rtmp_manager.cpp @@ -1,11 +1,16 @@ // rtsp_manager.cpp #include "rtmp_manager.hpp" -#include "logger.hpp" + #include -#include #include +#include #include +#include "logger.hpp" + +static inline std::string stream_type_suffix(StreamType type) { return (type == StreamType::MAIN) ? "_main" : "_sub"; } + +// static members std::unordered_map> RTMPManager::streams; std::mutex RTMPManager::streams_mutex; RTMPManager::StreamCallback RTMPManager::status_callback = nullptr; @@ -21,21 +26,16 @@ void RTMPManager::init() LOG_INFO("[RTMP] GStreamer initialized."); } -void RTMPManager::set_status_callback(StreamCallback cb) -{ - status_callback = std::move(cb); -} +void RTMPManager::set_status_callback(StreamCallback cb) { status_callback = std::move(cb); } void RTMPManager::update_status(const std::string &key, const StreamStatus &status) { { std::lock_guard lock(streams_mutex); auto it = streams.find(key); - if (it != streams.end()) - it->second->status = status; + if (it != streams.end()) it->second->status = status; } - if (status_callback) - status_callback(key, status); + if (status_callback) status_callback(key, status); } GstElement *RTMPManager::create_pipeline(const Camera &cam, StreamType type) @@ -51,18 +51,15 @@ GstElement *RTMPManager::create_pipeline(const Camera &cam, StreamType type) } std::string stream_name = cam.name + stream_type_suffix(type); - std::string pipeline_str = - "v4l2src device=" + cam.device + - " ! video/x-raw,format=NV12,width=" + std::to_string(width) + - ",height=" + std::to_string(height) + - ",framerate=" + std::to_string(fps) + - "/1 ! queue max-size-buffers=1 leaky=downstream " - "! mpph264enc bps=" + - std::to_string(bitrate) + - " gop=" + std::to_string(fps) + - " ! h264parse ! flvmux streamable=true name=mux " - "! rtmpsink location=\"rtmp://127.0.0.1/live/" + - stream_name + " live=1\" sync=false"; + std::string pipeline_str = "v4l2src device=" + cam.device + + " ! video/x-raw,format=NV12,width=" + std::to_string(width) + + ",height=" + std::to_string(height) + ",framerate=" + std::to_string(fps) + + "/1 ! queue max-size-buffers=1 leaky=downstream " + "! mpph264enc bps=" + + std::to_string(bitrate) + " gop=" + std::to_string(fps) + + " ! h264parse ! flvmux streamable=true name=mux " + "! rtmpsink location=\"rtmp://127.0.0.1/live/" + + stream_name + " live=1\" sync=false"; LOG_INFO("[RTMP] Creating pipeline for '" + stream_name + "': " + pipeline_str); @@ -77,7 +74,8 @@ GstElement *RTMPManager::create_pipeline(const Camera &cam, StreamType type) return pipeline; } -void RTMPManager::publish_stream_status(const std::string &cam_name, StreamType type, const std::string &seqNo, bool ok, const std::string &reason) +void RTMPManager::publish_stream_status(const std::string &cam_name, StreamType type, const std::string &seqNo, bool ok, + const std::string &reason) { // 找 camera index int cam_index = -1; @@ -94,7 +92,7 @@ void RTMPManager::publish_stream_status(const std::string &cam_name, StreamType nlohmann::json ch_resp; ch_resp["loc"] = cam_index; - ch_resp["url"] = ok && cam_ptr ? get_stream_url(cam_ptr->name, type) : ""; + ch_resp["url"] = (ok && cam_ptr) ? get_stream_url(cam_ptr->name, type) : ""; ch_resp["result"] = ok ? 0 : 1; ch_resp["reason"] = reason; @@ -103,10 +101,9 @@ void RTMPManager::publish_stream_status(const std::string &cam_name, StreamType reply["seqNo"] = seqNo; reply["data"] = nlohmann::json::array({ch_resp}); - mqtt_client->publish(g_app_config.mqtt.topics.video_down, reply.dump()); + if (mqtt_client) mqtt_client->publish(g_app_config.mqtt.topics.video_down, reply.dump()); } -// 修改 stream_loop,新增 seqNo 参数 void RTMPManager::stream_loop(Camera cam, StreamType type, const std::string &seqNo) { std::string key = make_stream_key(cam.name, type); @@ -115,13 +112,14 @@ void RTMPManager::stream_loop(Camera cam, StreamType type, const std::string &se const int MAX_RETRIES = 0; int retry_count = 0; + const int FIRST_FRAME_TIMEOUT_SEC = 5; // 等待第一帧最大秒数 + while (true) { { std::lock_guard lock(streams_mutex); auto it = streams.find(key); - if (it == streams.end() || !it->second->running.load()) - break; + if (it == streams.end() || !it->second->running.load()) break; } GstElement *pipeline = create_pipeline(cam, type); @@ -130,8 +128,7 @@ void RTMPManager::stream_loop(Camera cam, StreamType type, const std::string &se std::string reason = "Failed to create pipeline (device unavailable or resolution unsupported)"; update_status(key, {false, StreamResult::PIPELINE_ERROR, reason}); publish_stream_status(cam.name, type, seqNo, false, reason); - if (++retry_count > MAX_RETRIES) - break; + if (++retry_count > MAX_RETRIES) break; std::this_thread::sleep_for(std::chrono::seconds(1)); continue; } @@ -141,8 +138,6 @@ void RTMPManager::stream_loop(Camera cam, StreamType type, const std::string &se bool first_frame_received = false; bool stop_flag = false; - - const int FIRST_FRAME_TIMEOUT_SEC = 5; // 等待第一帧最大秒数 auto first_frame_start = std::chrono::steady_clock::now(); while (!stop_flag) @@ -161,6 +156,7 @@ void RTMPManager::stream_loop(Camera cam, StreamType type, const std::string &se } } + // check first-frame timeout if (!first_frame_received) { auto elapsed = std::chrono::steady_clock::now() - first_frame_start; @@ -174,58 +170,56 @@ void RTMPManager::stream_loop(Camera cam, StreamType type, const std::string &se } } - if (!msg) - continue; + if (!msg) continue; switch (GST_MESSAGE_TYPE(msg)) { - case GST_MESSAGE_ERROR: - { - GError *err = nullptr; - gchar *debug = nullptr; - gst_message_parse_error(msg, &err, &debug); - std::string err_msg = err ? err->message : "Unknown GStreamer error"; - LOG_ERROR("[RTMP] Error in " + key + ": " + err_msg); - update_status(key, {false, StreamResult::CONNECTION_FAIL, err_msg}); - publish_stream_status(cam.name, type, seqNo, false, err_msg); - if (err) - g_error_free(err); - if (debug) - g_free(debug); - stop_flag = true; - break; - } - case GST_MESSAGE_EOS: - LOG_WARN("[RTMP] EOS on " + key); - update_status(key, {false, StreamResult::EOS_RECEIVED, "EOS"}); - publish_stream_status(cam.name, type, seqNo, false, "EOS received"); - stop_flag = true; - break; - case GST_MESSAGE_STATE_CHANGED: - { - GstState old_state, new_state; - gst_message_parse_state_changed(msg, &old_state, &new_state, nullptr); - if (GST_MESSAGE_SRC(msg) == GST_OBJECT(pipeline) && new_state == GST_STATE_PLAYING && !first_frame_received) + case GST_MESSAGE_ERROR: { - first_frame_received = true; - LOG_INFO("[RTMP] First frame received for " + key); - update_status(key, {true, StreamResult::OK, ""}); - publish_stream_status(cam.name, type, seqNo, true, "Streaming OK"); + GError *err = nullptr; + gchar *debug = nullptr; + gst_message_parse_error(msg, &err, &debug); + std::string err_msg = err ? err->message : "Unknown GStreamer error"; + LOG_ERROR("[RTMP] Error in " + key + ": " + err_msg); + update_status(key, {false, StreamResult::CONNECTION_FAIL, err_msg}); + publish_stream_status(cam.name, type, seqNo, false, err_msg); + if (err) g_error_free(err); + if (debug) g_free(debug); + stop_flag = true; + break; } - break; - } + case GST_MESSAGE_EOS: + LOG_WARN("[RTMP] EOS on " + key); + update_status(key, {false, StreamResult::EOS_RECEIVED, "EOS"}); + publish_stream_status(cam.name, type, seqNo, false, "EOS received"); + stop_flag = true; + break; + case GST_MESSAGE_STATE_CHANGED: + { + GstState old_state, new_state; + gst_message_parse_state_changed(msg, &old_state, &new_state, nullptr); + if (GST_MESSAGE_SRC(msg) == GST_OBJECT(pipeline) && new_state == GST_STATE_PLAYING && + !first_frame_received) + { + first_frame_received = true; + LOG_INFO("[RTMP] First frame received for " + key); + update_status(key, {true, StreamResult::OK, ""}); + publish_stream_status(cam.name, type, seqNo, true, "Streaming OK"); + } + break; + } + default: + break; } gst_message_unref(msg); } gst_element_set_state(pipeline, GST_STATE_NULL); - if (bus) - gst_object_unref(bus); + if (bus) gst_object_unref(bus); gst_object_unref(pipeline); - if (!stop_flag) - break; + if (!stop_flag) break; if (++retry_count > MAX_RETRIES) { @@ -236,6 +230,29 @@ void RTMPManager::stream_loop(Camera cam, StreamType type, const std::string &se std::this_thread::sleep_for(std::chrono::seconds(1)); } + // ensure we remove ourselves from streams map if still present and belong to this thread + { + std::lock_guard lock(streams_mutex); + auto it = streams.find(key); + if (it != streams.end()) + { + // If thread stored in map matches current thread id, erase it. + // This avoids racing with stop_camera which may already have moved/erased it. + try + { + if (it->second && it->second->thread.get_id() == std::this_thread::get_id()) + { + streams.erase(it); + } + } + catch (...) + { + // in case thread id comparisons throw (shouldn't), just ignore + streams.erase(it); + } + } + } + update_status(key, {false, StreamResult::UNKNOWN, "Stream loop exited"}); LOG_INFO("[RTMP] Stream loop ended for " + key); } @@ -245,24 +262,27 @@ void RTMPManager::start_camera(const Camera &cam, StreamType type, const std::st { std::string key = make_stream_key(cam.name, type); - std::lock_guard lock(streams_mutex); - auto it = streams.find(key); - if (it != streams.end() && it->second->running.load()) { - LOG_WARN("[RTMP] " + key + " already streaming."); - // 上报已经在请求状态 - publish_stream_status(cam.name, type, seqNo, false, "Already streaming"); - return; + std::lock_guard lock(streams_mutex); + auto it = streams.find(key); + if (it != streams.end() && it->second->running.load()) + { + LOG_WARN("[RTMP] " + key + " already streaming."); + // 按协议:如果已经在推,直接回报已在推(成功),并返回 URL + publish_stream_status(cam.name, type, seqNo, true, "Already streaming"); + return; + } + + // create and store context + auto ctx = std::make_unique(); + ctx->running.store(true); + ctx->status = {true, StreamResult::UNKNOWN, ""}; + // create thread and place it into context. Need to assign thread before inserting to avoid race. + ctx->thread = std::thread([cam, type, seqNo]() { stream_loop(cam, type, seqNo); }); + + streams.emplace(key, std::move(ctx)); } - auto ctx = std::make_unique(); - ctx->running.store(true); - ctx->thread = std::thread([cam, type, seqNo]() - { - stream_loop(cam, type, seqNo); // stream_loop 内部处理 MQTT 上报 - }); - - streams[key] = std::move(ctx); LOG_INFO("[RTMP] start_camera requested for " + key); } @@ -277,8 +297,8 @@ void RTMPManager::stop_camera(const std::string &cam_name, StreamType type, cons if (it == streams.end()) { LOG_WARN("[RTMP] " + key + " not found."); - // 上报停止失败(未找到) - publish_stream_status(cam_name, type, seqNo, false, "Stream not found"); + // 按协议:如果没有在推,则回复“未推”的成功响应(实际不做任何动作) + publish_stream_status(cam_name, type, seqNo, true, "Not streaming"); return; } @@ -287,8 +307,7 @@ void RTMPManager::stop_camera(const std::string &cam_name, StreamType type, cons streams.erase(it); } - if (ctx->thread.joinable()) - ctx->thread.join(); + if (ctx->thread.joinable()) ctx->thread.join(); update_status(key, {false, StreamResult::OK, "Stopped manually"}); LOG_INFO("[RTMP] stop_camera completed: " + key); @@ -303,16 +322,13 @@ void RTMPManager::stop_all() { std::lock_guard lock(streams_mutex); - for (auto &kv : streams) - kv.second->running.store(false); - for (auto &kv : streams) - ctxs.push_back(std::move(kv.second)); + for (auto &kv : streams) kv.second->running.store(false); + for (auto &kv : streams) ctxs.push_back(std::move(kv.second)); streams.clear(); } for (auto &ctx : ctxs) - if (ctx->thread.joinable()) - ctx->thread.join(); + if (ctx->thread.joinable()) ctx->thread.join(); LOG_INFO("[RTMP] stop_all completed."); } @@ -328,8 +344,7 @@ bool RTMPManager::is_any_streaming() { std::lock_guard lock(streams_mutex); for (auto &kv : streams) - if (kv.second->running.load()) - return true; + if (kv.second->running.load()) return true; return false; }