diff --git a/include/rtmp_manager.hpp b/include/rtmp_manager.hpp index 2686d17..6d09465 100644 --- a/include/rtmp_manager.hpp +++ b/include/rtmp_manager.hpp @@ -8,6 +8,7 @@ #include #include #include +#include #include "app_config.hpp" class RTMPManager @@ -47,9 +48,15 @@ private: std::atomic running{false}; std::thread thread; StreamStatus status; + StreamContext() = default; + // non-copyable + StreamContext(const StreamContext &) = delete; + StreamContext &operator=(const StreamContext &) = delete; + // movable? we avoid moving by using unique_ptr }; - static std::unordered_map streams; + // store unique_ptr to avoid copy/move issues with atomic/thread + static std::unordered_map> streams; static std::mutex streams_mutex; static StreamCallback status_callback; diff --git a/src/rtmp_manager.cpp b/src/rtmp_manager.cpp index 27fdf36..a49d18d 100644 --- a/src/rtmp_manager.cpp +++ b/src/rtmp_manager.cpp @@ -5,8 +5,10 @@ #include #include #include +#include // for move +#include -std::unordered_map RTMPManager::streams; +std::unordered_map> RTMPManager::streams; std::mutex RTMPManager::streams_mutex; RTMPManager::StreamCallback RTMPManager::status_callback = nullptr; @@ -33,10 +35,12 @@ void RTMPManager::set_status_callback(StreamCallback cb) void RTMPManager::update_status(const std::string &key, const StreamStatus &status) { - std::lock_guard lock(streams_mutex); - auto it = streams.find(key); - if (it != streams.end()) - it->second.status = status; + { + std::lock_guard lock(streams_mutex); + auto it = streams.find(key); + if (it != streams.end()) + it->second->status = status; + } if (status_callback) status_callback(key, status); } @@ -95,7 +99,8 @@ void RTMPManager::stream_loop(Camera cam, StreamType type) { { std::lock_guard lock(streams_mutex); - if (!streams[key].running) + auto it = streams.find(key); + if (it == streams.end() || !it->second->running.load()) break; } @@ -125,7 +130,8 @@ void RTMPManager::stream_loop(Camera cam, StreamType type) { std::lock_guard lock(streams_mutex); - if (!streams[key].running) + auto it = streams.find(key); + if (it == streams.end() || !it->second->running.load()) { stop_flag = true; break; @@ -143,8 +149,10 @@ void RTMPManager::stream_loop(Camera cam, StreamType type) std::string err_msg = err ? err->message : "Unknown GStreamer error"; LOG_ERROR("[RTMP] Error in " + key + ": " + err_msg); update_status(key, {false, StreamResult::CONNECTION_FAIL, err_msg}); - g_error_free(err); - g_free(debug); + if (err) + g_error_free(err); + if (debug) + g_free(debug); stop_flag = true; } else if (GST_MESSAGE_TYPE(msg) == GST_MESSAGE_EOS) @@ -160,7 +168,8 @@ void RTMPManager::stream_loop(Camera cam, StreamType type) } gst_element_set_state(pipeline, GST_STATE_NULL); - gst_object_unref(bus); + if (bus) + gst_object_unref(bus); gst_object_unref(pipeline); if (!stop_flag) @@ -183,59 +192,98 @@ void RTMPManager::stream_loop(Camera cam, StreamType type) void RTMPManager::start_camera(const Camera &cam, StreamType type) { std::string key = make_stream_key(cam.name, type); - std::lock_guard lock(streams_mutex); - - auto it = streams.find(key); - if (it != streams.end() && it->second.running) { - LOG_WARN("[RTMP] " + key + " already streaming."); - return; + std::lock_guard lock(streams_mutex); + auto it = streams.find(key); + if (it != streams.end() && it->second->running.load()) + { + LOG_WARN("[RTMP] " + key + " already streaming."); + return; + } + + // create a new context on heap and keep a unique_ptr in map + auto ctx = std::make_unique(); + ctx->running.store(true); + ctx->status = {true, StreamResult::OK, ""}; + + // start thread after moving unique_ptr into map (to avoid racing) + // insert placeholder first to reserve key, then start thread + streams.emplace(key, std::move(ctx)); } - StreamContext ctx; - ctx.running = true; - ctx.thread = std::thread([cam, type]() - { stream_loop(cam, type); }); - streams.emplace(key, std::move(ctx)); + // actually start thread outside the lock (we need to get pointer again) + { + std::lock_guard lock(streams_mutex); + auto it = streams.find(key); + if (it != streams.end()) + { + // start the thread that runs stream_loop; capture cam and type by value + it->second->thread = std::thread([cam, type]() + { stream_loop(cam, type); }); + } + } + + LOG_INFO("[RTMP] start_camera requested for " + key); } void RTMPManager::stop_camera(const std::string &cam_name, StreamType type) { std::string key = make_stream_key(cam_name, type); - std::unique_lock lock(streams_mutex); - auto it = streams.find(key); - if (it == streams.end()) + + std::unique_ptr taken_ctx; { - LOG_WARN("[RTMP] " + key + " not found."); - return; + std::lock_guard lock(streams_mutex); + auto it = streams.find(key); + if (it == streams.end()) + { + LOG_WARN("[RTMP] " + key + " not found."); + return; + } + + // mark it to stop and take ownership + it->second->running.store(false); + taken_ctx = std::move(it->second); + streams.erase(it); } - it->second.running = false; - auto th = std::move(it->second.thread); - streams.erase(it); - lock.unlock(); - - if (th.joinable()) - th.join(); + // join outside lock + if (taken_ctx && taken_ctx->thread.joinable()) + taken_ctx->thread.join(); update_status(key, {false, StreamResult::OK, "Stopped manually"}); + LOG_INFO("[RTMP] stop_camera finished for " + key); } void RTMPManager::stop_all() { + std::vector> taken; std::vector keys; + { std::lock_guard lock(streams_mutex); for (auto &kv : streams) + { keys.push_back(kv.first); + kv.second->running.store(false); + } + // move all contexts out + for (auto &k : keys) + { + auto it = streams.find(k); + if (it != streams.end()) + taken.push_back(std::move(it->second)); + streams.erase(k); + } } - for (auto &key : keys) + + // join threads + for (size_t i = 0; i < taken.size(); ++i) { - if (key.find("_main") != std::string::npos) - stop_camera(key.substr(0, key.size() - 5), StreamType::MAIN); - else - stop_camera(key.substr(0, key.size() - 4), StreamType::SUB); + if (taken[i] && taken[i]->thread.joinable()) + taken[i]->thread.join(); } + + LOG_INFO("[RTMP] stop_all completed."); } bool RTMPManager::is_streaming(const std::string &cam_name, StreamType type) @@ -243,14 +291,14 @@ bool RTMPManager::is_streaming(const std::string &cam_name, StreamType type) std::lock_guard lock(streams_mutex); auto key = make_stream_key(cam_name, type); auto it = streams.find(key); - return it != streams.end() && it->second.running; + return it != streams.end() && it->second->running.load(); } bool RTMPManager::is_any_streaming() { std::lock_guard lock(streams_mutex); for (auto &kv : streams) - if (kv.second.running) + if (kv.second->running.load()) return true; return false; }