From 93c71e1193b444fdaed0fb914e702b346fbfb152 Mon Sep 17 00:00:00 2001 From: cxh Date: Thu, 16 Oct 2025 12:53:41 +0800 Subject: [PATCH] 1 --- include/rtmp_manager.hpp | 9 ++- src/rtmp_manager.cpp | 166 +++++++++++++++------------------------ 2 files changed, 70 insertions(+), 105 deletions(-) diff --git a/include/rtmp_manager.hpp b/include/rtmp_manager.hpp index b95e337..7bb466a 100644 --- a/include/rtmp_manager.hpp +++ b/include/rtmp_manager.hpp @@ -66,8 +66,12 @@ class RTMPManager { std::atomic running{false}; std::thread thread; + + // 新增:把启动阶段的 promise 放到上下文里,避免悬垂引用 + std::promise start_promise; + std::atomic start_promise_set{false}; + StreamStatus status; - int retry_count{0}; StreamContext() = default; StreamContext(const StreamContext &) = delete; @@ -76,8 +80,7 @@ class RTMPManager static std::string make_stream_key(const std::string &cam_name, StreamType type); static GstElement *create_pipeline(const Camera &cam, StreamType type); - static void stream_loop(Camera cam, StreamType type, StreamContext *ctx, - std::promise *status_promise); + static void stream_loop(Camera cam, StreamType type, StreamContext *ctx); static void update_status(const std::string &key, const StreamStatus &status); diff --git a/src/rtmp_manager.cpp b/src/rtmp_manager.cpp index 68c0c7a..3e636cf 100644 --- a/src/rtmp_manager.cpp +++ b/src/rtmp_manager.cpp @@ -85,11 +85,10 @@ RTMPManager::StreamResultInfo RTMPManager::start_camera(const Camera &cam, Strea res.loc = get_camera_index(cam.name); res.url = get_stream_url(cam.name, type); - std::string key = make_stream_key(cam.name, type); - std::promise status_promise; - auto status_future = status_promise.get_future(); + const std::string key = make_stream_key(cam.name, type); std::unique_ptr ctx; + std::future status_future; { std::lock_guard lock(streams_mutex); @@ -104,20 +103,21 @@ RTMPManager::StreamResultInfo RTMPManager::start_camera(const Camera &cam, Strea ctx = std::make_unique(); ctx->running.store(true); - // 传入 StreamContext 指针 + // 重要:在把 ctx 放入 map 之前,先从 ctx->start_promise 拿到 future + status_future = ctx->start_promise.get_future(); + StreamContext *ctx_ptr = ctx.get(); - ctx->thread = std::thread([cam, type, ctx_ptr, &status_promise]() - { RTMPManager::stream_loop(cam, type, ctx_ptr, &status_promise); }); + ctx->thread = std::thread([cam, type, ctx_ptr]() { RTMPManager::stream_loop(cam, type, ctx_ptr); }); streams.emplace(key, std::move(ctx)); } - // 等待首帧确认或错误,最多等待 7 秒 + // 等待首帧或错误,最多 7 秒 if (status_future.wait_for(std::chrono::seconds(7)) == std::future_status::ready) { - StreamStatus status = status_future.get(); - res.result = status.running ? 0 : 1; - res.reason = status.last_error.empty() ? "Started OK" : status.last_error; + const StreamStatus s = status_future.get(); + res.result = s.running ? 0 : 1; + res.reason = s.running ? "Started OK" : (s.last_error.empty() ? "Failed to start" : s.last_error); } else { @@ -158,83 +158,68 @@ RTMPManager::StreamResultInfo RTMPManager::stop_camera(const std::string &cam_na return res; } -void RTMPManager::stream_loop(Camera cam, StreamType type, StreamContext *ctx, - std::promise *status_promise) +void RTMPManager::stream_loop(Camera cam, StreamType type, StreamContext *ctx) { - std::string key = make_stream_key(cam.name, type); + const std::string key = make_stream_key(cam.name, type); + + auto try_set_start = [&](const StreamStatus &st) + { + bool expected = false; + if (ctx && ctx->start_promise_set.compare_exchange_strong(expected, true)) + { + try + { + ctx->start_promise.set_value(st); + } + catch (...) + { + } + } + }; + StreamStatus status; status.running = false; + status.last_result = StreamResult::UNKNOWN; + status.last_error.clear(); GstElement *pipeline = create_pipeline(cam, type); if (!pipeline) { status.last_result = StreamResult::PIPELINE_ERROR; status.last_error = "Failed to create pipeline"; - if (status_promise) - { - try - { - status_promise->set_value(status); - } - catch (...) - { - } - } + try_set_start(status); return; } GstBus *bus = gst_element_get_bus(pipeline); gst_element_set_state(pipeline, GST_STATE_PLAYING); - bool first_frame_received = false; - auto start_time = std::chrono::steady_clock::now(); + bool first_frame = false; + const auto t0 = std::chrono::steady_clock::now(); while (true) { - // 检查超时 - if (!first_frame_received && - std::chrono::duration_cast(std::chrono::steady_clock::now() - start_time).count() > 5) + // 手动停止 + if (!ctx->running.load()) + { + status.running = false; + status.last_result = StreamResult::UNKNOWN; + status.last_error = "Stream stopped manually"; + try_set_start(status); // 若启动阶段还没返回,这里补一次 + break; + } + + // 首帧超时(5s) + if (!first_frame && + std::chrono::duration_cast(std::chrono::steady_clock::now() - t0).count() > 5) { status.running = false; status.last_result = StreamResult::TIMEOUT; status.last_error = "No frames received within timeout"; - if (status_promise) - { - try - { - status_promise->set_value(status); - } - catch (...) - { - } - status_promise = nullptr; - } + try_set_start(status); break; } - // 检查 stop 请求 - { - std::lock_guard lock(streams_mutex); - if (!ctx->running.load()) - { - status.running = false; - status.last_result = StreamResult::UNKNOWN; - status.last_error = "Stream stopped manually"; - if (status_promise) - { - try - { - status_promise->set_value(status); - } - catch (...) - { - } - status_promise = nullptr; - } - break; - } - } - GstMessage *msg = gst_bus_timed_pop_filtered( bus, 100 * GST_MSECOND, static_cast(GST_MESSAGE_ERROR | GST_MESSAGE_EOS | GST_MESSAGE_STATE_CHANGED)); @@ -247,28 +232,16 @@ void RTMPManager::stream_loop(Camera cam, StreamType type, StreamContext *ctx, { GstState old_state, new_state; gst_message_parse_state_changed(msg, &old_state, &new_state, nullptr); - if (GST_MESSAGE_SRC(msg) == GST_OBJECT(pipeline) && new_state == GST_STATE_PLAYING && - !first_frame_received) + if (GST_MESSAGE_SRC(msg) == GST_OBJECT(pipeline) && new_state == GST_STATE_PLAYING && !first_frame) { - first_frame_received = true; + first_frame = true; status.running = true; status.last_result = StreamResult::OK; - status.last_error = ""; - if (status_promise) - { - try - { - status_promise->set_value(status); - } - catch (...) - { - } - status_promise = nullptr; - } + status.last_error.clear(); + try_set_start(status); // 首帧成功:通知 start_camera } break; } - case GST_MESSAGE_ERROR: { GError *err = nullptr; @@ -277,46 +250,35 @@ void RTMPManager::stream_loop(Camera cam, StreamType type, StreamContext *ctx, status.last_result = StreamResult::CONNECTION_FAIL; status.last_error = err ? err->message : "GStreamer error"; if (err) g_error_free(err); - if (status_promise) - { - try - { - status_promise->set_value(status); - } - catch (...) - { - } - status_promise = nullptr; - } + try_set_start(status); break; } - case GST_MESSAGE_EOS: { status.running = false; status.last_result = StreamResult::EOS_RECEIVED; status.last_error = "EOS received"; - if (status_promise) - { - try - { - status_promise->set_value(status); - } - catch (...) - { - } - status_promise = nullptr; - } + try_set_start(status); break; } + default: + break; } gst_message_unref(msg); + + // 错误/EOS 后退出循环 + if (status.last_result == StreamResult::CONNECTION_FAIL || status.last_result == StreamResult::EOS_RECEIVED) + { + break; + } } gst_element_set_state(pipeline, GST_STATE_NULL); - gst_object_unref(bus); + if (bus) gst_object_unref(bus); gst_object_unref(pipeline); + + // 线程收尾:这里不要删 map 里的 ctx,stop_camera 已经负责转移并 join } void RTMPManager::stop_all()