diff --git a/include/rtmp_manager.hpp b/include/rtmp_manager.hpp index 45b6e42..835e282 100644 --- a/include/rtmp_manager.hpp +++ b/include/rtmp_manager.hpp @@ -31,10 +31,7 @@ class RTMPManager static bool is_streaming(const std::string& cam_name); static std::string get_stream_url(const std::string& cam_name); - // 旧版接口(已保留) - static std::vector> get_all_status(); - - // ✅ 新增:单通道状态结构体(用于心跳) + // 新增:单通道状态结构体(用于心跳) struct ChannelInfo { int loc; // 摄像头位置索引(0~7) @@ -43,15 +40,17 @@ class RTMPManager std::string reason; // 错误原因(仅在异常时填) }; - // ✅ 新增:获取所有通道详细状态 + // 新增:获取所有通道详细状态 static std::vector get_all_channels_status(); private: struct StreamContext { - std::atomic running{false}; + std::atomic thread_running{false}; std::thread thread; + StreamStatus status; + std::mutex status_mutex; }; static void stream_loop(Camera cam, StreamContext* ctx); diff --git a/src/rtmp_manager.cpp b/src/rtmp_manager.cpp index 5ec50cf..2cf1984 100644 --- a/src/rtmp_manager.cpp +++ b/src/rtmp_manager.cpp @@ -123,12 +123,13 @@ void RTMPManager::stream_loop(Camera cam, StreamContext* ctx) { const std::string key = make_key(cam.name); - while (ctx->running) + while (ctx->thread_running) { // 1. 检查设备节点是否存在 struct stat st{}; if (stat(cam.device.c_str(), &st) != 0) { + std::lock_guard lk(ctx->status_mutex); ctx->status.running = false; ctx->status.last_error = "Device not found: " + cam.device; LOG_WARN("[RTMP] " + key + " - " + ctx->status.last_error); @@ -140,6 +141,7 @@ void RTMPManager::stream_loop(Camera cam, StreamContext* ctx) GstElement* pipeline = create_pipeline(cam); if (!pipeline) { + std::lock_guard lk(ctx->status_mutex); ctx->status.running = false; ctx->status.last_error = "Pipeline creation failed"; std::this_thread::sleep_for(std::chrono::seconds(3)); @@ -189,6 +191,7 @@ void RTMPManager::stream_loop(Camera cam, StreamContext* ctx) if (gst_element_get_state(pipeline, &state, &pending, 5 * GST_SECOND) == GST_STATE_CHANGE_SUCCESS && state == GST_STATE_PLAYING) { + std::lock_guard lk(ctx->status_mutex); confirmed_running = true; ctx->status.running = true; ctx->status.last_error.clear(); @@ -196,6 +199,7 @@ void RTMPManager::stream_loop(Camera cam, StreamContext* ctx) } else { + std::lock_guard lk(ctx->status_mutex); ctx->status.running = false; ctx->status.last_error = "Pipeline failed to confirm PLAYING"; LOG_WARN("[RTMP] " + key + " - " + ctx->status.last_error); @@ -214,7 +218,7 @@ void RTMPManager::stream_loop(Camera cam, StreamContext* ctx) const auto start_t = std::chrono::steady_clock::now(); bool need_restart = false; - while (ctx->running) + while (ctx->thread_running) { // 检查是否收到帧(前 5s 内) if (!got_frame) @@ -224,6 +228,7 @@ void RTMPManager::stream_loop(Camera cam, StreamContext* ctx) .count(); if (elapsed > 5) { + std::lock_guard lk(ctx->status_mutex); ctx->status.running = false; ctx->status.last_error = "No frames detected (no video signal)"; LOG_ERROR("[RTMP] " + key + " - " + ctx->status.last_error); @@ -233,6 +238,7 @@ void RTMPManager::stream_loop(Camera cam, StreamContext* ctx) } else { + std::lock_guard lk(ctx->status_mutex); ctx->status.running = true; ctx->status.last_error.clear(); } @@ -249,6 +255,7 @@ void RTMPManager::stream_loop(Camera cam, StreamContext* ctx) { GError* err = nullptr; gst_message_parse_error(msg, &err, nullptr); + std::lock_guard lk(ctx->status_mutex); ctx->status.running = false; ctx->status.last_error = err ? err->message : "GStreamer error"; LOG_ERROR("[RTMP] " + key + " stream error: " + ctx->status.last_error); @@ -257,11 +264,14 @@ void RTMPManager::stream_loop(Camera cam, StreamContext* ctx) break; } case GST_MESSAGE_EOS: + { + std::lock_guard lk(ctx->status_mutex); ctx->status.running = false; ctx->status.last_error = "End of stream (EOS)"; LOG_WARN("[RTMP] " + key + " reached EOS"); need_restart = true; break; + } default: break; } @@ -276,14 +286,17 @@ void RTMPManager::stream_loop(Camera cam, StreamContext* ctx) gst_object_unref(pipeline); // 7. 若仍在运行状态,准备重启 - if (ctx->running) + if (ctx->thread_running) { LOG_WARN("[RTMP] Restarting " + key + " in 3s..."); std::this_thread::sleep_for(std::chrono::milliseconds(RETRY_BASE_DELAY_MS)); } } - ctx->status.running = false; + { + std::lock_guard lk(ctx->status_mutex); + ctx->status.running = false; + } LOG_INFO("[RTMP] Stream thread exited for " + key); } @@ -304,7 +317,7 @@ void RTMPManager::start_all() } auto ctx = std::make_unique(); - ctx->running.store(true); + ctx->thread_running.store(true); ctx->thread = std::thread( [cam, ptr = ctx.get(), delay_ms]() @@ -331,7 +344,11 @@ bool RTMPManager::is_streaming(const std::string& cam_name) { std::lock_guard lock(streams_mutex); auto it = streams.find(make_key(cam_name)); - return (it != streams.end() && it->second->status.running); + if (it == streams.end()) return false; + + auto& ctx = *(it->second); + std::lock_guard lk(ctx.status_mutex); + return ctx.status.running; } std::string RTMPManager::get_stream_url(const std::string& cam_name) @@ -360,7 +377,11 @@ std::vector RTMPManager::get_all_channels_status() auto it = streams.find(key); if (it != streams.end()) { + auto& ctx = *(it->second); + + std::lock_guard lk(ctx.status_mutex); auto& status = it->second->status; + ch.running = status.running; if (status.running) { @@ -372,10 +393,6 @@ std::vector RTMPManager::get_all_channels_status() ch.reason = status.last_error.empty() ? "Unknown error" : status.last_error; } } - else - { - ch.reason = "Context missing"; - } result.push_back(ch); }