From b162d1ff4d935bba7a1fb4ecb0d66f329ec97896 Mon Sep 17 00:00:00 2001 From: cxh Date: Thu, 16 Oct 2025 11:08:12 +0800 Subject: [PATCH] 1 --- src/rtmp_manager.cpp | 64 +++++++++++++++++++++++++++----------------- 1 file changed, 39 insertions(+), 25 deletions(-) diff --git a/src/rtmp_manager.cpp b/src/rtmp_manager.cpp index b68cc16..10f1e9a 100644 --- a/src/rtmp_manager.cpp +++ b/src/rtmp_manager.cpp @@ -86,9 +86,8 @@ RTMPManager::StreamResultInfo RTMPManager::start_camera(const Camera &cam, Strea res.url = get_stream_url(cam.name, type); std::string key = make_stream_key(cam.name, type); - std::promise status_promise; - auto status_future = status_promise.get_future(); + // 先检查是否已有有效流 { std::lock_guard lock(streams_mutex); auto it = streams.find(key); @@ -98,25 +97,46 @@ RTMPManager::StreamResultInfo RTMPManager::start_camera(const Camera &cam, Strea res.reason = "Already streaming"; return res; } - - auto ctx = std::make_unique(); - ctx->running.store(true); - // 把 promise 移进线程上下文 - ctx->thread = std::thread([cam, type, &status_promise]() { stream_loop(cam, type, &status_promise); }); - streams.emplace(key, std::move(ctx)); } - // 阻塞等待线程内部更新状态,最多等 5 秒 + // 创建上下文 + auto ctx = std::make_unique(); + ctx->running.store(true); + + // promise/future 用于等待首帧或失败状态 + std::promise status_promise; + auto status_future = status_promise.get_future(); + + ctx->thread = std::thread([cam, type, &status_promise]() { stream_loop(cam, type, &status_promise); }); + + // 阻塞等待线程返回首帧或失败,最多 7 秒 + StreamStatus status; if (status_future.wait_for(std::chrono::seconds(7)) == std::future_status::ready) { - StreamStatus status = status_future.get(); - res.result = status.running ? 0 : 1; - res.reason = status.last_error.empty() ? "Started OK" : status.last_error; + status = status_future.get(); } else { + status.running = false; + status.last_result = StreamResult::TIMEOUT; + status.last_error = "Timeout waiting for stream"; + } + + if (status.running) + { + // 首帧成功才放进 map + std::lock_guard lock(streams_mutex); + streams.emplace(key, std::move(ctx)); + res.result = 0; + res.reason = "Started OK"; + } + else + { + // 启动失败,清理线程 + ctx->running.store(false); + if (ctx->thread.joinable()) ctx->thread.join(); res.result = 1; - res.reason = "Timeout waiting for stream"; + res.reason = status.last_error; } return res; @@ -175,16 +195,7 @@ void RTMPManager::stream_loop(Camera cam, StreamType type, std::promise lock(streams_mutex); - auto it = streams.find(key); - if (it == streams.end() || !it->second->running.load()) - { - break; // 停止逻辑 - } - } - + // 超时判断 if (!first_frame_received && std::chrono::duration_cast(std::chrono::steady_clock::now() - start_time).count() > 5) { @@ -212,11 +223,12 @@ void RTMPManager::stream_loop(Camera cam, StreamType type, std::promiseset_value(status); } break; } + case GST_MESSAGE_ERROR: case GST_MESSAGE_EOS: status.running = false; @@ -228,10 +240,12 @@ void RTMPManager::stream_loop(Camera cam, StreamType type, std::promise