This commit is contained in:
cxh 2025-10-16 11:15:16 +08:00
parent b162d1ff4d
commit 2c920efa29

View File

@ -195,56 +195,120 @@ void RTMPManager::stream_loop(Camera cam, StreamType type, std::promise<StreamSt
while (true) while (true)
{ {
// 超时判断 GstMessage *msg = gst_bus_timed_pop_filtered(
bus, 100 * GST_MSECOND,
static_cast<GstMessageType>(GST_MESSAGE_ERROR | GST_MESSAGE_EOS | GST_MESSAGE_STATE_CHANGED));
// 超时判断,等待首帧最多 5 秒
if (!first_frame_received && if (!first_frame_received &&
std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now() - start_time).count() > 5) std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now() - start_time).count() > 5)
{ {
status.last_result = StreamResult::TIMEOUT; status.last_result = StreamResult::TIMEOUT;
status.last_error = "No frames received within timeout"; status.last_error = "No frames received within timeout";
if (status_promise) status_promise->set_value(status); if (status_promise)
{
// 非阻塞 set_value避免重复触发异常
try
{
status_promise->set_value(status);
}
catch (...)
{
}
status_promise = nullptr;
}
break; break;
} }
GstMessage *msg = gst_bus_timed_pop_filtered( if (msg)
bus, 100 * GST_MSECOND,
static_cast<GstMessageType>(GST_MESSAGE_ERROR | GST_MESSAGE_EOS | GST_MESSAGE_STATE_CHANGED));
if (!msg) continue;
switch (GST_MESSAGE_TYPE(msg))
{ {
case GST_MESSAGE_STATE_CHANGED: switch (GST_MESSAGE_TYPE(msg))
{ {
GstState old_state, new_state; case GST_MESSAGE_STATE_CHANGED:
gst_message_parse_state_changed(msg, &old_state, &new_state, nullptr);
if (GST_MESSAGE_SRC(msg) == GST_OBJECT(pipeline) && new_state == GST_STATE_PLAYING &&
!first_frame_received)
{ {
first_frame_received = true; GstState old_state, new_state;
status.running = true; gst_message_parse_state_changed(msg, &old_state, &new_state, nullptr);
status.last_result = StreamResult::OK; if (GST_MESSAGE_SRC(msg) == GST_OBJECT(pipeline) && new_state == GST_STATE_PLAYING)
status.last_error.clear(); {
if (status_promise) status_promise->set_value(status); // PLAYING 不代表首帧到达,但可以用作初步标记
}
break;
}
case GST_MESSAGE_ERROR:
case GST_MESSAGE_EOS:
{
status.running = false;
status.last_result = (GST_MESSAGE_TYPE(msg) == GST_MESSAGE_ERROR) ? StreamResult::CONNECTION_FAIL
: StreamResult::EOS_RECEIVED;
status.last_error = "GStreamer error/EOS";
if (status_promise)
{
try
{
status_promise->set_value(status);
}
catch (...)
{
}
status_promise = nullptr;
}
goto cleanup;
}
}
gst_message_unref(msg);
}
// 检查首帧是否到达,这里简单用 pipeline 的 preroll/playing 状态近似
if (!first_frame_received)
{
GstPad *src_pad = gst_element_get_static_pad(pipeline, "src");
if (src_pad && gst_pad_is_linked(src_pad))
{
first_frame_received = true;
status.running = true;
status.last_result = StreamResult::OK;
status.last_error = "";
if (status_promise)
{
try
{
status_promise->set_value(status);
}
catch (...)
{
}
status_promise = nullptr;
}
}
if (src_pad) gst_object_unref(src_pad);
}
// 检查是否需要停止
{
std::lock_guard<std::mutex> lock(streams_mutex);
auto it = streams.find(key);
if (it == streams.end() || !it->second->running.load())
{
status.running = false;
status.last_result = StreamResult::UNKNOWN;
status.last_error = "Stream stopped manually";
if (status_promise)
{
try
{
status_promise->set_value(status);
}
catch (...)
{
}
status_promise = nullptr;
} }
break; break;
} }
case GST_MESSAGE_ERROR:
case GST_MESSAGE_EOS:
status.running = false;
status.last_result = (GST_MESSAGE_TYPE(msg) == GST_MESSAGE_ERROR) ? StreamResult::CONNECTION_FAIL
: StreamResult::EOS_RECEIVED;
status.last_error = "GStreamer error/EOS";
if (status_promise) status_promise->set_value(status);
break;
} }
gst_message_unref(msg);
// 如果 running 被外部置 false则退出
if (!status.running) break;
} }
cleanup:
gst_element_set_state(pipeline, GST_STATE_NULL); gst_element_set_state(pipeline, GST_STATE_NULL);
gst_object_unref(pipeline); gst_object_unref(pipeline);
} }