temp
This commit is contained in:
parent
436a7e99d8
commit
ecc8840688
@ -78,7 +78,6 @@ int get_camera_index(const std::string &name)
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start camera, return result info (不再 publish)
|
|
||||||
RTMPManager::StreamResultInfo RTMPManager::start_camera(const Camera &cam, StreamType type)
|
RTMPManager::StreamResultInfo RTMPManager::start_camera(const Camera &cam, StreamType type)
|
||||||
{
|
{
|
||||||
StreamResultInfo res;
|
StreamResultInfo res;
|
||||||
@ -103,13 +102,10 @@ RTMPManager::StreamResultInfo RTMPManager::start_camera(const Camera &cam, Strea
|
|||||||
streams[key] = std::move(ctx);
|
streams[key] = std::move(ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
// future 用于等待 first frame 或初始化结果
|
|
||||||
std::future<StreamResultInfo> fut = ctx_ptr->start_result.get_future();
|
std::future<StreamResultInfo> fut = ctx_ptr->start_result.get_future();
|
||||||
|
|
||||||
// 启动线程
|
|
||||||
ctx_ptr->thread = std::thread([cam, type, ctx_ptr]() { RTMPManager::stream_loop(cam, type, ctx_ptr); });
|
ctx_ptr->thread = std::thread([cam, type, ctx_ptr]() { RTMPManager::stream_loop(cam, type, ctx_ptr); });
|
||||||
|
|
||||||
// 等待初始化完成(收到第一帧或错误)
|
|
||||||
if (fut.wait_for(std::chrono::seconds(10)) == std::future_status::ready)
|
if (fut.wait_for(std::chrono::seconds(10)) == std::future_status::ready)
|
||||||
{
|
{
|
||||||
res = fut.get();
|
res = fut.get();
|
||||||
@ -118,16 +114,11 @@ RTMPManager::StreamResultInfo RTMPManager::start_camera(const Camera &cam, Strea
|
|||||||
{
|
{
|
||||||
res.result = 1;
|
res.result = 1;
|
||||||
res.reason = "Start timeout";
|
res.reason = "Start timeout";
|
||||||
|
|
||||||
// 超时后安全地停止线程
|
|
||||||
ctx_ptr->running.store(false);
|
|
||||||
if (ctx_ptr->thread.joinable()) ctx_ptr->thread.join();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop camera, return result info (不再 publish)
|
|
||||||
RTMPManager::StreamResultInfo RTMPManager::stop_camera(const std::string &cam_name, StreamType type)
|
RTMPManager::StreamResultInfo RTMPManager::stop_camera(const std::string &cam_name, StreamType type)
|
||||||
{
|
{
|
||||||
StreamResultInfo res;
|
StreamResultInfo res;
|
||||||
@ -144,24 +135,19 @@ RTMPManager::StreamResultInfo RTMPManager::stop_camera(const std::string &cam_na
|
|||||||
{
|
{
|
||||||
res.result = 0;
|
res.result = 0;
|
||||||
res.reason = "Not streaming";
|
res.reason = "Not streaming";
|
||||||
LOG_INFO("[RTMP] stop_camera: " + key + " is not streaming");
|
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
// 设置停止标志,stream_loop 会退出
|
// 标记停止
|
||||||
it->second->running.store(false);
|
it->second->running.store(false);
|
||||||
|
|
||||||
// 转移所有权,map 里移除
|
// 移交所有权给局部变量,避免在锁内 join
|
||||||
ctx = std::move(it->second);
|
ctx = std::move(it->second);
|
||||||
streams.erase(it);
|
streams.erase(it);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 等待线程退出
|
// 安全 join
|
||||||
if (ctx && ctx->thread.joinable())
|
if (ctx->thread.joinable()) ctx->thread.join();
|
||||||
{
|
|
||||||
ctx->thread.join();
|
|
||||||
LOG_INFO("[RTMP] stop_camera: " + key + " stopped");
|
|
||||||
}
|
|
||||||
|
|
||||||
res.result = 0;
|
res.result = 0;
|
||||||
res.reason = "Stopped manually";
|
res.reason = "Stopped manually";
|
||||||
@ -184,8 +170,7 @@ void RTMPManager::stream_loop(Camera cam, StreamType type, StreamContext *ctx)
|
|||||||
{
|
{
|
||||||
res.result = 1;
|
res.result = 1;
|
||||||
res.reason = "Failed to create pipeline";
|
res.reason = "Failed to create pipeline";
|
||||||
bool expected = false;
|
ctx->start_result.set_value(res);
|
||||||
if (ctx->start_result_set.compare_exchange_strong(expected, true)) ctx->start_result.set_value(res);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -193,6 +178,17 @@ void RTMPManager::stream_loop(Camera cam, StreamType type, StreamContext *ctx)
|
|||||||
gst_element_set_state(pipeline, GST_STATE_PLAYING);
|
gst_element_set_state(pipeline, GST_STATE_PLAYING);
|
||||||
|
|
||||||
bool first_frame_received = false;
|
bool first_frame_received = false;
|
||||||
|
bool start_result_set = false;
|
||||||
|
|
||||||
|
auto set_start_result = [&](const StreamResultInfo &r)
|
||||||
|
{
|
||||||
|
if (!start_result_set)
|
||||||
|
{
|
||||||
|
ctx->start_result.set_value(r);
|
||||||
|
start_result_set = true;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
auto start_time = std::chrono::steady_clock::now();
|
auto start_time = std::chrono::steady_clock::now();
|
||||||
|
|
||||||
while (true)
|
while (true)
|
||||||
@ -216,8 +212,7 @@ void RTMPManager::stream_loop(Camera cam, StreamType type, StreamContext *ctx)
|
|||||||
{
|
{
|
||||||
res.result = 1;
|
res.result = 1;
|
||||||
res.reason = "No frames received within timeout";
|
res.reason = "No frames received within timeout";
|
||||||
bool expected = false;
|
set_start_result(res);
|
||||||
if (ctx->start_result_set.compare_exchange_strong(expected, true)) ctx->start_result.set_value(res);
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -234,8 +229,7 @@ void RTMPManager::stream_loop(Camera cam, StreamType type, StreamContext *ctx)
|
|||||||
std::string err_msg = err ? err->message : "Unknown GStreamer error";
|
std::string err_msg = err ? err->message : "Unknown GStreamer error";
|
||||||
res.result = 1;
|
res.result = 1;
|
||||||
res.reason = "Pipeline error: " + err_msg;
|
res.reason = "Pipeline error: " + err_msg;
|
||||||
bool expected = false;
|
set_start_result(res);
|
||||||
if (ctx->start_result_set.compare_exchange_strong(expected, true)) ctx->start_result.set_value(res);
|
|
||||||
if (err) g_error_free(err);
|
if (err) g_error_free(err);
|
||||||
if (debug) g_free(debug);
|
if (debug) g_free(debug);
|
||||||
break;
|
break;
|
||||||
@ -243,10 +237,7 @@ void RTMPManager::stream_loop(Camera cam, StreamType type, StreamContext *ctx)
|
|||||||
case GST_MESSAGE_EOS:
|
case GST_MESSAGE_EOS:
|
||||||
res.result = 1;
|
res.result = 1;
|
||||||
res.reason = "EOS received";
|
res.reason = "EOS received";
|
||||||
{
|
set_start_result(res);
|
||||||
bool expected = false;
|
|
||||||
if (ctx->start_result_set.compare_exchange_strong(expected, true)) ctx->start_result.set_value(res);
|
|
||||||
}
|
|
||||||
break;
|
break;
|
||||||
case GST_MESSAGE_STATE_CHANGED:
|
case GST_MESSAGE_STATE_CHANGED:
|
||||||
// pipeline 播放状态变化,不算成功
|
// pipeline 播放状态变化,不算成功
|
||||||
@ -257,8 +248,7 @@ void RTMPManager::stream_loop(Camera cam, StreamType type, StreamContext *ctx)
|
|||||||
first_frame_received = true;
|
first_frame_received = true;
|
||||||
res.result = 0;
|
res.result = 0;
|
||||||
res.reason = "First frame received, started OK";
|
res.reason = "First frame received, started OK";
|
||||||
bool expected = false;
|
set_start_result(res);
|
||||||
if (ctx->start_result_set.compare_exchange_strong(expected, true)) ctx->start_result.set_value(res);
|
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
@ -302,25 +292,17 @@ void RTMPManager::stop_all()
|
|||||||
|
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> lock(streams_mutex);
|
std::lock_guard<std::mutex> lock(streams_mutex);
|
||||||
|
for (auto &kv : streams) kv.second->running.store(false); // 标记停止
|
||||||
|
|
||||||
// 设置所有流停止标志,并收集上下文
|
// 移交所有权给局部变量,避免在锁内 join
|
||||||
for (auto &kv : streams)
|
for (auto &kv : streams) ctxs.push_back(std::move(kv.second));
|
||||||
{
|
|
||||||
kv.second->running.store(false);
|
|
||||||
ctxs.push_back(std::move(kv.second));
|
|
||||||
LOG_INFO("[RTMP] stop_all: stopping stream " + kv.first);
|
|
||||||
}
|
|
||||||
|
|
||||||
streams.clear();
|
streams.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
// 等待所有线程退出
|
// 安全 join
|
||||||
for (auto &ctx : ctxs)
|
for (auto &ctx : ctxs)
|
||||||
{
|
{
|
||||||
if (ctx && ctx->thread.joinable())
|
if (ctx->thread.joinable()) ctx->thread.join();
|
||||||
{
|
|
||||||
ctx->thread.join();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG_INFO("[RTMP] stop_all completed.");
|
LOG_INFO("[RTMP] stop_all completed.");
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user