From 2b910366dfebada2dbb29864f2e41db5df00650a Mon Sep 17 00:00:00 2001 From: cxh Date: Thu, 16 Oct 2025 10:52:51 +0800 Subject: [PATCH] ! --- src/rtmp_manager.cpp | 214 +++++++++++++++++++------------------------ 1 file changed, 93 insertions(+), 121 deletions(-) diff --git a/src/rtmp_manager.cpp b/src/rtmp_manager.cpp index 0c9c730..97c85d4 100644 --- a/src/rtmp_manager.cpp +++ b/src/rtmp_manager.cpp @@ -2,6 +2,7 @@ #include "rtmp_manager.hpp" #include +#include #include #include @@ -85,6 +86,9 @@ RTMPManager::StreamResultInfo RTMPManager::start_camera(const Camera &cam, Strea res.url = get_stream_url(cam.name, type); std::string key = make_stream_key(cam.name, type); + std::promise status_promise; + auto status_future = status_promise.get_future(); + { std::lock_guard lock(streams_mutex); auto it = streams.find(key); @@ -97,12 +101,24 @@ RTMPManager::StreamResultInfo RTMPManager::start_camera(const Camera &cam, Strea auto ctx = std::make_unique(); ctx->running.store(true); - ctx->thread = std::thread([cam, type]() { stream_loop(cam, type); }); + // 把 promise 移进线程上下文 + ctx->thread = std::thread([cam, type, &status_promise]() { stream_loop(cam, type, &status_promise); }); streams.emplace(key, std::move(ctx)); } - res.result = 0; - res.reason = "Started OK"; + // 阻塞等待线程内部更新状态,最多等 5 秒 + if (status_future.wait_for(std::chrono::seconds(7)) == std::future_status::ready) + { + StreamStatus status = status_future.get(); + res.result = status.running ? 0 : 1; + res.reason = status.last_error.empty() ? "Started OK" : status.last_error; + } + else + { + res.result = 1; + res.reason = "Timeout waiting for stream"; + } + return res; } @@ -136,6 +152,80 @@ RTMPManager::StreamResultInfo RTMPManager::stop_camera(const std::string &cam_na return res; } +void RTMPManager::stream_loop(Camera cam, StreamType type, std::promise *status_promise) +{ + std::string key = make_stream_key(cam.name, type); + StreamStatus status; + status.running = false; + + GstElement *pipeline = create_pipeline(cam, type); + if (!pipeline) + { + status.last_result = StreamResult::PIPELINE_ERROR; + status.last_error = "Failed to create pipeline"; + if (status_promise) status_promise->set_value(status); + return; + } + + GstBus *bus = gst_element_get_bus(pipeline); + gst_element_set_state(pipeline, GST_STATE_PLAYING); + + bool first_frame_received = false; + auto start_time = std::chrono::steady_clock::now(); + + while (true) + { + // 检查超时 + if (!first_frame_received && + std::chrono::duration_cast(std::chrono::steady_clock::now() - start_time).count() > 5) + { + status.last_result = StreamResult::TIMEOUT; + status.last_error = "No frames received within timeout"; + if (status_promise) status_promise->set_value(status); + break; + } + + GstMessage *msg = gst_bus_timed_pop_filtered( + bus, 100 * GST_MSECOND, + static_cast(GST_MESSAGE_ERROR | GST_MESSAGE_EOS | GST_MESSAGE_STATE_CHANGED)); + + if (!msg) continue; + + switch (GST_MESSAGE_TYPE(msg)) + { + case GST_MESSAGE_STATE_CHANGED: + { + GstState old_state, new_state; + gst_message_parse_state_changed(msg, &old_state, &new_state, nullptr); + if (GST_MESSAGE_SRC(msg) == GST_OBJECT(pipeline) && new_state == GST_STATE_PLAYING && + !first_frame_received) + { + first_frame_received = true; + status.running = true; + status.last_result = StreamResult::OK; + status.last_error = ""; + if (status_promise) status_promise->set_value(status); // 立刻返回给 start_camera + } + break; + } + case GST_MESSAGE_ERROR: + case GST_MESSAGE_EOS: + status.running = false; + status.last_result = (GST_MESSAGE_TYPE(msg) == GST_MESSAGE_ERROR) ? StreamResult::CONNECTION_FAIL + : StreamResult::EOS_RECEIVED; + status.last_error = "GStreamer error/EOS"; + if (status_promise) status_promise->set_value(status); + break; + } + + gst_message_unref(msg); + if (!running) break; // 停止逻辑 + } + + gst_element_set_state(pipeline, GST_STATE_NULL); + gst_object_unref(pipeline); +} + void RTMPManager::stop_all() { std::vector> ctxs; @@ -172,124 +262,6 @@ std::string RTMPManager::get_stream_url(const std::string &cam_name, StreamType return "rtmp://127.0.0.1/live/" + make_stream_key(cam_name, type); } -void RTMPManager::stream_loop(Camera cam, StreamType type) -{ - std::string key = make_stream_key(cam.name, type); - LOG_INFO("[RTMP] Stream loop started for " + key); - - int retries = 0; - - while (true) - { - { - std::lock_guard lock(streams_mutex); - auto it = streams.find(key); - if (it == streams.end() || !it->second->running.load()) break; - } - - GstElement *pipeline = create_pipeline(cam, type); - if (!pipeline) - { - update_status(key, {false, StreamResult::PIPELINE_ERROR, "Failed to create pipeline"}); - if (++retries <= MAX_RETRIES) - { - LOG_WARN("[RTMP] Retrying pipeline creation for " + key); - std::this_thread::sleep_for(std::chrono::milliseconds(RETRY_DELAY_MS)); - continue; - } - break; - } - - GstBus *bus = gst_element_get_bus(pipeline); - gst_element_set_state(pipeline, GST_STATE_PLAYING); - - bool first_frame_received = false; - bool stop_flag = false; - auto first_frame_start = std::chrono::steady_clock::now(); - - while (!stop_flag) - { - GstMessage *msg = gst_bus_timed_pop_filtered( - bus, 100 * GST_MSECOND, - static_cast(GST_MESSAGE_ERROR | GST_MESSAGE_EOS | GST_MESSAGE_STATE_CHANGED)); - - { - std::lock_guard lock(streams_mutex); - auto it = streams.find(key); - if (it == streams.end() || !it->second->running.load()) - { - stop_flag = true; - break; - } - } - - if (!first_frame_received) - { - auto elapsed = std::chrono::steady_clock::now() - first_frame_start; - if (std::chrono::duration_cast(elapsed).count() > 5) - { - update_status(key, {false, StreamResult::TIMEOUT, "No frames received"}); - stop_flag = true; - } - } - - if (!msg) continue; - - switch (GST_MESSAGE_TYPE(msg)) - { - case GST_MESSAGE_ERROR: - { - GError *err = nullptr; - gst_message_parse_error(msg, &err, nullptr); - update_status(key, {false, StreamResult::CONNECTION_FAIL, err ? err->message : "Unknown error"}); - if (err) g_error_free(err); - stop_flag = true; - break; - } - case GST_MESSAGE_EOS: - update_status(key, {false, StreamResult::EOS_RECEIVED, "EOS"}); - stop_flag = true; - break; - case GST_MESSAGE_STATE_CHANGED: - { - GstState old_state, new_state; - gst_message_parse_state_changed(msg, &old_state, &new_state, nullptr); - if (GST_MESSAGE_SRC(msg) == GST_OBJECT(pipeline) && new_state == GST_STATE_PLAYING && - !first_frame_received) - { - first_frame_received = true; - update_status(key, {true, StreamResult::OK, ""}); - retries = 0; // 成功后重置重试计数 - } - break; - } - default: - break; - } - - gst_message_unref(msg); - } - - gst_element_set_state(pipeline, GST_STATE_NULL); - if (bus) gst_object_unref(bus); - gst_object_unref(pipeline); - - // 自动重试逻辑 - if (stop_flag && retries < MAX_RETRIES) - { - retries++; - LOG_WARN("[RTMP] Stream " + key + " failed, retrying... (" + std::to_string(retries) + ")"); - std::this_thread::sleep_for(std::chrono::milliseconds(RETRY_DELAY_MS)); - continue; - } - - break; - } - - update_status(key, {false, StreamResult::UNKNOWN, "Stream loop exited"}); - LOG_INFO("[RTMP] Stream loop ended for " + key); -} - std::vector RTMPManager::process_push_request(const VideoPushRequest &req) { std::vector results;