From 4954ab200ebfafee6c264da79f2fb0126cac82fb Mon Sep 17 00:00:00 2001 From: cxh Date: Thu, 16 Oct 2025 11:21:18 +0800 Subject: [PATCH] 1 --- include/rtmp_manager.hpp | 3 +- src/rtmp_manager.cpp | 222 ++++++++++++++++++++------------------- 2 files changed, 116 insertions(+), 109 deletions(-) diff --git a/include/rtmp_manager.hpp b/include/rtmp_manager.hpp index f9d502b..b95e337 100644 --- a/include/rtmp_manager.hpp +++ b/include/rtmp_manager.hpp @@ -76,7 +76,8 @@ class RTMPManager static std::string make_stream_key(const std::string &cam_name, StreamType type); static GstElement *create_pipeline(const Camera &cam, StreamType type); - static void stream_loop(Camera cam, StreamType type, std::promise *status_promise); + static void stream_loop(Camera cam, StreamType type, StreamContext *ctx, + std::promise *status_promise); static void update_status(const std::string &key, const StreamStatus &status); diff --git a/src/rtmp_manager.cpp b/src/rtmp_manager.cpp index d3cfd1f..68c0c7a 100644 --- a/src/rtmp_manager.cpp +++ b/src/rtmp_manager.cpp @@ -86,8 +86,11 @@ RTMPManager::StreamResultInfo RTMPManager::start_camera(const Camera &cam, Strea res.url = get_stream_url(cam.name, type); std::string key = make_stream_key(cam.name, type); + std::promise status_promise; + auto status_future = status_promise.get_future(); + + std::unique_ptr ctx; - // 先检查是否已有有效流 { std::lock_guard lock(streams_mutex); auto it = streams.find(key); @@ -97,46 +100,29 @@ RTMPManager::StreamResultInfo RTMPManager::start_camera(const Camera &cam, Strea res.reason = "Already streaming"; return res; } + + ctx = std::make_unique(); + ctx->running.store(true); + + // 传入 StreamContext 指针 + StreamContext *ctx_ptr = ctx.get(); + ctx->thread = std::thread([cam, type, ctx_ptr, &status_promise]() + { RTMPManager::stream_loop(cam, type, ctx_ptr, &status_promise); }); + + streams.emplace(key, std::move(ctx)); } - // 创建上下文 - auto ctx = std::make_unique(); - ctx->running.store(true); - - // promise/future 用于等待首帧或失败状态 - std::promise status_promise; - auto status_future = status_promise.get_future(); - - ctx->thread = std::thread([cam, type, &status_promise]() { stream_loop(cam, type, &status_promise); }); - - // 阻塞等待线程返回首帧或失败,最多 7 秒 - StreamStatus status; + // 等待首帧确认或错误,最多等待 7 秒 if (status_future.wait_for(std::chrono::seconds(7)) == std::future_status::ready) { - status = status_future.get(); + StreamStatus status = status_future.get(); + res.result = status.running ? 0 : 1; + res.reason = status.last_error.empty() ? "Started OK" : status.last_error; } else { - status.running = false; - status.last_result = StreamResult::TIMEOUT; - status.last_error = "Timeout waiting for stream"; - } - - if (status.running) - { - // 首帧成功才放进 map - std::lock_guard lock(streams_mutex); - streams.emplace(key, std::move(ctx)); - res.result = 0; - res.reason = "Started OK"; - } - else - { - // 启动失败,清理线程 - ctx->running.store(false); - if (ctx->thread.joinable()) ctx->thread.join(); res.result = 1; - res.reason = status.last_error; + res.reason = "Timeout waiting for stream"; } return res; @@ -172,7 +158,8 @@ RTMPManager::StreamResultInfo RTMPManager::stop_camera(const std::string &cam_na return res; } -void RTMPManager::stream_loop(Camera cam, StreamType type, std::promise *status_promise) +void RTMPManager::stream_loop(Camera cam, StreamType type, StreamContext *ctx, + std::promise *status_promise) { std::string key = make_stream_key(cam.name, type); StreamStatus status; @@ -183,7 +170,16 @@ void RTMPManager::stream_loop(Camera cam, StreamType type, std::promiseset_value(status); + if (status_promise) + { + try + { + status_promise->set_value(status); + } + catch (...) + { + } + } return; } @@ -195,19 +191,15 @@ void RTMPManager::stream_loop(Camera cam, StreamType type, std::promise(GST_MESSAGE_ERROR | GST_MESSAGE_EOS | GST_MESSAGE_STATE_CHANGED)); - - // 超时判断,等待首帧最多 5 秒 + // 检查超时 if (!first_frame_received && std::chrono::duration_cast(std::chrono::steady_clock::now() - start_time).count() > 5) { + status.running = false; status.last_result = StreamResult::TIMEOUT; status.last_error = "No frames received within timeout"; if (status_promise) { - // 非阻塞 set_value,避免重复触发异常 try { status_promise->set_value(status); @@ -220,74 +212,10 @@ void RTMPManager::stream_loop(Camera cam, StreamType type, std::promiseset_value(status); - } - catch (...) - { - } - status_promise = nullptr; - } - goto cleanup; - } - } - gst_message_unref(msg); - } - - // 检查首帧是否到达,这里简单用 pipeline 的 preroll/playing 状态近似 - if (!first_frame_received) - { - GstPad *src_pad = gst_element_get_static_pad(pipeline, "src"); - if (src_pad && gst_pad_is_linked(src_pad)) - { - first_frame_received = true; - status.running = true; - status.last_result = StreamResult::OK; - status.last_error = ""; - if (status_promise) - { - try - { - status_promise->set_value(status); - } - catch (...) - { - } - status_promise = nullptr; - } - } - if (src_pad) gst_object_unref(src_pad); - } - - // 检查是否需要停止 + // 检查 stop 请求 { std::lock_guard lock(streams_mutex); - auto it = streams.find(key); - if (it == streams.end() || !it->second->running.load()) + if (!ctx->running.load()) { status.running = false; status.last_result = StreamResult::UNKNOWN; @@ -306,10 +234,88 @@ void RTMPManager::stream_loop(Camera cam, StreamType type, std::promise(GST_MESSAGE_ERROR | GST_MESSAGE_EOS | GST_MESSAGE_STATE_CHANGED)); + + if (!msg) continue; + + switch (GST_MESSAGE_TYPE(msg)) + { + case GST_MESSAGE_STATE_CHANGED: + { + GstState old_state, new_state; + gst_message_parse_state_changed(msg, &old_state, &new_state, nullptr); + if (GST_MESSAGE_SRC(msg) == GST_OBJECT(pipeline) && new_state == GST_STATE_PLAYING && + !first_frame_received) + { + first_frame_received = true; + status.running = true; + status.last_result = StreamResult::OK; + status.last_error = ""; + if (status_promise) + { + try + { + status_promise->set_value(status); + } + catch (...) + { + } + status_promise = nullptr; + } + } + break; + } + + case GST_MESSAGE_ERROR: + { + GError *err = nullptr; + gst_message_parse_error(msg, &err, nullptr); + status.running = false; + status.last_result = StreamResult::CONNECTION_FAIL; + status.last_error = err ? err->message : "GStreamer error"; + if (err) g_error_free(err); + if (status_promise) + { + try + { + status_promise->set_value(status); + } + catch (...) + { + } + status_promise = nullptr; + } + break; + } + + case GST_MESSAGE_EOS: + { + status.running = false; + status.last_result = StreamResult::EOS_RECEIVED; + status.last_error = "EOS received"; + if (status_promise) + { + try + { + status_promise->set_value(status); + } + catch (...) + { + } + status_promise = nullptr; + } + break; + } + } + + gst_message_unref(msg); } -cleanup: gst_element_set_state(pipeline, GST_STATE_NULL); + gst_object_unref(bus); gst_object_unref(pipeline); }