diff --git a/src/rtmp_manager.cpp b/src/rtmp_manager.cpp index 0cceaa1..10ed4fd 100644 --- a/src/rtmp_manager.cpp +++ b/src/rtmp_manager.cpp @@ -81,7 +81,7 @@ void RTMPManager::stream_loop(Camera cam, StreamType type) { { std::lock_guard lock(streams_mutex); - if (!streams[key].running) + if (!streams[key].running.load()) break; } @@ -109,7 +109,7 @@ void RTMPManager::stream_loop(Camera cam, StreamType type) { std::lock_guard lock(streams_mutex); - if (!streams[key].running) + if (!streams[key].running.load()) break; } @@ -140,8 +140,8 @@ void RTMPManager::stream_loop(Camera cam, StreamType type) } gst_element_set_state(pipeline, GST_STATE_NULL); - gst_object_unref(pipeline); gst_object_unref(bus); + gst_object_unref(pipeline); if (!error_occurred) break; @@ -150,7 +150,7 @@ void RTMPManager::stream_loop(Camera cam, StreamType type) { LOG_ERROR("[RTMP] Max retries reached for '" + key + "'. Stopping reconnection attempts."); std::lock_guard lock(streams_mutex); - streams[key].running = false; + streams[key].running.store(false); break; } @@ -165,22 +165,25 @@ void RTMPManager::start_camera(const Camera &cam, StreamType type) { std::lock_guard lock(streams_mutex); std::string key = make_stream_key(cam.name, type); - if (streams.count(key) && streams[key].running) + + if (streams.count(key) && streams[key].running.load()) { LOG_WARN("[RTMP] Camera '" + key + "' already streaming."); return; } StreamContext ctx; - ctx.running = true; + ctx.running.store(true); ctx.thread = std::thread([cam, type]() { RTMPManager::stream_loop(cam, type); }); - streams.emplace(key, std::move(ctx)); + + // 修复 emplace 构造失败问题 + streams.emplace(std::move(key), std::move(ctx)); } void RTMPManager::stop_camera(const std::string &cam_name, StreamType type) { - std::lock_guard lock(streams_mutex); + std::unique_lock lock(streams_mutex); std::string key = make_stream_key(cam_name, type); auto it = streams.find(key); if (it == streams.end()) @@ -189,11 +192,13 @@ void RTMPManager::stop_camera(const std::string &cam_name, StreamType type) return; } - it->second.running = false; - if (it->second.thread.joinable()) - it->second.thread.join(); - + it->second.running.store(false); + auto th = std::move(it->second.thread); streams.erase(it); + lock.unlock(); + + if (th.joinable()) + th.join(); } bool RTMPManager::is_streaming(const std::string &cam_name, StreamType type) @@ -201,42 +206,35 @@ bool RTMPManager::is_streaming(const std::string &cam_name, StreamType type) std::lock_guard lock(streams_mutex); std::string key = make_stream_key(cam_name, type); auto it = streams.find(key); - return it != streams.end() && it->second.running; + return it != streams.end() && it->second.running.load(); } bool RTMPManager::is_any_streaming() { std::lock_guard lock(streams_mutex); for (auto &kv : streams) - if (kv.second.running) + if (kv.second.running.load()) return true; return false; } void RTMPManager::stop_all() { - std::vector keys; + std::vector> targets; + { std::lock_guard lock(streams_mutex); for (auto &kv : streams) - keys.push_back(kv.first); + { + if (kv.first.size() > 4 && kv.first.find("_sub") == kv.first.size() - 4) + targets.emplace_back(kv.first.substr(0, kv.first.size() - 4), StreamType::SUB); + else + targets.emplace_back(kv.first.substr(0, kv.first.size() - 5), StreamType::MAIN); + } } - for (auto &key : keys) - { - if (key.size() > 4 && key.find("_sub") == key.size() - 4) - { - stop_camera(key.substr(0, key.size() - 4), StreamType::SUB); - } - else if (key.size() > 5 && key.find("_main") == key.size() - 5) - { - stop_camera(key.substr(0, key.size() - 5), StreamType::MAIN); - } - else - { - stop_camera(key, StreamType::MAIN); - } - } + for (auto &[name, type] : targets) + stop_camera(name, type); } std::string RTMPManager::get_stream_url(const std::string &cam_name, StreamType type)