From 2c920efa2906cc441db0660f594dacc256d2fd17 Mon Sep 17 00:00:00 2001 From: cxh Date: Thu, 16 Oct 2025 11:15:16 +0800 Subject: [PATCH] 1 --- src/rtmp_manager.cpp | 130 ++++++++++++++++++++++++++++++++----------- 1 file changed, 97 insertions(+), 33 deletions(-) diff --git a/src/rtmp_manager.cpp b/src/rtmp_manager.cpp index 10f1e9a..d3cfd1f 100644 --- a/src/rtmp_manager.cpp +++ b/src/rtmp_manager.cpp @@ -195,56 +195,120 @@ void RTMPManager::stream_loop(Camera cam, StreamType type, std::promise(GST_MESSAGE_ERROR | GST_MESSAGE_EOS | GST_MESSAGE_STATE_CHANGED)); + + // 超时判断,等待首帧最多 5 秒 if (!first_frame_received && std::chrono::duration_cast(std::chrono::steady_clock::now() - start_time).count() > 5) { status.last_result = StreamResult::TIMEOUT; status.last_error = "No frames received within timeout"; - if (status_promise) status_promise->set_value(status); + if (status_promise) + { + // 非阻塞 set_value,避免重复触发异常 + try + { + status_promise->set_value(status); + } + catch (...) + { + } + status_promise = nullptr; + } break; } - GstMessage *msg = gst_bus_timed_pop_filtered( - bus, 100 * GST_MSECOND, - static_cast(GST_MESSAGE_ERROR | GST_MESSAGE_EOS | GST_MESSAGE_STATE_CHANGED)); - - if (!msg) continue; - - switch (GST_MESSAGE_TYPE(msg)) + if (msg) { - case GST_MESSAGE_STATE_CHANGED: + switch (GST_MESSAGE_TYPE(msg)) { - GstState old_state, new_state; - gst_message_parse_state_changed(msg, &old_state, &new_state, nullptr); - if (GST_MESSAGE_SRC(msg) == GST_OBJECT(pipeline) && new_state == GST_STATE_PLAYING && - !first_frame_received) + case GST_MESSAGE_STATE_CHANGED: { - first_frame_received = true; - status.running = true; - status.last_result = StreamResult::OK; - status.last_error.clear(); - if (status_promise) status_promise->set_value(status); + GstState old_state, new_state; + gst_message_parse_state_changed(msg, &old_state, &new_state, nullptr); + if (GST_MESSAGE_SRC(msg) == GST_OBJECT(pipeline) && new_state == GST_STATE_PLAYING) + { + // PLAYING 不代表首帧到达,但可以用作初步标记 + } + break; + } + case GST_MESSAGE_ERROR: + case GST_MESSAGE_EOS: + { + status.running = false; + status.last_result = (GST_MESSAGE_TYPE(msg) == GST_MESSAGE_ERROR) ? StreamResult::CONNECTION_FAIL + : StreamResult::EOS_RECEIVED; + status.last_error = "GStreamer error/EOS"; + if (status_promise) + { + try + { + status_promise->set_value(status); + } + catch (...) + { + } + status_promise = nullptr; + } + goto cleanup; + } + } + gst_message_unref(msg); + } + + // 检查首帧是否到达,这里简单用 pipeline 的 preroll/playing 状态近似 + if (!first_frame_received) + { + GstPad *src_pad = gst_element_get_static_pad(pipeline, "src"); + if (src_pad && gst_pad_is_linked(src_pad)) + { + first_frame_received = true; + status.running = true; + status.last_result = StreamResult::OK; + status.last_error = ""; + if (status_promise) + { + try + { + status_promise->set_value(status); + } + catch (...) + { + } + status_promise = nullptr; + } + } + if (src_pad) gst_object_unref(src_pad); + } + + // 检查是否需要停止 + { + std::lock_guard lock(streams_mutex); + auto it = streams.find(key); + if (it == streams.end() || !it->second->running.load()) + { + status.running = false; + status.last_result = StreamResult::UNKNOWN; + status.last_error = "Stream stopped manually"; + if (status_promise) + { + try + { + status_promise->set_value(status); + } + catch (...) + { + } + status_promise = nullptr; } break; } - - case GST_MESSAGE_ERROR: - case GST_MESSAGE_EOS: - status.running = false; - status.last_result = (GST_MESSAGE_TYPE(msg) == GST_MESSAGE_ERROR) ? StreamResult::CONNECTION_FAIL - : StreamResult::EOS_RECEIVED; - status.last_error = "GStreamer error/EOS"; - if (status_promise) status_promise->set_value(status); - break; } - - gst_message_unref(msg); - - // 如果 running 被外部置 false,则退出 - if (!status.running) break; } +cleanup: gst_element_set_state(pipeline, GST_STATE_NULL); gst_object_unref(pipeline); }