diff --git a/include/rtmp_manager.hpp b/include/rtmp_manager.hpp index 8ef1372..eec1605 100644 --- a/include/rtmp_manager.hpp +++ b/include/rtmp_manager.hpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -63,6 +64,7 @@ class RTMPManager { std::atomic running{false}; std::thread thread; + std::promise start_result; StreamStatus status; StreamContext() = default; StreamContext(const StreamContext &) = delete; diff --git a/src/rtmp_manager.cpp b/src/rtmp_manager.cpp index ddd18ec..060a59f 100644 --- a/src/rtmp_manager.cpp +++ b/src/rtmp_manager.cpp @@ -86,6 +86,8 @@ RTMPManager::StreamResultInfo RTMPManager::start_camera(const Camera &cam, Strea res.url = get_stream_url(cam.name, type); std::string key = make_stream_key(cam.name, type); + std::shared_ptr ctx = std::make_shared(); + { std::lock_guard lock(streams_mutex); auto it = streams.find(key); @@ -96,14 +98,29 @@ RTMPManager::StreamResultInfo RTMPManager::start_camera(const Camera &cam, Strea return res; } - auto ctx = std::make_unique(); ctx->running.store(true); - ctx->thread = std::thread([cam, type]() { stream_loop(cam, type); }); - streams.emplace(key, std::move(ctx)); + streams[key] = ctx; + } + + std::future fut = ctx->start_result.get_future(); + + ctx->thread = std::thread( + [this, cam, type, ctx]() + { + stream_loop(cam, type, ctx); // stream_loop 接收 StreamContext + }); + + // 等待 pipeline 初始化完成,最多等待 5 秒(可自定义) + if (fut.wait_for(std::chrono::seconds(5)) == std::future_status::ready) + { + res = fut.get(); + } + else + { + res.result = 1; + res.reason = "Start timeout"; } - res.result = 0; - res.reason = "Started OK"; return res; } @@ -174,123 +191,136 @@ std::string RTMPManager::get_stream_url(const std::string &cam_name, StreamType return "rtmp://127.0.0.1/live/" + make_stream_key(cam_name, type); } -void RTMPManager::stream_loop(Camera cam, StreamType type) +void RTMPManager::stream_loop(Camera cam, StreamType type, std::shared_ptr ctx) { + StreamResultInfo res; + res.loc = get_camera_index(cam.name); + res.url = get_stream_url(cam.name, type); + std::string key = make_stream_key(cam.name, type); LOG_INFO("[RTMP] Stream loop started for " + key); - const int FIRST_FRAME_TIMEOUT_SEC = 5; + GstElement *pipeline = create_pipeline(cam, type); + if (!pipeline) + { + res.result = 1; + res.reason = "Failed to create pipeline"; + ctx->start_result.set_value(res); + return; + } + + GstBus *bus = gst_element_get_bus(pipeline); + gst_element_set_state(pipeline, GST_STATE_PLAYING); + + bool first_frame_received = false; + auto start_time = std::chrono::steady_clock::now(); while (true) { + // 检查 stop_flag { std::lock_guard lock(streams_mutex); auto it = streams.find(key); if (it == streams.end() || !it->second->running.load()) break; } - GstElement *pipeline = create_pipeline(cam, type); - if (!pipeline) + GstMessage *msg = + gst_bus_timed_pop_filtered(bus, 100 * GST_MSECOND, + static_cast(GST_MESSAGE_ERROR | GST_MESSAGE_EOS | + GST_MESSAGE_STATE_CHANGED | GST_MESSAGE_ELEMENT)); + + // 超时未收到第一帧 + if (!first_frame_received) { - update_status(key, {false, StreamResult::PIPELINE_ERROR, "Failed to create pipeline"}); - break; + auto elapsed = std::chrono::steady_clock::now() - start_time; + if (std::chrono::duration_cast(elapsed).count() > FIRST_FRAME_TIMEOUT_SEC) + { + res.result = 1; + res.reason = "No frames received within timeout"; + ctx->start_result.set_value(res); // 返回失败 + break; + } } - GstBus *bus = gst_element_get_bus(pipeline); - gst_element_set_state(pipeline, GST_STATE_PLAYING); + if (!msg) continue; - bool first_frame_received = false; - bool stop_flag = false; - auto first_frame_start = std::chrono::steady_clock::now(); - - while (!stop_flag) + switch (GST_MESSAGE_TYPE(msg)) { - GstMessage *msg = gst_bus_timed_pop_filtered( - bus, 100 * GST_MSECOND, - static_cast(GST_MESSAGE_ERROR | GST_MESSAGE_EOS | GST_MESSAGE_STATE_CHANGED)); - + case GST_MESSAGE_ERROR: { - std::lock_guard lock(streams_mutex); - auto it = streams.find(key); - if (it == streams.end() || !it->second->running.load()) - { - stop_flag = true; - break; - } + GError *err = nullptr; + gchar *debug = nullptr; + gst_message_parse_error(msg, &err, &debug); + std::string err_msg = err ? err->message : "Unknown GStreamer error"; + res.result = 1; + res.reason = "Pipeline error: " + err_msg; + ctx->start_result.set_value(res); + if (err) g_error_free(err); + if (debug) g_free(debug); + break; } - - if (!first_frame_received) + case GST_MESSAGE_EOS: + res.result = 1; + res.reason = "EOS received"; + ctx->start_result.set_value(res); + break; + case GST_MESSAGE_STATE_CHANGED: { - auto elapsed = std::chrono::steady_clock::now() - first_frame_start; - if (std::chrono::duration_cast(elapsed).count() > FIRST_FRAME_TIMEOUT_SEC) + GstState old_state, new_state; + gst_message_parse_state_changed(msg, &old_state, &new_state, nullptr); + if (GST_MESSAGE_SRC(msg) == GST_OBJECT(pipeline) && new_state == GST_STATE_PLAYING && + !first_frame_received) { - update_status(key, {false, StreamResult::TIMEOUT, "No frames received within timeout"}); - stop_flag = true; + // 这里仅表示 pipeline 播放了,不算真正成功 } + break; } - - if (!msg) continue; - - switch (GST_MESSAGE_TYPE(msg)) - { - case GST_MESSAGE_ERROR: + case GST_MESSAGE_ELEMENT: + // 这里可以通过 caps 或其他方式检测到第一帧到达 + if (!first_frame_received) { - GError *err = nullptr; - gchar *debug = nullptr; - gst_message_parse_error(msg, &err, &debug); - std::string err_msg = err ? err->message : "Unknown GStreamer error"; - update_status(key, {false, StreamResult::CONNECTION_FAIL, err_msg}); - if (err) g_error_free(err); - if (debug) g_free(debug); - stop_flag = true; - break; + first_frame_received = true; + res.result = 0; + res.reason = "First frame received, started OK"; + ctx->start_result.set_value(res); // 返回成功 } - case GST_MESSAGE_EOS: - update_status(key, {false, StreamResult::EOS_RECEIVED, "EOS"}); - stop_flag = true; - break; - case GST_MESSAGE_STATE_CHANGED: - { - GstState old_state, new_state; - gst_message_parse_state_changed(msg, &old_state, &new_state, nullptr); - if (GST_MESSAGE_SRC(msg) == GST_OBJECT(pipeline) && new_state == GST_STATE_PLAYING && - !first_frame_received) - { - first_frame_received = true; - update_status(key, {true, StreamResult::OK, ""}); - } - break; - } - default: - break; - } - - gst_message_unref(msg); + break; + default: + break; } - gst_element_set_state(pipeline, GST_STATE_NULL); - if (bus) gst_object_unref(bus); - gst_object_unref(pipeline); - - if (!stop_flag) break; + gst_message_unref(msg); } + gst_element_set_state(pipeline, GST_STATE_NULL); + if (bus) gst_object_unref(bus); + gst_object_unref(pipeline); + + // 清理 streams { std::lock_guard lock(streams_mutex); - auto it = streams.find(key); - if (it != streams.end()) - { - try - { - if (it->second && it->second->thread.get_id() == std::this_thread::get_id()) streams.erase(it); - } - catch (...) - { - streams.erase(it); - } - } + streams.erase(key); } - update_status(key, {false, StreamResult::UNKNOWN, "Stream loop exited"}); LOG_INFO("[RTMP] Stream loop ended for " + key); } + +{ + std::lock_guard lock(streams_mutex); + auto it = streams.find(key); + if (it != streams.end()) + { + try + { + if (it->second && it->second->thread.get_id() == std::this_thread::get_id()) streams.erase(it); + } + catch (...) + { + streams.erase(it); + } + } +} + +update_status(key, {false, StreamResult::UNKNOWN, "Stream loop exited"}); +LOG_INFO("[RTMP] Stream loop ended for " + key); +}