This commit is contained in:
cxh 2025-10-15 10:02:01 +08:00
parent 3c4ba67d27
commit 0e3f2b94d5
2 changed files with 42 additions and 6 deletions

View File

@ -55,6 +55,12 @@ private:
// movable? we avoid moving by using unique_ptr // movable? we avoid moving by using unique_ptr
}; };
// RTMPManager 内部增加
std::mutex stop_queue_mutex;
std::condition_variable stop_cv;
std::queue<std::unique_ptr<StreamContext>> stop_queue;
std::atomic<bool> stop_thread_running{false};
std::thread stop_thread;
// store unique_ptr to avoid copy/move issues with atomic/thread // store unique_ptr to avoid copy/move issues with atomic/thread
static std::unordered_map<std::string, std::unique_ptr<StreamContext>> streams; static std::unordered_map<std::string, std::unique_ptr<StreamContext>> streams;
static std::mutex streams_mutex; static std::mutex streams_mutex;

View File

@ -26,6 +26,31 @@ void RTMPManager::init()
{ {
gst_init(nullptr, nullptr); gst_init(nullptr, nullptr);
LOG_INFO("[RTMP] GStreamer initialized."); LOG_INFO("[RTMP] GStreamer initialized.");
// 启动 stop_worker_thread
stop_thread_running.store(true);
stop_thread = std::thread([]
{
while (stop_thread_running.load())
{
std::unique_ptr<StreamContext> ctx;
{
std::unique_lock<std::mutex> lock(RTMPManager::stop_queue_mutex);
RTMPManager::stop_cv.wait(lock, []{
return !RTMPManager::stop_queue.empty() || !RTMPManager::stop_thread_running.load();
});
if (!RTMPManager::stop_queue.empty())
{
ctx = std::move(RTMPManager::stop_queue.front());
RTMPManager::stop_queue.pop();
}
}
if (ctx && ctx->thread.joinable())
{
ctx->thread.join();
LOG_INFO("[RTMP] Stream thread joined asynchronously: " + ctx->name);
}
} });
} }
void RTMPManager::set_status_callback(StreamCallback cb) void RTMPManager::set_status_callback(StreamCallback cb)
@ -229,8 +254,8 @@ void RTMPManager::start_camera(const Camera &cam, StreamType type)
void RTMPManager::stop_camera(const std::string &cam_name, StreamType type) void RTMPManager::stop_camera(const std::string &cam_name, StreamType type)
{ {
std::string key = make_stream_key(cam_name, type); std::string key = make_stream_key(cam_name, type);
std::unique_ptr<StreamContext> taken_ctx; std::unique_ptr<StreamContext> taken_ctx;
{ {
std::lock_guard<std::mutex> lock(streams_mutex); std::lock_guard<std::mutex> lock(streams_mutex);
auto it = streams.find(key); auto it = streams.find(key);
@ -240,18 +265,23 @@ void RTMPManager::stop_camera(const std::string &cam_name, StreamType type)
return; return;
} }
// mark it to stop and take ownership
it->second->running.store(false); it->second->running.store(false);
taken_ctx = std::move(it->second); taken_ctx = std::move(it->second);
streams.erase(it); streams.erase(it);
} }
// join outside lock // 异步加入 stop 队列,由 stop_worker_thread join
if (taken_ctx && taken_ctx->thread.joinable()) if (taken_ctx)
taken_ctx->thread.join(); {
{
std::lock_guard<std::mutex> lock(stop_queue_mutex);
stop_queue.push(std::move(taken_ctx));
}
stop_cv.notify_one();
}
update_status(key, {false, StreamResult::OK, "Stopped manually"}); update_status(key, {false, StreamResult::OK, "Stopped manually"});
LOG_INFO("[RTMP] stop_camera finished for " + key); LOG_INFO("[RTMP] stop_camera queued for async stop: " + key);
} }
void RTMPManager::stop_all() void RTMPManager::stop_all()