1
This commit is contained in:
parent
2c920efa29
commit
4954ab200e
@ -76,7 +76,8 @@ class RTMPManager
|
|||||||
|
|
||||||
static std::string make_stream_key(const std::string &cam_name, StreamType type);
|
static std::string make_stream_key(const std::string &cam_name, StreamType type);
|
||||||
static GstElement *create_pipeline(const Camera &cam, StreamType type);
|
static GstElement *create_pipeline(const Camera &cam, StreamType type);
|
||||||
static void stream_loop(Camera cam, StreamType type, std::promise<StreamStatus> *status_promise);
|
static void stream_loop(Camera cam, StreamType type, StreamContext *ctx,
|
||||||
|
std::promise<StreamStatus> *status_promise);
|
||||||
|
|
||||||
static void update_status(const std::string &key, const StreamStatus &status);
|
static void update_status(const std::string &key, const StreamStatus &status);
|
||||||
|
|
||||||
|
|||||||
@ -86,8 +86,11 @@ RTMPManager::StreamResultInfo RTMPManager::start_camera(const Camera &cam, Strea
|
|||||||
res.url = get_stream_url(cam.name, type);
|
res.url = get_stream_url(cam.name, type);
|
||||||
|
|
||||||
std::string key = make_stream_key(cam.name, type);
|
std::string key = make_stream_key(cam.name, type);
|
||||||
|
std::promise<StreamStatus> status_promise;
|
||||||
|
auto status_future = status_promise.get_future();
|
||||||
|
|
||||||
|
std::unique_ptr<StreamContext> ctx;
|
||||||
|
|
||||||
// 先检查是否已有有效流
|
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> lock(streams_mutex);
|
std::lock_guard<std::mutex> lock(streams_mutex);
|
||||||
auto it = streams.find(key);
|
auto it = streams.find(key);
|
||||||
@ -97,46 +100,29 @@ RTMPManager::StreamResultInfo RTMPManager::start_camera(const Camera &cam, Strea
|
|||||||
res.reason = "Already streaming";
|
res.reason = "Already streaming";
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ctx = std::make_unique<StreamContext>();
|
||||||
|
ctx->running.store(true);
|
||||||
|
|
||||||
|
// 传入 StreamContext 指针
|
||||||
|
StreamContext *ctx_ptr = ctx.get();
|
||||||
|
ctx->thread = std::thread([cam, type, ctx_ptr, &status_promise]()
|
||||||
|
{ RTMPManager::stream_loop(cam, type, ctx_ptr, &status_promise); });
|
||||||
|
|
||||||
|
streams.emplace(key, std::move(ctx));
|
||||||
}
|
}
|
||||||
|
|
||||||
// 创建上下文
|
// 等待首帧确认或错误,最多等待 7 秒
|
||||||
auto ctx = std::make_unique<StreamContext>();
|
|
||||||
ctx->running.store(true);
|
|
||||||
|
|
||||||
// promise/future 用于等待首帧或失败状态
|
|
||||||
std::promise<StreamStatus> status_promise;
|
|
||||||
auto status_future = status_promise.get_future();
|
|
||||||
|
|
||||||
ctx->thread = std::thread([cam, type, &status_promise]() { stream_loop(cam, type, &status_promise); });
|
|
||||||
|
|
||||||
// 阻塞等待线程返回首帧或失败,最多 7 秒
|
|
||||||
StreamStatus status;
|
|
||||||
if (status_future.wait_for(std::chrono::seconds(7)) == std::future_status::ready)
|
if (status_future.wait_for(std::chrono::seconds(7)) == std::future_status::ready)
|
||||||
{
|
{
|
||||||
status = status_future.get();
|
StreamStatus status = status_future.get();
|
||||||
|
res.result = status.running ? 0 : 1;
|
||||||
|
res.reason = status.last_error.empty() ? "Started OK" : status.last_error;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
status.running = false;
|
|
||||||
status.last_result = StreamResult::TIMEOUT;
|
|
||||||
status.last_error = "Timeout waiting for stream";
|
|
||||||
}
|
|
||||||
|
|
||||||
if (status.running)
|
|
||||||
{
|
|
||||||
// 首帧成功才放进 map
|
|
||||||
std::lock_guard<std::mutex> lock(streams_mutex);
|
|
||||||
streams.emplace(key, std::move(ctx));
|
|
||||||
res.result = 0;
|
|
||||||
res.reason = "Started OK";
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
// 启动失败,清理线程
|
|
||||||
ctx->running.store(false);
|
|
||||||
if (ctx->thread.joinable()) ctx->thread.join();
|
|
||||||
res.result = 1;
|
res.result = 1;
|
||||||
res.reason = status.last_error;
|
res.reason = "Timeout waiting for stream";
|
||||||
}
|
}
|
||||||
|
|
||||||
return res;
|
return res;
|
||||||
@ -172,7 +158,8 @@ RTMPManager::StreamResultInfo RTMPManager::stop_camera(const std::string &cam_na
|
|||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
void RTMPManager::stream_loop(Camera cam, StreamType type, std::promise<StreamStatus> *status_promise)
|
void RTMPManager::stream_loop(Camera cam, StreamType type, StreamContext *ctx,
|
||||||
|
std::promise<StreamStatus> *status_promise)
|
||||||
{
|
{
|
||||||
std::string key = make_stream_key(cam.name, type);
|
std::string key = make_stream_key(cam.name, type);
|
||||||
StreamStatus status;
|
StreamStatus status;
|
||||||
@ -183,7 +170,16 @@ void RTMPManager::stream_loop(Camera cam, StreamType type, std::promise<StreamSt
|
|||||||
{
|
{
|
||||||
status.last_result = StreamResult::PIPELINE_ERROR;
|
status.last_result = StreamResult::PIPELINE_ERROR;
|
||||||
status.last_error = "Failed to create pipeline";
|
status.last_error = "Failed to create pipeline";
|
||||||
if (status_promise) status_promise->set_value(status);
|
if (status_promise)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
status_promise->set_value(status);
|
||||||
|
}
|
||||||
|
catch (...)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -195,19 +191,15 @@ void RTMPManager::stream_loop(Camera cam, StreamType type, std::promise<StreamSt
|
|||||||
|
|
||||||
while (true)
|
while (true)
|
||||||
{
|
{
|
||||||
GstMessage *msg = gst_bus_timed_pop_filtered(
|
// 检查超时
|
||||||
bus, 100 * GST_MSECOND,
|
|
||||||
static_cast<GstMessageType>(GST_MESSAGE_ERROR | GST_MESSAGE_EOS | GST_MESSAGE_STATE_CHANGED));
|
|
||||||
|
|
||||||
// 超时判断,等待首帧最多 5 秒
|
|
||||||
if (!first_frame_received &&
|
if (!first_frame_received &&
|
||||||
std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now() - start_time).count() > 5)
|
std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now() - start_time).count() > 5)
|
||||||
{
|
{
|
||||||
|
status.running = false;
|
||||||
status.last_result = StreamResult::TIMEOUT;
|
status.last_result = StreamResult::TIMEOUT;
|
||||||
status.last_error = "No frames received within timeout";
|
status.last_error = "No frames received within timeout";
|
||||||
if (status_promise)
|
if (status_promise)
|
||||||
{
|
{
|
||||||
// 非阻塞 set_value,避免重复触发异常
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
status_promise->set_value(status);
|
status_promise->set_value(status);
|
||||||
@ -220,74 +212,10 @@ void RTMPManager::stream_loop(Camera cam, StreamType type, std::promise<StreamSt
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (msg)
|
// 检查 stop 请求
|
||||||
{
|
|
||||||
switch (GST_MESSAGE_TYPE(msg))
|
|
||||||
{
|
|
||||||
case GST_MESSAGE_STATE_CHANGED:
|
|
||||||
{
|
|
||||||
GstState old_state, new_state;
|
|
||||||
gst_message_parse_state_changed(msg, &old_state, &new_state, nullptr);
|
|
||||||
if (GST_MESSAGE_SRC(msg) == GST_OBJECT(pipeline) && new_state == GST_STATE_PLAYING)
|
|
||||||
{
|
|
||||||
// PLAYING 不代表首帧到达,但可以用作初步标记
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case GST_MESSAGE_ERROR:
|
|
||||||
case GST_MESSAGE_EOS:
|
|
||||||
{
|
|
||||||
status.running = false;
|
|
||||||
status.last_result = (GST_MESSAGE_TYPE(msg) == GST_MESSAGE_ERROR) ? StreamResult::CONNECTION_FAIL
|
|
||||||
: StreamResult::EOS_RECEIVED;
|
|
||||||
status.last_error = "GStreamer error/EOS";
|
|
||||||
if (status_promise)
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
status_promise->set_value(status);
|
|
||||||
}
|
|
||||||
catch (...)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
status_promise = nullptr;
|
|
||||||
}
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
gst_message_unref(msg);
|
|
||||||
}
|
|
||||||
|
|
||||||
// 检查首帧是否到达,这里简单用 pipeline 的 preroll/playing 状态近似
|
|
||||||
if (!first_frame_received)
|
|
||||||
{
|
|
||||||
GstPad *src_pad = gst_element_get_static_pad(pipeline, "src");
|
|
||||||
if (src_pad && gst_pad_is_linked(src_pad))
|
|
||||||
{
|
|
||||||
first_frame_received = true;
|
|
||||||
status.running = true;
|
|
||||||
status.last_result = StreamResult::OK;
|
|
||||||
status.last_error = "";
|
|
||||||
if (status_promise)
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
status_promise->set_value(status);
|
|
||||||
}
|
|
||||||
catch (...)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
status_promise = nullptr;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (src_pad) gst_object_unref(src_pad);
|
|
||||||
}
|
|
||||||
|
|
||||||
// 检查是否需要停止
|
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> lock(streams_mutex);
|
std::lock_guard<std::mutex> lock(streams_mutex);
|
||||||
auto it = streams.find(key);
|
if (!ctx->running.load())
|
||||||
if (it == streams.end() || !it->second->running.load())
|
|
||||||
{
|
{
|
||||||
status.running = false;
|
status.running = false;
|
||||||
status.last_result = StreamResult::UNKNOWN;
|
status.last_result = StreamResult::UNKNOWN;
|
||||||
@ -306,10 +234,88 @@ void RTMPManager::stream_loop(Camera cam, StreamType type, std::promise<StreamSt
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
GstMessage *msg = gst_bus_timed_pop_filtered(
|
||||||
|
bus, 100 * GST_MSECOND,
|
||||||
|
static_cast<GstMessageType>(GST_MESSAGE_ERROR | GST_MESSAGE_EOS | GST_MESSAGE_STATE_CHANGED));
|
||||||
|
|
||||||
|
if (!msg) continue;
|
||||||
|
|
||||||
|
switch (GST_MESSAGE_TYPE(msg))
|
||||||
|
{
|
||||||
|
case GST_MESSAGE_STATE_CHANGED:
|
||||||
|
{
|
||||||
|
GstState old_state, new_state;
|
||||||
|
gst_message_parse_state_changed(msg, &old_state, &new_state, nullptr);
|
||||||
|
if (GST_MESSAGE_SRC(msg) == GST_OBJECT(pipeline) && new_state == GST_STATE_PLAYING &&
|
||||||
|
!first_frame_received)
|
||||||
|
{
|
||||||
|
first_frame_received = true;
|
||||||
|
status.running = true;
|
||||||
|
status.last_result = StreamResult::OK;
|
||||||
|
status.last_error = "";
|
||||||
|
if (status_promise)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
status_promise->set_value(status);
|
||||||
|
}
|
||||||
|
catch (...)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
status_promise = nullptr;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case GST_MESSAGE_ERROR:
|
||||||
|
{
|
||||||
|
GError *err = nullptr;
|
||||||
|
gst_message_parse_error(msg, &err, nullptr);
|
||||||
|
status.running = false;
|
||||||
|
status.last_result = StreamResult::CONNECTION_FAIL;
|
||||||
|
status.last_error = err ? err->message : "GStreamer error";
|
||||||
|
if (err) g_error_free(err);
|
||||||
|
if (status_promise)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
status_promise->set_value(status);
|
||||||
|
}
|
||||||
|
catch (...)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
status_promise = nullptr;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case GST_MESSAGE_EOS:
|
||||||
|
{
|
||||||
|
status.running = false;
|
||||||
|
status.last_result = StreamResult::EOS_RECEIVED;
|
||||||
|
status.last_error = "EOS received";
|
||||||
|
if (status_promise)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
status_promise->set_value(status);
|
||||||
|
}
|
||||||
|
catch (...)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
status_promise = nullptr;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
gst_message_unref(msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
cleanup:
|
|
||||||
gst_element_set_state(pipeline, GST_STATE_NULL);
|
gst_element_set_state(pipeline, GST_STATE_NULL);
|
||||||
|
gst_object_unref(bus);
|
||||||
gst_object_unref(pipeline);
|
gst_object_unref(pipeline);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user