diff --git a/include/rtmp_manager.hpp b/include/rtmp_manager.hpp index a894508..ebc7a7e 100644 --- a/include/rtmp_manager.hpp +++ b/include/rtmp_manager.hpp @@ -63,9 +63,11 @@ class RTMPManager struct StreamContext { std::atomic running{false}; + std::atomic start_result_set{false}; // 新增:保证 promise 只 set 一次 std::thread thread; std::promise start_result; StreamStatus status; + StreamContext() = default; StreamContext(const StreamContext &) = delete; StreamContext &operator=(const StreamContext &) = delete; diff --git a/src/rtmp_manager.cpp b/src/rtmp_manager.cpp index eb7e0c6..57f5dfd 100644 --- a/src/rtmp_manager.cpp +++ b/src/rtmp_manager.cpp @@ -103,10 +103,13 @@ RTMPManager::StreamResultInfo RTMPManager::start_camera(const Camera &cam, Strea streams[key] = std::move(ctx); } + // future 用于等待 first frame 或初始化结果 std::future fut = ctx_ptr->start_result.get_future(); + // 启动线程 ctx_ptr->thread = std::thread([cam, type, ctx_ptr]() { RTMPManager::stream_loop(cam, type, ctx_ptr); }); + // 等待初始化完成(收到第一帧或错误) if (fut.wait_for(std::chrono::seconds(10)) == std::future_status::ready) { res = fut.get(); @@ -115,6 +118,10 @@ RTMPManager::StreamResultInfo RTMPManager::start_camera(const Camera &cam, Strea { res.result = 1; res.reason = "Start timeout"; + + // 超时后安全地停止线程 + ctx_ptr->running.store(false); + if (ctx_ptr->thread.joinable()) ctx_ptr->thread.join(); } return res; @@ -129,6 +136,7 @@ RTMPManager::StreamResultInfo RTMPManager::stop_camera(const std::string &cam_na std::unique_ptr ctx; std::string key = make_stream_key(cam_name, type); + { std::lock_guard lock(streams_mutex); auto it = streams.find(key); @@ -136,15 +144,24 @@ RTMPManager::StreamResultInfo RTMPManager::stop_camera(const std::string &cam_na { res.result = 0; res.reason = "Not streaming"; + LOG_INFO("[RTMP] stop_camera: " + key + " is not streaming"); return res; } + // 设置停止标志,stream_loop 会退出 it->second->running.store(false); + + // 转移所有权,map 里移除 ctx = std::move(it->second); streams.erase(it); } - if (ctx->thread.joinable()) ctx->thread.join(); + // 等待线程退出 + if (ctx && ctx->thread.joinable()) + { + ctx->thread.join(); + LOG_INFO("[RTMP] stop_camera: " + key + " stopped"); + } res.result = 0; res.reason = "Stopped manually"; @@ -167,7 +184,7 @@ void RTMPManager::stream_loop(Camera cam, StreamType type, StreamContext *ctx) { res.result = 1; res.reason = "Failed to create pipeline"; - ctx->start_result.set_value(res); + if (ctx->start_result_set.compare_exchange_strong(expected, true)) ctx->start_result.set_value(res); return; } @@ -198,7 +215,8 @@ void RTMPManager::stream_loop(Camera cam, StreamType type, StreamContext *ctx) { res.result = 1; res.reason = "No frames received within timeout"; - ctx->start_result.set_value(res); + bool expected = false; + if (ctx->start_result_set.compare_exchange_strong(expected, true)) ctx->start_result.set_value(res); break; } } @@ -215,7 +233,8 @@ void RTMPManager::stream_loop(Camera cam, StreamType type, StreamContext *ctx) std::string err_msg = err ? err->message : "Unknown GStreamer error"; res.result = 1; res.reason = "Pipeline error: " + err_msg; - ctx->start_result.set_value(res); + bool expected = false; + if (ctx->start_result_set.compare_exchange_strong(expected, true)) ctx->start_result.set_value(res); if (err) g_error_free(err); if (debug) g_free(debug); break; @@ -223,7 +242,10 @@ void RTMPManager::stream_loop(Camera cam, StreamType type, StreamContext *ctx) case GST_MESSAGE_EOS: res.result = 1; res.reason = "EOS received"; - ctx->start_result.set_value(res); + { + bool expected = false; + if (ctx->start_result_set.compare_exchange_strong(expected, true)) ctx->start_result.set_value(res); + } break; case GST_MESSAGE_STATE_CHANGED: // pipeline 播放状态变化,不算成功 @@ -234,7 +256,8 @@ void RTMPManager::stream_loop(Camera cam, StreamType type, StreamContext *ctx) first_frame_received = true; res.result = 0; res.reason = "First frame received, started OK"; - ctx->start_result.set_value(res); + bool expected = false; + if (ctx->start_result_set.compare_exchange_strong(expected, true)) ctx->start_result.set_value(res); } break; default: @@ -275,15 +298,29 @@ bool RTMPManager::is_any_streaming() void RTMPManager::stop_all() { std::vector> ctxs; + { std::lock_guard lock(streams_mutex); - for (auto &kv : streams) kv.second->running.store(false); - for (auto &kv : streams) ctxs.push_back(std::move(kv.second)); + + // 设置所有流停止标志,并收集上下文 + for (auto &kv : streams) + { + kv.second->running.store(false); + ctxs.push_back(std::move(kv.second)); + LOG_INFO("[RTMP] stop_all: stopping stream " + kv.first); + } + streams.clear(); } + // 等待所有线程退出 for (auto &ctx : ctxs) - if (ctx->thread.joinable()) ctx->thread.join(); + { + if (ctx && ctx->thread.joinable()) + { + ctx->thread.join(); + } + } LOG_INFO("[RTMP] stop_all completed."); }