// rtsp_manager.cpp #include "rtmp_manager.hpp" #include "logger.hpp" #include #include #include std::unordered_map RTMPManager::streams; std::mutex RTMPManager::streams_mutex; static inline std::string stream_type_suffix(StreamType type) { return (type == StreamType::MAIN) ? "_main" : "_sub"; } std::string RTMPManager::make_stream_key(const std::string &cam_name, StreamType type) { return cam_name + stream_type_suffix(type); } void RTMPManager::init() { gst_init(nullptr, nullptr); LOG_INFO("[RTMP] GStreamer initialized."); } GstElement *RTMPManager::create_pipeline(const Camera &cam, StreamType type) { // 拷贝原始参数 int width = cam.width; int height = cam.height; int fps = cam.fps; int bitrate = cam.bitrate; if (type == StreamType::SUB) { // 简单一刀切策略:分辨率 /2,帧率 /2,码率 /2,临界值保护 width = std::max(160, width / 2); height = std::max(120, height / 2); fps = std::max(10, fps / 2); bitrate = std::max(300000, bitrate / 2); // 最低 300kbps } // 构建不同的流名(stream key) std::string stream_name = cam.name + (type == StreamType::MAIN ? "_main" : "_sub"); std::string pipeline_str = "v4l2src device=" + cam.device + " ! video/x-raw,format=NV12,width=" + std::to_string(width) + ",height=" + std::to_string(height) + ",framerate=" + std::to_string(fps) + "/1" " ! queue max-size-buffers=1 leaky=downstream " " ! mpph264enc bps=" + std::to_string(bitrate) + " gop=" + std::to_string(fps) + " ! h264parse " " ! flvmux streamable=true name=mux " " ! rtmpsink location=\"rtmp://127.0.0.1/live/" + stream_name + " live=1\" sync=false"; LOG_INFO("[RTMP] Creating pipeline for '" + cam.name + "' (" + (type == StreamType::MAIN ? "MAIN" : "SUB") + ") -> " + pipeline_str); GError *error = nullptr; GstElement *pipeline = gst_parse_launch(pipeline_str.c_str(), &error); if (error) { LOG_ERROR("[RTMP] Failed to parse pipeline: " + std::string(error->message)); g_error_free(error); return nullptr; } return pipeline; } void RTMPManager::stream_loop(Camera cam, StreamType type) { const std::string key = make_stream_key(cam.name, type); LOG_INFO("[RTMP] Stream thread started for '" + key + "'"); while (true) { { std::lock_guard lock(streams_mutex); if (!streams[key].running) break; } GstElement *pipeline = create_pipeline(cam, type); if (!pipeline) { LOG_ERROR("[RTMP] Failed to create pipeline for '" + key + "', retrying in 3s..."); std::this_thread::sleep_for(std::chrono::seconds(3)); continue; } GstBus *bus = gst_element_get_bus(pipeline); gst_element_set_state(pipeline, GST_STATE_PLAYING); LOG_INFO("[RTMP] Camera '" + key + "' streaming..."); bool error_occurred = false; // 简单重连计数(可选):连续失败超过 N 次可以退出 int consecutive_failures = 0; const int MAX_RETRIES = 5; while (true) { GstMessage *msg = gst_bus_timed_pop_filtered( bus, GST_CLOCK_TIME_NONE, static_cast(GST_MESSAGE_ERROR | GST_MESSAGE_EOS)); { std::lock_guard lock(streams_mutex); if (!streams[key].running) break; } if (!msg) continue; if (GST_MESSAGE_TYPE(msg) == GST_MESSAGE_ERROR) { GError *err = nullptr; gchar *debug = nullptr; gst_message_parse_error(msg, &err, &debug); LOG_ERROR("[RTMP] Error on '" + key + "': " + std::string(err->message)); if (debug) LOG_DEBUG(std::string(debug)); g_error_free(err); g_free(debug); error_occurred = true; consecutive_failures++; } else if (GST_MESSAGE_TYPE(msg) == GST_MESSAGE_EOS) { LOG_WARN("[RTMP] EOS received on '" + key + "'"); error_occurred = true; consecutive_failures++; } gst_message_unref(msg); if (error_occurred) break; } gst_element_set_state(pipeline, GST_STATE_NULL); gst_object_unref(pipeline); gst_object_unref(bus); if (!error_occurred) break; if (consecutive_failures >= MAX_RETRIES) { LOG_ERROR("[RTMP] Max retries reached for '" + key + "'. Stopping reconnection attempts."); // 可以选择将 running 置为 false,让上层知道失败 std::lock_guard lock(streams_mutex); streams[key].running = false; break; } LOG_WARN("[RTMP] Reconnecting '" + key + "' in 3s..."); std::this_thread::sleep_for(std::chrono::seconds(3)); } LOG_INFO("[RTMP] Stream thread exited for '" + key + "'"); } void RTMPManager::start_camera(const Camera &cam, StreamType type) { std::lock_guard lock(streams_mutex); std::string key = make_stream_key(cam.name, type); if (streams.count(key) && streams[key].running) { LOG_WARN("[RTMP] Camera '" + key + "' already streaming."); return; } StreamContext ctx; ctx.running = true; ctx.thread = std::thread([cam, type]() { RTMPManager::stream_loop(cam, type); }); streams[key] = std::move(ctx); } void RTMPManager::stop_camera(const std::string &cam_name, StreamType type) { std::lock_guard lock(streams_mutex); std::string key = make_stream_key(cam_name, type); auto it = streams.find(key); if (it == streams.end()) { LOG_WARN("[RTMP] Camera '" + key + "' not found."); return; } it->second.running = false; LOG_INFO("[RTMP] Stopping camera '" + key + "'..."); if (it->second.thread.joinable()) it->second.thread.join(); streams.erase(it); } bool RTMPManager::is_streaming(const std::string &cam_name, StreamType type) { std::lock_guard lock(streams_mutex); std::string key = make_stream_key(cam_name, type); auto it = streams.find(key); return it != streams.end() && it->second.running; } bool RTMPManager::is_any_streaming() { std::lock_guard lock(streams_mutex); for (auto &kv : streams) if (kv.second.running) return true; return false; } void RTMPManager::stop_all() { std::vector names; { std::lock_guard lock(streams_mutex); for (auto &kv : streams) names.push_back(kv.first); } for (auto &name : names) { // name 中包含后缀,但是 stop_camera 需要 cam_name + StreamType // 我们可以解析后缀,或新增一个 stop_by_key。为简单起见解析: if (name.size() > 5 && name.find("_sub") == name.size() - 4) { std::string cam_name = name.substr(0, name.size() - 4); stop_camera(cam_name, StreamType::SUB); } else if (name.size() > 6 && name.find("_main") == name.size() - 5) { std::string cam_name = name.substr(0, name.size() - 5); stop_camera(cam_name, StreamType::MAIN); } else { // fallback: stop by treating as MAIN stop_camera(name, StreamType::MAIN); } } } std::string RTMPManager::get_stream_url(const std::string &cam_name, StreamType type) { // 根据你本地 RTMP 服务地址组装 URL,和 create_pipeline 的 stream_name 保持一致 std::string stream_name = cam_name + stream_type_suffix(type); return std::string("rtmp://127.0.0.1/live/") + stream_name; }