diff --git a/include/rtmp_manager.hpp b/include/rtmp_manager.hpp index 6d09465..18c8a43 100644 --- a/include/rtmp_manager.hpp +++ b/include/rtmp_manager.hpp @@ -55,6 +55,12 @@ private: // movable? we avoid moving by using unique_ptr }; + // RTMPManager 内部增加 + std::mutex stop_queue_mutex; + std::condition_variable stop_cv; + std::queue> stop_queue; + std::atomic stop_thread_running{false}; + std::thread stop_thread; // store unique_ptr to avoid copy/move issues with atomic/thread static std::unordered_map> streams; static std::mutex streams_mutex; diff --git a/src/rtmp_manager.cpp b/src/rtmp_manager.cpp index a49d18d..086537c 100644 --- a/src/rtmp_manager.cpp +++ b/src/rtmp_manager.cpp @@ -26,6 +26,31 @@ void RTMPManager::init() { gst_init(nullptr, nullptr); LOG_INFO("[RTMP] GStreamer initialized."); + + // 启动 stop_worker_thread + stop_thread_running.store(true); + stop_thread = std::thread([] + { + while (stop_thread_running.load()) + { + std::unique_ptr ctx; + { + std::unique_lock lock(RTMPManager::stop_queue_mutex); + RTMPManager::stop_cv.wait(lock, []{ + return !RTMPManager::stop_queue.empty() || !RTMPManager::stop_thread_running.load(); + }); + if (!RTMPManager::stop_queue.empty()) + { + ctx = std::move(RTMPManager::stop_queue.front()); + RTMPManager::stop_queue.pop(); + } + } + if (ctx && ctx->thread.joinable()) + { + ctx->thread.join(); + LOG_INFO("[RTMP] Stream thread joined asynchronously: " + ctx->name); + } + } }); } void RTMPManager::set_status_callback(StreamCallback cb) @@ -229,8 +254,8 @@ void RTMPManager::start_camera(const Camera &cam, StreamType type) void RTMPManager::stop_camera(const std::string &cam_name, StreamType type) { std::string key = make_stream_key(cam_name, type); - std::unique_ptr taken_ctx; + { std::lock_guard lock(streams_mutex); auto it = streams.find(key); @@ -240,18 +265,23 @@ void RTMPManager::stop_camera(const std::string &cam_name, StreamType type) return; } - // mark it to stop and take ownership it->second->running.store(false); taken_ctx = std::move(it->second); streams.erase(it); } - // join outside lock - if (taken_ctx && taken_ctx->thread.joinable()) - taken_ctx->thread.join(); + // 异步加入 stop 队列,由 stop_worker_thread join + if (taken_ctx) + { + { + std::lock_guard lock(stop_queue_mutex); + stop_queue.push(std::move(taken_ctx)); + } + stop_cv.notify_one(); + } update_status(key, {false, StreamResult::OK, "Stopped manually"}); - LOG_INFO("[RTMP] stop_camera finished for " + key); + LOG_INFO("[RTMP] stop_camera queued for async stop: " + key); } void RTMPManager::stop_all()