// rtsp_manager.cpp #include "rtmp_manager.hpp" #include "logger.hpp" #include #include #include #include #include // for move #include std::unordered_map> RTMPManager::streams; std::mutex RTMPManager::streams_mutex; RTMPManager::StreamCallback RTMPManager::status_callback = nullptr; static inline std::string stream_type_suffix(StreamType type) { return (type == StreamType::MAIN) ? "_main" : "_sub"; } std::string RTMPManager::make_stream_key(const std::string &cam_name, StreamType type) { return cam_name + stream_type_suffix(type); } void RTMPManager::init() { gst_init(nullptr, nullptr); LOG_INFO("[RTMP] GStreamer initialized."); // 启动 stop_worker_thread stop_thread_running.store(true); stop_thread = std::thread([] { while (stop_thread_running.load()) { std::unique_ptr ctx; { std::unique_lock lock(RTMPManager::stop_queue_mutex); RTMPManager::stop_cv.wait(lock, []{ return !RTMPManager::stop_queue.empty() || !RTMPManager::stop_thread_running.load(); }); if (!RTMPManager::stop_queue.empty()) { ctx = std::move(RTMPManager::stop_queue.front()); RTMPManager::stop_queue.pop(); } } if (ctx && ctx->thread.joinable()) { ctx->thread.join(); LOG_INFO("[RTMP] Stream thread joined asynchronously"); } } }); } void RTMPManager::set_status_callback(StreamCallback cb) { status_callback = std::move(cb); } void RTMPManager::update_status(const std::string &key, const StreamStatus &status) { { std::lock_guard lock(streams_mutex); auto it = streams.find(key); if (it != streams.end()) it->second->status = status; } if (status_callback) status_callback(key, status); } GstElement *RTMPManager::create_pipeline(const Camera &cam, StreamType type) { int width = cam.width; int height = cam.height; int fps = cam.fps; int bitrate = cam.bitrate; if (type == StreamType::SUB) { width = std::max(160, width / 2); height = std::max(120, height / 2); fps = std::max(10, fps / 2); bitrate = std::max(300000, bitrate / 2); } std::string stream_name = cam.name + stream_type_suffix(type); std::string pipeline_str = "v4l2src device=" + cam.device + " ! video/x-raw,format=NV12,width=" + std::to_string(width) + ",height=" + std::to_string(height) + ",framerate=" + std::to_string(fps) + "/1 ! queue max-size-buffers=1 leaky=downstream " "! mpph264enc bps=" + std::to_string(bitrate) + " gop=" + std::to_string(fps) + " ! h264parse ! flvmux streamable=true name=mux " "! rtmpsink location=\"rtmp://127.0.0.1/live/" + stream_name + " live=1\" sync=false"; LOG_INFO("[RTMP] Creating pipeline for '" + stream_name + "': " + pipeline_str); GError *error = nullptr; GstElement *pipeline = gst_parse_launch(pipeline_str.c_str(), &error); if (error) { LOG_ERROR("[RTMP] Pipeline parse error: " + std::string(error->message)); g_error_free(error); return nullptr; } return pipeline; } void RTMPManager::stream_loop(Camera cam, StreamType type) { const std::string key = make_stream_key(cam.name, type); LOG_INFO("[RTMP] Stream loop started for " + key); const int MAX_RETRIES = 5; int retry_count = 0; while (true) { { std::lock_guard lock(streams_mutex); auto it = streams.find(key); if (it == streams.end() || !it->second->running.load()) break; } GstElement *pipeline = create_pipeline(cam, type); if (!pipeline) { update_status(key, {false, StreamResult::PIPELINE_ERROR, "Failed to create pipeline"}); std::this_thread::sleep_for(std::chrono::seconds(3)); if (++retry_count > MAX_RETRIES) break; continue; } GstBus *bus = gst_element_get_bus(pipeline); gst_element_set_state(pipeline, GST_STATE_PLAYING); update_status(key, {true, StreamResult::OK, ""}); LOG_INFO("[RTMP] " + key + " is streaming."); bool stop_flag = false; GstMessage *msg = nullptr; while (true) { msg = gst_bus_timed_pop_filtered( bus, GST_CLOCK_TIME_NONE, static_cast(GST_MESSAGE_ERROR | GST_MESSAGE_EOS)); { std::lock_guard lock(streams_mutex); auto it = streams.find(key); if (it == streams.end() || !it->second->running.load()) { stop_flag = true; break; } } if (!msg) continue; if (GST_MESSAGE_TYPE(msg) == GST_MESSAGE_ERROR) { GError *err = nullptr; gchar *debug = nullptr; gst_message_parse_error(msg, &err, &debug); std::string err_msg = err ? err->message : "Unknown GStreamer error"; LOG_ERROR("[RTMP] Error in " + key + ": " + err_msg); update_status(key, {false, StreamResult::CONNECTION_FAIL, err_msg}); if (err) g_error_free(err); if (debug) g_free(debug); stop_flag = true; } else if (GST_MESSAGE_TYPE(msg) == GST_MESSAGE_EOS) { LOG_WARN("[RTMP] EOS on " + key); update_status(key, {false, StreamResult::EOS_RECEIVED, "EOS"}); stop_flag = true; } gst_message_unref(msg); if (stop_flag) break; } gst_element_set_state(pipeline, GST_STATE_NULL); if (bus) gst_object_unref(bus); gst_object_unref(pipeline); if (!stop_flag) break; if (++retry_count > MAX_RETRIES) { LOG_ERROR("[RTMP] " + key + " reached max retries. Giving up."); break; } LOG_WARN("[RTMP] Reconnecting " + key + " in 3s..."); std::this_thread::sleep_for(std::chrono::seconds(3)); } update_status(key, {false, StreamResult::UNKNOWN, "Stream loop exited"}); LOG_INFO("[RTMP] Stream loop ended for " + key); } void RTMPManager::start_camera(const Camera &cam, StreamType type) { std::string key = make_stream_key(cam.name, type); { std::lock_guard lock(streams_mutex); auto it = streams.find(key); if (it != streams.end() && it->second->running.load()) { LOG_WARN("[RTMP] " + key + " already streaming."); return; } // create a new context on heap and keep a unique_ptr in map auto ctx = std::make_unique(); ctx->running.store(true); ctx->status = {true, StreamResult::OK, ""}; // start thread after moving unique_ptr into map (to avoid racing) // insert placeholder first to reserve key, then start thread streams.emplace(key, std::move(ctx)); } // actually start thread outside the lock (we need to get pointer again) { std::lock_guard lock(streams_mutex); auto it = streams.find(key); if (it != streams.end()) { // start the thread that runs stream_loop; capture cam and type by value it->second->thread = std::thread([cam, type]() { stream_loop(cam, type); }); } } LOG_INFO("[RTMP] start_camera requested for " + key); } void RTMPManager::stop_camera(const std::string &cam_name, StreamType type) { std::string key = make_stream_key(cam_name, type); std::unique_ptr taken_ctx; { std::lock_guard lock(streams_mutex); auto it = streams.find(key); if (it == streams.end()) { LOG_WARN("[RTMP] " + key + " not found."); return; } it->second->running.store(false); taken_ctx = std::move(it->second); streams.erase(it); } // 异步加入 stop 队列,由 stop_worker_thread join if (taken_ctx) { { std::lock_guard lock(stop_queue_mutex); stop_queue.push(std::move(taken_ctx)); } stop_cv.notify_one(); } update_status(key, {false, StreamResult::OK, "Stopped manually"}); LOG_INFO("[RTMP] stop_camera queued for async stop: " + key); } void RTMPManager::stop_all() { std::vector> taken; std::vector keys; { std::lock_guard lock(streams_mutex); for (auto &kv : streams) { keys.push_back(kv.first); kv.second->running.store(false); } // move all contexts out for (auto &k : keys) { auto it = streams.find(k); if (it != streams.end()) taken.push_back(std::move(it->second)); streams.erase(k); } } // join threads for (size_t i = 0; i < taken.size(); ++i) { if (taken[i] && taken[i]->thread.joinable()) taken[i]->thread.join(); } LOG_INFO("[RTMP] stop_all completed."); } bool RTMPManager::is_streaming(const std::string &cam_name, StreamType type) { std::lock_guard lock(streams_mutex); auto key = make_stream_key(cam_name, type); auto it = streams.find(key); return it != streams.end() && it->second->running.load(); } bool RTMPManager::is_any_streaming() { std::lock_guard lock(streams_mutex); for (auto &kv : streams) if (kv.second->running.load()) return true; return false; } std::string RTMPManager::get_stream_url(const std::string &cam_name, StreamType type) { return "rtmp://127.0.0.1/live/" + make_stream_key(cam_name, type); }