temp
This commit is contained in:
parent
a11ffc5564
commit
b96a4d45e9
@ -63,9 +63,11 @@ class RTMPManager
|
||||
struct StreamContext
|
||||
{
|
||||
std::atomic<bool> running{false};
|
||||
std::atomic<bool> start_result_set{false}; // 新增:保证 promise 只 set 一次
|
||||
std::thread thread;
|
||||
std::promise<StreamResultInfo> start_result;
|
||||
StreamStatus status;
|
||||
|
||||
StreamContext() = default;
|
||||
StreamContext(const StreamContext &) = delete;
|
||||
StreamContext &operator=(const StreamContext &) = delete;
|
||||
|
||||
@ -103,10 +103,13 @@ RTMPManager::StreamResultInfo RTMPManager::start_camera(const Camera &cam, Strea
|
||||
streams[key] = std::move(ctx);
|
||||
}
|
||||
|
||||
// future 用于等待 first frame 或初始化结果
|
||||
std::future<StreamResultInfo> fut = ctx_ptr->start_result.get_future();
|
||||
|
||||
// 启动线程
|
||||
ctx_ptr->thread = std::thread([cam, type, ctx_ptr]() { RTMPManager::stream_loop(cam, type, ctx_ptr); });
|
||||
|
||||
// 等待初始化完成(收到第一帧或错误)
|
||||
if (fut.wait_for(std::chrono::seconds(10)) == std::future_status::ready)
|
||||
{
|
||||
res = fut.get();
|
||||
@ -115,6 +118,10 @@ RTMPManager::StreamResultInfo RTMPManager::start_camera(const Camera &cam, Strea
|
||||
{
|
||||
res.result = 1;
|
||||
res.reason = "Start timeout";
|
||||
|
||||
// 超时后安全地停止线程
|
||||
ctx_ptr->running.store(false);
|
||||
if (ctx_ptr->thread.joinable()) ctx_ptr->thread.join();
|
||||
}
|
||||
|
||||
return res;
|
||||
@ -129,6 +136,7 @@ RTMPManager::StreamResultInfo RTMPManager::stop_camera(const std::string &cam_na
|
||||
|
||||
std::unique_ptr<StreamContext> ctx;
|
||||
std::string key = make_stream_key(cam_name, type);
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(streams_mutex);
|
||||
auto it = streams.find(key);
|
||||
@ -136,15 +144,24 @@ RTMPManager::StreamResultInfo RTMPManager::stop_camera(const std::string &cam_na
|
||||
{
|
||||
res.result = 0;
|
||||
res.reason = "Not streaming";
|
||||
LOG_INFO("[RTMP] stop_camera: " + key + " is not streaming");
|
||||
return res;
|
||||
}
|
||||
|
||||
// 设置停止标志,stream_loop 会退出
|
||||
it->second->running.store(false);
|
||||
|
||||
// 转移所有权,map 里移除
|
||||
ctx = std::move(it->second);
|
||||
streams.erase(it);
|
||||
}
|
||||
|
||||
if (ctx->thread.joinable()) ctx->thread.join();
|
||||
// 等待线程退出
|
||||
if (ctx && ctx->thread.joinable())
|
||||
{
|
||||
ctx->thread.join();
|
||||
LOG_INFO("[RTMP] stop_camera: " + key + " stopped");
|
||||
}
|
||||
|
||||
res.result = 0;
|
||||
res.reason = "Stopped manually";
|
||||
@ -167,7 +184,7 @@ void RTMPManager::stream_loop(Camera cam, StreamType type, StreamContext *ctx)
|
||||
{
|
||||
res.result = 1;
|
||||
res.reason = "Failed to create pipeline";
|
||||
ctx->start_result.set_value(res);
|
||||
if (ctx->start_result_set.compare_exchange_strong(expected, true)) ctx->start_result.set_value(res);
|
||||
return;
|
||||
}
|
||||
|
||||
@ -198,7 +215,8 @@ void RTMPManager::stream_loop(Camera cam, StreamType type, StreamContext *ctx)
|
||||
{
|
||||
res.result = 1;
|
||||
res.reason = "No frames received within timeout";
|
||||
ctx->start_result.set_value(res);
|
||||
bool expected = false;
|
||||
if (ctx->start_result_set.compare_exchange_strong(expected, true)) ctx->start_result.set_value(res);
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -215,7 +233,8 @@ void RTMPManager::stream_loop(Camera cam, StreamType type, StreamContext *ctx)
|
||||
std::string err_msg = err ? err->message : "Unknown GStreamer error";
|
||||
res.result = 1;
|
||||
res.reason = "Pipeline error: " + err_msg;
|
||||
ctx->start_result.set_value(res);
|
||||
bool expected = false;
|
||||
if (ctx->start_result_set.compare_exchange_strong(expected, true)) ctx->start_result.set_value(res);
|
||||
if (err) g_error_free(err);
|
||||
if (debug) g_free(debug);
|
||||
break;
|
||||
@ -223,7 +242,10 @@ void RTMPManager::stream_loop(Camera cam, StreamType type, StreamContext *ctx)
|
||||
case GST_MESSAGE_EOS:
|
||||
res.result = 1;
|
||||
res.reason = "EOS received";
|
||||
ctx->start_result.set_value(res);
|
||||
{
|
||||
bool expected = false;
|
||||
if (ctx->start_result_set.compare_exchange_strong(expected, true)) ctx->start_result.set_value(res);
|
||||
}
|
||||
break;
|
||||
case GST_MESSAGE_STATE_CHANGED:
|
||||
// pipeline 播放状态变化,不算成功
|
||||
@ -234,7 +256,8 @@ void RTMPManager::stream_loop(Camera cam, StreamType type, StreamContext *ctx)
|
||||
first_frame_received = true;
|
||||
res.result = 0;
|
||||
res.reason = "First frame received, started OK";
|
||||
ctx->start_result.set_value(res);
|
||||
bool expected = false;
|
||||
if (ctx->start_result_set.compare_exchange_strong(expected, true)) ctx->start_result.set_value(res);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
@ -275,15 +298,29 @@ bool RTMPManager::is_any_streaming()
|
||||
void RTMPManager::stop_all()
|
||||
{
|
||||
std::vector<std::unique_ptr<StreamContext>> ctxs;
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(streams_mutex);
|
||||
for (auto &kv : streams) kv.second->running.store(false);
|
||||
for (auto &kv : streams) ctxs.push_back(std::move(kv.second));
|
||||
|
||||
// 设置所有流停止标志,并收集上下文
|
||||
for (auto &kv : streams)
|
||||
{
|
||||
kv.second->running.store(false);
|
||||
ctxs.push_back(std::move(kv.second));
|
||||
LOG_INFO("[RTMP] stop_all: stopping stream " + kv.first);
|
||||
}
|
||||
|
||||
streams.clear();
|
||||
}
|
||||
|
||||
// 等待所有线程退出
|
||||
for (auto &ctx : ctxs)
|
||||
if (ctx->thread.joinable()) ctx->thread.join();
|
||||
{
|
||||
if (ctx && ctx->thread.joinable())
|
||||
{
|
||||
ctx->thread.join();
|
||||
}
|
||||
}
|
||||
|
||||
LOG_INFO("[RTMP] stop_all completed.");
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user