diff --git a/CMakeLists.txt b/CMakeLists.txt index bc260be..82fecee 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,63 +1,56 @@ cmake_minimum_required(VERSION 3.10) -# 工程名称(随便改) -project(rtsp_server) +project(rtmp_publisher) set(CMAKE_CXX_STANDARD 17) set(CMAKE_CXX_STANDARD_REQUIRED ON) add_compile_definitions(GLIB_DISABLE_DEPRECATION_WARNINGS) -# 设置可执行文件输出目录 -set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/../bin) +# 输出到工程 bin/ +set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_SOURCE_DIR}/bin) -# 找 GStreamer +# ---------- GStreamer ---------- find_package(PkgConfig REQUIRED) pkg_check_modules(GSTREAMER REQUIRED gstreamer-1.0) -pkg_check_modules(GSTREAMER_RTSP REQUIRED gstreamer-rtsp-server-1.0) -# 添加头文件目录 -include_directories( +# ---------- 源码 ---------- +file(GLOB SRC_FILES src/*.cpp) + +add_executable(camera_to_rtmp ${SRC_FILES}) + +# ---------- 头文件 ---------- +target_include_directories(camera_to_rtmp PRIVATE ${CMAKE_SOURCE_DIR}/include ${CMAKE_SOURCE_DIR}/third_party/include ${CMAKE_SOURCE_DIR}/third_party/include/paho_mqtt ${GSTREAMER_INCLUDE_DIRS} - ${GSTREAMER_RTSP_INCLUDE_DIRS} ) -# 源码目录 -file(GLOB SRC_FILES src/*.cpp) - -# 生成可执行文件(名字和 project 不一定要一样) -add_executable(camera_to_rtsp ${SRC_FILES}) - -# 链接库目录 -link_directories( - ${GSTREAMER_LIBRARY_DIRS} - ${GSTREAMER_RTSP_LIBRARY_DIRS} +# ---------- 链接库 ---------- +target_link_directories(camera_to_rtmp PRIVATE ${CMAKE_SOURCE_DIR}/third_party/lib ) -# 链接库 -target_link_libraries(camera_to_rtsp +target_link_libraries(camera_to_rtmp ${GSTREAMER_LIBRARIES} - ${GSTREAMER_RTSP_LIBRARIES} pthread ${CMAKE_SOURCE_DIR}/third_party/lib/libpaho-mqttpp3.a ${CMAKE_SOURCE_DIR}/third_party/lib/libpaho-mqtt3a.a ${CMAKE_SOURCE_DIR}/third_party/lib/libpaho-mqtt3c.a ) -set_target_properties(camera_to_rtsp PROPERTIES +# ---------- RPATH ---------- +set_target_properties(camera_to_rtmp PROPERTIES BUILD_RPATH "\$ORIGIN/lib" INSTALL_RPATH "\$ORIGIN/lib" ) -# 运行时把 config.json 复制到 bin/ 目录 +# ---------- 配置文件 ---------- add_custom_command( - TARGET camera_to_rtsp POST_BUILD - COMMAND ${CMAKE_COMMAND} -E make_directory $ + TARGET camera_to_rtmp POST_BUILD + COMMAND ${CMAKE_COMMAND} -E make_directory $ COMMAND ${CMAKE_COMMAND} -E copy_if_different ${CMAKE_SOURCE_DIR}/config.json - $/config.json.default + $/config.json.default ) diff --git a/include/rtmp_manager.hpp b/include/rtmp_manager.hpp new file mode 100644 index 0000000..835e282 --- /dev/null +++ b/include/rtmp_manager.hpp @@ -0,0 +1,64 @@ +// rtmp_manager.hpp +#pragma once + +#include + +#include +#include +#include +#include +#include +#include + +#include "app_config.hpp" +#include "logger.hpp" + +std::string get_ip_address(const std::string& ifname); + +class RTMPManager +{ + public: + struct StreamStatus + { + bool running{false}; + std::string last_error; + }; + + // 初始化与控制接口 + static void init(); + static void start_all(); + static void stop_all(); + static bool is_streaming(const std::string& cam_name); + static std::string get_stream_url(const std::string& cam_name); + + // 新增:单通道状态结构体(用于心跳) + struct ChannelInfo + { + int loc; // 摄像头位置索引(0~7) + std::string url; // 正常时的推流地址 + bool running; // 是否正常运行 + std::string reason; // 错误原因(仅在异常时填) + }; + + // 新增:获取所有通道详细状态 + static std::vector get_all_channels_status(); + + private: + struct StreamContext + { + std::atomic thread_running{false}; + std::thread thread; + + StreamStatus status; + std::mutex status_mutex; + }; + + static void stream_loop(Camera cam, StreamContext* ctx); + static GstElement* create_pipeline(const Camera& cam); + static std::string make_key(const std::string& name); + + static std::unordered_map> streams; + static std::mutex streams_mutex; + + static constexpr int RETRY_BASE_DELAY_MS = 3000; +}; diff --git a/include/rtsp_manager.hpp b/include/rtsp_manager.hpp deleted file mode 100644 index 89099b5..0000000 --- a/include/rtsp_manager.hpp +++ /dev/null @@ -1,50 +0,0 @@ -// rtsp_manager.hpp -#pragma once - -#include -#include -#include "app_config.hpp" -#include -#include -#include -#include - -class RTSPManager -{ -public: - static void init(); - static void start(const std::vector &cameras); - static void stop(); - - static void mount_camera(const Camera &cam); - static void unmount_camera(const Camera &cam); - - static bool is_streaming(const std::string &cam_name); - static bool is_any_streaming(); - -private: - static GMainLoop *loop; - static GMainContext *main_context; - static GstRTSPServer *server; - - static std::unordered_map streaming_status; - - // 工厂创建 - static GstRTSPMediaFactory *create_media_factory(const Camera &cam); - - // 挂载/卸载 - static gboolean mount_camera_in_main(gpointer data); - static gboolean unmount_camera_in_main(gpointer data); - - // factory 管理 - static std::unordered_map mounted_factories; - static std::mutex mounted_factories_mutex; - - // pipeline/media 管理 - static std::unordered_map> media_map; - static std::mutex media_map_mutex; - - // gstreamer 信号 - static void on_media_created(GstRTSPMediaFactory *factory, GstRTSPMedia *media, gpointer user_data); - static void on_media_unprepared(GstRTSPMedia *media, gpointer user_data); -}; diff --git a/src/main.cpp b/src/main.cpp index 5b1b636..8ccec5a 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -1,139 +1,79 @@ // main.cpp +#include + +#include +#include +#include +#include + #include "app_config.hpp" -#include "rtsp_manager.hpp" #include "logger.hpp" #include "mqtt_client_wrapper.hpp" -#include -#include -#include -#include -#include - -constexpr bool ENABLE_RTSP_THREAD = true; -constexpr bool ENABLE_MQTT_THREAD = true; +#include "rtmp_manager.hpp" std::atomic g_running(true); -static void minimal_signal_handler(int signum) +// ---------- 信号处理 ---------- +static void signal_handler(int) { g_running.store(false, std::memory_order_relaxed); - const char msg[] = "[MAIN] Signal received, initiating shutdown...\n"; + const char msg[] = "[MAIN] Signal received, shutting down...\n"; write(STDERR_FILENO, msg, sizeof(msg) - 1); } int main() { - struct sigaction sigAct{}; - sigAct.sa_handler = minimal_signal_handler; - sigemptyset(&sigAct.sa_mask); - sigAct.sa_flags = 0; - sigaction(SIGINT, &sigAct, nullptr); - sigaction(SIGTERM, &sigAct, nullptr); + // ---------- 信号 ---------- + struct sigaction sa{}; + sa.sa_handler = signal_handler; + sigemptyset(&sa.sa_mask); + sa.sa_flags = 0; + sigaction(SIGINT, &sa, nullptr); + sigaction(SIGTERM, &sa, nullptr); signal(SIGPIPE, SIG_IGN); - Logger::set_log_to_file(get_executable_dir_file_path("app.log")); + // ---------- 日志 ---------- + Logger::init(get_executable_dir_file_path("logs"), 7); + LOG_INFO("[MAIN] ===== Video RTMP Publisher Starting ====="); + // ---------- 配置 ---------- try { g_app_config = AppConfig::load_from_file(get_executable_dir_file_path("config.json")); + LOG_INFO("[MAIN] Config loaded."); } - catch (const std::exception &e) + catch (const std::exception& e) { - LOG_ERROR(std::string("Failed to load config: ") + e.what()); + LOG_ERROR(std::string("[MAIN] Failed to load config: ") + e.what()); return -1; } - std::atomic rtsp_thread_exited(false); - std::atomic mqtt_thread_exited(false); + // ---------- GStreamer ---------- + RTMPManager::init(); - // 初始化 GStreamer - RTSPManager::init(); + // ---------- 启动 RTMP 推流 ---------- + LOG_INFO("[MAIN] Starting RTMP pipelines..."); + RTMPManager::start_all(); - std::thread rtsp_thread; - std::thread mqtt_thread; - - if (ENABLE_RTSP_THREAD) - { - rtsp_thread = std::thread([&]() - { - RTSPManager::start(g_app_config.cameras); - rtsp_thread_exited.store(true, std::memory_order_relaxed); }); - - LOG_INFO("[MAIN] RTSP thread started"); - } - else - { - LOG_INFO("[MAIN] RTSP thread disabled by toggle"); - } - - if (ENABLE_MQTT_THREAD) - { - mqtt_thread = std::thread([&]() - { + // ---------- MQTT ---------- + std::thread mqtt_thread( + [] + { + LOG_INFO("[MAIN] MQTT thread started."); mqtt_client_thread_func(); - mqtt_thread_exited.store(true, std::memory_order_relaxed); }); + LOG_INFO("[MAIN] MQTT thread exited."); + }); - LOG_INFO("[MAIN] MQTT thread started"); - } - else - { - LOG_INFO("[MAIN] MQTT thread disabled by toggle"); - } + // ---------- 主循环 ---------- + while (g_running.load(std::memory_order_relaxed)) std::this_thread::sleep_for(std::chrono::milliseconds(200)); - while (g_running.load(std::memory_order_relaxed)) - std::this_thread::sleep_for(std::chrono::milliseconds(200)); + // ---------- 退出 ---------- + LOG_INFO("[MAIN] Shutdown requested. Stopping services..."); - LOG_INFO("[MAIN] Shutdown requested, stopping services..."); + RTMPManager::stop_all(); - const auto max_wait = std::chrono::seconds(5); - const auto poll_interval = std::chrono::milliseconds(100); + if (mqtt_thread.joinable()) mqtt_thread.join(); - if (ENABLE_RTSP_THREAD) - { - RTSPManager::stop(); - - if (rtsp_thread.joinable()) - rtsp_thread.join(); - - LOG_INFO("[MAIN] RTSP thread finished and joined."); - } - - auto deadline = std::chrono::steady_clock::now() + max_wait; - - if (ENABLE_MQTT_THREAD) - { - while (!mqtt_thread_exited.load(std::memory_order_relaxed) && - std::chrono::steady_clock::now() < deadline) - { - std::this_thread::sleep_for(poll_interval); - } - - if (mqtt_thread.joinable()) - { - if (mqtt_thread_exited.load(std::memory_order_relaxed)) - { - mqtt_thread.join(); - LOG_INFO("[MAIN] MQTT thread finished and joined."); - } - else - { - LOG_WARN("[MAIN] MQTT thread did not exit within timeout."); - } - } - } - - bool any_failed = false; - if (ENABLE_RTSP_THREAD && rtsp_thread.joinable() && !rtsp_thread_exited.load()) - any_failed = true; - if (ENABLE_MQTT_THREAD && mqtt_thread.joinable() && !mqtt_thread_exited.load()) - any_failed = true; - - if (any_failed) - { - LOG_ERROR("[MAIN] Threads did not exit in time. Forcing immediate termination."); - _exit(1); - } - - LOG_INFO("[MAIN] Program exited cleanly."); + LOG_INFO("[MAIN] ===== Video RTMP Publisher Exited Cleanly ====="); return 0; } diff --git a/src/mqtt_client_wrapper.cpp b/src/mqtt_client_wrapper.cpp index 16b2be7..176c433 100644 --- a/src/mqtt_client_wrapper.cpp +++ b/src/mqtt_client_wrapper.cpp @@ -56,7 +56,7 @@ static void on_mqtt_message_received(const std::string& topic, const std::string auto j = nlohmann::json::parse(message); auto seqNo = j["data"].value("seqNo", ""); - LOG_INFO("[MQTT] video_down received, RTSP always running (no action)."); + LOG_INFO("[MQTT] video_down received, stream is always available via MediaMTX."); nlohmann::json reply_data; reply_data["time"] = Logger::get_current_time_utc8(); @@ -75,7 +75,7 @@ static void on_mqtt_message_received(const std::string& topic, const std::string // ------- substream_down 应答 ------- if (topic == g_app_config.mqtt.topics.substream_down) { - LOG_INFO("[MQTT] substream_down received (ignored)."); + LOG_WARN("[MQTT] substream_down received, substream is not supported in MediaMTX mode."); return; } diff --git a/src/rtmp_manager.cpp b/src/rtmp_manager.cpp new file mode 100644 index 0000000..d569e74 --- /dev/null +++ b/src/rtmp_manager.cpp @@ -0,0 +1,392 @@ +// rtmp_manager.cpp +#include "rtmp_manager.hpp" + +#include +#include +#include +#include +#include + +#include +#include +#include + +// ========== 工具函数 ========== +static bool device_exists(const std::string& path) +{ + struct stat st; + return (stat(path.c_str(), &st) == 0); +} + +// 动态获取指定网卡 IPv4 地址 +std::string get_ip_address(const std::string& ifname) +{ + struct ifaddrs *ifaddr, *ifa; + char ip[INET_ADDRSTRLEN] = {0}; + + if (getifaddrs(&ifaddr) == -1) + { + perror("getifaddrs"); + return ""; + } + + for (ifa = ifaddr; ifa != nullptr; ifa = ifa->ifa_next) + { + if (ifa->ifa_addr == nullptr) continue; + + if (ifa->ifa_addr->sa_family == AF_INET && ifname == ifa->ifa_name) + { + void* addr = &((struct sockaddr_in*)ifa->ifa_addr)->sin_addr; + if (inet_ntop(AF_INET, addr, ip, sizeof(ip))) + { + freeifaddrs(ifaddr); + return ip; + } + } + } + + freeifaddrs(ifaddr); + return ""; +} + +// ========== 静态成员 ========== +std::unordered_map> RTMPManager::streams; +std::mutex RTMPManager::streams_mutex; + +// ========== 初始化 ========== +void RTMPManager::init() +{ + gst_init(nullptr, nullptr); + LOG_INFO("[RTMP] GStreamer initialized."); +} + +std::string RTMPManager::make_key(const std::string& name) { return name + "_main"; } + +// ========== 创建推流管线 ========== +GstElement* RTMPManager::create_pipeline(const Camera& cam) +{ + const int width = cam.width; + const int height = cam.height; + const int fps = cam.fps; + const int bitrate = cam.bitrate; + + // MediaMTX 中的 stream key + const std::string stream_name = cam.name + "_main"; + + // RTMP 推送到 MediaMTX + // mediamtx.yml 中 paths 会自动创建 + const std::string rtmp_url = "rtmp://127.0.0.1:1935/" + stream_name; + + /* + * Pipeline 说明: + * v4l2src -> mpph264enc -> h264parse -> flvmux -> rtmpsink + * + * - 不使用 tee(降低死锁概率) + * - 不引入音频(MediaMTX 不强制要求) + * - 纯视频、纯 RTMP、纯 TCP + */ + std::string pipeline_str = "v4l2src name=src device=" + cam.device + + " ! video/x-raw,format=NV12,width=" + std::to_string(width) + + ",height=" + std::to_string(height) + ",framerate=" + std::to_string(fps) + + "/1 " + " ! mpph264enc bps=" + + std::to_string(bitrate) + " gop=" + std::to_string(fps) + + " rc-mode=cbr " + " ! h264parse name=parse " + " ! flvmux streamable=true " + " ! rtmpsink location=\"" + + rtmp_url + + "\" " + " sync=false async=false"; + + GError* error = nullptr; + GstElement* pipeline = gst_parse_launch(pipeline_str.c_str(), &error); + if (error) + { + LOG_ERROR(std::string("[RTMP] Pipeline creation failed: ") + error->message); + g_error_free(error); + return nullptr; + } + + return pipeline; +} + +// ========== 主推流循环 ========== +void RTMPManager::stream_loop(Camera cam, StreamContext* ctx) +{ + const std::string key = make_key(cam.name); + + while (ctx->thread_running) + { + // 1. 检查设备节点是否存在 + struct stat st{}; + if (stat(cam.device.c_str(), &st) != 0) + { + std::lock_guard lk(ctx->status_mutex); + ctx->status.running = false; + ctx->status.last_error = "Device not found: " + cam.device; + LOG_WARN("[RTMP] " + key + " - " + ctx->status.last_error); + std::this_thread::sleep_for(std::chrono::seconds(3)); + continue; + } + + // 2. 创建 GStreamer 管线 + GstElement* pipeline = create_pipeline(cam); + if (!pipeline) + { + std::lock_guard lk(ctx->status_mutex); + ctx->status.running = false; + ctx->status.last_error = "Pipeline creation failed"; + std::this_thread::sleep_for(std::chrono::seconds(3)); + continue; + } + gst_element_set_name(pipeline, key.c_str()); + GstBus* bus = gst_element_get_bus(pipeline); + + // 3. 在 v4l2src 的 src pad 上挂探测 probe + bool got_frame = false; + { + GstElement* src = gst_bin_get_by_name(GST_BIN(pipeline), "src"); + if (src) + { + GstPad* pad = gst_element_get_static_pad(src, "src"); + if (pad) + { + gst_pad_add_probe( + pad, GST_PAD_PROBE_TYPE_BUFFER, + [](GstPad*, GstPadProbeInfo*, gpointer user_data) -> GstPadProbeReturn + { + *static_cast(user_data) = true; + return GST_PAD_PROBE_OK; + }, + &got_frame, nullptr); + gst_object_unref(pad); + } + else + { + LOG_WARN("[RTMP] " + key + " - src has no 'src' pad?"); + } + gst_object_unref(src); + } + else + { + LOG_WARN("[RTMP] " + key + " - cannot find element 'src' for pad-probe"); + } + } + + // 4. 启动播放 + LOG_INFO("[RTMP] Starting stream: " + key); + gst_element_set_state(pipeline, GST_STATE_PLAYING); + + // 等待进入 PLAYING 状态(最长 5s) + GstState state = GST_STATE_NULL, pending = GST_STATE_NULL; + bool confirmed_running = false; + if (gst_element_get_state(pipeline, &state, &pending, 5 * GST_SECOND) == GST_STATE_CHANGE_SUCCESS && + state == GST_STATE_PLAYING) + { + std::lock_guard lk(ctx->status_mutex); + confirmed_running = true; + ctx->status.running = true; + ctx->status.last_error.clear(); + LOG_INFO("[RTMP] " + key + " confirmed PLAYING"); + } + else + { + std::lock_guard lk(ctx->status_mutex); + ctx->status.running = false; + ctx->status.last_error = "Pipeline failed to confirm PLAYING"; + LOG_WARN("[RTMP] " + key + " - " + ctx->status.last_error); + } + + if (!confirmed_running) + { + gst_element_set_state(pipeline, GST_STATE_NULL); + if (bus) gst_object_unref(bus); + gst_object_unref(pipeline); + std::this_thread::sleep_for(std::chrono::milliseconds(RETRY_BASE_DELAY_MS)); + continue; + } + + // 5. 运行阶段:监测帧和错误 + const auto start_t = std::chrono::steady_clock::now(); + bool need_restart = false; + + while (ctx->thread_running) + { + // 检查是否收到帧(前 5s 内) + if (!got_frame) + { + auto elapsed = + std::chrono::duration_cast(std::chrono::steady_clock::now() - start_t) + .count(); + if (elapsed > 5) + { + std::lock_guard lk(ctx->status_mutex); + ctx->status.running = false; + ctx->status.last_error = "No frames detected (no video signal)"; + LOG_ERROR("[RTMP] " + key + " - " + ctx->status.last_error); + need_restart = true; + break; + } + } + else + { + std::lock_guard lk(ctx->status_mutex); + ctx->status.running = true; + ctx->status.last_error.clear(); + } + + // 等待错误或 EOS 消息 + GstMessage* msg = gst_bus_timed_pop_filtered(bus, 200 * GST_MSECOND, + (GstMessageType)(GST_MESSAGE_ERROR | GST_MESSAGE_EOS)); + + if (!msg) continue; + + switch (GST_MESSAGE_TYPE(msg)) + { + case GST_MESSAGE_ERROR: + { + GError* err = nullptr; + gst_message_parse_error(msg, &err, nullptr); + std::lock_guard lk(ctx->status_mutex); + ctx->status.running = false; + ctx->status.last_error = err ? err->message : "GStreamer error"; + LOG_ERROR("[RTMP] " + key + " stream error: " + ctx->status.last_error); + if (err) g_error_free(err); + need_restart = true; + break; + } + case GST_MESSAGE_EOS: + { + std::lock_guard lk(ctx->status_mutex); + ctx->status.running = false; + ctx->status.last_error = "End of stream (EOS)"; + LOG_WARN("[RTMP] " + key + " reached EOS"); + need_restart = true; + break; + } + default: break; + } + gst_message_unref(msg); + + if (need_restart) break; + } + + // 6. 收尾清理 + gst_element_set_state(pipeline, GST_STATE_NULL); + if (bus) gst_object_unref(bus); + gst_object_unref(pipeline); + + // 7. 若仍在运行状态,准备重启 + if (ctx->thread_running) + { + LOG_WARN("[RTMP] Restarting " + key + " in 3s..."); + std::this_thread::sleep_for(std::chrono::milliseconds(RETRY_BASE_DELAY_MS)); + } + } + + { + std::lock_guard lk(ctx->status_mutex); + ctx->status.running = false; + } + LOG_INFO("[RTMP] Stream thread exited for " + key); +} + +// ========== 启停与状态 ========== +void RTMPManager::start_all() +{ + LOG_INFO("[RTMP] Starting all record streams..."); + std::lock_guard lock(streams_mutex); + + int delay_ms = 0; + for (auto& cam : g_app_config.cameras) + { + auto key = make_key(cam.name); + if (streams.find(key) != streams.end()) + { + LOG_INFO("[RTMP] Stream already running: " + key); + continue; + } + + auto ctx = std::make_unique(); + ctx->thread_running.store(true); + + ctx->thread = std::thread( + [cam, ptr = ctx.get(), delay_ms]() + { + std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms)); + stream_loop(cam, ptr); + }); + + streams.emplace(key, std::move(ctx)); + delay_ms += 200; // 每路错开 200ms + } +} + +void RTMPManager::stop_all() +{ + std::lock_guard lock(streams_mutex); + for (auto& kv : streams) kv.second->thread_running.store(false); + for (auto& kv : streams) + if (kv.second->thread.joinable()) kv.second->thread.join(); + streams.clear(); +} + +bool RTMPManager::is_streaming(const std::string& cam_name) +{ + std::lock_guard lock(streams_mutex); + auto it = streams.find(make_key(cam_name)); + if (it == streams.end()) return false; + + auto& ctx = *(it->second); + std::lock_guard lk(ctx.status_mutex); + return ctx.status.running; +} + +std::string RTMPManager::get_stream_url(const std::string& cam_name) +{ + std::string ip = get_ip_address("enP2p33s0"); + if (ip.empty()) ip = "127.0.0.1"; + return "rtsp://" + ip + ":18554/" + cam_name + "_main"; +} +// ========== 汇总状态 ========== +std::vector RTMPManager::get_all_channels_status() +{ + std::vector result; + std::lock_guard lock(streams_mutex); + + for (size_t i = 0; i < g_app_config.cameras.size(); ++i) + { + const auto& cam = g_app_config.cameras[i]; + auto key = make_key(cam.name); + + ChannelInfo ch; + ch.loc = static_cast(i); + ch.url.clear(); + ch.running = false; + ch.reason = "Not started"; + + auto it = streams.find(key); + if (it != streams.end()) + { + auto& ctx = *(it->second); + + std::lock_guard lk(ctx.status_mutex); + auto& status = it->second->status; + + ch.running = status.running; + if (status.running) + { + ch.url = get_stream_url(cam.name); + ch.reason.clear(); + } + else + { + ch.reason = status.last_error.empty() ? "Unknown error" : status.last_error; + } + } + + result.push_back(ch); + } + return result; +} diff --git a/src/rtsp_manager.cpp b/src/rtsp_manager.cpp deleted file mode 100644 index dee8d46..0000000 --- a/src/rtsp_manager.cpp +++ /dev/null @@ -1,382 +0,0 @@ -// rtsp_manager.cpp -#include "rtsp_manager.hpp" - -#include -#include -#include - -#include -#include -#include -#include -#include - -#include "logger.hpp" - -// 静态变量定义 -GMainLoop* RTSPManager::loop = nullptr; -GMainContext* RTSPManager::main_context = nullptr; -GstRTSPServer* RTSPManager::server = nullptr; - -std::unordered_map RTSPManager::streaming_status; -std::unordered_map RTSPManager::mounted_factories; -std::mutex RTSPManager::mounted_factories_mutex; - -// ============================== -// ✅ 不再保存 GstRTSPMedia* 裸指针 -// 改为:每路 camera 的“活跃 session 计数” -// ============================== -static std::unordered_map g_client_count; -static std::mutex g_client_count_mutex; - -// 日志降噪:media-configure 过于频繁时只提示 -static std::unordered_map last_media_ts; -static std::mutex last_media_ts_mutex; - -bool set_v4l2_format(const std::string& dev, int width, int height) -{ - int fd = open(dev.c_str(), O_RDWR); - if (fd < 0) - { - LOG_ERROR("Failed to open " + dev); - return false; - } - - struct v4l2_format fmt; - memset(&fmt, 0, sizeof(fmt)); - fmt.type = V4L2_BUF_TYPE_VIDEO_CAPTURE_MPLANE; - - // 读格式,若相同则略过 - if (ioctl(fd, VIDIOC_G_FMT, &fmt) == 0) - { - bool match = true; - if (fmt.fmt.pix_mp.width != (unsigned int)width || fmt.fmt.pix_mp.height != (unsigned int)height || - fmt.fmt.pix_mp.pixelformat != V4L2_PIX_FMT_NV12) - { - match = false; - } - - if (match) - { - close(fd); - LOG_INFO("[RTSP] V4L2 format already NV12 " + std::to_string(width) + "x" + std::to_string(height) + - " for " + dev); - return true; - } - } - - memset(&fmt, 0, sizeof(fmt)); - fmt.type = V4L2_BUF_TYPE_VIDEO_CAPTURE_MPLANE; - fmt.fmt.pix_mp.width = width; - fmt.fmt.pix_mp.height = height; - fmt.fmt.pix_mp.pixelformat = V4L2_PIX_FMT_NV12; - fmt.fmt.pix_mp.num_planes = 1; - - if (ioctl(fd, VIDIOC_S_FMT, &fmt) < 0) - { - LOG_ERROR("VIDIOC_S_FMT failed for " + dev); - close(fd); - return false; - } - - LOG_INFO("[RTSP] Set V4L2 format to NV12 " + std::to_string(width) + "x" + std::to_string(height) + " for " + dev); - close(fd); - return true; -} - -void RTSPManager::init() -{ - gst_init(nullptr, nullptr); - LOG_INFO("[RTSP] GStreamer initialized."); -} - -GstRTSPMediaFactory* RTSPManager::create_media_factory(const Camera& cam) -{ - // 启动前把 v4l2 格式设成我们想要的 - set_v4l2_format(cam.device, cam.width, cam.height); - - int w = cam.width; - int h = cam.height; - - std::string caps = - "video/x-raw,format=NV12," - "width=" + - std::to_string(w) + ",height=" + std::to_string(h) + ",framerate=" + std::to_string(cam.fps) + "/1"; - - // 关键:tee 一路给 RTSP,一路给 fakesink,保持 v4l2src / mpph264enc 永远在跑 - std::string launch_str = "( v4l2src device=" + cam.device + - " io-mode=2 is-live=true do-timestamp=true" - " ! " + - caps + - " ! tee name=t " - " ! queue leaky=downstream max-size-time=0 max-size-bytes=0 max-size-buffers=0" - " ! mpph264enc name=enc rc-mode=cbr bps=" + - std::to_string(cam.bitrate) + " gop=" + std::to_string(cam.fps) + - " header-mode=1" - " ! h264parse" - " ! rtph264pay name=pay0 pt=96 config-interval=1 " - " t. ! queue leaky=downstream max-size-buffers=3 max-size-bytes=0 max-size-time=0" - " ! fakesink sync=false async=false )"; - - LOG_INFO("[RTSP] Launch for " + cam.name + ": " + launch_str); - - GstRTSPMediaFactory* factory = gst_rtsp_media_factory_new(); - gst_rtsp_media_factory_set_launch(factory, launch_str.c_str()); - - // 所有客户端共享同一个 pipeline,避免频繁拉起 v4l2 / encoder - gst_rtsp_media_factory_set_shared(factory, TRUE); - - // 客户端断开时不要乱 suspend/reset - gst_rtsp_media_factory_set_suspend_mode(factory, GST_RTSP_SUSPEND_MODE_NONE); - - g_signal_connect_data(factory, "media-configure", G_CALLBACK(on_media_created), g_strdup(cam.name.c_str()), - (GClosureNotify)g_free, (GConnectFlags)0); - - return factory; -} - -void RTSPManager::start(const std::vector& cams) -{ - server = gst_rtsp_server_new(); - gst_rtsp_server_set_service(server, "8554"); - - // ✅ attach 之前设置 backlog - gst_rtsp_server_set_backlog(server, 32); - - loop = g_main_loop_new(nullptr, FALSE); - main_context = g_main_loop_get_context(loop); - - GstRTSPMountPoints* mounts = gst_rtsp_server_get_mount_points(server); - - for (const auto& cam : cams) - { - if (!cam.enabled) continue; - - GstRTSPMediaFactory* factory = create_media_factory(cam); - std::string mount_point = "/" + cam.name; - - gst_rtsp_mount_points_add_factory(mounts, mount_point.c_str(), factory); - - { - std::lock_guard lock(mounted_factories_mutex); - mounted_factories[cam.name] = factory; - streaming_status[cam.name] = true; - } - - // 初始化计数 - { - std::lock_guard lock(g_client_count_mutex); - g_client_count.emplace(cam.name, 0); - } - - LOG_INFO("[RTSP] Camera '" + cam.name + "' mounted at rtsp://0.0.0.0:8554" + mount_point); - } - - g_object_unref(mounts); - - gst_rtsp_server_attach(server, nullptr); - - LOG_INFO("[RTSP] Server running on rtsp://0.0.0.0:8554"); - g_main_loop_run(loop); - - if (server) - { - g_object_unref(server); - server = nullptr; - } - if (loop) - { - g_main_loop_unref(loop); - loop = nullptr; - } - - LOG_INFO("[RTSP] Server stopped."); -} - -void RTSPManager::on_media_created(GstRTSPMediaFactory*, GstRTSPMedia* media, gpointer user_data) -{ - const char* cam_name = static_cast(user_data); - - // ✅ 只做日志降噪:别让“抑制”影响生命周期处理 - bool noisy = false; - auto now = std::chrono::steady_clock::now(); - { - std::lock_guard lock(last_media_ts_mutex); - auto& last = last_media_ts[cam_name]; - if (last.time_since_epoch().count() != 0 && now - last < std::chrono::seconds(2)) noisy = true; - last = now; - } - - int count_after = 0; - { - std::lock_guard lock(g_client_count_mutex); - int& c = g_client_count[cam_name]; // 若不存在会创建;这里无所谓 - c++; - count_after = c; - } - - if (noisy) - { - LOG_WARN(std::string("[RTSP] media-configure frequent: ") + cam_name + - " (active_sessions=" + std::to_string(count_after) + ")"); - } - else - { - LOG_INFO(std::string("[RTSP] media-configure for camera: ") + cam_name + - " (active_sessions=" + std::to_string(count_after) + ")"); - } - - // ✅ 生命周期绑定:unprepared 一定会对应减计数 - g_signal_connect_data(media, "unprepared", G_CALLBACK(on_media_unprepared), g_strdup(cam_name), - (GClosureNotify)g_free, (GConnectFlags)0); -} - -void RTSPManager::on_media_unprepared(GstRTSPMedia* /*media*/, gpointer user_data) -{ - const char* cam_name = static_cast(user_data); - - int count_after = 0; - { - std::lock_guard lock(g_client_count_mutex); - auto it = g_client_count.find(cam_name); - if (it == g_client_count.end()) - { - // 可能发生在 unmount 后 teardown 仍在进行的边界情况 - LOG_WARN(std::string("[RTSP] media-unprepared but cam not in client_count: ") + cam_name); - return; - } - - if (it->second > 0) it->second--; - count_after = it->second; - } - - LOG_INFO(std::string("[RTSP] media-unprepared: ") + cam_name + " (active_sessions=" + std::to_string(count_after) + - ")"); - - // ✅ 不要 g_object_unref(media) —— gst-rtsp-server 会处理 -} - -gboolean RTSPManager::mount_camera_in_main(gpointer data) -{ - Camera* cam = static_cast(data); - if (!cam || !server) - { - delete cam; - return G_SOURCE_REMOVE; - } - - GstRTSPMountPoints* mounts = gst_rtsp_server_get_mount_points(server); - - std::string mount_point = "/" + cam->name; - GstRTSPMediaFactory* factory = create_media_factory(*cam); - - gst_rtsp_mount_points_add_factory(mounts, mount_point.c_str(), factory); - g_object_unref(mounts); - - { - std::lock_guard lock(mounted_factories_mutex); - mounted_factories[cam->name] = factory; - streaming_status[cam->name] = true; - } - - { - std::lock_guard lock(g_client_count_mutex); - g_client_count[cam->name] = 0; - } - - LOG_INFO("[RTSP] Camera '" + cam->name + "' mounted at rtsp://localhost:8554" + mount_point); - - delete cam; - return G_SOURCE_REMOVE; -} - -gboolean RTSPManager::unmount_camera_in_main(gpointer data) -{ - Camera* cam = static_cast(data); - if (!cam || !server) - { - delete cam; - return G_SOURCE_REMOVE; - } - - std::string cam_name = cam->name; - std::string mount_point = "/" + cam_name; - - // ✅ 仅移除 mount;media teardown 交给 gst-rtsp-server - GstRTSPMountPoints* mounts = gst_rtsp_server_get_mount_points(server); - if (mounts) - { - gst_rtsp_mount_points_remove_factory(mounts, mount_point.c_str()); - g_object_unref(mounts); - } - - { - std::lock_guard lock(mounted_factories_mutex); - auto it = mounted_factories.find(cam_name); - if (it != mounted_factories.end()) - { - // 为了避免 double-unref:这里不手动 unref factory - mounted_factories.erase(it); - } - streaming_status[cam_name] = false; - } - - { - std::lock_guard lock(g_client_count_mutex); - g_client_count.erase(cam_name); - } - - LOG_INFO("[RTSP] Camera '" + cam_name + "' unmounted."); - delete cam; - return G_SOURCE_REMOVE; -} - -void RTSPManager::mount_camera(const Camera& cam) -{ - Camera* camCopy = new Camera(cam); - g_main_context_invoke( - main_context, [](gpointer data) -> gboolean { return RTSPManager::mount_camera_in_main(data); }, camCopy); -} - -void RTSPManager::unmount_camera(const Camera& cam) -{ - Camera* camCopy = new Camera(cam); - g_main_context_invoke( - main_context, [](gpointer data) -> gboolean { return RTSPManager::unmount_camera_in_main(data); }, camCopy); -} - -bool RTSPManager::is_streaming(const std::string& cam_name) -{ - std::lock_guard lock(mounted_factories_mutex); - auto it = streaming_status.find(cam_name); - return it != streaming_status.end() ? it->second : false; -} - -bool RTSPManager::is_any_streaming() -{ - std::lock_guard lock(mounted_factories_mutex); - for (auto& kv : streaming_status) - if (kv.second) return true; - return false; -} - -void RTSPManager::stop() -{ - if (!loop) return; - - if (server) gst_rtsp_server_set_backlog(server, 0); // optional - - if (main_context) - { - g_main_context_invoke( - main_context, - [](gpointer data) -> gboolean - { - g_main_loop_quit(static_cast(data)); - return G_SOURCE_REMOVE; - }, - loop); - } - - g_main_loop_quit(loop); -}