From a12d902fc0a5c05fb777f4fd50a2f7dff5aafd5f Mon Sep 17 00:00:00 2001 From: cxh Date: Wed, 15 Oct 2025 08:50:01 +0800 Subject: [PATCH] temp --- include/rtmp_manager.hpp | 35 ++++++--- src/main.cpp | 123 ++++++++++-------------------- src/rtmp_manager.cpp | 158 ++++++++++++++++++++++----------------- 3 files changed, 155 insertions(+), 161 deletions(-) diff --git a/include/rtmp_manager.hpp b/include/rtmp_manager.hpp index 35e3a8f..2686d17 100644 --- a/include/rtmp_manager.hpp +++ b/include/rtmp_manager.hpp @@ -2,42 +2,59 @@ #pragma once #include -#include "app_config.hpp" #include #include #include #include #include +#include +#include "app_config.hpp" class RTMPManager { public: + enum class StreamResult + { + OK, + PIPELINE_ERROR, + CONNECTION_FAIL, + EOS_RECEIVED, + UNKNOWN + }; + + struct StreamStatus + { + bool running; + StreamResult last_result; + std::string last_error; + }; + + using StreamCallback = std::function; + static void init(); static void start_camera(const Camera &cam, StreamType type); static void stop_camera(const std::string &cam_name, StreamType type); static void stop_all(); static bool is_streaming(const std::string &cam_name, StreamType type); static bool is_any_streaming(); - static std::string get_stream_url(const std::string &cam_name, StreamType type); + static void set_status_callback(StreamCallback cb); + private: struct StreamContext { - std::atomic running; + std::atomic running{false}; std::thread thread; - - StreamContext() : running(false) {} - StreamContext(StreamContext &&) = default; // 允许移动 - StreamContext &operator=(StreamContext &&) = default; - StreamContext(const StreamContext &) = delete; - StreamContext &operator=(const StreamContext &) = delete; + StreamStatus status; }; static std::unordered_map streams; static std::mutex streams_mutex; + static StreamCallback status_callback; static void stream_loop(Camera cam, StreamType type); static GstElement *create_pipeline(const Camera &cam, StreamType type); static std::string make_stream_key(const std::string &cam_name, StreamType type); + static void update_status(const std::string &key, const StreamStatus &status); }; diff --git a/src/main.cpp b/src/main.cpp index 96ac9ef..a6f473b 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -3,123 +3,82 @@ #include "rtmp_manager.hpp" #include "logger.hpp" #include "mqtt_client_wrapper.hpp" + #include #include #include -#include // write, STDOUT_FILENO +#include #include -// 可通过这些开关快速启用/禁用线程进行调试 -constexpr bool ENABLE_RTMP_THREAD = true; // 设置为 false 禁用 RTSP 线程 -constexpr bool ENABLE_MQTT_THREAD = true; // 设置为 false 禁用 MQTT 线程 +static std::atomic g_running(true); -std::atomic g_running(true); - -static void minimal_signal_handler(int signum) +static void signal_handler(int signum) { - // 只做非常有限且 async-signal-safe 的操作 g_running.store(false, std::memory_order_relaxed); - const char msg[] = "[MAIN] Signal received, initiating shutdown...\n"; + const char msg[] = "[MAIN] Signal received, shutting down...\n"; write(STDERR_FILENO, msg, sizeof(msg) - 1); - // 不要调用 LOG_*、malloc、std::string、mutex、exit 等 } int main() { - // 安装信号处理(SIGINT 和 SIGTERM) - struct sigaction sigAct{}; - sigAct.sa_handler = minimal_signal_handler; - sigemptyset(&sigAct.sa_mask); - sigAct.sa_flags = 0; - sigaction(SIGINT, &sigAct, nullptr); - sigaction(SIGTERM, &sigAct, nullptr); + // 安装信号处理器 + struct sigaction sa{}; + sa.sa_handler = signal_handler; + sigemptyset(&sa.sa_mask); + sa.sa_flags = 0; + sigaction(SIGINT, &sa, nullptr); + sigaction(SIGTERM, &sa, nullptr); signal(SIGPIPE, SIG_IGN); - // 初始化日志文件 + // 初始化日志 Logger::set_log_to_file(get_executable_dir_file_path("app.log")); + LOG_INFO("[MAIN] Application starting..."); try { - // 从可执行文件所在目录读取配置文件 + // 加载配置 g_app_config = AppConfig::load_from_file(get_executable_dir_file_path("config.json")); + LOG_INFO("[MAIN] Loaded config from config.json"); } catch (const std::exception &e) { - LOG_ERROR(std::string("Failed to load config: ") + e.what()); + LOG_ERROR(std::string("[MAIN] Failed to load config: ") + e.what()); return -1; } - // 线程退出标志,用于超时等待 - std::atomic rtsp_thread_exited(false); - std::atomic mqtt_thread_exited(false); - - // 先在主线程初始化 GStreamer + // 初始化 GStreamer RTMPManager::init(); - std::thread rtsp_thread; - std::thread mqtt_thread; + // 启动 MQTT 线程 + std::thread mqtt_thread([] + { + try + { + LOG_INFO("[MAIN] MQTT thread started."); + mqtt_client_thread_func(); // 在回调里执行推流控制 + } + catch (const std::exception &e) + { + LOG_ERROR(std::string("[MAIN] MQTT thread crashed: ") + e.what()); + } + LOG_INFO("[MAIN] MQTT thread exiting..."); }); - // 启动 MQTT 线程(如果启用) - if (ENABLE_MQTT_THREAD) - { - mqtt_thread = std::thread([&]() - { - mqtt_client_thread_func(); - mqtt_thread_exited.store(true, std::memory_order_relaxed); }); - LOG_INFO("[MAIN] MQTT thread started"); - } - else - { - LOG_INFO("[MAIN] MQTT thread disabled by build-time toggle"); - } - - // 等待退出信号 + // 主循环,仅等待退出信号 while (g_running.load(std::memory_order_relaxed)) std::this_thread::sleep_for(std::chrono::milliseconds(200)); - LOG_INFO("[MAIN] Shutdown requested, stopping services..."); + LOG_INFO("[MAIN] Shutdown requested. Cleaning up..."); - // 等线程优雅退出:总等待时间 (可调整) - const auto max_wait = std::chrono::seconds(5); - const auto poll_interval = std::chrono::milliseconds(100); - auto deadline = std::chrono::steady_clock::now() + max_wait; + // 停止所有 RTMP 流 + RTMPManager::stop_all(); - // 重置 MQTT 线程等待的截止时间 - deadline = std::chrono::steady_clock::now() + max_wait; - - if (ENABLE_MQTT_THREAD) + // 等待 MQTT 线程退出 + if (mqtt_thread.joinable()) { - while (!mqtt_thread_exited.load(std::memory_order_relaxed) && - std::chrono::steady_clock::now() < deadline) - { - std::this_thread::sleep_for(poll_interval); - } - - if (mqtt_thread.joinable()) - { - if (mqtt_thread_exited.load(std::memory_order_relaxed)) - { - mqtt_thread.join(); - LOG_INFO("[MAIN] MQTT thread finished and joined."); - } - else - { - LOG_WARN("[MAIN] MQTT thread did not exit within timeout."); - } - } + mqtt_thread.join(); + LOG_INFO("[MAIN] MQTT thread joined."); } - // 如果有线程仍未退出,则强制终止 - bool any_failed = false; - if (ENABLE_MQTT_THREAD && mqtt_thread.joinable() && !mqtt_thread_exited.load(std::memory_order_relaxed)) - any_failed = true; - - if (any_failed) - { - LOG_ERROR("[MAIN] Threads did not exit in time. Forcing immediate termination."); - _exit(1); - } - - LOG_INFO("[MAIN] Program exited cleanly."); + LOG_INFO("[MAIN] Application exited cleanly."); return 0; -} \ No newline at end of file +} diff --git a/src/rtmp_manager.cpp b/src/rtmp_manager.cpp index 10ed4fd..816f2a1 100644 --- a/src/rtmp_manager.cpp +++ b/src/rtmp_manager.cpp @@ -1,14 +1,14 @@ // rtsp_manager.cpp #include "rtmp_manager.hpp" #include "logger.hpp" -#include #include #include -#include +#include #include std::unordered_map RTMPManager::streams; std::mutex RTMPManager::streams_mutex; +RTMPManager::StreamCallback RTMPManager::status_callback = nullptr; static inline std::string stream_type_suffix(StreamType type) { @@ -26,6 +26,21 @@ void RTMPManager::init() LOG_INFO("[RTMP] GStreamer initialized."); } +void RTMPManager::set_status_callback(StreamCallback cb) +{ + status_callback = std::move(cb); +} + +void RTMPManager::update_status(const std::string &key, const StreamStatus &status) +{ + std::lock_guard lock(streams_mutex); + auto it = streams.find(key); + if (it != streams.end()) + it->second.status = status; + if (status_callback) + status_callback(key, status); +} + GstElement *RTMPManager::create_pipeline(const Camera &cam, StreamType type) { int width = cam.width; @@ -42,30 +57,26 @@ GstElement *RTMPManager::create_pipeline(const Camera &cam, StreamType type) } std::string stream_name = cam.name + stream_type_suffix(type); - std::string pipeline_str = "v4l2src device=" + cam.device + " ! video/x-raw,format=NV12,width=" + std::to_string(width) + ",height=" + std::to_string(height) + - ",framerate=" + std::to_string(fps) + "/1" - " ! queue max-size-buffers=1 leaky=downstream " - " ! mpph264enc bps=" + + ",framerate=" + std::to_string(fps) + + "/1 ! queue max-size-buffers=1 leaky=downstream " + "! mpph264enc bps=" + std::to_string(bitrate) + " gop=" + std::to_string(fps) + - " ! h264parse " - " ! flvmux streamable=true name=mux " - " ! rtmpsink location=\"rtmp://127.0.0.1/live/" + - stream_name + - " live=1\" sync=false"; + " ! h264parse ! flvmux streamable=true name=mux " + "! rtmpsink location=\"rtmp://127.0.0.1/live/" + + stream_name + " live=1\" sync=false"; - LOG_INFO("[RTMP] Creating pipeline for '" + cam.name + "' (" + - (type == StreamType::MAIN ? "MAIN" : "SUB") + ") -> " + pipeline_str); + LOG_INFO("[RTMP] Creating pipeline for '" + stream_name + "': " + pipeline_str); GError *error = nullptr; GstElement *pipeline = gst_parse_launch(pipeline_str.c_str(), &error); if (error) { - LOG_ERROR("[RTMP] Failed to parse pipeline: " + std::string(error->message)); + LOG_ERROR("[RTMP] Pipeline parse error: " + std::string(error->message)); g_error_free(error); return nullptr; } @@ -75,42 +86,50 @@ GstElement *RTMPManager::create_pipeline(const Camera &cam, StreamType type) void RTMPManager::stream_loop(Camera cam, StreamType type) { const std::string key = make_stream_key(cam.name, type); - LOG_INFO("[RTMP] Stream thread started for '" + key + "'"); + LOG_INFO("[RTMP] Stream loop started for " + key); + + const int MAX_RETRIES = 5; + int retry_count = 0; while (true) { { std::lock_guard lock(streams_mutex); - if (!streams[key].running.load()) + if (!streams[key].running) break; } GstElement *pipeline = create_pipeline(cam, type); if (!pipeline) { - LOG_ERROR("[RTMP] Failed to create pipeline for '" + key + "', retrying in 3s..."); + update_status(key, {false, StreamResult::PIPELINE_ERROR, "Failed to create pipeline"}); std::this_thread::sleep_for(std::chrono::seconds(3)); + if (++retry_count > MAX_RETRIES) + break; continue; } GstBus *bus = gst_element_get_bus(pipeline); gst_element_set_state(pipeline, GST_STATE_PLAYING); - LOG_INFO("[RTMP] Camera '" + key + "' streaming..."); + update_status(key, {true, StreamResult::OK, ""}); + LOG_INFO("[RTMP] " + key + " is streaming."); - bool error_occurred = false; - int consecutive_failures = 0; - const int MAX_RETRIES = 5; + bool stop_flag = false; + GstMessage *msg = nullptr; while (true) { - GstMessage *msg = gst_bus_timed_pop_filtered( + msg = gst_bus_timed_pop_filtered( bus, GST_CLOCK_TIME_NONE, static_cast(GST_MESSAGE_ERROR | GST_MESSAGE_EOS)); { std::lock_guard lock(streams_mutex); - if (!streams[key].running.load()) + if (!streams[key].running) + { + stop_flag = true; break; + } } if (!msg) @@ -121,21 +140,22 @@ void RTMPManager::stream_loop(Camera cam, StreamType type) GError *err = nullptr; gchar *debug = nullptr; gst_message_parse_error(msg, &err, &debug); - LOG_ERROR("[RTMP] Error on '" + key + "': " + std::string(err->message)); + std::string err_msg = err ? err->message : "Unknown GStreamer error"; + LOG_ERROR("[RTMP] Error in " + key + ": " + err_msg); + update_status(key, {false, StreamResult::CONNECTION_FAIL, err_msg}); g_error_free(err); g_free(debug); - error_occurred = true; - consecutive_failures++; + stop_flag = true; } else if (GST_MESSAGE_TYPE(msg) == GST_MESSAGE_EOS) { - LOG_WARN("[RTMP] EOS received on '" + key + "'"); - error_occurred = true; - consecutive_failures++; + LOG_WARN("[RTMP] EOS on " + key); + update_status(key, {false, StreamResult::EOS_RECEIVED, "EOS"}); + stop_flag = true; } gst_message_unref(msg); - if (error_occurred) + if (stop_flag) break; } @@ -143,100 +163,98 @@ void RTMPManager::stream_loop(Camera cam, StreamType type) gst_object_unref(bus); gst_object_unref(pipeline); - if (!error_occurred) + if (!stop_flag) break; - if (consecutive_failures >= MAX_RETRIES) + if (++retry_count > MAX_RETRIES) { - LOG_ERROR("[RTMP] Max retries reached for '" + key + "'. Stopping reconnection attempts."); - std::lock_guard lock(streams_mutex); - streams[key].running.store(false); + LOG_ERROR("[RTMP] " + key + " reached max retries. Giving up."); break; } - LOG_WARN("[RTMP] Reconnecting '" + key + "' in 3s..."); + LOG_WARN("[RTMP] Reconnecting " + key + " in 3s..."); std::this_thread::sleep_for(std::chrono::seconds(3)); } - LOG_INFO("[RTMP] Stream thread exited for '" + key + "'"); + update_status(key, {false, StreamResult::UNKNOWN, "Stream loop exited"}); + LOG_INFO("[RTMP] Stream loop ended for " + key); } void RTMPManager::start_camera(const Camera &cam, StreamType type) { - std::lock_guard lock(streams_mutex); std::string key = make_stream_key(cam.name, type); + std::lock_guard lock(streams_mutex); - if (streams.count(key) && streams[key].running.load()) + auto it = streams.find(key); + if (it != streams.end() && it->second.running) { - LOG_WARN("[RTMP] Camera '" + key + "' already streaming."); + LOG_WARN("[RTMP] " + key + " already streaming."); return; } StreamContext ctx; - ctx.running.store(true); + ctx.running = true; ctx.thread = std::thread([cam, type]() - { RTMPManager::stream_loop(cam, type); }); - - // 修复 emplace 构造失败问题 - streams.emplace(std::move(key), std::move(ctx)); + { stream_loop(cam, type); }); + streams[key] = std::move(ctx); } void RTMPManager::stop_camera(const std::string &cam_name, StreamType type) { - std::unique_lock lock(streams_mutex); std::string key = make_stream_key(cam_name, type); + std::unique_lock lock(streams_mutex); auto it = streams.find(key); if (it == streams.end()) { - LOG_WARN("[RTMP] Camera '" + key + "' not found."); + LOG_WARN("[RTMP] " + key + " not found."); return; } - it->second.running.store(false); + it->second.running = false; auto th = std::move(it->second.thread); streams.erase(it); lock.unlock(); if (th.joinable()) th.join(); + + update_status(key, {false, StreamResult::OK, "Stopped manually"}); +} + +void RTMPManager::stop_all() +{ + std::vector keys; + { + std::lock_guard lock(streams_mutex); + for (auto &kv : streams) + keys.push_back(kv.first); + } + for (auto &key : keys) + { + if (key.find("_main") != std::string::npos) + stop_camera(key.substr(0, key.size() - 5), StreamType::MAIN); + else + stop_camera(key.substr(0, key.size() - 4), StreamType::SUB); + } } bool RTMPManager::is_streaming(const std::string &cam_name, StreamType type) { std::lock_guard lock(streams_mutex); - std::string key = make_stream_key(cam_name, type); + auto key = make_stream_key(cam_name, type); auto it = streams.find(key); - return it != streams.end() && it->second.running.load(); + return it != streams.end() && it->second.running; } bool RTMPManager::is_any_streaming() { std::lock_guard lock(streams_mutex); for (auto &kv : streams) - if (kv.second.running.load()) + if (kv.second.running) return true; return false; } -void RTMPManager::stop_all() -{ - std::vector> targets; - - { - std::lock_guard lock(streams_mutex); - for (auto &kv : streams) - { - if (kv.first.size() > 4 && kv.first.find("_sub") == kv.first.size() - 4) - targets.emplace_back(kv.first.substr(0, kv.first.size() - 4), StreamType::SUB); - else - targets.emplace_back(kv.first.substr(0, kv.first.size() - 5), StreamType::MAIN); - } - } - - for (auto &[name, type] : targets) - stop_camera(name, type); -} - std::string RTMPManager::get_stream_url(const std::string &cam_name, StreamType type) { return "rtmp://127.0.0.1/live/" + make_stream_key(cam_name, type);