This commit is contained in:
cxh 2025-10-16 11:08:12 +08:00
parent 8a676136e3
commit b162d1ff4d

View File

@ -86,9 +86,8 @@ RTMPManager::StreamResultInfo RTMPManager::start_camera(const Camera &cam, Strea
res.url = get_stream_url(cam.name, type); res.url = get_stream_url(cam.name, type);
std::string key = make_stream_key(cam.name, type); std::string key = make_stream_key(cam.name, type);
std::promise<StreamStatus> status_promise;
auto status_future = status_promise.get_future();
// 先检查是否已有有效流
{ {
std::lock_guard<std::mutex> lock(streams_mutex); std::lock_guard<std::mutex> lock(streams_mutex);
auto it = streams.find(key); auto it = streams.find(key);
@ -98,25 +97,46 @@ RTMPManager::StreamResultInfo RTMPManager::start_camera(const Camera &cam, Strea
res.reason = "Already streaming"; res.reason = "Already streaming";
return res; return res;
} }
auto ctx = std::make_unique<StreamContext>();
ctx->running.store(true);
// 把 promise 移进线程上下文
ctx->thread = std::thread([cam, type, &status_promise]() { stream_loop(cam, type, &status_promise); });
streams.emplace(key, std::move(ctx));
} }
// 阻塞等待线程内部更新状态,最多等 5 秒 // 创建上下文
auto ctx = std::make_unique<StreamContext>();
ctx->running.store(true);
// promise/future 用于等待首帧或失败状态
std::promise<StreamStatus> status_promise;
auto status_future = status_promise.get_future();
ctx->thread = std::thread([cam, type, &status_promise]() { stream_loop(cam, type, &status_promise); });
// 阻塞等待线程返回首帧或失败,最多 7 秒
StreamStatus status;
if (status_future.wait_for(std::chrono::seconds(7)) == std::future_status::ready) if (status_future.wait_for(std::chrono::seconds(7)) == std::future_status::ready)
{ {
StreamStatus status = status_future.get(); status = status_future.get();
res.result = status.running ? 0 : 1;
res.reason = status.last_error.empty() ? "Started OK" : status.last_error;
} }
else else
{ {
status.running = false;
status.last_result = StreamResult::TIMEOUT;
status.last_error = "Timeout waiting for stream";
}
if (status.running)
{
// 首帧成功才放进 map
std::lock_guard<std::mutex> lock(streams_mutex);
streams.emplace(key, std::move(ctx));
res.result = 0;
res.reason = "Started OK";
}
else
{
// 启动失败,清理线程
ctx->running.store(false);
if (ctx->thread.joinable()) ctx->thread.join();
res.result = 1; res.result = 1;
res.reason = "Timeout waiting for stream"; res.reason = status.last_error;
} }
return res; return res;
@ -175,16 +195,7 @@ void RTMPManager::stream_loop(Camera cam, StreamType type, std::promise<StreamSt
while (true) while (true)
{ {
// 检查 stop_flag 或超时 // 超时判断
{
std::lock_guard<std::mutex> lock(streams_mutex);
auto it = streams.find(key);
if (it == streams.end() || !it->second->running.load())
{
break; // 停止逻辑
}
}
if (!first_frame_received && if (!first_frame_received &&
std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now() - start_time).count() > 5) std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now() - start_time).count() > 5)
{ {
@ -212,11 +223,12 @@ void RTMPManager::stream_loop(Camera cam, StreamType type, std::promise<StreamSt
first_frame_received = true; first_frame_received = true;
status.running = true; status.running = true;
status.last_result = StreamResult::OK; status.last_result = StreamResult::OK;
status.last_error = ""; status.last_error.clear();
if (status_promise) status_promise->set_value(status); if (status_promise) status_promise->set_value(status);
} }
break; break;
} }
case GST_MESSAGE_ERROR: case GST_MESSAGE_ERROR:
case GST_MESSAGE_EOS: case GST_MESSAGE_EOS:
status.running = false; status.running = false;
@ -228,10 +240,12 @@ void RTMPManager::stream_loop(Camera cam, StreamType type, std::promise<StreamSt
} }
gst_message_unref(msg); gst_message_unref(msg);
// 如果 running 被外部置 false则退出
if (!status.running) break;
} }
gst_element_set_state(pipeline, GST_STATE_NULL); gst_element_set_state(pipeline, GST_STATE_NULL);
if (bus) gst_object_unref(bus);
gst_object_unref(pipeline); gst_object_unref(pipeline);
} }