This commit is contained in:
cxh 2025-10-15 15:33:00 +08:00
parent f7343bd23e
commit 3b8be249e3
2 changed files with 121 additions and 89 deletions

View File

@ -6,6 +6,7 @@
#include <algorithm> #include <algorithm>
#include <atomic> #include <atomic>
#include <functional> #include <functional>
#include <future>
#include <memory> #include <memory>
#include <mutex> #include <mutex>
#include <string> #include <string>
@ -63,6 +64,7 @@ class RTMPManager
{ {
std::atomic<bool> running{false}; std::atomic<bool> running{false};
std::thread thread; std::thread thread;
std::promise<StreamResultInfo> start_result;
StreamStatus status; StreamStatus status;
StreamContext() = default; StreamContext() = default;
StreamContext(const StreamContext &) = delete; StreamContext(const StreamContext &) = delete;

View File

@ -86,6 +86,8 @@ RTMPManager::StreamResultInfo RTMPManager::start_camera(const Camera &cam, Strea
res.url = get_stream_url(cam.name, type); res.url = get_stream_url(cam.name, type);
std::string key = make_stream_key(cam.name, type); std::string key = make_stream_key(cam.name, type);
std::shared_ptr<StreamContext> ctx = std::make_shared<StreamContext>();
{ {
std::lock_guard<std::mutex> lock(streams_mutex); std::lock_guard<std::mutex> lock(streams_mutex);
auto it = streams.find(key); auto it = streams.find(key);
@ -96,14 +98,29 @@ RTMPManager::StreamResultInfo RTMPManager::start_camera(const Camera &cam, Strea
return res; return res;
} }
auto ctx = std::make_unique<StreamContext>();
ctx->running.store(true); ctx->running.store(true);
ctx->thread = std::thread([cam, type]() { stream_loop(cam, type); }); streams[key] = ctx;
streams.emplace(key, std::move(ctx)); }
std::future<StreamResultInfo> fut = ctx->start_result.get_future();
ctx->thread = std::thread(
[this, cam, type, ctx]()
{
stream_loop(cam, type, ctx); // stream_loop 接收 StreamContext
});
// 等待 pipeline 初始化完成,最多等待 5 秒(可自定义)
if (fut.wait_for(std::chrono::seconds(5)) == std::future_status::ready)
{
res = fut.get();
}
else
{
res.result = 1;
res.reason = "Start timeout";
} }
res.result = 0;
res.reason = "Started OK";
return res; return res;
} }
@ -174,123 +191,136 @@ std::string RTMPManager::get_stream_url(const std::string &cam_name, StreamType
return "rtmp://127.0.0.1/live/" + make_stream_key(cam_name, type); return "rtmp://127.0.0.1/live/" + make_stream_key(cam_name, type);
} }
void RTMPManager::stream_loop(Camera cam, StreamType type) void RTMPManager::stream_loop(Camera cam, StreamType type, std::shared_ptr<StreamContext> ctx)
{ {
StreamResultInfo res;
res.loc = get_camera_index(cam.name);
res.url = get_stream_url(cam.name, type);
std::string key = make_stream_key(cam.name, type); std::string key = make_stream_key(cam.name, type);
LOG_INFO("[RTMP] Stream loop started for " + key); LOG_INFO("[RTMP] Stream loop started for " + key);
const int FIRST_FRAME_TIMEOUT_SEC = 5; GstElement *pipeline = create_pipeline(cam, type);
if (!pipeline)
{
res.result = 1;
res.reason = "Failed to create pipeline";
ctx->start_result.set_value(res);
return;
}
GstBus *bus = gst_element_get_bus(pipeline);
gst_element_set_state(pipeline, GST_STATE_PLAYING);
bool first_frame_received = false;
auto start_time = std::chrono::steady_clock::now();
while (true) while (true)
{ {
// 检查 stop_flag
{ {
std::lock_guard<std::mutex> lock(streams_mutex); std::lock_guard<std::mutex> lock(streams_mutex);
auto it = streams.find(key); auto it = streams.find(key);
if (it == streams.end() || !it->second->running.load()) break; if (it == streams.end() || !it->second->running.load()) break;
} }
GstElement *pipeline = create_pipeline(cam, type); GstMessage *msg =
if (!pipeline) gst_bus_timed_pop_filtered(bus, 100 * GST_MSECOND,
static_cast<GstMessageType>(GST_MESSAGE_ERROR | GST_MESSAGE_EOS |
GST_MESSAGE_STATE_CHANGED | GST_MESSAGE_ELEMENT));
// 超时未收到第一帧
if (!first_frame_received)
{ {
update_status(key, {false, StreamResult::PIPELINE_ERROR, "Failed to create pipeline"}); auto elapsed = std::chrono::steady_clock::now() - start_time;
break; if (std::chrono::duration_cast<std::chrono::seconds>(elapsed).count() > FIRST_FRAME_TIMEOUT_SEC)
{
res.result = 1;
res.reason = "No frames received within timeout";
ctx->start_result.set_value(res); // 返回失败
break;
}
} }
GstBus *bus = gst_element_get_bus(pipeline); if (!msg) continue;
gst_element_set_state(pipeline, GST_STATE_PLAYING);
bool first_frame_received = false; switch (GST_MESSAGE_TYPE(msg))
bool stop_flag = false;
auto first_frame_start = std::chrono::steady_clock::now();
while (!stop_flag)
{ {
GstMessage *msg = gst_bus_timed_pop_filtered( case GST_MESSAGE_ERROR:
bus, 100 * GST_MSECOND,
static_cast<GstMessageType>(GST_MESSAGE_ERROR | GST_MESSAGE_EOS | GST_MESSAGE_STATE_CHANGED));
{ {
std::lock_guard<std::mutex> lock(streams_mutex); GError *err = nullptr;
auto it = streams.find(key); gchar *debug = nullptr;
if (it == streams.end() || !it->second->running.load()) gst_message_parse_error(msg, &err, &debug);
{ std::string err_msg = err ? err->message : "Unknown GStreamer error";
stop_flag = true; res.result = 1;
break; res.reason = "Pipeline error: " + err_msg;
} ctx->start_result.set_value(res);
if (err) g_error_free(err);
if (debug) g_free(debug);
break;
} }
case GST_MESSAGE_EOS:
if (!first_frame_received) res.result = 1;
res.reason = "EOS received";
ctx->start_result.set_value(res);
break;
case GST_MESSAGE_STATE_CHANGED:
{ {
auto elapsed = std::chrono::steady_clock::now() - first_frame_start; GstState old_state, new_state;
if (std::chrono::duration_cast<std::chrono::seconds>(elapsed).count() > FIRST_FRAME_TIMEOUT_SEC) gst_message_parse_state_changed(msg, &old_state, &new_state, nullptr);
if (GST_MESSAGE_SRC(msg) == GST_OBJECT(pipeline) && new_state == GST_STATE_PLAYING &&
!first_frame_received)
{ {
update_status(key, {false, StreamResult::TIMEOUT, "No frames received within timeout"}); // 这里仅表示 pipeline 播放了,不算真正成功
stop_flag = true;
} }
break;
} }
case GST_MESSAGE_ELEMENT:
if (!msg) continue; // 这里可以通过 caps 或其他方式检测到第一帧到达
if (!first_frame_received)
switch (GST_MESSAGE_TYPE(msg))
{
case GST_MESSAGE_ERROR:
{ {
GError *err = nullptr; first_frame_received = true;
gchar *debug = nullptr; res.result = 0;
gst_message_parse_error(msg, &err, &debug); res.reason = "First frame received, started OK";
std::string err_msg = err ? err->message : "Unknown GStreamer error"; ctx->start_result.set_value(res); // 返回成功
update_status(key, {false, StreamResult::CONNECTION_FAIL, err_msg});
if (err) g_error_free(err);
if (debug) g_free(debug);
stop_flag = true;
break;
} }
case GST_MESSAGE_EOS: break;
update_status(key, {false, StreamResult::EOS_RECEIVED, "EOS"}); default:
stop_flag = true; break;
break;
case GST_MESSAGE_STATE_CHANGED:
{
GstState old_state, new_state;
gst_message_parse_state_changed(msg, &old_state, &new_state, nullptr);
if (GST_MESSAGE_SRC(msg) == GST_OBJECT(pipeline) && new_state == GST_STATE_PLAYING &&
!first_frame_received)
{
first_frame_received = true;
update_status(key, {true, StreamResult::OK, ""});
}
break;
}
default:
break;
}
gst_message_unref(msg);
} }
gst_element_set_state(pipeline, GST_STATE_NULL); gst_message_unref(msg);
if (bus) gst_object_unref(bus);
gst_object_unref(pipeline);
if (!stop_flag) break;
} }
gst_element_set_state(pipeline, GST_STATE_NULL);
if (bus) gst_object_unref(bus);
gst_object_unref(pipeline);
// 清理 streams
{ {
std::lock_guard<std::mutex> lock(streams_mutex); std::lock_guard<std::mutex> lock(streams_mutex);
auto it = streams.find(key); streams.erase(key);
if (it != streams.end())
{
try
{
if (it->second && it->second->thread.get_id() == std::this_thread::get_id()) streams.erase(it);
}
catch (...)
{
streams.erase(it);
}
}
} }
update_status(key, {false, StreamResult::UNKNOWN, "Stream loop exited"});
LOG_INFO("[RTMP] Stream loop ended for " + key); LOG_INFO("[RTMP] Stream loop ended for " + key);
} }
{
std::lock_guard<std::mutex> lock(streams_mutex);
auto it = streams.find(key);
if (it != streams.end())
{
try
{
if (it->second && it->second->thread.get_id() == std::this_thread::get_id()) streams.erase(it);
}
catch (...)
{
streams.erase(it);
}
}
}
update_status(key, {false, StreamResult::UNKNOWN, "Stream loop exited"});
LOG_INFO("[RTMP] Stream loop ended for " + key);
}