From cb37ca9eb3b86fee1efef744f28a9e0f0b8daf1c Mon Sep 17 00:00:00 2001 From: cxh Date: Wed, 15 Oct 2025 10:18:02 +0800 Subject: [PATCH] temp --- include/rtmp_manager.hpp | 17 ++---- src/rtmp_manager.cpp | 124 ++++++++------------------------------- 2 files changed, 29 insertions(+), 112 deletions(-) diff --git a/include/rtmp_manager.hpp b/include/rtmp_manager.hpp index 99be22b..69137b7 100644 --- a/include/rtmp_manager.hpp +++ b/include/rtmp_manager.hpp @@ -9,8 +9,7 @@ #include #include #include -#include -#include +#include #include "app_config.hpp" class RTMPManager @@ -41,7 +40,6 @@ public: static bool is_streaming(const std::string &cam_name, StreamType type); static bool is_any_streaming(); static std::string get_stream_url(const std::string &cam_name, StreamType type); - static void set_status_callback(StreamCallback cb); private: @@ -51,19 +49,10 @@ private: std::thread thread; StreamStatus status; StreamContext() = default; - // non-copyable StreamContext(const StreamContext &) = delete; StreamContext &operator=(const StreamContext &) = delete; - // movable? we avoid moving by using unique_ptr }; - // RTMPManager 内部增加 - static std::mutex stop_queue_mutex; - static std::condition_variable stop_cv; - static std::queue> stop_queue; - static std::atomic stop_thread_running; - static std::thread stop_thread; - // store unique_ptr to avoid copy/move issues with atomic/thread static std::unordered_map> streams; static std::mutex streams_mutex; static StreamCallback status_callback; @@ -72,4 +61,8 @@ private: static GstElement *create_pipeline(const Camera &cam, StreamType type); static std::string make_stream_key(const std::string &cam_name, StreamType type); static void update_status(const std::string &key, const StreamStatus &status); + static inline std::string stream_type_suffix(StreamType type) + { + return (type == StreamType::MAIN) ? "_main" : "_sub"; + } }; diff --git a/src/rtmp_manager.cpp b/src/rtmp_manager.cpp index e4f958a..745daf8 100644 --- a/src/rtmp_manager.cpp +++ b/src/rtmp_manager.cpp @@ -4,24 +4,12 @@ #include #include #include -#include -#include // for move -#include +#include -std::mutex RTMPManager::stop_queue_mutex; -std::condition_variable RTMPManager::stop_cv; -std::queue> RTMPManager::stop_queue; -std::atomic RTMPManager::stop_thread_running{false}; -std::thread RTMPManager::stop_thread; std::unordered_map> RTMPManager::streams; std::mutex RTMPManager::streams_mutex; RTMPManager::StreamCallback RTMPManager::status_callback = nullptr; -static inline std::string stream_type_suffix(StreamType type) -{ - return (type == StreamType::MAIN) ? "_main" : "_sub"; -} - std::string RTMPManager::make_stream_key(const std::string &cam_name, StreamType type) { return cam_name + stream_type_suffix(type); @@ -31,31 +19,6 @@ void RTMPManager::init() { gst_init(nullptr, nullptr); LOG_INFO("[RTMP] GStreamer initialized."); - - // 启动 stop_worker_thread - stop_thread_running.store(true); - stop_thread = std::thread([] - { - while (stop_thread_running.load()) - { - std::unique_ptr ctx; - { - std::unique_lock lock(RTMPManager::stop_queue_mutex); - RTMPManager::stop_cv.wait(lock, []{ - return !RTMPManager::stop_queue.empty() || !RTMPManager::stop_thread_running.load(); - }); - if (!RTMPManager::stop_queue.empty()) - { - ctx = std::move(RTMPManager::stop_queue.front()); - RTMPManager::stop_queue.pop(); - } - } - if (ctx && ctx->thread.joinable()) - { - ctx->thread.join(); - LOG_INFO("[RTMP] Stream thread joined asynchronously"); - } - } }); } void RTMPManager::set_status_callback(StreamCallback cb) @@ -77,10 +40,7 @@ void RTMPManager::update_status(const std::string &key, const StreamStatus &stat GstElement *RTMPManager::create_pipeline(const Camera &cam, StreamType type) { - int width = cam.width; - int height = cam.height; - int fps = cam.fps; - int bitrate = cam.bitrate; + int width = cam.width, height = cam.height, fps = cam.fps, bitrate = cam.bitrate; if (type == StreamType::SUB) { @@ -119,7 +79,7 @@ GstElement *RTMPManager::create_pipeline(const Camera &cam, StreamType type) void RTMPManager::stream_loop(Camera cam, StreamType type) { - const std::string key = make_stream_key(cam.name, type); + std::string key = make_stream_key(cam.name, type); LOG_INFO("[RTMP] Stream loop started for " + key); const int MAX_RETRIES = 5; @@ -154,9 +114,8 @@ void RTMPManager::stream_loop(Camera cam, StreamType type) while (true) { - msg = gst_bus_timed_pop_filtered( - bus, GST_CLOCK_TIME_NONE, - static_cast(GST_MESSAGE_ERROR | GST_MESSAGE_EOS)); + msg = gst_bus_timed_pop_filtered(bus, GST_CLOCK_TIME_NONE, + static_cast(GST_MESSAGE_ERROR | GST_MESSAGE_EOS)); { std::lock_guard lock(streams_mutex); @@ -222,6 +181,12 @@ void RTMPManager::stream_loop(Camera cam, StreamType type) void RTMPManager::start_camera(const Camera &cam, StreamType type) { std::string key = make_stream_key(cam.name, type); + auto ctx = std::make_unique(); + ctx->running.store(true); + ctx->status = {true, StreamResult::OK, ""}; + ctx->thread = std::thread([cam, type]() + { stream_loop(cam, type); }); + { std::lock_guard lock(streams_mutex); auto it = streams.find(key); @@ -230,36 +195,16 @@ void RTMPManager::start_camera(const Camera &cam, StreamType type) LOG_WARN("[RTMP] " + key + " already streaming."); return; } - - // create a new context on heap and keep a unique_ptr in map - auto ctx = std::make_unique(); - ctx->running.store(true); - ctx->status = {true, StreamResult::OK, ""}; - - // start thread after moving unique_ptr into map (to avoid racing) - // insert placeholder first to reserve key, then start thread streams.emplace(key, std::move(ctx)); } - // actually start thread outside the lock (we need to get pointer again) - { - std::lock_guard lock(streams_mutex); - auto it = streams.find(key); - if (it != streams.end()) - { - // start the thread that runs stream_loop; capture cam and type by value - it->second->thread = std::thread([cam, type]() - { stream_loop(cam, type); }); - } - } - LOG_INFO("[RTMP] start_camera requested for " + key); } void RTMPManager::stop_camera(const std::string &cam_name, StreamType type) { std::string key = make_stream_key(cam_name, type); - std::unique_ptr taken_ctx; + std::unique_ptr ctx; { std::lock_guard lock(streams_mutex); @@ -269,54 +214,34 @@ void RTMPManager::stop_camera(const std::string &cam_name, StreamType type) LOG_WARN("[RTMP] " + key + " not found."); return; } - it->second->running.store(false); - taken_ctx = std::move(it->second); + ctx = std::move(it->second); streams.erase(it); } - // 异步加入 stop 队列,由 stop_worker_thread join - if (taken_ctx) - { - { - std::lock_guard lock(stop_queue_mutex); - stop_queue.push(std::move(taken_ctx)); - } - stop_cv.notify_one(); - } + if (ctx->thread.joinable()) + ctx->thread.join(); update_status(key, {false, StreamResult::OK, "Stopped manually"}); - LOG_INFO("[RTMP] stop_camera queued for async stop: " + key); + LOG_INFO("[RTMP] stop_camera completed: " + key); } void RTMPManager::stop_all() { - std::vector> taken; - std::vector keys; + std::vector> ctxs; { std::lock_guard lock(streams_mutex); for (auto &kv : streams) - { - keys.push_back(kv.first); kv.second->running.store(false); - } - // move all contexts out - for (auto &k : keys) - { - auto it = streams.find(k); - if (it != streams.end()) - taken.push_back(std::move(it->second)); - streams.erase(k); - } + for (auto &kv : streams) + ctxs.push_back(std::move(kv.second)); + streams.clear(); } - // join threads - for (size_t i = 0; i < taken.size(); ++i) - { - if (taken[i] && taken[i]->thread.joinable()) - taken[i]->thread.join(); - } + for (auto &ctx : ctxs) + if (ctx->thread.joinable()) + ctx->thread.join(); LOG_INFO("[RTMP] stop_all completed."); } @@ -324,8 +249,7 @@ void RTMPManager::stop_all() bool RTMPManager::is_streaming(const std::string &cam_name, StreamType type) { std::lock_guard lock(streams_mutex); - auto key = make_stream_key(cam_name, type); - auto it = streams.find(key); + auto it = streams.find(make_stream_key(cam_name, type)); return it != streams.end() && it->second->running.load(); }